In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("GitHub Data Processing") \
    .config("spark.driver.extraClassPath", "D:/Coding/Data Engineering/Mentorship/Github Most Popular repos/spark_jars/mysql-connector-j-9.2.0.jar").getOrCreate()

In [2]:
df = spark.read.json("archive/*.json")  # Replace with actual file path
df.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- created: string (nullable = true)
 |-- description: string (nullable = true)
 |-- forks: long (nullable = true)
 |-- full_name: string (nullable = true)
 |-- id: long (nullable = true)
 |-- language: string (nullable = true)
 |-- open_issues: long (nullable = true)
 |-- repo_name: string (nullable = true)
 |-- stars: long (nullable = true)
 |-- subscribers: long (nullable = true)
 |-- topics: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- type: string (nullable = true)
 |-- username: string (nullable = true)



In [3]:
df.show(5)

+---------------+-------------------+--------------------+-----+--------------------+---------+--------+-----------+-----------------+-----+-----------+--------------------+------------+---------+
|_corrupt_record|            created|         description|forks|           full_name|       id|language|open_issues|        repo_name|stars|subscribers|              topics|        type| username|
+---------------+-------------------+--------------------+-----+--------------------+---------+--------+-----------+-----------------+-----+-----------+--------------------+------------+---------+
|           null|2014-02-25 08:00:08|Apache Spark - A ...|25357|        apache/spark| 17165658|   Scala|        242|            spark|32296|       2080|[python, scala, r...|Organization|   apache|
|           null|2017-08-09 19:39:59|Distributed train...| 2027|     horovod/horovod| 99846383|  Python|        298|          horovod|12219|        334|[tensorflow, uber...|Organization|  horovod|
|           nul

In [4]:
from pyspark.sql.functions import *

df_lang = df.groupBy("language").agg(count("*").alias("num_repos"))
df_lang = df_lang.sort(desc("num_repos"))
df_lang.show()

+----------------+---------+
|        language|num_repos|
+----------------+---------+
|      JavaScript|     5293|
|          Python|     4861|
|      TypeScript|     2816|
|              Go|     1868|
|            null|     1425|
|            Java|     1418|
|Jupyter Notebook|     1291|
|           Scala|     1178|
|             C++|      952|
|            Dart|      855|
|          Kotlin|      830|
|           Julia|      806|
|            HTML|      524|
|           Shell|      520|
|              C#|      336|
|               C|      321|
|             CSS|      227|
|     Objective-C|      192|
|            Ruby|      174|
|            Rust|      165|
+----------------+---------+
only showing top 20 rows



In [5]:
df_org = df.filter(col("type") == "Organization") \
    .groupBy("username") \
    .agg(sum("stars").alias("total_stars")) \
    .withColumnRenamed("username", "organization_name")

df_org = df_org.sort(desc("total_stars"))
df_org.show()


+-----------------+-----------+
|organization_name|total_stars|
+-----------------+-----------+
|        microsoft|    1011553|
|       tensorflow|     859765|
|         facebook|     627455|
|           apache|     433004|
|          angular|     352533|
|      huggingface|     306537|
|           google|     289449|
|          pytorch|     255202|
|      gothinkster|     218946|
|      storybookjs|     208451|
|       kubernetes|     206820|
|       keras-team|     195019|
|           vercel|     180974|
|          alibaba|     172746|
|          Tencent|     159319|
|          flutter|     159010|
|           nodejs|     144071|
|           docker|     142690|
|          ohmyzsh|     141449|
|             dmlc|     139827|
+-----------------+-----------+
only showing top 20 rows



In [6]:
df_relevance = df.withColumn(
    "relevance_score",
    expr("1.5 * forks + 1.32 * subscribers + 1.04 * stars")
).select("repo_name", "relevance_score")

df_relevance = df_relevance.sort(desc("relevance_score"))
df_relevance.show()

+--------------------+---------------+
|           repo_name|relevance_score|
+--------------------+---------------+
|          tensorflow|      309867.30|
|          tensorflow|      309857.60|
|          tensorflow|      309854.02|
|               react|      256810.22|
|   developer-roadmap|      246966.98|
|             ohmyzsh|      186208.60|
|             flutter|      179141.36|
|              vscode|      170980.94|
|              models|      147932.40|
|        react-native|      143317.90|
|        react-native|      143222.38|
|              opencv|      141480.16|
|          kubernetes|      141126.72|
|free-programming-...|      137006.78|
|free-programming-...|      137004.24|
|free-programming-...|      136770.88|
|    create-react-app|      136334.56|
|            three.js|      133498.50|
|          ant-design|      131601.74|
|                node|      127568.42|
+--------------------+---------------+
only showing top 20 rows



In [7]:
jdbc_url = "jdbc:mysql://localhost:3306/github_data"
db_properties = {
    "user": "root",
    "password": "0000",
    "driver": "com.mysql.cj.jdbc.Driver"
}

In [None]:
df_lang.write.jdbc(url=jdbc_url, table="programming_lang", mode="overwrite", properties=db_properties)
df_org.write.jdbc(url=jdbc_url, table="organizations_stars", mode="overwrite", properties=db_properties)
df_relevance.write.jdbc(url=jdbc_url, table="search_terms_relevance", mode="overwrite", properties=db_properties)