##Imports and Data extracts from GitHub-Archive

In [2]:
import requests
import gzip
import json
import traceback


from pyspark.sql import SQLContext
from pyspark.sql import functions as F
from pyspark.sql import types as T

###### Note: Since there was memory constraint in DataBricks Community environment, only 12 hours of data from 01-01-2015 is used for further analysis.

In [4]:
events_list = []


def extract_data():

  for hour in range(12):
    url = 'http://data.gharchive.org/2015-01-01-'+str(hour)+'.json.gz'
    r = requests.get(url)
    file_path = "./"+url.split("/")[-1]

    with open(file_path, 'wb') as f:
        f.write(r.content)

    with gzip.open(file_path, 'rb') as f:
        for line in f:
            event = json.loads(line.decode("utf-8"))
            events_list.append(event)
  return


In [5]:
extract_data()

In [6]:
print(type(events_list))
print(len(events_list))
print(type(events_list[0]))

###### Combined all 12 json.gz files into a single json and dumped in DBFS (DataBricks File System).

In [8]:
file_path = "/dbfs/mnt/2015-01-01.json"
    
with open(file_path, 'w+') as fout:
  json.dump(events_list, fout)

###### Loaded the json as PySpark Dataframe: master_df. It contains raw json dump.

In [10]:
file_path = "dbfs:/mnt/2015-01-01.json"
master_df = spark.read.format("json").load(file_path)
#master_df.show()

In [11]:
master_df.printSchema()

In [12]:
master_df.count()

## DataFrame 1 : main_df

In [14]:
main_df = master_df.select(F.col("actor.id").alias("actor_id"), 
                           F.col("org.id").alias("org_id"), 
                           F.col("id").alias("event_id"), 
                           F.col("created_at"), 
                           F.col("repo.id").alias("repo_id"), 
                           F.col("type").alias("event_type"))
main_df.show()

##### Data wrangling & exploration. Changed the datatypes of a few columns, split the timestamp into hour column, etc.

In [17]:
main_df = main_df.withColumn("created_time", F.split(F.col("created_at"), "T").getItem(1))
main_df = main_df.withColumn("created_hour", F.split(F.col("created_time"), ":").getItem(0)).drop("created_at").drop("created_time")
main_df = main_df.withColumn("actor_id", main_df["actor_id"].cast(T.LongType())).withColumn("org_id", main_df["org_id"].cast(T.LongType())).withColumn("event_id", main_df["event_id"].cast(T.LongType())).withColumn("repo_id", main_df["repo_id"].cast(T.LongType())).withColumn("created_hour", main_df["created_hour"].cast(T.IntegerType()))
print(main_df.show())
print(main_df.printSchema())

In [18]:
main_df.describe().show()

###### Imputed missing values from main_df column instead of Null or NaNs.

In [20]:
main_df = main_df.fillna({'actor_id': -1, 'org_id': -1, 'event_id': -1, 'repo_id': -1, 'event_type': "Missing", 'created_hour': -1})
main_df.show()

In [21]:
main_df.describe().show()

##### Data Aggregation on main_df

In [23]:
print("Unique Actors: ", main_df.select('actor_id').distinct().count())
print("Unique Orgs: ", main_df.select('org_id').distinct().count())
print("Unique Repos: ", main_df.select('repo_id').distinct().count())
print("Unique Event Types: ", main_df.select('event_type').distinct().count())
event_types = [i for i in main_df.select('event_type').distinct().collect()]
print(event_types)

In [24]:
main_df.groupby("event_type").agg(F.count("event_id")).show()

In [25]:
print((main_df.filter(main_df.org_id != -1).select('actor_id').distinct().count()/main_df.select('actor_id').distinct().count())*100)

In [26]:
print((main_df.filter(main_df.org_id != -1).select('repo_id').distinct().count()/main_df.select('repo_id').distinct().count())*100)

## DataFrame 2: org_df

In [28]:
org_df = master_df.select(F.col("org.id"), F.col("org.login"), F.col("org.url")).dropDuplicates(["id"])

###### cleaning data and imputing missing values of org_df

In [30]:
org_df.printSchema()

In [31]:
org_df.describe().show()

In [32]:
org_df.sample(fraction = 0.010).show()

In [33]:
org_df = org_df.fillna({'id': -1, 'login': "Missing", 'url': "Missing"})

## DataFrame 3: payload_df

In [35]:
payload_df = master_df.select(F.col("id").alias("event_id"), F.col("payload.*"))
payload_df.printSchema()

##### select the fields of interest from payload_df and write them to a seperate dataframe

In [37]:
payload_df.show()

In [38]:
payload_df.describe().show()

## DataFrame 4: repo_df

In [40]:
repo_df = payload_df.select(F.col("event_id").alias("event_id"),
                            F.col("pull_request.base.repo.name").alias("repo_name"),
                            F.col("pull_request.base.repo.id").alias("repo_id"),
                            F.col("pull_request.base.repo.watchers_count"),
                            F.col("pull_request.base.repo.forks_count"),
                            F.col("pull_request.base.repo.language"),
                            F.col("pull_request.id").alias("pull_req_id")
                            ).dropna()

##### Extract all details related to a repository and the pull requests to the repository from payload_df dataframe

In [42]:
repo_df.show()

In [43]:
repo_df.printSchema()

In [44]:
repo_df.describe().show()

###### Cleaning the repo_df by casting the fields to appropriate datatypes

In [46]:
repo_df = repo_df.withColumn("event_id", repo_df["event_id"].cast(T.LongType()))
repo_df.printSchema()

##### Aggregating and exploring the repo_df

In [48]:
print("Unique Pull Reqs: ", repo_df.select(F.col("pull_req_id")).distinct().count())
print("Unique Repos: ", repo_df.select(F.col("repo_id")).distinct().count())
print("Unique Events: ", repo_df.select(F.col("event_id")).distinct().count())
print("Unique Repos: ", repo_df.select(F.col("repo_name")).distinct().count())
[ i for i in repo_df.select(F.col("language")).distinct().collect()]

## DataFrame 5: Top500_repos_df

###### Creating a dataframe to contain top500_repos_df by joining repo_df, main_df, org_df and using watchers_count and forks_count fields

In [51]:
print(repo_df.count())
print(main_df.count())

In [52]:
repo_org_df = main_df.select(F.col("repo_id"),
                            F.col("org_id")).dropDuplicates()
repo_org_df.count()

In [53]:
repo_df.createOrReplaceTempView("repo")
repo_org_df.createOrReplaceTempView("repo_org")
   
temp1 = spark.sql("select o.org_id, r.* from repo r INNER JOIN repo_org o ON r.repo_id == o.repo_id")

In [54]:
temp1.count()

In [55]:
temp1.createOrReplaceTempView("temp1")
org_df.createOrReplaceTempView("org")

temp2 = spark.sql("select o.login, t.* from temp1 t INNER JOIN org o ON t.org_id == o.id")

In [56]:
temp2.count()

In [57]:
temp2.show()

#### Aggregating the fields to identify the top500 repositories

In [59]:
top500_repos_df = temp2.where(F.col("login")!= "Missing").groupby("repo_id").agg({'watchers_count':'sum', 'forks_count':'sum'}).withColumnRenamed('sum(watchers_count)','watchers_count').withColumnRenamed('sum(forks_count)', 'forks_count').orderBy('forks_count', ascending = False).orderBy('watchers_count', ascending = False).limit(500)
top500_repos_df.show()

In [60]:
print(type(top500_repos_df))

###### top10_orgs_df is created by adding organization details to top500_repos_df and aggregating it by organization

In [62]:
top500_repos_df.createOrReplaceTempView("top500_repos_df")
temp2.createOrReplaceTempView("temp2")

temp4 = spark.sql("select t2.login, t1.* from top_repo_df t1 INNER JOIN temp2 t2 ON t1.repo_id == t2.repo_id")

In [63]:
top10_orgs_df = temp4.groupby("login").agg({'repo_id':'count'}).withColumnRenamed('count(repo_id)','popular_repo_count').orderBy('popular_repo_count', ascending = False).limit(10)
top10_orgs_df.show()

###### Dumping the DataFrames as csv files to create visualizations in Tableau.

In [65]:
main_df.write.csv('/dbfs/mnt/main.csv')
repo_df.write.csv('/dbfs/mnt/repo.csv')
org_df.write.csv('/dbfs/mnt/org.csv')
top500_repos_df.write.csv('/dbfs/mnt/top500_repos.csv')
top10_orgs_df.write.csv('/dbfs/mnt/top10_orgs.csv')

In [66]:
main_df.createOrReplaceTempView("main_table")
repo_df.createOrReplaceTempView("repo_table")
org_df.createOrReplaceTempView("org_table")
top500_repos_df.createOrReplaceTempView("top500_repos_table")
top10_orgs_df.createOrReplaceTempView("top10_orgs_table")

#### Question 1: 

###### There are close to 20+ types of GitHub events possible (as per the GitHub archive data). In the data we analyzed, there are 14 distinct events that happened over the span of 12 hours. But what are the top 5 events by volume? Does the top 5 events change across org_users and individual_users?

#### Question 2: 

###### There are two types of user segments in GitHub-Archive data (The archive doesn't contain any details about GitHub Enterprise). Org_users who are a part of an organization pay for collaborating with their team through GitHub. Individual users are other type of users who do not pay for the GitHub service. 

###### How different is the event activity volume amongst org_users and individual_users? Do org_users trigger more events or individual users?

#### Question 3: 

###### What are the most popular repositories in GitHub? How do we decide the popular repositories? Although not complete, we use "watchers_count" and "forks_count" as a measure of popular repositories. And, which organizations maintain those repositories?