## Reading Git Final Project

In [2]:
import os
import subprocess
import datetime
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

from pyspark.sql.functions import *
from pyspark.sql.types import *

pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', None)

In [3]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

In [4]:
gcs_folder = 'gs://msca-bdp-data-open/final_project_git'

#### Check data size in GCS

In [5]:
cmd = 'gsutil du -s -h ' + gcs_folder

p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True)
for line in p.stdout.readlines():
    print (f'Total directory size: {line}')
    
retval = p.wait() # Wait for the child process to terminate.

Total directory size: 1.36 TiB     gs://msca-bdp-data-open/final_project_git



### Read Git data from GCS

#### Languages
Programming languages by repository as reported by GitHub's https://developer.github.com/v3/repos/#list-languages API

In [6]:
%%time   
    
df_languages = spark.read.parquet(os.path.join(gcs_folder, 'languages'))
print(f'Records read from dataframe *languages*: {df_languages.count():,.0f}')



Records read from dataframe *languages*: 3,325,634
CPU times: user 5.64 ms, sys: 10.5 ms, total: 16.1 ms
Wall time: 8.6 s


                                                                                

In [7]:
df_languages.printSchema()

root
 |-- repo_name: string (nullable = true)
 |-- language: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- bytes: long (nullable = true)



#### Licenses
Open source license SPDX code for each repository as detected by https://developer.github.com/v3/licenses/

In [8]:
%%time   
    
df_licenses = spark.read.parquet(os.path.join(gcs_folder, 'licenses'))
print(f'Records read from dataframe *licenses*: {df_licenses.count():,.0f}')

Records read from dataframe *licenses*: 3,325,634
CPU times: user 4.85 ms, sys: 169 µs, total: 5.02 ms
Wall time: 901 ms


In [9]:
df_licenses.printSchema()

root
 |-- repo_name: string (nullable = true)
 |-- license: string (nullable = true)



#### Commits
Unique Git commits from open source repositories on GitHub, pre-grouped by repositories they appear in.

In [10]:
%%time   
    
df_commits = spark.read.parquet(os.path.join(gcs_folder, 'commits'))
print(f'Records read from dataframe *commits*: {df_commits.count():,.0f}')



Records read from dataframe *commits*: 265,419,190
CPU times: user 1.08 s, sys: 257 ms, total: 1.34 s
Wall time: 4min 36s


                                                                                

In [11]:
df_commits.printSchema()

root
 |-- commit: string (nullable = true)
 |-- tree: string (nullable = true)
 |-- parent: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- author: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- email: string (nullable = true)
 |    |-- time_sec: long (nullable = true)
 |    |-- tz_offset: long (nullable = true)
 |    |-- date: struct (nullable = true)
 |    |    |-- seconds: long (nullable = true)
 |    |    |-- nanos: long (nullable = true)
 |-- committer: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- email: string (nullable = true)
 |    |-- time_sec: long (nullable = true)
 |    |-- tz_offset: long (nullable = true)
 |    |-- date: struct (nullable = true)
 |    |    |-- seconds: long (nullable = true)
 |    |    |-- nanos: long (nullable = true)
 |-- subject: string (nullable = true)
 |-- message: string (nullable = true)
 |-- trailer: array (nullable = true)
 |    |-- element: struct (contains

#### Contents
Unique file contents of text files under 1 MiB on the HEAD branch.  
Can be joined to `files` dataset using the id columns to identify the repository and file path.

In [None]:
%%time   
    
df_contents = spark.read.parquet(os.path.join(gcs_folder, 'contents'))
print(f'Records read from dataframe *commits*: {df_contents.count():,.0f}')



Records read from dataframe *commits*: 281,191,977
CPU times: user 2.08 s, sys: 477 ms, total: 2.56 s
Wall time: 8min 39s


                                                                                

In [None]:
df_contents.printSchema()

root
 |-- id: string (nullable = true)
 |-- size: long (nullable = true)
 |-- content: string (nullable = true)
 |-- binary: boolean (nullable = true)
 |-- copies: long (nullable = true)



#### Files
File metadata for all files at HEAD.  
Join with `contents` dataset on id columns to search text.

In [None]:
%%time   
    
df_files = spark.read.parquet(os.path.join(gcs_folder, 'files'))
print(f'Records read from dataframe *files*: {df_files.count():,.0f}')



Records read from dataframe *files*: 2,309,424,945
CPU times: user 341 ms, sys: 72.5 ms, total: 413 ms
Wall time: 1min 26s


                                                                                

In [None]:
df_files.printSchema()

root
 |-- repo_name: string (nullable = true)
 |-- ref: string (nullable = true)
 |-- path: string (nullable = true)
 |-- mode: long (nullable = true)
 |-- id: string (nullable = true)
 |-- symlink_target: string (nullable = true)



In [18]:
# Get a cleaned commits dataframe by selecting variables that are needed.
# Sort time after 2008 and before today
commits_sample = df_commits.filter((col("author.time_sec") > 1199167201) & (col("author.time_sec") < 1733594559)).sample(withReplacement=False, fraction=0.01, seed=42)
cleaned_c = commits_sample.select(col("commit"),
                                  col("author.name").alias("author_name"),
                                  col("author.time_sec").alias("timestamp"),
                                  col("subject"),
                                  col("message"),
                                  explode(col("repo_name")).alias("repo_name"))
cleaned_c = cleaned_c.withColumn('date', from_unixtime('timestamp').cast(DateType()))

In [19]:
cleaned_c.show(5)

[Stage 20:>                                                         (0 + 1) / 1]

+--------------------+--------------------+----------+--------------------+--------------------+--------------------+----------+
|              commit|         author_name| timestamp|             subject|             message|           repo_name|      date|
+--------------------+--------------------+----------+--------------------+--------------------+--------------------+----------+
|be41ab16c41b28840...|conda-forge-coord...|1596039929|Updated the yacro...|Updated the yacro...|conda-forge/feeds...|2020-07-29|
|7c87590a3c12094e1...|Яшина Александра ...|1408946175|[NA]: по письмо о...|[NA]: по письмо о...|2gis/nuclear-aggr...|2014-08-25|
|68e5e9e74b0d619e2...|                ilor|1237148567|        rm empty dir|rm empty dir

git...|battle-for-wesnot...|2009-03-15|
|e4723fd447eb44da5...|            echristo|1394064053|constify a few ac...|constify a few ac...|tarunprabhu/Drago...|2014-03-06|
|005a597f5da0a69e6...|NetKAN inflator R...|1633019394|NetKAN added mod ...|NetKAN added mod ...| 

                                                                                

In [None]:
import datetime
import pytz

datetime.datetime.now(pytz.timezone('US/Central')).strftime("%a, %d %B %Y %H:%M:%S")

In [20]:
df_lan_license = df_languages.select("repo_name", "language.name").join(df_licenses, "repo_name")

In [21]:
print(f'Records read from dataframe *languages*: {df_lan_license.count():,.0f}')



Records read from dataframe *languages*: 3,325,634


                                                                                

In [None]:
df_lan_license.write.parquet('gs://ads-sjhuang-final/notebooks/sjhuang/lan_license01',mode="overwrite")

                                                                                

In [23]:
sample_1 = cleaned_c.join(df_lan_license, on = 'repo_name', how = 'left')

In [None]:
sample_1.limit(5)

                                                                                

repo_name,commit,author_name,timestamp,subject,message,date,name,license
001szymon/uBlock,fb62bbc29e437a3e5...,gorhill,1404316949,"this fixes #12, #37","this fixes #12, #37",2014-07-02,"[CSS, HTML, JavaS...",gpl-3.0
001szymon/uBlock,60295031dfba85299...,gorhill,1404428298,making videos ava...,making videos ava...,2014-07-03,"[CSS, HTML, JavaS...",gpl-3.0
001szymon/uBlock,a5725cb6073b6f4ab...,gorhill,1425424377,for network reque...,for network reque...,2015-03-03,"[CSS, HTML, JavaS...",gpl-3.0
001szymon/uBlock,62c8ffbcc4e75073a...,AlexVallat,1426012456,Merge branch 'mas...,Merge branch 'mas...,2015-03-10,"[CSS, HTML, JavaS...",gpl-3.0
001szymon/uBlock,8a19f32373e355022...,Deathamns,1419021597,Remove duplicate ...,Remove duplicate ...,2014-12-19,"[CSS, HTML, JavaS...",gpl-3.0


In [None]:
sample_1.write.parquet('gs://ads-sjhuang-final/notebooks/sjhuang/sample01',mode="overwrite")

                                                                                