Importing libraries

In [1]:
import numpy as np 
import pandas as pd
import urllib.request
import requests
import gzip
import json


import pyspark
from pyspark.sql import DataFrame as Sdf2
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

Getting Data

In [2]:
for j in range(1,4):
    for i in range (0,24):
        response=requests.get('https://data.gharchive.org/2015-01-0'+str(j)+'-'+str(i)+'.json.gz',stream=True)
        if response.status_code == 200:
            with open('dataPS.json.gz', 'ab') as f:
                f.write(response.raw.read())

KeyboardInterrupt: 

Starting Spark Session

In [None]:
spark =SparkSession \
.builder \
.appName("FlatJson") \
.master("local[*]").getOrCreate()
sc=spark.sparkContext

df=spark.read.json('dataPS.json.gz')

Flattening Json Data Dynamically

In [None]:

#Flatten array of structs and structs
def flatten(df2):
   # compute Complex Fields (Lists and Structs) in Schema   
   complex_fields = dict([(field.name, field.dataType)
                             for field in df2.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   while len(complex_fields)!=0:
      col_name=list(complex_fields.keys())[0]
      # print ("Processing :"+col_name+" Type : "+str(type(complex_fields[col_name])))
    
      # if StructType then convert all sub element to columns.
      # i.e. flatten structs
      if (type(complex_fields[col_name]) == StructType):
         expanded = [col(col_name+'.'+k).alias(col_name+'_'+k) for k in [ n.name for n in  complex_fields[col_name]]]
         df2=df2.select("*", *expanded).drop(col_name)
    
      # if ArrayType then add the Array Elements as Rows using the explode function
      # i.e. explode Arrays
      elif (type(complex_fields[col_name]) == ArrayType):    
         df2=df2.withColumn(col_name,explode_outer(col_name))
    
      # recompute remaining Complex Fields in Schema       
      complex_fields = dict([(field.name, field.dataType)
                             for field in df2.schema.fields
                             if type(field.dataType) == ArrayType or  type(field.dataType) == StructType])
   return df2

df=flatten(df)

How often diff event-types occurred in the extracted data? (Using PySparks)

In [None]:
df.groupby('type').count().show()


Giving Label to User Based on whether they are using through an organisation or not(Using PySparks)

In [None]:
df=df.withColumn("User_Group",
                     when((df.org_id.isNull()), lit("Individual_users")).otherwise(lit("Org_User")))

Events distribution for OrgUsers and Individual user (Using PySpark)

In [None]:
Event_Distribution=df.groupby(['User_Group','type']).count()
Event_Distribution.groupby('User_Group').pivot('type').sum('count').show()

What is the popular Languages on Github, based on extracted data?(Using PySpark)

In [None]:
df.groupby('payload_pull_request_base_repo_language').count().show()

Do org_users trigger more events or individual users? (Using PySpark)

In [None]:
df.groupby('User_Group').count().show()

Programming Languages Used by the User Groups(using PySpark)

In [None]:
Language_User_Group=df.groupby(['User_Group','payload_pull_request_base_repo_language']).count()
Language_User_Group.groupby('payload_pull_request_base_repo_language').pivot('User_Group').sum('count').show()

No of People Working on Multiple Reposistories {Individuals/Organisational}(Using PySpark)

In [None]:
multiple_repo_actors_PS=df.groupby(['User_Group','actor_id','repo_id']).count().groupby(['User_Group','actor_id']).count()
multiple_repo_actors_PS.filter(multiple_repo_actors_PS['count']>1).groupby('User_Group').count().show()

Average (Commits vs Distinct) Commit per Push Events for User Type (PySpark)

In [None]:
avg_commits=df.filter(df['type']=='PushEvent').select(['User_Group','payload_size','payload_distinct_size']).groupby('User_Group').mean().show()

Top Organisation to target for GithubPro(Calculated using No of Events) (Using Pyspark)

In [None]:
df.filter(df.org_login.isNotNull()).groupby('org_login').count().sort('count', ascending=False).show(15)


Top Organisation to target for GithubPro(Calculated using No of Distinct Users)(Using PySparks)

In [None]:
df.filter(df.org_login.isNotNull()).select(['org_login', 'actor_id','type']).groupby(['org_login','actor_id']).\
                                    count().select(['org_login', 'actor_id']).groupby('org_login').count()\
                                    .sort('count', ascending=False).show()

Checking Number of Rows

In [None]:
df.count()

Most Starred Repo (PySpark)

In [None]:
df.filter(df['type']=='WatchEvent').groupby('repo_name').count().sort('count',ascending=False).show()