In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=3c5b79a0242040925eed11516d01d19fc9f399e424b348ff854e7cdd7e0ae424
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, regexp_replace, split, size, expr, concat_ws
from pyspark.ml.feature import NGram, Tokenizer
from pyspark.ml import Pipeline
import re

# Create a Spark session
spark = SparkSession.builder.appName("GitHubNGrams").getOrCreate()

# Load the JSON data into a DataFrame
json_data = spark.read.json("10K.github.jsonl")

# Filter data for PushEvent type
push_events = json_data.filter(col("type") == "PushEvent")

# Define a function to extract and preprocess commit messages
def process_commit_messages(commit_messages):
    # Convert to lowercase and remove punctuation and underscores
    cleaned_messages = [re.sub(r'\W+', ' ', msg.lower()) for msg in commit_messages]
    return ' '.join(cleaned_messages)

# UDF for processing commit messages
process_commit_messages_udf = spark.udf.register("process_commit_messages", process_commit_messages)

# Apply transformations to the DataFrame
processed_data = push_events.withColumn(
    "processed_commits",
    process_commit_messages_udf(col("payload.commits.message"))
)

# Tokenize and apply NGram transformation
tokenizer = Tokenizer(inputCol="processed_commits", outputCol="tokenized_words")
ngram = NGram(n=3, inputCol=tokenizer.getOutputCol(), outputCol="ngrams_result")
pipeline = Pipeline(stages=[tokenizer, ngram])
model = pipeline.fit(processed_data)
result = model.transform(processed_data)

# Extract only the first five words from the n-grams
result = result.withColumn(
    "first_five_words",
    expr("slice(ngrams_result, 1, case when size(ngrams_result) >= 5 then 5 else size(ngrams_result) end)")
)

# Handle cases where 1-2 words are present
result = result.withColumn(
    "first_five_words",
    expr("case when size(tokenized_words) <= 2 then tokenized_words else first_five_words end")
)

# Convert the array of strings to a single string
result = result.withColumn("first_five_words", concat_ws(", ", col("first_five_words")))

# Save the result to a CSV file
result.select("actor.display_login", "first_five_words").coalesce(1).write.csv("output.csv", header=True, mode="overwrite")

# Stop the Spark session
spark.stop()