In [None]:
!pip install plotly

In [None]:
import time
import os
import shutil
from itertools import islice
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import plotly.express as px

from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import findspark
findspark.init()

from google.cloud import storage

spark = SparkSession.builder \
    .appName("YourAppName") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.memory", "16g") \
    .config("spark.driver.maxResultSize", "16g") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.minExecutors", "4") \
    .config("spark.dynamicAllocation.maxExecutors", "100") \
    .getOrCreate()



spark.conf.set("spark.sql.repl.eagerEval.enabled",True)
spark.conf.set("spark.sql.repl.eagerEval.maxCharsPerCell", 200)

print(spark.version)

# Read from folder into spark df

You will see 5 sub-folders, each containing a collection of parquet files.  A single folder can be read into Spark Dataframe:

* Commits (gs://msca-bdp-data-open/final_project_git/commits): This contains information about the commits made to repositories. Each commit has metadata such as the author, committer, commit date, SHA, parent commit(s), and commit message.

* Contents (gs://msca-bdp-data-open/final_project_git/contents): Provides the content of the files in the repositories. This is useful if you're looking to analyze source code or documents within repositories.

* Files (gs://msca-bdp-data-open/final_project_git/files): This contains metadata about the files in the repositories such as the file path, the mode, and the blob ID which links back to the content.

* Languages (gs://msca-bdp-data-open/final_project_git/languages): Each repository often has code written in one or more languages. This table provides an aggregation of the number of bytes of code for each language in a repository.

* Licenses (gs://msca-bdp-data-open/final_project_git/licenses): Contains information on the licenses used by repositories.
 

In [None]:
#!hadoop fs -ls "gs://msca-bdp-data-open/final_project_git/commits"

In [None]:
# Read into spark df
%time
commits_spDf = spark.read.parquet("gs://msca-bdp-data-open/final_project_git/commits")
contents_spDf = spark.read.parquet("gs://msca-bdp-data-open/final_project_git/contents")
files_spDf = spark.read.parquet("gs://msca-bdp-data-open/final_project_git/files")
languages_spDf = spark.read.parquet("gs://msca-bdp-data-open/final_project_git/languages")
licenses_spDf = spark.read.parquet("gs://msca-bdp-data-open/final_project_git/licenses")

In [None]:
# Checking preliminary data schema
%time
tables = [commits_spDf,contents_spDf,files_spDf,languages_spDf,licenses_spDf]

for i in tables:
    print(f"Table \n {i}")
    i.printSchema()

# EDA
What is the timeline of the data?  Do you see significant peaks and valleys?

Do you see any data collection gaps?

Do you see any outliers?  Remove obvious outliers before plotting the timeline

Do you see any spikes?  Are these spikes caused by real activities / events?

In [None]:
from pyspark.sql.functions import col, from_unixtime

# Extract commit timestamp
commits_spDf = commits_spDf.withColumn("author_timestamp", from_unixtime(col("author.date.seconds")))
commits_spDf = commits_spDf.withColumn("committer_timestamp", from_unixtime(col("committer.date.seconds")))
commits_spDf = commits_spDf.withColumn("author_commit_date", col("author_timestamp").cast("date"))
commits_spDf = commits_spDf.withColumn("committer_commit_date", col("committer_timestamp").cast("date"))

# View sample data
commits_spDf.select("committer_timestamp", "author_timestamp").show(5)

In [None]:
commits_spDf.select("committer_commit_date", "author_commit_date").show(5)

In [None]:
from pyspark.sql.functions import count

# Aggregate by date
author_timeline_df = commits_spDf.groupBy("author_commit_date").agg(count("*").alias("author_commit_count"))
committer_timeline_df = commits_spDf.groupBy("committer_commit_date").agg(count("*").alias("committer_commit_count"))

# Sort by date for visualization
author_timeline_df = author_timeline_df.orderBy("author_commit_date")
author_timeline_df.show(10)

committer_timeline_df = committer_timeline_df.orderBy("committer_commit_date")
committer_timeline_df.show(10)

In [None]:
author_timeline_pdDf = author_timeline_df.toPandas()

In [None]:
display(author_timeline_pdDf.head(5))
display(author_timeline_pdDf.tail(50))

In [None]:
committer_timeline_pdDf = committer_timeline_df.toPandas()

In [None]:
display(committer_timeline_pdDf.head(5))
display(committer_timeline_pdDf.tail(50))

In [None]:
pdDf = [author_timeline_pdDf, committer_timeline_pdDf]
titles = ['author','committer']

In [None]:
for df, val in zip(pdDf, titles):
    df['Date'] = pd.to_datetime(df[f'{val}_commit_date'])
    #df = df.dropna(inplace=True)
    print(f"For table {val}")
    print(f"Earliest commit date in record is {df['Date'].min()}")
    print(f"Latest commit date in record is {df['Date'].max()}")

In [None]:
for df, title in zip(pdDf, titles):
    x_col = f'{title}_commit_date'
    y_col = f'{title}_commit_count'
    
    df[x_col] = pd.to_datetime(df[x_col])
    
    # Plotting
    plt.figure(figsize=(10, 6))
    plt.plot(df[x_col], df[y_col], marker='o', label=f'{title} timeline')
    
    # Adding labels and title
    plt.xlabel('Commit Date')
    plt.ylabel('Commit Count')
    plt.title(f'{title.capitalize()} Commit Timeline')
    plt.legend()
    plt.grid(True)
    
    # Show the plot
    plt.show()

In [None]:
committer_timeline_pdDf.tail(5560)

# Remove outliar that is likely to be errors

Now notice that there are a lot of future dates after 2022-11-27 that have only 2 counts. We don't want to see because they are likely errors. So we decide to filter out dates on and after 2022-11-27.

In [None]:
# Filter rows where the dates are before 2022-11-27
from pyspark.sql.functions import year,col

cutoff_date = "2022-11-27"
commits_spDf = commits_spDf.filter((col("author_timestamp") < cutoff_date) & (col("committer_timestamp") < cutoff_date))

# View the filtered data
commits_spDf.select("committer_timestamp", "author_timestamp").show(5)

In [None]:
# Aggregate by date
author_timeline_df = commits_spDf.groupBy("author_commit_date").agg(count("*").alias("author_commit_count"))
committer_timeline_df = commits_spDf.groupBy("committer_commit_date").agg(count("*").alias("committer_commit_count"))

# Sort by date for visualization
author_timeline_df = author_timeline_df.orderBy("author_commit_date")
author_timeline_df.show(10)

committer_timeline_df = committer_timeline_df.orderBy("committer_commit_date")
committer_timeline_df.show(10)

In [None]:
author_timeline_pdDf_filtered = author_timeline_df.toPandas()

In [None]:
committer_timeline_pdDf_filtered = committer_timeline_df.toPandas()

In [None]:
committer_timeline_pdDf_filtered.tail(30)

In [None]:
pdDf = [author_timeline_pdDf_filtered, committer_timeline_pdDf_filtered]
titles = ['author','committer']

for df, title in zip(pdDf, titles):
    x_col = f'{title}_commit_date'
    y_col = f'{title}_commit_count'
    
    df[x_col] = pd.to_datetime(df[x_col])
    
    # Plotting
    plt.figure(figsize=(10, 6))
    plt.scatter(df[x_col], df[y_col], marker='o',s=10, label=f'{title} timeline')
    
    # Adding labels and title
    plt.xlabel('Commit Date')
    plt.ylabel('Commit Count')
    plt.title(f'{title.capitalize()} Commit Timeline')
    plt.legend()
    plt.grid(True)
    
    # Show the plot
    plt.show()

# Popular Language Over Time

In [None]:
from pyspark.sql.functions import explode, col

# Step 1: Explode repo_name in commits_spDf
commits_exploded = commits_spDf.withColumn("repo_name_exploded", explode(col("repo_name")))

# Step 2: Join languages_spDf and exploded commits_spDf on repo_name
merged_df = languages_spDf.alias("lang").join(
    commits_exploded.alias("commit"),
    col("lang.repo_name") == col("commit.repo_name_exploded"),
    "inner"
)

# Step 3: Select relevant columns including time columns
result_df = merged_df.select(
    col("lang.repo_name").alias("repo_name"),           # from languages_spDf
    col("lang.language").alias("language"),            # from languages_spDf
    col("lang.language.bytes").alias("language_bytes"), # language bytes
    col("commit.author_commit_date").alias("author_commit_date"), # from commits_spDf
    col("commit.committer_commit_date").alias("committer_commit_date"), # from commits_spDf
)



In [None]:
language_time = result_df.groupBy(["committer_commit_date","language"]).agg(count("*").alias("Count"))

In [None]:
# Step 4: Convert to Pandas for aggregation and plotting
result_pd = language_time.toPandas()

In [None]:
bucket_write = "gs://msca-bdp-students-bucket/shared_data/xiuan/final_language_count.csv"
result_pd = result_pd.to_csv(bucket_write)

In [None]:
result_pd.head(5)

In [None]:


# Step 5: Parse dates and aggregate
result_pd["author_commit_date"] = pd.to_datetime(result_pd["author_commit_date"])
result_pd["year_month"] = result_pd["author_commit_date"].dt.to_period("M")  # Group by year and month

# Aggregate data by language and time
trend_data = result_pd.groupby(["year_month", "language"]).agg({
    "language_bytes": "sum"
}).reset_index()

# Step 6: Save aggregated data to Pandas for plotting
import matplotlib.pyplot as plt
import seaborn as sns

# Prepare data for plotting
top_languages = trend_data.groupby("language")["language_bytes"].sum().nlargest(5).index  # Top 5 languages
trend_plot_data = trend_data[trend_data["language"].isin(top_languages)]

# Plot trends
plt.figure(figsize=(14, 8))
sns.lineplot(
    data=trend_plot_data,
    x="year_month",
    y="language_bytes",
    hue="language"
)
plt.title("Trends of Most Popular Programming Languages Over Time")
plt.xlabel("Year-Month")
plt.ylabel("Total Bytes")
plt.legend(title="Language")
plt.grid(True)
plt.xticks(rotation=45)
plt.show()


# License distribution
What is the distribution of licenses across GitHub repositories?
Any certain programming languages that are more likely to be associated with a particular license?

In [None]:
license_count = license_spDf.withColumn("Count",count('License'))
license_count.to_Panda()