In [1]:
'''
runs!
'''

from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import BooleanType, StringType
import os
import matplotlib.pyplot as plt
import pandas as pd


# Initialize Spark session
spark = SparkSession.builder.appName("GitHub Analysis").getOrCreate()

# Define paths for each dataset
data_paths = {
    "commits": "gs://msca-bdp-data-open/final_project_git/commits",
    "contents": "gs://msca-bdp-data-open/final_project_git/contents",
    "files": "gs://msca-bdp-data-open/final_project_git/files",
    "languages": "gs://msca-bdp-data-open/final_project_git/languages",
    "licenses": "gs://msca-bdp-data-open/final_project_git/licenses",
}

# Dictionary to store loaded dataframes
dataframes = {}

# Define an output directory to save analysis results
output_dir = "github_data_output"
os.makedirs(output_dir, exist_ok=True)

# Function to load data with sampling
def load_data(dataset, path, sample_fraction=0.1):
    try:
        print(f"Loading {dataset} dataset...")
        df = spark.read.parquet(path)

        # Specific sampling for certain datasets
        if dataset == "languages" or dataset == "licenses":
            df = df.sample(fraction=sample_fraction * 10, seed = 42)
        else:
            df = df.sample(fraction=sample_fraction, seed=42)

        dataframes[dataset] = df
        print(f"{dataset} dataset loaded successfully.")
    except Exception as e:
        print(f"Error loading {dataset} dataset: {e}")

# Load all datasets
for dataset, path in data_paths.items():
    load_data(dataset, path)

Loading commits dataset...


                                                                                

commits dataset loaded successfully.
Loading contents dataset...
contents dataset loaded successfully.
Loading files dataset...
files dataset loaded successfully.
Loading languages dataset...


                                                                                

languages dataset loaded successfully.
Loading licenses dataset...
licenses dataset loaded successfully.


In [2]:
'''
runs!
'''

# --- Data Cleaning ---
def clean_commits(df):
    if df is None: return None
    print("Cleaning commits dataset...")
    df = df.filter(F.col("author.time_sec") > 0)
    df = df.filter(~F.exists(F.col("difference"), lambda x: (x.old_path.isNull() & x.new_path.isNotNull()) | (x.old_path.isNotNull() & x.new_path.isNull())))
    df = df.drop("trailer", "difference", "difference_truncated", "encoding")
    df = df.dropDuplicates(["commit"])
    return df

def clean_contents(df):
    if df is None: return None
    print("Cleaning contents dataset...")
    df = df.filter(F.col("id").isNotNull() & F.col("size").isNotNull())
    df = df.withColumn("copies_int", F.col("copies").cast("int"))
    df = df.filter(F.col("copies_int") > 0).drop("copies_int")
    df = df.withColumn("content", F.when(F.col("binary") == True, F.lit(None)).otherwise(F.col("content")))
    df = df.dropDuplicates(["id"])
    df = df.withColumn("binary", F.col("binary").cast("boolean"))
    return df

def clean_files(df):
    if df is None: return None
    print("Cleaning files dataset...")
    df = df.filter(F.col("repo_name").isNotNull() & F.col("path").isNotNull() & F.col("id").isNotNull())
    df = df.filter(F.col("mode") > 0).filter(F.length(F.col("path")) > 0)
    df = df.dropDuplicates(["id"])
    df = df.filter(F.col("mode").cast("string").rlike("^[0-9]+$"))
    df = df.withColumn("mode", F.col("mode").cast("long"))
    return df


if "commits" in dataframes:
    dataframes["commits"] = clean_commits(dataframes["commits"])
if "contents" in dataframes:
    dataframes["contents"] = clean_contents(dataframes["contents"])
if "files" in dataframes:
    dataframes["files"] = clean_files(dataframes["files"])

print("Data cleaning completed")

Cleaning commits dataset...
Cleaning contents dataset...
Cleaning files dataset...
Data cleaning completed


In [None]:
'''
runs!
'''


# --- Timeline Analysis ---

def analyze_timeline():
    if "commits" not in dataframes or dataframes["commits"] is None:
        print("Commits dataset missing. Skipping timeline analysis")
        return None
    try:
        print("Starting timeline analysis...")
        timeline_df = dataframes["commits"].select(F.from_unixtime(F.col("author.time_sec")).alias("commit_timestamp"))
        yearly_commits = timeline_df.filter(F.year("commit_timestamp") >= 2008).filter(F.year("commit_timestamp") <= 2022).groupBy(F.year("commit_timestamp").alias("year")).agg(F.count("*").alias("commit_count")).orderBy("year")
        monthly_commits_2016 = timeline_df.filter(F.year("commit_timestamp") == 2016).groupBy(F.month("commit_timestamp").alias("month")).agg(F.count("*").alias("commit_count")).orderBy("month")
       
        # Convert to Pandas for plotting
        pandas_yearly_commits = yearly_commits.withColumn("year", F.col("year").cast("string")).toPandas()
        pandas_monthly_commits_2016 = monthly_commits_2016.withColumn("month", F.col("month").cast("string")).toPandas()
        
        # Plotting
        plt.figure(figsize=(12, 6))
        plt.bar(pandas_yearly_commits['year'], pandas_yearly_commits['commit_count'], color = 'skyblue', label = "Yearly Commits")
        plt.plot(pandas_yearly_commits['year'], pandas_yearly_commits['commit_count'], "r-", label = "Trendline")
        plt.xlabel('Year', fontsize = 12)
        plt.ylabel('Number of Commits', fontsize = 12)
        plt.title("Commits Over Time (2008 - 2022)", fontsize = 14)
        plt.xticks(rotation=45, ha="right", fontsize = 10)
        plt.legend(fontsize = 10)
        plt.grid(axis = 'y', linestyle = '--', alpha = 0.7)
        plt.gca().spines['top'].set_visible(False)
        plt.gca().spines['right'].set_visible(False)
        plt.tight_layout()
        plt.show()

        plt.figure(figsize=(12, 6))
        plt.bar(pandas_monthly_commits_2016['month'], pandas_monthly_commits_2016['commit_count'], color = 'skyblue', label = "Monthly Commits (2016)")
        plt.plot(pandas_monthly_commits_2016['month'], pandas_monthly_commits_2016['commit_count'], "r-", label = "Trendline")
        plt.xlabel('Month', fontsize = 12)
        plt.ylabel('Number of Commits', fontsize = 12)
        plt.title("Commits in 2016 by Month", fontsize = 14)
        plt.xticks(rotation=45, ha="right", fontsize = 10)
        plt.legend(fontsize = 10)
        plt.grid(axis = 'y', linestyle = '--', alpha = 0.7)
        plt.gca().spines['top'].set_visible(False)
        plt.gca().spines['right'].set_visible(False)
        plt.tight_layout()
        plt.show()

        print("Timeline analysis completed")
    except Exception as e:
        print(f"Error in timeline analysis: {e}")

analyze_timeline()


In [None]:
'''
runs!
'''
# --- Language Trends ---
def analyze_language_trends():
    if "languages" not in dataframes or dataframes["languages"] is None:
        print("Languages dataset missing. Skipping language analysis.")
        return None
    try:
        print("Starting language trends analysis...")
        languages_df = dataframes["languages"]
        exploded_languages_df = languages_df.select(F.col("repo_name"), F.explode("language").alias("language_info"))
        language_summary_df = exploded_languages_df.select(F.col("repo_name"), F.col("language_info.name").alias("language"), F.col("language_info.bytes").alias("bytes"))
        total_bytes_per_language = language_summary_df.groupBy("language").agg(F.sum("bytes").alias("total_bytes")).orderBy(F.desc("total_bytes")).limit(10)
        language_distribution_df = language_summary_df.groupBy("language").agg(F.countDistinct("repo_name").alias("repo_count")).orderBy(F.desc("repo_count")).limit(10)

        # Convert to Pandas for plotting
        pandas_language_summary = total_bytes_per_language.toPandas()
        pandas_language_distribution = language_distribution_df.toPandas()

        # Plotting
        plt.figure(figsize=(12, 6))
        plt.bar(pandas_language_summary['language'], pandas_language_summary['total_bytes'])
        plt.xlabel("Programming Language")
        plt.ylabel("Total Bytes")
        plt.title("Top 10 Languages by Total Bytes")
        plt.xticks(rotation=45, ha="right")
        plt.tight_layout()
        plt.show()

        plt.figure(figsize=(12, 6))
        plt.bar(pandas_language_distribution['language'], pandas_language_distribution['repo_count'])
        plt.xlabel("Programming Language")
        plt.ylabel("Number of Repos")
        plt.title("Top 10 Languages by Distribution")
        plt.xticks(rotation=45, ha="right")
        plt.tight_layout()
        plt.show()

        print("Language analysis completed")
    except Exception as e:
        print(f"Error in language analysis: {e}")

analyze_language_trends()

In [None]:
'''
runs!
'''

# --- License Distribution ---
def analyze_license_distribution():
    if "licenses" not in dataframes or dataframes["licenses"] is None:
        print("Licenses dataset missing. Skipping license analysis.")
        return None
    try:
        print("Starting license distribution analysis...")
        licenses_df = dataframes["licenses"]
        license_distribution_df = licenses_df.groupBy("license").agg(F.countDistinct("repo_name").alias("repo_count")).orderBy(F.desc("repo_count")).limit(10)

       # Convert to Pandas for plotting
        pandas_license_distribution = license_distribution_df.toPandas()

        # Plotting
        plt.figure(figsize=(12, 6))
        plt.bar(pandas_license_distribution['license'], pandas_license_distribution['repo_count'])
        plt.xlabel("License")
        plt.ylabel("Number of Repositories")
        plt.title("Top 10 License Distribution")
        plt.xticks(rotation=45, ha="right")
        plt.tight_layout()
        plt.show()

        print("License analysis completed.")
    except Exception as e:
        print(f"Error in license analysis: {e}")

analyze_license_distribution()

In [None]:
'''
runs! but unclear if correct
'''

# --- Repository Popularity ---
def analyze_repository_popularity():
    if "commits" not in dataframes or dataframes["commits"] is None:
        print("Commits dataset missing. Skipping repository popularity analysis.")
        return None
    try:
        print("Starting repository popularity analysis...")
        commits_df = dataframes["commits"]
        repo_popularity = commits_df.select(F.explode("repo_name").alias("repo")).groupBy("repo").agg(F.count("*").alias("commit_count")).orderBy(F.desc("commit_count")).limit(10)

       # Convert to Pandas for plotting
        pandas_repo_popularity = repo_popularity.toPandas()
        
        # Plotting
        plt.figure(figsize=(12, 6))
        plt.bar(pandas_repo_popularity['repo'], pandas_repo_popularity['commit_count'])
        plt.xlabel("Repository")
        plt.ylabel("Number of Commits")
        plt.title("Top 10 Most Popular Repositories")
        plt.xticks(rotation=45, ha="right")
        plt.tight_layout()
        plt.show()

        print("Repository popularity analysis completed.")
    except Exception as e:
        print(f"Error in repository popularity analysis: {e}")

analyze_repository_popularity()

In [None]:
# --- DS/AI Technology Tracking ---

'''
runs!
'''


def track_ds_ai_technologies():
    if "commits" not in dataframes or dataframes["commits"] is None:
        print("Commits dataset missing. Skipping DS/AI tracking")
        return None
    try:
        print("Starting DS/AI technology tracking...")
        ds_ai_keywords = ["machine learning", "data science", "ai", "neural network", "deep learning", "tensorflow", "pytorch", "scikit-learn"]
        
        def has_ds_ai_tech(repo_name, message):
            repo_name = repo_name.lower()
            message = message.lower()
            return any(keyword in repo_name or keyword in message for keyword in ds_ai_keywords)

        has_ds_ai_tech_udf = F.udf(has_ds_ai_tech, BooleanType())

        commits_df = dataframes["commits"]

        ds_ai_repos = commits_df.select(F.explode("repo_name").alias("repo"), "message", F.year(F.from_unixtime("author.time_sec")).alias("year"))
        ds_ai_repos = ds_ai_repos.sample(0.1, seed = 42) #Aggressive sample
        ds_ai_repos = ds_ai_repos.filter(has_ds_ai_tech_udf(F.col("repo"), F.col("message"))).groupBy("year").agg(F.count("*").alias("unique_ds_ai_repos")).orderBy("year")
        ds_ai_repos = ds_ai_repos.filter(F.col("year") >= 2008).filter(F.col("year") <= 2022)
       
       # Convert to Pandas for plotting
        pandas_ds_ai_repos = ds_ai_repos.withColumn("year", F.col("year").cast("string")).toPandas()
        
        # Plotting
        plt.figure(figsize=(12, 6))
        plt.bar(pandas_ds_ai_repos['year'], pandas_ds_ai_repos['unique_ds_ai_repos'], color = 'skyblue', label = "DS/AI Repos")
        plt.plot(pandas_ds_ai_repos['year'], pandas_ds_ai_repos['unique_ds_ai_repos'], "r-", label = "Trendline")
        plt.xlabel("Year", fontsize = 12)
        plt.ylabel("Number of Repositories", fontsize = 12)
        plt.title("DS/AI Repository Growth Over Time (2008 - 2022)", fontsize = 14)
        plt.xticks(rotation=45, ha="right", fontsize = 10)
        plt.legend(fontsize = 10)
        plt.grid(axis = 'y', linestyle = '--', alpha = 0.7)
        plt.gca().spines['top'].set_visible(False)
        plt.gca().spines['right'].set_visible(False)
        plt.tight_layout()
        plt.show()
        
        print("DS/AI technology tracking completed.")
    except Exception as e:
        print(f"Error in DS/AI technology tracking: {e}")

track_ds_ai_technologies()

In [None]:
'''
container node errors

'''

# --- Additional Technology Analysis ---
def analyze_tech_popularity():
    if "commits" not in dataframes or dataframes["commits"] is None:
        print("Commits dataset missing. Skipping technology analysis.")
        return None

    try:
        print("Starting technology popularity analysis...")
        tech_keywords = ["docker", "django", "spark", "redis"]

        def has_tech(repo_name, tech):
            repo_name = repo_name.lower()
            return tech in repo_name
        
        has_tech_udf = F.udf(has_tech, BooleanType())

        commits_df = dataframes["commits"]
        
        # Aggressive sampling BEFORE explode
        sampled_commits = commits_df.sample(fraction = 0.005, seed = 42) # Even more aggressive sampling

        # Overall counts
        overall_tech_repos = sampled_commits.select(F.explode("repo_name").alias("repo")).sample(fraction = 0.1, seed = 42)
        overall_counts = {tech: overall_tech_repos.filter(has_tech_udf(F.col("repo"), F.lit(tech))).count() for tech in tech_keywords}
        overall_counts_df = pd.DataFrame(list(overall_counts.items()), columns=['technology', 'repo_count'])

        # AI/DS counts
        ds_ai_keywords = ["machine learning", "data science", "ai", "neural network", "deep learning", "tensorflow", "pytorch"]
        def has_ds_ai_tech(repo_name, message):
            repo_name = repo_name.lower()
            message = message.lower()
            return any(keyword in repo_name or keyword in message for keyword in ds_ai_keywords)
            
        has_ds_ai_tech_udf = F.udf(has_ds_ai_tech, BooleanType())

        ds_ai_tech_repos = sampled_commits.select(F.explode("repo_name").alias("repo"), "message").sample(fraction = 0.1, seed = 42).filter(has_ds_ai_tech_udf(F.col("repo"), F.col("message")))
        
        ds_ai_counts = {tech: ds_ai_tech_repos.filter(has_tech_udf(F.col("repo"), F.lit(tech))).count() for tech in tech_keywords}
        ds_ai_counts_df = pd.DataFrame(list(ds_ai_counts.items()), columns=['technology', 'repo_count'])


        # AI/DS over time counts (Removed the plots, just keep the counts)
        ds_ai_tech_overtime = sampled_commits.select(
            "author.time_sec",
             F.explode("repo_name").alias("repo"),
            "message",
            F.year(F.from_unixtime("author.time_sec")).alias("year")
        ).filter(
             has_ds_ai_tech_udf(F.col("repo"), F.col("message"))
        ).filter(F.col("year") >= 2008).filter(F.col("year") <= 2022).sample(fraction = 0.1, seed = 42)

        yearly_tech_counts = {}
        for tech in tech_keywords:
           yearly_tech_counts[tech] = ds_ai_tech_overtime.filter(has_tech_udf(F.col("repo"), F.lit(tech))).groupBy("year").agg(F.count("*").alias("count")).orderBy("year").toPandas()
           
        
         # Plotting
        plt.figure(figsize=(12, 6))
        plt.bar(overall_counts_df['technology'], overall_counts_df['repo_count'], color = 'skyblue', label = "Total Repo Count")
        plt.xlabel('Technology', fontsize = 12)
        plt.ylabel('Number of Repositories', fontsize = 12)
        plt.title("Overall Technology Popularity", fontsize = 14)
        plt.xticks(rotation=45, ha="right", fontsize = 10)
        plt.legend(fontsize = 10)
        plt.grid(axis = 'y', linestyle = '--', alpha = 0.7)
        plt.gca().spines['top'].set_visible(False)
        plt.gca().spines['right'].set_visible(False)
        plt.tight_layout()
        plt.show()
        
        plt.figure(figsize=(12, 6))
        plt.bar(ds_ai_counts_df['technology'], ds_ai_counts_df['repo_count'], color = 'lightcoral', label = "DS/AI Repo Count")
        plt.xlabel('Technology', fontsize = 12)
        plt.ylabel('Number of Repositories (AI/DS)', fontsize = 12)
        plt.title("Technology Popularity in AI/DS Projects", fontsize = 14)
        plt.xticks(rotation=45, ha="right", fontsize = 10)
        plt.legend(fontsize = 10)
        plt.grid(axis = 'y', linestyle = '--', alpha = 0.7)
        plt.gca().spines['top'].set_visible(False)
        plt.gca().spines['right'].set_visible(False)
        plt.tight_layout()
        plt.show()
        
        print("Technology popularity analysis completed.")
        return yearly_tech_counts #Return the yearly tech counts in case they are needed.

    except Exception as e:
        print(f"Error in technology analysis: {e}")

analyze_tech_popularity()

In [None]:
# --- Commit Message Analysis ---
def analyze_commit_messages():
    if "commits" not in dataframes or dataframes["commits"] is None:
        print("Commits dataset missing. Skipping commit message analysis.")
        return None
    try:
        print("Starting commit message analysis...")
        commits_df = dataframes["commits"]

        def categorize_commit(message):
            message = message.lower()
            if "bug" in message or "fix" in message:
                return "Bug Fix"
            elif "feature" in message or "add" in message:
                return "New Feature"
            elif "refactor" in message or "improve" in message:
                return "Refactoring"
            elif "test" in message:
                return "Testing"
            else:
                return "Other"
        
        categorize_udf = F.udf(categorize_commit, StringType())
        
        commit_types = commits_df.select(categorize_udf(F.col("message")).alias("commit_type")).groupBy("commit_type").agg(F.count("*").alias("type_count")).orderBy(F.desc("type_count"))
        
        # Convert to Pandas for plotting
        pandas_commit_types = commit_types.toPandas()

        # Plotting
        plt.figure(figsize=(10,6))
        plt.bar(pandas_commit_types["commit_type"], pandas_commit_types["type_count"])
        plt.xlabel("Commit Type")
        plt.ylabel("Count")
        plt.title("Commit Types")
        plt.xticks(rotation=45, ha="right")
        plt.tight_layout()
        plt.show()
        print("Commit message analysis completed.")
    except Exception as e:
      print(f"Error in commit message analysis: {e}")


analyze_commit_messages()

In [None]:
def analyze_influential_committers():
    if "commits" not in dataframes or dataframes["commits"] is None:
         print("Commits dataset missing. Skipping committer analysis.")
         return None
    try:
      print("Starting committer analysis...")
      commits_df = dataframes["commits"]

      # Use coalesce to replace null author names and emails with placeholder
      top_committers = commits_df.groupBy(
          F.coalesce(F.col("author.name"), F.lit("Unknown Author")).alias("author_name"),
          F.coalesce(F.col("author.email"), F.lit("Unknown Email")).alias("author_email")
      ).agg(F.count("*").alias("commit_count")).orderBy(F.desc("commit_count")).limit(10)
      
       # Convert to Pandas for plotting
      pandas_top_committers = top_committers.toPandas()

        # Plotting
      plt.figure(figsize=(10,6))
      plt.bar(pandas_top_committers["author_name"], pandas_top_committers["commit_count"])
      plt.xlabel("Committer")
      plt.ylabel("Commit Count")
      plt.title("Top 10 Committers")
      plt.xticks(rotation=45, ha="right")
      plt.tight_layout()
      plt.show()

      print("Committer analysis completed.")
    except Exception as e:
        print(f"Error in committer analysis: {e}")
        
analyze_influential_committers()

In [None]:
'''
container killed errors
24/12/13 23:05:59 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 2380.0 in stage 34.0 (TID 13108) (hub-msca-bdp-dphub-students-jdigiovanni-sw-6n81.c.msca-bdp-students.internal executor 7): ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Container from a bad node: container_1734114672005_0014_01_000008 on host: hub-msca-bdp-dphub-students-jdigiovanni-sw-6n81.c.msca-bdp-students.internal. Exit status: 143. Diagnostics: [2024-12-13 23:05:59.131]Container killed on request. Exit code is 143
[2024-12-13 23:05:59.132]Container exited with a non-zero exit code 143. 
[2024-12-13 23:05:59.132]Killed by external signal


after a while: Approximate duplication ratio of commit messages: 0.13758643775426194




Approximate duplication ratio of commit messages: 0.08948844463774197
Text similarity analysis completed.
'''

# --- Text Similarity Analysis ---
def analyze_message_similarity():
    if "commits" not in dataframes or dataframes["commits"] is None:
        print("Commits dataset missing. Skipping message similarity analysis.")
        return None
    try:
        print("Starting text similarity analysis...")
        commits_df = dataframes["commits"]
        
        # Sample the data first
        sampled_commits_df = commits_df.sample(fraction = 0.05, seed = 42)

        # Simplify: Just get a count of unique messages
        unique_messages_count = sampled_commits_df.select("message").distinct().count()
        total_messages_count = sampled_commits_df.count()
        duplication_ratio = 1.0 - (unique_messages_count / total_messages_count) if total_messages_count > 0 else 0.0


        print(f"Approximate duplication ratio of commit messages: {duplication_ratio}")
        print("Text similarity analysis completed.")
    except Exception as e:
        print(f"Error in text similarity analysis: {e}")

analyze_message_similarity()

In [None]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, MinHashLSH
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, DoubleType
import matplotlib.pyplot as plt
import pandas as pd


def analyze_message_similarity():
    if "commits" not in dataframes or dataframes["commits"] is None:
        print("Commits dataset not loaded or cleaned. Skipping text similarity analysis.")
        return None

    try:
        print("Starting text similarity analysis...")
        commits_df = dataframes["commits"]

        # Limit data for performance reasons
        sample_commits_df = commits_df.sample(fraction=0.01, seed=42) #More aggressive sampling

        # 1. Tokenization and TF-IDF
        tokenizer = Tokenizer(inputCol="message", outputCol="words")
        wordsData = tokenizer.transform(sample_commits_df)
        hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=1000)
        featurizedData = hashingTF.transform(wordsData)
        idf = IDF(inputCol="rawFeatures", outputCol="features")
        idfModel = idf.fit(featurizedData)
        rescaledData = idfModel.transform(featurizedData)

        # Filter out zero vectors
        filteredData = rescaledData.filter(F.size(F.col("features.values")) > 0)
        
        # Extract the values array as a new column
        filteredData = filteredData.withColumn("features_vector", F.col("features.values"))

        # 2. LSH
        mh = MinHashLSH(inputCol="features_vector", outputCol="hashes", numHashTables=5)
        model = mh.fit(filteredData)
        hashedData = model.transform(filteredData)
        
        # 3. Self Join to get similar messages
        hashedData.createOrReplaceTempView("hashed_messages")
        
        similar_messages = spark.sql("""
            SELECT
                m1.message AS message1,
                m2.message AS message2,
                m1.hashes,
                m2.hashes
            FROM hashed_messages m1
            CROSS JOIN hashed_messages m2
            WHERE m1.message != m2.message
        """)
        
        # Calculate the jaccard distance
        similar_messages = model.approxSimilarityJoin(hashedData, hashedData, 0.5, distCol="similarity").select(
            F.col("datasetA.message").alias("message1"),
            F.col("datasetB.message").alias("message2"),
             F.col("similarity").alias("similarity")
         )

        # Convert to Pandas DataFrame and plot distribution
        pandas_similar_messages = similar_messages.toPandas()

        plt.figure(figsize=(10,6))
        plt.hist(pandas_similar_messages['similarity'], bins = 20, edgecolor='black')
        plt.xlabel("Similarity Score", fontsize = 12)
        plt.ylabel("Frequency", fontsize = 12)
        plt.title("Distribution of Jaccard Similarities", fontsize = 14)
        plt.grid(axis = 'y', linestyle = '--', alpha = 0.7)
        plt.gca().spines['top'].set_visible(False)
        plt.gca().spines['right'].set_visible(False)
        plt.tight_layout()
        plt.show()

        print(f"Message similarities analysis completed")
        return similar_messages
    except Exception as e:
        print(f"Error in text similarity analysis: {e}")
        return None

# Run the analysis
similar_messages = analyze_message_similarity()

In [None]:
def analyze_language_trends_over_time():
    if "commits" not in dataframes or dataframes["commits"] is None or "languages" not in dataframes or dataframes["languages"] is None:
        print("Commits or Languages dataset missing. Skipping language trends over time analysis.")
    try:
        print("Starting language trends over time analysis...")

        commits_df = dataframes["commits"]
        languages_df = dataframes["languages"]

        # Sample commits and languages dataframes
        sampled_commits = commits_df.sample(fraction = 0.005, seed = 42) # Aggressive sampling
        sampled_languages = languages_df.sample(fraction = 0.1, seed = 42)

        # Explode language data
        exploded_languages = sampled_languages.select("repo_name", F.explode("language").alias("language_info"))
        language_summary = exploded_languages.select("repo_name", F.col("language_info.name").alias("language"))


        # Prepare a DataFrame with year of the commit
        commit_year = sampled_commits.select(F.explode("repo_name").alias("repo"), F.year(F.from_unixtime("author.time_sec")).alias("year"))


        # Join commits with language data on repo name
        joined_df = commit_year.join(language_summary, commit_year.repo == language_summary.repo_name, "inner")

        # Group by language and year, count number of repos
        language_over_time = joined_df.groupBy("language", "year").agg(F.countDistinct("repo").alias("repo_count")).orderBy("year", "language")
        
        pandas_language_over_time = language_over_time.toPandas()
        
        # Get the top languages to track
        top_languages = language_summary.groupBy("language").agg(F.count("*").alias("count")).orderBy(F.desc("count")).limit(5).toPandas()
        top_lang_list = top_languages["language"].tolist()

         # Plotting
        plt.figure(figsize=(12, 6))
        for language in top_lang_list:
            data = pandas_language_over_time[pandas_language_over_time["language"] == language]
            plt.plot(data["year"].astype(str), data["repo_count"], marker = "o", linestyle = "-", label = language)
        
        plt.xlabel("Year", fontsize = 12)
        plt.ylabel("Number of Repositories", fontsize = 12)
        plt.title("Language Popularity Over Time", fontsize = 14)
        plt.xticks(rotation=45, ha="right", fontsize = 10)
        plt.legend(fontsize = 10)
        plt.grid(axis = 'y', linestyle = '--', alpha = 0.7)
        plt.gca().spines['top'].set_visible(False)
        plt.gca().spines['right'].set_visible(False)
        plt.tight_layout()
        plt.show()
        
        print("Language trends over time analysis completed.")
    except Exception as e:
        print(f"Error in language trends over time analysis: {e}")
        
analyze_language_trends_over_time()

In [None]:
def analyze_language_license_association():
    if "licenses" not in dataframes or dataframes["licenses"] is None or "languages" not in dataframes or dataframes["languages"] is None:
        print("Licenses or Languages dataset missing. Skipping language-license association analysis.")
        return None
    try:
        print("Starting language-license association analysis...")
        licenses_df = dataframes["licenses"].sample(fraction = 0.1, seed = 42)
        languages_df = dataframes["languages"].sample(fraction = 0.1, seed = 42)

        # Explode the language data
        exploded_languages = languages_df.select("repo_name", F.explode("language").alias("language_info"))
        language_summary = exploded_languages.select("repo_name", F.col("language_info.name").alias("language"))

        # Join licenses with language data based on repo name
        joined_df = licenses_df.join(language_summary, "repo_name", "inner")

        # Count licenses per language
        language_license_counts = joined_df.groupBy("language", "license").agg(F.count("*").alias("repo_count"))
        
        pandas_language_license_counts = language_license_counts.toPandas()
        
        #Get the top languages
        top_languages = language_summary.groupBy("language").agg(F.count("*").alias("count")).orderBy(F.desc("count")).limit(5).toPandas()
        top_lang_list = top_languages["language"].tolist()
        
        # Plotting - grouped bar chart
        plt.figure(figsize = (15,8))
        bar_width = 0.15
        
        licenses = pandas_language_license_counts['license'].unique()
        
        x = range(len(top_lang_list))
        
        for i, lic in enumerate(licenses):
          license_data = pandas_language_license_counts[pandas_language_license_counts['license'] == lic]
          
          #Create a dictionary for easy lookup
          license_counts = {row["language"]: row["repo_count"] for index, row in license_data.iterrows()}
          
          y = [license_counts.get(lang, 0) for lang in top_lang_list]
          
          plt.bar([pos + (bar_width * i) for pos in x], y, width = bar_width, label = lic)

        plt.xlabel('Programming Languages', fontsize = 12)
        plt.ylabel('Number of Repositories', fontsize = 12)
        plt.title('License Distribution per Programming Language (Top 5)', fontsize = 14)
        plt.xticks([pos + (bar_width * (len(licenses) - 1 )/2) for pos in x], top_lang_list, rotation = 45, ha="right", fontsize = 10)
        plt.legend(fontsize = 10)
        plt.grid(axis = 'y', linestyle = '--', alpha = 0.7)
        plt.gca().spines['top'].set_visible(False)
        plt.gca().spines['right'].set_visible(False)
        plt.tight_layout()
        plt.show()


        print("Language-license association analysis completed.")
    except Exception as e:
        print(f"Error in language-license association analysis: {e}")
        
analyze_language_license_association()

In [None]:
def analyze_message_similarity_by_language():
    if "commits" not in dataframes or dataframes["commits"] is None or "languages" not in dataframes or dataframes["languages"] is None:
        print("Commits or Languages dataset missing. Skipping message similarity by language analysis.")
        return None
    try:
        print("Starting message similarity by language analysis...")

        commits_df = dataframes["commits"]
        languages_df = dataframes["languages"]

        # Sample dataframes
        sampled_commits = commits_df.sample(fraction = 0.005, seed = 42)
        sampled_languages = languages_df.sample(fraction = 0.1, seed = 42)


        # Explode language data
        exploded_languages = sampled_languages.select("repo_name", F.explode("language").alias("language_info"))
        language_summary = exploded_languages.select("repo_name", F.col("language_info.name").alias("language"))

        # Prepare commit dataframe with repo and year
        commit_repo_year = sampled_commits.select(F.explode("repo_name").alias("repo"), "message")


        # Join commit and language on the repo name
        joined_df = commit_repo_year.join(language_summary, commit_repo_year.repo == language_summary.repo_name, "inner")

        # Get top 5 programming languages
        top_languages = language_summary.groupBy("language").agg(F.count("*").alias("count")).orderBy(F.desc("count")).limit(5).toPandas()
        top_lang_list = top_languages["language"].tolist()

        # Create a dictionary to store duplication ratios for each language
        duplication_ratios = {}
        
        # Calculate duplication ratio for each of the top 5 languages
        for language in top_lang_list:
            lang_df = joined_df.filter(F.col("language") == language)
            
             # Simplify: Just get a count of unique messages
            unique_messages_count = lang_df.select("message").distinct().count()
            total_messages_count = lang_df.count()
            duplication_ratio = 1.0 - (unique_messages_count / total_messages_count) if total_messages_count > 0 else 0.0
            
            duplication_ratios[language] = duplication_ratio

        # Plotting
        plt.figure(figsize=(10,6))
        plt.bar(duplication_ratios.keys(), duplication_ratios.values())
        plt.xlabel("Programming Language")
        plt.ylabel("Duplication Ratio")
        plt.title("Duplication of Commit Messages per Programming Language (Top 5)")
        plt.xticks(rotation=45, ha = "right")
        plt.tight_layout()
        plt.show()
        
        print("Message similarity by language analysis completed.")

    except Exception as e:
        print(f"Error in message similarity by language analysis: {e}")
        
analyze_message_similarity_by_language()

Starting message similarity by language analysis...




In [None]:
# --- Logging Data Quality ---
import os
from pyspark.sql import functions as F
from pyspark.sql.types import LongType, IntegerType, FloatType, DoubleType

log_dir = "github_data_logs"
os.makedirs(log_dir, exist_ok=True)

def save_schema_and_count(dataset_name, schema, row_count):
    with open(os.path.join(log_dir, f"{dataset_name}_schema.txt"), "w") as f:
        f.write("Schema:\n")
        f.write(schema + "\n")
        f.write(f"Row count: {row_count}\n")

for dataset, df in dataframes.items():
    if df is None:
        print(f"Skipping data quality checks for {dataset} because it was not loaded.")
        continue
    print(f"Processing {dataset} for data quality checks...")
    
    schema_str = df._jdf.schema().treeString()
    row_count = df.count()
    save_schema_and_count(dataset, schema_str, row_count)
    print(f"Schema and row count saved for {dataset}.")

    null_counts = df.select([(df[col].isNull().cast("int")).alias(col) for col in df.columns]) \
                    .agg(*[F.sum(col).alias(col) for col in df.columns]) \
                    .collect()[0].asDict()
                    
    with open(os.path.join(log_dir, f"{dataset}_nulls.txt"), "w") as f:
        f.write("NULL Counts:\n")
        f.write(str(null_counts))
    
    print(f"NULL counts saved for {dataset}.")
        
    numerical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, (LongType, IntegerType, FloatType, DoubleType))]
    if numerical_cols:
        stats = df.select(numerical_cols).describe().limit(5).toPandas()
        print(f"Basic statistics for {dataset} (limited to 5 rows):\n{stats}")

    print("="*50)