In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import round, col, sum as spark_sum, lit, count
from pyspark.sql.types import StructType, StructField, StringType, LongType
from sqlalchemy import create_engine
import pandas as pd
import os
import functools

In [2]:
# PostgreSQL connection details
POSTGRES_URL = "jdbc:postgresql://postgres:5432/github_repos"
POSTGRES_USER = "admin"
POSTGRES_PASSWORD = "password"

# Create Spark Session
spark = SparkSession.builder \
    .appName("GitHubReposAnalysis") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.5.0") \
    .getOrCreate()

# Path to JSON files (Mounted in Docker)
json_path = "/home/jovyan/data"

In [3]:
# Define JSON schema
github_repo_schema = StructType([
    StructField("id", LongType(), True),
    StructField("repo_name", StringType(), True),
    StructField("full_name", StringType(), True),
    StructField("description", StringType(), True),
    StructField("created", StringType(), True),
    StructField("language", StringType(), True),
    StructField("type", StringType(), True),
    StructField("username", StringType(), True),
    StructField("stars", LongType(), True),
    StructField("forks", LongType(), True),
    StructField("subscribers", LongType(), True),
    StructField("open_issues", LongType(), True),
    StructField("topics", StringType(), True),
    StructField("search_term", StringType(), True)
])

# Function to read all JSON files
def read_json_files(path):
    files = [f for f in os.listdir(path) if f.endswith('.json')]
    if not files:
        raise ValueError("No JSON files found in the specified path")

    dfs = []
    for file in files:
        search_term = file.replace('.json', '')
        try:
            df = spark.read.schema(github_repo_schema).json(os.path.join(path, file))
            df = df.withColumn("search_term", lit(search_term))
            dfs.append(df)
        except Exception as e:
            print(f"Error reading file {file}: {e}")
    
    return functools.reduce(DataFrame.unionByName, dfs) if dfs else None

# Load JSON data
repos_df = read_json_files(json_path)

if repos_df is None:
    print("No data loaded, exiting.")
    exit()

# Show schema and sample rows
repos_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- repo_name: string (nullable = true)
 |-- full_name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- created: string (nullable = true)
 |-- language: string (nullable = true)
 |-- type: string (nullable = true)
 |-- username: string (nullable = true)
 |-- stars: long (nullable = true)
 |-- forks: long (nullable = true)
 |-- subscribers: long (nullable = true)
 |-- open_issues: long (nullable = true)
 |-- topics: string (nullable = true)
 |-- search_term: string (nullable = false)



In [4]:
repos_df.show(5)

+--------+--------------------+--------------------+-----------------------------+-------------------+----------+------------+-----------+-----+-----+-----------+-----------+--------------------+-----------+
|      id|           repo_name|           full_name|                  description|            created|  language|        type|   username|stars|forks|subscribers|open_issues|              topics|search_term|
+--------+--------------------+--------------------+-----------------------------+-------------------+----------+------------+-----------+-----+-----+-----------+-----------+--------------------+-----------+
|14098069|free-programming-...|justjavac/free-pr...|:books: 免费的计算机编程类...|2013-11-04 01:59:19|      NULL|        User|  justjavac|87946|25155|       5917|         28|["python","javasc...|    Angular|
|24195339|             angular|     angular/angular|         The modern web de...|2014-09-18 16:12:01|TypeScript|Organization|    angular|80075|21048|       3109|       1598|["a

In [5]:
# 1. Programming Languages Table
programming_lang_df = repos_df.groupBy("language") \
    .agg(
        col("language").alias("language_name"),
        count("*").alias("repo_count")
    ).filter(col("language").isNotNull()) \
    .select("language_name", "repo_count")

programming_lang_df.show()

+-------------+----------+
|language_name|repo_count|
+-------------+----------+
|           C#|       336|
|         Less|         5|
|   JavaScript|      5293|
|         SCSS|        31|
|   Emacs Lisp|        18|
|         Dart|       855|
|          Vue|       113|
|   Dockerfile|       114|
|  Objective-C|       192|
|   ApacheConf|         4|
|        Swift|        84|
|   TypeScript|      2816|
|          CSS|       227|
|       Elixir|        16|
|         HTML|       524|
|        Scala|      1178|
|          PHP|       159|
|       Kotlin|       830|
|           Go|      1868|
|          Pug|         1|
+-------------+----------+
only showing top 20 rows



In [7]:
# 2. Organizations Stars Table
organizations_stars_df = repos_df.filter(col("type") == "Organization") \
    .groupBy("username") \
    .agg(
        col("username").alias("organization_name"),
        spark_sum("stars").alias("total_stars")
    ).select("organization_name", "total_stars")

organizations_stars_df.show()

+-----------------+-----------+
|organization_name|total_stars|
+-----------------+-----------+
|            afklm|        355|
|          ngParty|        355|
|       angular-ui|      55925|
|     InfomediaLtd|        416|
|         swimlane|      15526|
|            akveo|      79256|
|        ui-router|        305|
|          getmeli|       4318|
|         TrilonIO|       4104|
|        scotch-io|       2861|
|            oppia|       4505|
|        microsoft|    1011553|
|        getsentry|      60685|
|          AzureAD|        731|
|         prettier|      88444|
|angular-fullstack|      12269|
|              opf|       5527|
|         compodoc|       8396|
|     chromelyapps|       2793|
|  testing-library|      27022|
+-----------------+-----------+
only showing top 20 rows



In [8]:
# 3. Search Terms Relevance Table (Rounded)
search_terms_relevance_df = repos_df.groupBy("search_term") \
    .agg(
        round(spark_sum(1.5 * col("forks") + 1.32 * col("subscribers") + 1.04 * col("stars")), 2).alias("relevance_score")
    )

search_terms_relevance_df.show()

+----------------+---------------+
|     search_term|relevance_score|
+----------------+---------------+
|         Angular|     2888727.66|
|             Cpp|     4377811.44|
|            Dart|     1028218.32|
|   Deep-Learning|     6559100.02|
|          Django|     1488257.16|
|          Docker|     4403817.56|
|        ethereum|     1157287.26|
|           Flask|      884822.12|
|         Flutter|           NULL|
|          Gatsby|      361724.48|
|          Golang|     5407407.26|
|          Hadoop|       584015.9|
|           Julia|      384773.28|
|          Kotlin|      1936763.9|
|      Kubernetes|     3540183.02|
|Machine-Learning|      6925617.8|
|          NextJS|      631042.74|
|          NodeJS|     4331014.68|
|            OOPs|       23416.26|
|         PyTorch|     3357329.08|
+----------------+---------------+
only showing top 20 rows



In [9]:
# Function to write DataFrame to PostgreSQL
def write_to_postgres(df, table_name):
    df.write \
        .format("jdbc") \
        .option("url", POSTGRES_URL) \
        .option("dbtable", table_name) \
        .option("user", POSTGRES_USER) \
        .option("password", POSTGRES_PASSWORD) \
        .option("driver", "org.postgresql.Driver") \
        .mode("overwrite") \
        .save()

In [10]:
# Write data to PostgreSQL tables
write_to_postgres(programming_lang_df, "programming_lang")
write_to_postgres(organizations_stars_df, "organizations_stars")
write_to_postgres(search_terms_relevance_df, "search_terms_relevance")

print("✅ Data successfully written to PostgreSQL!")

✅ Data successfully written to PostgreSQL!


In [11]:
# Use SQLAlchemy to connect and read data into Pandas
db_url = "postgresql://admin:password@postgres:5432/github_repos"
engine = create_engine(db_url)

# Read search terms relevance table
query = "SELECT * FROM search_terms_relevance;"
df = pd.read_sql(query, engine)

# Display results
df.head()

Unnamed: 0,search_term,relevance_score
0,Angular,2888727.66
1,Cpp,4377811.44
2,Dart,1028218.32
3,Deep-Learning,6559100.02
4,Django,1488257.16


In [12]:
# Read search terms relevance table
query = "SELECT * FROM programming_lang;"
df = pd.read_sql(query, engine)

# Display results
df.head()

Unnamed: 0,language_name,repo_count
0,C#,336
1,Less,5
2,JavaScript,5293
3,SCSS,31
4,Emacs Lisp,18


In [13]:
# Read search terms relevance table
query = "SELECT * FROM organizations_stars;"
df = pd.read_sql(query, engine)

# Display results
df.head()

Unnamed: 0,organization_name,total_stars
0,afklm,355
1,ngParty,355
2,angular-ui,55925
3,InfomediaLtd,416
4,swimlane,15526
