# 01 Introduction and Data Preparation - GitHub Developer Engagement

# I - Introduction

## A: Background

The Github open source community provides a unique opportunity to both businesses and the independent developers. To the businesses it provides a means to open the software and other products to a large base of inspectors who can quickly detect issues and provide feedback. To the independent developers, the open source platform provides a means to improve one’s technical skillset, work with more experienced practitioner and grow their professional network. 

The success of this platform depends on the employees of the organizations to maintain the Github repositories. These employees play a variety of roles, such as attend to code issues, perform code reviews, manage pull requests and perform commits. Maintaining a good work environment for these employees is important to secure the employee’s engagement in a sustainable way.




## B: Aim Of This Analysis

This analysis looks to study the effect of potential predictor variables i) workload related factors (i.e. number of commits by employee in their initial months) and ii) the stability of the work environment (i.e. work staff changes in the employee’s initial months) and its impact on employee engagement/disengagement. We consider an employee is “possibly_disengaged”, if we see a drastic reduction in their number of commits in the future months compared to their participation in the beginning months. If we see no such drastic reduction, we label those employees as “no_signs”.


## C: Hypothesis

In terms of workload related factors, it is hypothesized that the magnitude of workload in the beginning months predicts future disengagement. For example, over-worked employees (i.e. performing a very high number of commits) in the beginning months would tend to disengage in the future months. 

In terms of work environment stability related factors, it is hypothesized that frequent changes in the committer team personnel to manage a repo contributes to an unstable work environment and predicts future employee disengagement.


## D: Selected Paper

The analysis of this project looks to replicate the analysis of the selected paper (refer Section 6 for citation). Instead of Github commit activity, the selected paper looks into the activity summary reports of the developers to predict if the developer will become disengaged, and eventually leave the company. The selected paper considers “factors” (i.e. features) such as working hours (i.e. as a proxy of magnitude of workload), the nature of the projects the employee has worked on, and overall contribution statistics (e.g. number of projects).  The paper tests various classification models such as Random Forest, SVM etc. to predict if the employee will leave the company within the next 3 years.


# II - Initial Set-up of Data

### Import Libraries

In [None]:
# please note that it may take upto 15 min to run the whole notebook

In [None]:
import os
os.environ["HADOOP_HOME"] = "S:/Courses/MIE1512/Material/Spark/winutils/"
os.environ["JAVA_HOME"] = "C:/progra~2/Java/jdk1.8.0_221/"
os.environ["JRE_HOME"] = "C:/progra~2/Java/jdk1.8.0_221/jre/"

In [None]:
import findspark
findspark.init("S:/Courses/MIE1512/Material/Spark/spark-2.3.2-bin-hadoop2.7/")

import pyspark
sc = pyspark.SparkContext(appName="myAppName")
spark = pyspark.sql.SQLContext(sc)

In [None]:
import pyspark.sql.types as T 
import pyspark.sql.functions as F
import seaborn as sns

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

In [None]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, LinearSVC
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, ClusteringEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import matplotlib.pyplot as plt
import numpy as np

%matplotlib inline

In [None]:
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import BinaryClassificationEvaluator, ClusteringEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

### Load and clean csv

In [None]:
#load the linux csv
commits_df_raw = spark.read.csv("commits_apache_2.csv",
                           header=True,
                           inferSchema=True)
commits_df_raw.createOrReplaceTempView("commits_raw")

all_table = spark.sql("""
SELECT * FROM commits_raw
""")
all_table.createOrReplaceTempView("repos")

In [None]:
#check schema
all_table.printSchema()

root
 |-- committer_name: string (nullable = true)
 |-- author_timestamp: string (nullable = true)
 |-- author_name: string (nullable = true)
 |-- repo_names: string (nullable = true)
 |-- committer_timestamp: string (nullable = true)
 |-- repo_org: string (nullable = true)



In [None]:
# check that it loaded properly
all_table.limit(5).toPandas()

Unnamed: 0,committer_name,author_timestamp,author_name,repo_names,committer_timestamp,repo_org
0,Karl Wright,2013-09-18 20:48:59 UTC,Karl Wright,apache/manifoldcf,2013-09-18 20:48:59 UTC,
1,Zheng Zhong,2006-02-23 18:06:50 UTC,Zheng Zhong,apache/portals-pluto,2006-02-23 18:06:50 UTC,
2,Simone Tripodi,2010-03-23 19:58:08 UTC,Simone Tripodi,apache/bval,2010-03-23 19:58:08 UTC,
3,Stian Soiland-Reyes,2015-02-06 16:40:39 UTC,Stian Soiland-Reyes,apache/incubator-taverna-maven-parent,2015-02-06 16:40:39 UTC,
4,Paul Dick,2001-04-03 22:12:24 UTC,Paul Dick,apache/xalan-c,2001-04-03 22:12:24 UTC,


### Data Cleaning

Two steps are performed:
1. convert data type to timestamp
2. add a column for the organization of the repo. These are all characters before the / in repo_names column

In [None]:
# this function removes the "UTC" that we notice in author/committer_timestamp content
def remove_last_chars(col):
    new_col = col[:-4]
    return new_col

In [None]:
#define a UDF and apply UDF
udf_remove_last_chars = udf(remove_last_chars, StringType())
all_table = all_table.withColumn("committer_timestamp", udf_remove_last_chars(all_table["committer_timestamp"]))
all_table = all_table.withColumn("author_timestamp", udf_remove_last_chars(all_table["author_timestamp"]))
all_table.limit(5).toPandas()

Unnamed: 0,committer_name,author_timestamp,author_name,repo_names,committer_timestamp,repo_org
0,Karl Wright,2013-09-18 20:48:59,Karl Wright,apache/manifoldcf,2013-09-18 20:48:59,
1,Zheng Zhong,2006-02-23 18:06:50,Zheng Zhong,apache/portals-pluto,2006-02-23 18:06:50,
2,Simone Tripodi,2010-03-23 19:58:08,Simone Tripodi,apache/bval,2010-03-23 19:58:08,
3,Stian Soiland-Reyes,2015-02-06 16:40:39,Stian Soiland-Reyes,apache/incubator-taverna-maven-parent,2015-02-06 16:40:39,
4,Paul Dick,2001-04-03 22:12:24,Paul Dick,apache/xalan-c,2001-04-03 22:12:24,


In [None]:
#Cast as timestamp
all_table = spark.sql("""
SELECT committer_name, 
CAST(unix_timestamp(committer_timestamp) as timestamp) as committer_timestamp,
CAST(unix_timestamp(author_timestamp) as timestamp) as author_timestamp,
author_name, repo_names
FROM repos
""")

In [None]:
# this function creates a new column i.e. the organization the repo belongs to
def create_repo_org(col):
    new_col = col.split("/")[0]
    return new_col

In [None]:
#define a UDF and apply UDF
udf_create_repo_org = udf(create_repo_org, StringType())
all_table = all_table.withColumn("repo_org", udf_create_repo_org(all_table["repo_names"]))

In [None]:
#check schema
all_table.printSchema()

root
 |-- committer_name: string (nullable = true)
 |-- committer_timestamp: timestamp (nullable = true)
 |-- author_timestamp: timestamp (nullable = true)
 |-- author_name: string (nullable = true)
 |-- repo_names: string (nullable = true)
 |-- repo_org: string (nullable = true)



In [None]:
all_table.limit(5).toPandas()

Unnamed: 0,committer_name,committer_timestamp,author_timestamp,author_name,repo_names,repo_org
0,Karl Wright,2013-09-18 20:48:59,2013-09-18 20:48:59,Karl Wright,apache/manifoldcf,apache
1,Zheng Zhong,2006-02-23 18:06:50,2006-02-23 18:06:50,Zheng Zhong,apache/portals-pluto,apache
2,Simone Tripodi,2010-03-23 19:58:08,2010-03-23 19:58:08,Simone Tripodi,apache/bval,apache
3,Stian Soiland-Reyes,2015-02-06 16:40:39,2015-02-06 16:40:39,Stian Soiland-Reyes,apache/incubator-taverna-maven-parent,apache
4,Paul Dick,2001-04-03 22:12:24,2001-04-03 22:12:24,Paul Dick,apache/xalan-c,apache


In [None]:
#save as temptable repos.
all_table.createOrReplaceTempView("repos")

# III - Feature Engineering

1. <b>first_commit and period_N: </b> 

    first_commit: This column contains the timestamp of the first commit by the developer towards the organization (i.e. Apple). It is used to calculate the periods relative to the developer (see period_N). 

    period_N: These are all type timestamp, calculated using formula first_commit + N, where N is an integer value from 1-6 (inclusive). Each N spans 4 months e.g. period_2 = first_commit + 8 months. The analysis looks into the commit activity for the first 2 years relative to the developer's first_commit i.e. till N = 6. 

2. <b>P(N)_ numb_commits_org:</b>  Number of commits by the developer for each period for the organization. It is hypothesized that people who are over-worked (i.e. perform a high number of commits) in baseline period (i.e. first period, see 15. Y-Variable for more info) tend to disengage in future periods. This is adapted version of the feature p{N}_ hour_ sum in the paper. 

3. <b>P(N)_ numb_commits_repo:</b> Number of commits by the developer for each period for a given repo. This feature is used to calculate P(N)_ multi_ratio.
    

4. <b>P(N)_ repo_comm_absent:</b> Total number of absentees for the repo over all periods. It is hypothesized that frequent changes in the committer team personnel for a repo from one period to another contributes to an unstable work environment and predicts future developer disengagement. This is adapted version of the feature 
p(N)_ person_change in the paper.


5. <b>P(N)_ repo_comm_new:</b> Total number of new committers for the repo over all periods. It is hypothesized that frequent changes in the committer team personnel for a repo from one period to another contributes to an unstable work environment and predicts future developer disengagement.

6. <b>P(N)_ repo_committers: </b>Number of developers committing to the repo during each period. Similar to the stance of the paper, the number of project members is an indicator of the project size. Small project size usually means more workload to each individual in the project, thereby predicting future developer disengagement.

7. <b>total_repo_comm_absent:</b> For each successive period, the number of committers who committed for the previous period to the repo, but did not commit in the next period. E.g. P(1)_ repo_comm_absent represents the comparison of period_2 to period_1.


8. <b>total_repo_comm_new:</b> For each successive period, the number of committers who did not commit in the previous period to the repo, but committed in the next period. E.g. P(1)_repo_comm_new represents the comparison of period_2 to period_1.


9. <b>agg_absent_periods:</b> Number of periods in which the value of P(n)_repo_comm_absent is > 0. This is an aggregate form of P(n)_repo_comm_absent. This is the adapted version of feature in the paper. This is adapted version of the feature less_zero in the paper


10. <b>new_ppl_periods:</b> Number of periods in which the value of P(n)_repo_comm_new is > 0. This is an aggregate form of P(n)_repo_comm_new. This is the adapted version of feature in the paper. This is adapted version of the feature larger_zero in the paper.


11. <b>no_change_periods:</b> Number of periods in which the value of P(n)_repo_comm_new & P(n)_repo_comm_absent are 0. It is hypothesized that the larger this number, the more sense of work environment stability, which thereby supports develepor engagement. This is adapted version of the feature equal_zero in the paper.

12. <b>P(n)_ multi_ratio:</b> Ratio of commits between a repo to other repos for each period within the organization. It is calculated as: P(N)_numb_commit_repos / P(N)_numb_commits_org. It is used to calculate avg_multi_ratio. 


13. <b>total_commits_org:</b> This feature was removed from further analysis during V2.

14. <b>avg_multi_ratio:</b> Average ratio of commits between a repo to other repos within the organization over the first four periods. It is calculated as average of P(N)_multi_ratio over all of the six periods. This is adapted version of the feature multi_project in the paper. Similar to the paper, contributing to multiple repos is like contributing to “multiple projects (which) might mean higher workload”. It is hypothesized that developers facing high-workload tend to disengage in the future. 

15. <b> Y-Variable:</b> The future employee engagement i.e. the y-variable, is calculated as follows: 

   1. Number of commits between first_commit and period_1 is considered the baseline.
   
   2. If for both periods period_5 and period_6, the number of commits for a period is reduced by over 80% of the baseline, then the person     is “possibly_disengaged”, else the person is labelled as “no_signs” since they do not show signs of disengagement.

### 1. first_commit and period_N

In [None]:
all_table = spark.sql("""
SELECT row_number() over (order by committer_name ASC, committer_timestamp ASC, repo_names ASC) as ID,
committer_name, committer_timestamp, repo_names, repo_org, 
MIN(committer_timestamp) over (partition by committer_name, repo_org) as first_commit
FROM repos
""")

In [None]:
all_table.limit(2).toPandas()

Unnamed: 0,ID,committer_name,committer_timestamp,repo_names,repo_org,first_commit
0,1,"""Erik """"Ealanrian"""" Jansman""",2016-07-11 08:38:37,apache/celix,apache,2016-07-11 08:38:37
1,2,"""James """"Chuck"""" Williams""",2006-06-08 22:38:37,apache/axis2-java,apache,2006-06-08 22:38:37


In [None]:
all_table.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- committer_name: string (nullable = true)
 |-- committer_timestamp: timestamp (nullable = true)
 |-- repo_names: string (nullable = true)
 |-- repo_org: string (nullable = true)
 |-- first_commit: timestamp (nullable = true)



In [None]:
#note that period_1 is the baseline period
all_table = all_table.withColumn('period_1', all_table.first_commit + F.expr('INTERVAL 4 MONTH'))

all_table = all_table.withColumn('period_2', all_table.first_commit + F.expr('INTERVAL 8 MONTH'))
all_table = all_table.withColumn('period_3', all_table.first_commit + F.expr('INTERVAL 12 MONTH'))
all_table = all_table.withColumn('period_4', all_table.first_commit + F.expr('INTERVAL 16 MONTH'))
all_table = all_table.withColumn('period_5', all_table.first_commit + F.expr('INTERVAL 20 MONTH'))
all_table = all_table.withColumn('period_6', all_table.first_commit + F.expr('INTERVAL 24 MONTH'))

#below not included
all_table = all_table.withColumn('period_7', all_table.first_commit + F.expr('INTERVAL 28 MONTH'))
all_table = all_table.withColumn('period_8', all_table.first_commit + F.expr('INTERVAL 32 MONTH'))
all_table = all_table.withColumn('period_9', all_table.first_commit + F.expr('INTERVAL 36 MONTH'))
all_table = all_table.withColumn('period_10', all_table.first_commit + F.expr('INTERVAL 40 MONTH'))
all_table = all_table.withColumn('period_11', all_table.first_commit + F.expr('INTERVAL 44 MONTH'))

# for last_months
all_table = all_table.withColumn('period_12', all_table.first_commit + F.expr('INTERVAL 12 MONTH'))
all_table = all_table.withColumn('period_18', all_table.first_commit + F.expr('INTERVAL 18 MONTH'))

# A trial and error process was taken up in V1 to define suitable y-variable. 
# The months below are created but are not used in ML model. However, they may still appear in tables below. 
# This will be cleaned up in V2.
all_table = all_table.withColumn('period_25', all_table.first_commit + F.expr('INTERVAL 25 MONTH'))
all_table = all_table.withColumn('period_32', all_table.first_commit + F.expr('INTERVAL 32 MONTH'))
all_table = all_table.withColumn('period_33', all_table.first_commit + F.expr('INTERVAL 33 MONTH'))
all_table = all_table.withColumn('period_34', all_table.first_commit + F.expr('INTERVAL 34 MONTH'))
all_table = all_table.withColumn('period_35', all_table.first_commit + F.expr('INTERVAL 35 MONTH'))
all_table = all_table.withColumn('period_36', all_table.first_commit + F.expr('INTERVAL 36 MONTH'))

In [None]:
all_table.createOrReplaceTempView("repos")

In [None]:
all_table.limit(2).toPandas()

Unnamed: 0,ID,committer_name,committer_timestamp,repo_names,repo_org,first_commit,period_1,period_2,period_3,period_4,...,period_10,period_11,period_12,period_18,period_25,period_32,period_33,period_34,period_35,period_36
0,1,"""Erik """"Ealanrian"""" Jansman""",2016-07-11 08:38:37,apache/celix,apache,2016-07-11 08:38:37,2016-11-11 08:38:37,2017-03-11 08:38:37,2017-07-11 08:38:37,2017-11-11 08:38:37,...,2019-11-11 08:38:37,2020-03-11 08:38:37,2017-07-11 08:38:37,2018-01-11 08:38:37,2018-08-11 08:38:37,2019-03-11 08:38:37,2019-04-11 08:38:37,2019-05-11 08:38:37,2019-06-11 08:38:37,2019-07-11 08:38:37
1,2,"""James """"Chuck"""" Williams""",2006-06-08 22:38:37,apache/axis2-java,apache,2006-06-08 22:38:37,2006-10-08 22:38:37,2007-02-08 22:38:37,2007-06-08 22:38:37,2007-10-08 22:38:37,...,2009-10-08 22:38:37,2010-02-08 22:38:37,2007-06-08 22:38:37,2007-12-08 22:38:37,2008-07-08 22:38:37,2009-02-08 22:38:37,2009-03-08 23:38:37,2009-04-08 22:38:37,2009-05-08 22:38:37,2009-06-08 22:38:37


### 2. P(N)_ numb_commits_org

The following steps are taken to create this feature:

1. We take all commits occuring within a specific period of question.
2. We perform a count per (i.e. groupby) committer_name and per organization. 
3. This table represents all commits performed by the developer to the organization within specific time period
4. We perform a join of this table to all_table
5. This step is repeated for each time period i.e. N,N+1,N+2...N+6
6. For rows that are null, we fill with 0

In [None]:
## dont change repos to any other name in any of the sub-tables!

In [None]:
q = spark.sql("""

SELECT count(1) as P1_numb_commits_org , repo_org as org, committer_name as com_name
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp >= first_commit AND committer_timestamp < period_1
)
GROUP BY org, com_name

""")
q.createOrReplaceTempView("P1")

In [None]:
#Sample P1 table showing count of commits in first timespan. Notice that the count is grouped by org and committer_name
#we will right join this table by org name and com_name to the master table i.e. repos
spark.sql("""

SELECT *
FROM P1
LIMIT 2

""").toPandas()

Unnamed: 0,P1_numb_commits_org,org,com_name
0,43,apache,Aaron Myers
1,1,apache,Aurélien Pupier


In [None]:
spark.sql("""

SELECT count(1) as P1_numb_commits_org , repo_org as org, committer_name as com_name
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp >= first_commit AND committer_timestamp < period_1
)
GROUP BY org, com_name

LIMIT 5

""").toPandas()

Unnamed: 0,P1_numb_commits_org,org,com_name
0,43,apache,Aaron Myers
1,1,apache,Aurélien Pupier
2,21,apache,Danny Chan
3,2,apache,Evgeny Stanilovskiy
4,2,apache,Fridolin Jackstadt


In [None]:
repos_spark = spark.sql("""

SELECT count(1) as P2_numb_commits_org , repo_org as org, committer_name as com_name
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_1 AND committer_timestamp < period_2
)
GROUP BY org, com_name

""")
repos_spark.createOrReplaceTempView("P2")

In [None]:
repos_spark = spark.sql("""

SELECT count(1) as P3_numb_commits_org, repo_org as org, committer_name as com_name
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_2 AND committer_timestamp < period_3
)
GROUP BY org, com_name

""")
repos_spark.createOrReplaceTempView("P3")

In [None]:
t = spark.sql("""

SELECT count(1) as P4_numb_commits_org, repo_org as org , committer_name as com_name
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_3 AND committer_timestamp < period_4
)
GROUP BY org, com_name

""")
t.createOrReplaceTempView("P4")

In [None]:
r = spark.sql("""

SELECT count(1) as P5_numb_commits_org, repo_org as org , committer_name as com_name
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_4 AND committer_timestamp < period_5
)
GROUP BY org, com_name

""")
r.createOrReplaceTempView("P5")

In [None]:
f = spark.sql("""

SELECT count(1) as P6_numb_commits_org, repo_org as org , committer_name as com_name
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_5 AND committer_timestamp < period_6
)
GROUP BY org, com_name

""")
f.createOrReplaceTempView("P6")

In [None]:
q = spark.sql("""

SELECT count(1) as P7_numb_commits_org, repo_org as org , committer_name as com_name
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_6 AND committer_timestamp < period_7
)
GROUP BY org, com_name

""")
q.createOrReplaceTempView("P7")

In [None]:
s = spark.sql("""

SELECT count(1) as P8_numb_commits_org, repo_org as org , committer_name as com_name
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_7 AND committer_timestamp < period_8
)
GROUP BY org, com_name

""")
s.createOrReplaceTempView("P8")

In [None]:
e = spark.sql("""

SELECT count(1) as P9_numb_commits_org, repo_org as org , committer_name as com_name
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_8 AND committer_timestamp < period_9
)
GROUP BY org, com_name

""")
e.createOrReplaceTempView("P9")

In [None]:
s = spark.sql("""

SELECT count(1) as P10_numb_commits_org, repo_org as org , committer_name as com_name
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_9 AND committer_timestamp < period_10
)
GROUP BY org, com_name

""")
s.createOrReplaceTempView("P10")

In [None]:
e = spark.sql("""

SELECT count(1) as P11_numb_commits_org, repo_org as org , committer_name as com_name
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_10 AND committer_timestamp < period_11
)
GROUP BY org, com_name

""")
e.createOrReplaceTempView("P11")

In [None]:
f = spark.sql("""

SELECT count(1) as last_commits, repo_org as org, committer_name as com_name
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_12 AND committer_timestamp < period_18
)
GROUP BY org, com_name

""")
f.createOrReplaceTempView("last_months")

### join

In [None]:
#don't change repos name in first join!

In [None]:
# we now join each of the tables one-by-one

In [None]:
#DONT CHANGE content to repos to repos_2 here. Only re-save as repos_2. This is first join.
repos_spark = spark.sql("""

SELECT repos.ID ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,

P1_numb_commits_org

FROM
(
SELECT *
FROM P1
) 

t RIGHT JOIN repos ON t.org = repos.repo_org AND t.com_name = repos.committer_name
ORDER BY ID ASC

""")
repos_spark.createOrReplaceTempView("repos_2")


In [None]:
f = spark.sql("""

SELECT repos_2.ID ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_numb_commits_org, P2_numb_commits_org

FROM
(
SELECT *
FROM P2
) 
t RIGHT JOIN repos_2 ON t.org = repos_2.repo_org AND t.com_name = repos_2.committer_name
ORDER BY ID ASC

""")
f.createOrReplaceTempView("repos_2")

In [None]:
s = spark.sql("""

SELECT repos_2.ID ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_numb_commits_org, P2_numb_commits_org, P3_numb_commits_org

FROM
(
SELECT *
FROM P3
) 
t RIGHT JOIN repos_2 ON t.org = repos_2.repo_org   AND t.com_name = repos_2.committer_name
ORDER BY ID ASC

""")
s.createOrReplaceTempView("repos_2")

In [None]:
repos_spark = spark.sql("""

SELECT repos_2.ID ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_numb_commits_org, P2_numb_commits_org, P3_numb_commits_org, 
P4_numb_commits_org

FROM
(
SELECT *
FROM P4
) 
t RIGHT JOIN repos_2 ON t.org = repos_2.repo_org  AND t.com_name = repos_2.committer_name
ORDER BY ID ASC

""")
repos_spark.createOrReplaceTempView("repos_2")

In [None]:
q = spark.sql("""

SELECT repos_2.ID ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_numb_commits_org, P2_numb_commits_org, P3_numb_commits_org, 
P4_numb_commits_org, P5_numb_commits_org

FROM
(
SELECT *
FROM P5
) 
t RIGHT JOIN repos_2 ON t.org = repos_2.repo_org   AND t.com_name = repos_2.committer_name
ORDER BY ID ASC

""")
q.createOrReplaceTempView("repos_2")

In [None]:
repos_spark = spark.sql("""

SELECT repos_2.ID ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit, 

P1_numb_commits_org, P2_numb_commits_org, P3_numb_commits_org, 
P4_numb_commits_org, P5_numb_commits_org,P6_numb_commits_org

FROM
(
SELECT *
FROM P6
) 
t RIGHT JOIN repos_2 ON t.org = repos_2.repo_org   AND t.com_name = repos_2.committer_name
ORDER BY ID ASC

""")
repos_spark.createOrReplaceTempView("repos_2")

In [None]:
s = spark.sql("""

SELECT repos_2.ID ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_numb_commits_org, P2_numb_commits_org, P3_numb_commits_org, 
P4_numb_commits_org, P5_numb_commits_org,P6_numb_commits_org, 
P7_numb_commits_org

FROM
(
SELECT *
FROM P7
) 
t RIGHT JOIN repos_2 ON t.org = repos_2.repo_org   AND t.com_name = repos_2.committer_name
ORDER BY ID ASC

""")
s.createOrReplaceTempView("repos_2")

In [None]:
a = spark.sql("""

SELECT repos_2.ID ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_numb_commits_org, P2_numb_commits_org, P3_numb_commits_org, 
P4_numb_commits_org, P5_numb_commits_org,P6_numb_commits_org, 
P7_numb_commits_org, P8_numb_commits_org

FROM
(
SELECT *
FROM P8
) 
t RIGHT JOIN repos_2 ON t.org = repos_2.repo_org   AND t.com_name = repos_2.committer_name
ORDER BY ID ASC

""")
a.createOrReplaceTempView("repos_2")

In [None]:
repos_spark = spark.sql("""

SELECT repos_2.ID ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_numb_commits_org, P2_numb_commits_org, P3_numb_commits_org, 
P4_numb_commits_org, P5_numb_commits_org,P6_numb_commits_org, 
P7_numb_commits_org,P8_numb_commits_org,P9_numb_commits_org

FROM
(
SELECT *
FROM P9
) 
t RIGHT JOIN repos_2 ON t.org = repos_2.repo_org   AND t.com_name = repos_2.committer_name
ORDER BY ID ASC

""")
repos_spark.createOrReplaceTempView("repos_2")

In [None]:
e = spark.sql("""

SELECT repos_2.ID ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_numb_commits_org, P2_numb_commits_org, P3_numb_commits_org, 
P4_numb_commits_org, P5_numb_commits_org,P6_numb_commits_org, 
P7_numb_commits_org,P8_numb_commits_org,P9_numb_commits_org,
P10_numb_commits_org 

FROM
(
SELECT *
FROM P10
) 
t RIGHT JOIN repos_2 ON t.org = repos_2.repo_org   AND t.com_name = repos_2.committer_name
ORDER BY ID ASC

""")
e.createOrReplaceTempView("repos_2")

In [None]:
repos_spark = spark.sql("""

SELECT repos_2.ID ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_numb_commits_org, P2_numb_commits_org, P3_numb_commits_org, 
P4_numb_commits_org, P5_numb_commits_org,P6_numb_commits_org, 
P7_numb_commits_org,P8_numb_commits_org,P9_numb_commits_org,
P10_numb_commits_org, P11_numb_commits_org

FROM
(
SELECT *
FROM P11
) 
t RIGHT JOIN repos_2 ON t.org = repos_2.repo_org   AND t.com_name = repos_2.committer_name
ORDER BY ID ASC

""")
repos_spark.createOrReplaceTempView("repos_2")

In [None]:
repos_spark = repos_spark.fillna(0, subset=['P1_numb_commits_org', 'P2_numb_commits_org', 'P3_numb_commits_org', 
                                        'P4_numb_commits_org', 'P5_numb_commits_org', 'P6_numb_commits_org',
                                       'P7_numb_commits_org', 'P8_numb_commits_org', 'P9_numb_commits_org',
                                       'P10_numb_commits_org', 'P11_numb_commits_org'])

In [None]:
repos_spark = repos_spark.fillna(0, subset=['last_commits'])

In [None]:
repos_spark.createOrReplaceTempView("repos_2")

In [None]:
spark.sql("""

SELECT count(1) as count
FROM repos_2

""").toPandas()

Unnamed: 0,count
0,1048536


In [None]:
#we notice that some committers probably do "batch" commit
# spark.sql("""

# SELECT distinct(committer_name), (P1_numb_commits_org)
# FROM repos_2
# WHERE P1_numb_commits_org > 80
# LIMIT 5

# """).toPandas()

In [None]:
# spark.sql("""

# SELECT count(1) as count, y_block_1
# FROM
# (
# SELECT 
#     (CASE WHEN ((P1_numb_commits_org * 0.2) > P2_numb_commits_org 
#     AND (P1_numb_commits_org * 0.2) > P3_numb_commits_org)  
#     THEN "disengaged_1"
#     ELSE "engaged" END) y_block_1
    
# FROM repos_2
# )
# GROUP BY y_block_1

# """).toPandas()

In [None]:
# spark.sql("""

# SELECT count(1) as count, y_block_2
# FROM
# (
# SELECT 
#     (CASE WHEN ((P1_numb_commits_org * 0.2) > P3_numb_commits_org 
#     AND (P1_numb_commits_org * 0.2) > P4_numb_commits_org)  
#     THEN "disengaged_2"
#     ELSE "engaged" END) y_block_2
    
# FROM repos_2
# )
# GROUP BY y_block_2

# """).toPandas()

In [None]:
# spark.sql("""

# SELECT count(1) as count, y_block_3
# FROM
# (
# SELECT 
#     (CASE WHEN ((P1_numb_commits_org * 0.2) > P4_numb_commits_org 
#     AND (P1_numb_commits_org * 0.2) > P5_numb_commits_org)  
#     THEN "disengaged_3"
#     ELSE "engaged" END) y_block_3
    
# FROM repos_2
# )
# GROUP BY y_block_3

# """).toPandas()

In [None]:
# spark.sql("""

# SELECT count(1) as count, y_block_4
# FROM
# (
# SELECT 
#     (CASE WHEN ((P1_numb_commits_org * 0.2) > P5_numb_commits_org 
#     AND (P1_numb_commits_org * 0.2) > P6_numb_commits_org)  
#     THEN "disengaged_4"
#     ELSE "engaged" END) y_block_4
    
# FROM repos_2
# )
# GROUP BY y_block_4

# """).toPandas()

In [None]:
# spark.sql("""

# SELECT count(1) as count, y_block_5
# FROM
# (
# SELECT 
#     (CASE WHEN ((P1_numb_commits_org * 0.2) > P6_numb_commits_org 
#     AND (P1_numb_commits_org * 0.2) > P7_numb_commits_org)  
#     THEN "disengaged_5"
#     ELSE "engaged" END) y_block_5
    
# FROM repos_2
# )
# GROUP BY y_block_5

# """).toPandas()

In [None]:
# spark.sql("""

# SELECT count(1) as count, y_block_6
# FROM
# (
# SELECT 
#     (CASE WHEN ((P1_numb_commits_org * 0.2) > P7_numb_commits_org 
#     AND (P1_numb_commits_org * 0.2) > P8_numb_commits_org)  
#     THEN "disengaged_6"
#     ELSE "engaged" END) y_block_6
    
# FROM repos_2
# )
# GROUP BY y_block_6

# """).toPandas()

In [None]:
# spark.sql("""

# SELECT count(1) as count, y_block_7
# FROM
# (
# SELECT 
#     (CASE WHEN ((P1_numb_commits_org * 0.2) > P8_numb_commits_org 
#     AND (P1_numb_commits_org * 0.2) > P9_numb_commits_org)  
#     THEN "disengaged_7"
#     ELSE "engaged" END) y_block_7
    
# FROM repos_2
# )
# GROUP BY y_block_7

# """).toPandas()

In [None]:
# spark.sql("""

# SELECT count(1) as count, y_block_8
# FROM
# (
# SELECT 
#     (CASE WHEN ((P1_numb_commits_org * 0.2) > P9_numb_commits_org 
#     AND (P1_numb_commits_org * 0.2) > P10_numb_commits_org)  
#     THEN "disengaged_8"
#     ELSE "engaged" END) y_block_8
    
# FROM repos_2
# )
# GROUP BY y_block_8

# """).toPandas()

### 3. P(N)_ numb_commits_repo


The following steps are taken to create this feature:

1. We take all commits occuring within a specific period of question.
2. We perform a count per (i.e. groupby) committer_name and per repo. 
3. This table represents all commits performed by the developer to the repo within specific time period
4. We perform a join of this table to all_table
5. This step is repeated for each time period i.e. N,N+1,N+2...N+6. For V1, I only do till P2.
6. For rows that are null (i.e. no commits by developer during time period) we fill with 0

### make table for each successive month duration

In [None]:
s = spark.sql("""

SELECT count(1) as P1_numb_commits_repo , repo_names as rep_names, committer_name as com_name
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp >= first_commit AND committer_timestamp < period_1
)
GROUP BY rep_names, com_name

""")
s.createOrReplaceTempView("P1_repo")

In [None]:
f = spark.sql("""

SELECT count(1) as P2_numb_commits_repo , repo_names as rep_names, committer_name as com_name
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_1 AND committer_timestamp < period_2
)
GROUP BY rep_names, com_name

""")
f.createOrReplaceTempView("P2_repo")

In [None]:
q = spark.sql("""

SELECT count(1) as P3_numb_commits_repo , repo_names as rep_names, committer_name as com_name
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_1 AND committer_timestamp < period_2
)
GROUP BY rep_names, com_name

""")
q.createOrReplaceTempView("P3_repo")

In [None]:
repos_spark = spark.sql("""

SELECT count(1) as P4_numb_commits_repo , repo_names as rep_names, committer_name as com_name
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_1 AND committer_timestamp < period_2
)
GROUP BY rep_names, com_name

""")
repos_spark.createOrReplaceTempView("P4_repo")

In [None]:
d = spark.sql("""

SELECT count(1) as P5_numb_commits_repo , repo_names as rep_names, committer_name as com_name
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_1 AND committer_timestamp < period_2
)
GROUP BY rep_names, com_name

""")
d.createOrReplaceTempView("P5_repo")

In [None]:
i = spark.sql("""

SELECT count(1) as P6_numb_commits_repo , repo_names as rep_names, committer_name as com_name
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_1 AND committer_timestamp < period_2
)
GROUP BY rep_names, com_name

""")
i.createOrReplaceTempView("P6_repo")

### join

In [None]:
#don't change repos name in first join.

In [None]:
s = spark.sql("""

SELECT repos.ID ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_numb_commits_repo

FROM
(
SELECT *
FROM P1_repo
) 
t RIGHT JOIN repos ON t.rep_names = repos.repo_names AND t.com_name = repos.committer_name
ORDER BY ID ASC

""")
s.createOrReplaceTempView("repos_3")

In [None]:
f = spark.sql("""

SELECT repos_3.ID ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_numb_commits_repo, P2_numb_commits_repo

FROM
(
SELECT *
FROM P2_repo
) 
t RIGHT JOIN repos_3 ON t.rep_names = repos_3.repo_names  AND t.com_name = repos_3.committer_name
ORDER BY ID ASC

""")
f.createOrReplaceTempView("repos_3")

In [None]:
# spark.sql("""

# SELECT *
# FROM repos_3
# ORDER BY P1_numb_commits_org ASC
# LIMIT 2


# """).toPandas()

In [None]:
a = spark.sql("""

SELECT repos_3.ID ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_numb_commits_repo, P2_numb_commits_repo, P3_numb_commits_repo

FROM
(
SELECT *
FROM P3_repo
) 
t RIGHT JOIN repos_3 ON t.rep_names = repos_3.repo_names  AND t.com_name = repos_3.committer_name
ORDER BY ID ASC

""")
a.createOrReplaceTempView("repos_3")

In [None]:
r = spark.sql("""

SELECT repos_3.ID ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_numb_commits_repo, P2_numb_commits_repo, P3_numb_commits_repo,
P4_numb_commits_repo

FROM
(
SELECT *
FROM P4_repo
) 
t RIGHT JOIN repos_3 ON t.rep_names = repos_3.repo_names  AND t.com_name = repos_3.committer_name
ORDER BY ID ASC

""")
r.createOrReplaceTempView("repos_3")

In [None]:
y = spark.sql("""

SELECT repos_3.ID ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_numb_commits_repo, P2_numb_commits_repo, P3_numb_commits_repo,
P4_numb_commits_repo, P5_numb_commits_repo


FROM
(
SELECT *
FROM P5_repo
) 
t RIGHT JOIN repos_3 ON t.rep_names = repos_3.repo_names  AND t.com_name = repos_3.committer_name
ORDER BY ID ASC

""")
y.createOrReplaceTempView("repos_3")

In [None]:
repos_spark = spark.sql("""

SELECT repos_3.ID ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_numb_commits_repo, P2_numb_commits_repo,P3_numb_commits_repo,
P4_numb_commits_repo, P5_numb_commits_repo, P6_numb_commits_repo


FROM
(
SELECT *
FROM P6_repo
) 
t RIGHT JOIN repos_3 ON t.rep_names = repos_3.repo_names  AND t.com_name = repos_3.committer_name
ORDER BY ID ASC

""")

In [None]:
repos_spark = repos_spark.fillna(0, subset=['P1_numb_commits_repo', 'P2_numb_commits_repo',
                                       'P3_numb_commits_repo', 'P4_numb_commits_repo',
                                       'P5_numb_commits_repo', 'P6_numb_commits_repo'])

In [None]:
repos_spark.createOrReplaceTempView("repos_3")

In [None]:
spark.sql("""

SELECT count(1) as count
FROM repos_3

""").show()

+-------+
|  count|
+-------+
|1048536|
+-------+



In [None]:
# spark.sql("""

# SELECT count(1) as count
# FROM repos_3

# """).show()

In [None]:
# spark.sql("""

# SELECT count(1) as count
# FROM repos_join

# """).show()

In [None]:
repos_spark.columns

['ID',
 'committer_name',
 'committer_timestamp',
 'repo_names',
 'repo_org',
 'first_commit',
 'P1_numb_commits_org',
 'P2_numb_commits_org',
 'P3_numb_commits_org',
 'P4_numb_commits_org',
 'P5_numb_commits_org',
 'P6_numb_commits_org',
 'P7_numb_commits_org',
 'P8_numb_commits_org',
 'P9_numb_commits_org',
 'P10_numb_commits_org',
 'P11_numb_commits_org',
 'P1_numb_commits_repo',
 'P2_numb_commits_repo',
 'P3_numb_commits_repo',
 'P4_numb_commits_repo',
 'P5_numb_commits_repo',
 'P6_numb_commits_repo']

## 4. P(N)_ repo_comm_absent

The following steps are taken to create this feature:

1. We take all commits occuring within a specific period of question (e.g. first).
2. We take all commits occuring in the successive period of Pt. 1 (e.g. second). 
3. We take a count of all cases where a committer name appears in first but not in second and store in a temptable
4. We perform a join of this table to all_table
5. This step is repeated for each time period i.e. N,N+1,N+2...N+6. For V1, I only do first two periods.
6. For rows that are null (i.e. no commits by developer during time period) we fill with 0

In [None]:
s = spark.sql("""

SELECT committer_name, repo_names 
FROM repos
WHERE committer_timestamp >= first_commit AND committer_timestamp < period_1

""")
s.createOrReplaceTempView("P_at_1")

In [None]:
f = spark.sql("""

SELECT committer_name, repo_names 
FROM repos
WHERE committer_timestamp >= period_1 AND committer_timestamp < period_2

""")
f.createOrReplaceTempView("P_at_2")

In [None]:
a = spark.sql("""

SELECT committer_name, repo_names 
FROM repos
WHERE committer_timestamp >= period_2 AND committer_timestamp < period_3

""")
a.createOrReplaceTempView("P_at_3")

In [None]:
q = spark.sql("""

SELECT committer_name, repo_names 
FROM repos
WHERE committer_timestamp >= period_3 AND committer_timestamp < period_4

""")
q.createOrReplaceTempView("P_at_4")

In [None]:
t = spark.sql("""

SELECT committer_name, repo_names 
FROM repos
WHERE committer_timestamp >= period_4 AND committer_timestamp < period_5

""")
t.createOrReplaceTempView("P_at_5")

In [None]:
e = spark.sql("""

SELECT committer_name, repo_names 
FROM repos
WHERE committer_timestamp >= period_5 AND committer_timestamp < period_6

""")
e.createOrReplaceTempView("P_at_6")

### make temp tables that compares tables for each successive period

In [None]:
x = spark.sql("""

SELECT count(distinct(comm_name)) as P1_repo_comm_absent, rep_name
FROM
(
SELECT t1.committer_name as comm_name, t1.repo_names as rep_name
FROM P_at_1 t1
WHERE NOT EXISTS (SELECT t2.committer_name FROM P_at_2 t2 WHERE t1.committer_name = t2.committer_name AND 
                    t1.repo_names = t2.repo_names)
)
GROUP BY rep_name

""")
x.createOrReplaceTempView("P_2_1_repo_comm_absent")

In [None]:
e = spark.sql("""

SELECT count(distinct(comm_name)) as P2_repo_comm_absent, rep_name
FROM
(
SELECT t1.committer_name as comm_name, t1.repo_names as rep_name
FROM P_at_2 t1
WHERE NOT EXISTS (SELECT t2.committer_name FROM P_at_3 t2 WHERE t1.committer_name = t2.committer_name AND 
                    t1.repo_names = t2.repo_names)
)
GROUP BY rep_name

""")
e.createOrReplaceTempView("P_3_2_repo_comm_absent")

In [None]:
a = spark.sql("""

SELECT count(distinct(comm_name)) as P3_repo_comm_absent, rep_name
FROM
(
SELECT t1.committer_name as comm_name, t1.repo_names as rep_name
FROM P_at_3 t1
WHERE NOT EXISTS (SELECT t2.committer_name FROM P_at_4 t2 WHERE t1.committer_name = t2.committer_name AND 
                    t1.repo_names = t2.repo_names)
)
GROUP BY rep_name

""")
a.createOrReplaceTempView("P_4_3_repo_comm_absent")

In [None]:
w = spark.sql("""

SELECT count(distinct(comm_name)) as P4_repo_comm_absent, rep_name
FROM
(
SELECT t1.committer_name as comm_name, t1.repo_names as rep_name
FROM P_at_4 t1
WHERE NOT EXISTS (SELECT t2.committer_name FROM P_at_5 t2 WHERE t1.committer_name = t2.committer_name AND 
                    t1.repo_names = t2.repo_names)
)
GROUP BY rep_name

""")
w.createOrReplaceTempView("P_5_4_repo_comm_absent")

In [None]:
r = spark.sql("""

SELECT count(distinct(comm_name)) as P5_repo_comm_absent, rep_name
FROM
(
SELECT t1.committer_name as comm_name, t1.repo_names as rep_name
FROM P_at_5 t1
WHERE NOT EXISTS (SELECT t2.committer_name FROM P_at_6 t2 WHERE t1.committer_name = t2.committer_name AND 
                    t1.repo_names = t2.repo_names)
)
GROUP BY rep_name

""")
r.createOrReplaceTempView("P_6_5_repo_comm_absent")

### joining to repos

In [None]:
q = spark.sql("""

SELECT repos.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_repo_comm_absent

FROM
(
SELECT *
FROM P_2_1_repo_comm_absent
) 
t RIGHT JOIN repos ON t.rep_name = repos.repo_names 

""")
q.createOrReplaceTempView("repos_4")

In [None]:
repos_4_spark = spark.sql("""

SELECT repos_4.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_repo_comm_absent,P2_repo_comm_absent

FROM
(
SELECT *
FROM P_3_2_repo_comm_absent
) 
t RIGHT JOIN repos_4 ON t.rep_name = repos_4.repo_names 

""")

repos_4_spark.createOrReplaceTempView("repos_4")

In [None]:
a = spark.sql("""

SELECT repos_4.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_repo_comm_absent,P2_repo_comm_absent,P3_repo_comm_absent

FROM
(
SELECT *
FROM P_4_3_repo_comm_absent
) 
t RIGHT JOIN repos_4 ON t.rep_name = repos_4.repo_names 

""")
a.createOrReplaceTempView("repos_4")

In [None]:
q = spark.sql("""

SELECT repos_4.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_repo_comm_absent,P2_repo_comm_absent,P3_repo_comm_absent,
P4_repo_comm_absent

FROM
(
SELECT *
FROM P_5_4_repo_comm_absent
) 
t RIGHT JOIN repos_4 ON t.rep_name = repos_4.repo_names 

""")
q.createOrReplaceTempView("repos_4")

In [None]:
repos_4_spark = spark.sql("""

SELECT repos_4.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  


P1_repo_comm_absent,P2_repo_comm_absent,P3_repo_comm_absent,
P4_repo_comm_absent,P5_repo_comm_absent

FROM
(
SELECT *
FROM P_6_5_repo_comm_absent
) 
t RIGHT JOIN repos_4 ON t.rep_name = repos_4.repo_names 

""")
repos_4_spark.createOrReplaceTempView("repos_4")

In [None]:
repos_4_spark = repos_4_spark.fillna(0, subset=['P1_repo_comm_absent','P2_repo_comm_absent',
                                        'P3_repo_comm_absent','P4_repo_comm_absent',
                                        'P5_repo_comm_absent'])

In [None]:
repos_4_spark.createOrReplaceTempView("repos_4")

### 5. P(N)_ repo_comm_new

The following steps are taken to create this feature:

1. We take all commits occuring within a specific period of question (e.g. first).
2. We take all commits occuring in the successive period of Pt. 1 (e.g. second). 
3. We take a count of all cases where a committer name appears in second but not in first, and store in a temptable
4. We perform a join of this table to all_table
5. This step is repeated for each time period i.e. N,N+1,N+2...N+6. For V1, I only do first two periods.
6. For rows that are null (i.e. no commits by developer during time period) we fill with 0

In [None]:
q = spark.sql("""

SELECT count(distinct(comm_name)) as P1_repo_comm_new, rep_name
FROM
(
SELECT t2.committer_name as comm_name, t2.repo_names as rep_name
FROM P_at_2 t2
WHERE NOT EXISTS (SELECT t1.committer_name FROM P_at_1 t1 WHERE t1.committer_name = t2.committer_name AND 
                    t1.repo_names = t2.repo_names)
)
GROUP BY rep_name

""")
q.createOrReplaceTempView("P_2_1_repo_comm_new")

In [None]:
q = spark.sql("""

SELECT count(distinct(comm_name)) as P2_repo_comm_new, rep_name
FROM
(
SELECT t2.committer_name as comm_name, t2.repo_names as rep_name
FROM P_at_3 t2
WHERE NOT EXISTS (SELECT t1.committer_name FROM P_at_2 t1 WHERE t1.committer_name = t2.committer_name AND 
                    t1.repo_names = t2.repo_names)
)
GROUP BY rep_name

""")
q.createOrReplaceTempView("P_3_2_repo_comm_new")

In [None]:
s = spark.sql("""

SELECT count(distinct(comm_name)) as P3_repo_comm_new, rep_name
FROM
(
SELECT t2.committer_name as comm_name, t2.repo_names as rep_name
FROM P_at_4 t2
WHERE NOT EXISTS (SELECT t1.committer_name FROM P_at_3 t1 WHERE t1.committer_name = t2.committer_name AND 
                    t1.repo_names = t2.repo_names)
)
GROUP BY rep_name

""")
s.createOrReplaceTempView("P_4_3_repo_comm_new")

In [None]:
r = spark.sql("""

SELECT count(distinct(comm_name)) as P4_repo_comm_new, rep_name
FROM
(
SELECT t2.committer_name as comm_name, t2.repo_names as rep_name
FROM P_at_5 t2
WHERE NOT EXISTS (SELECT t1.committer_name FROM P_at_4 t1 WHERE t1.committer_name = t2.committer_name AND 
                    t1.repo_names = t2.repo_names)
)
GROUP BY rep_name

""")
r.createOrReplaceTempView("P_5_4_repo_comm_new")

In [None]:
f = spark.sql("""

SELECT count(distinct(comm_name)) as P5_repo_comm_new, rep_name
FROM
(
SELECT t2.committer_name as comm_name, t2.repo_names as rep_name
FROM P_at_6 t2
WHERE NOT EXISTS (SELECT t1.committer_name FROM P_at_5 t1 WHERE t1.committer_name = t2.committer_name AND 
                    t1.repo_names = t2.repo_names)
)
GROUP BY rep_name

""")
f.createOrReplaceTempView("P_6_5_repo_comm_new")

### joining

In [None]:
s = spark.sql("""

SELECT repos.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_repo_comm_new

FROM
(
SELECT *
FROM P_2_1_repo_comm_new
) 
t RIGHT JOIN repos ON t.rep_name = repos.repo_names 

""")
s.createOrReplaceTempView("repos_5")

In [None]:
f = spark.sql("""

SELECT repos_5.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_repo_comm_new, P2_repo_comm_new

FROM
(
SELECT *
FROM P_3_2_repo_comm_new
) 
t RIGHT JOIN repos_5 ON t.rep_name = repos_5.repo_names 

""")
f.createOrReplaceTempView("repos_5")

In [None]:
g = spark.sql("""

SELECT repos_5.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_repo_comm_new, P2_repo_comm_new, P3_repo_comm_new

FROM
(
SELECT *
FROM P_4_3_repo_comm_new
) 
t RIGHT JOIN repos_5 ON t.rep_name = repos_5.repo_names 

""")
g.createOrReplaceTempView("repos_5")

In [None]:
s = spark.sql("""

SELECT repos_5.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit, 

P1_repo_comm_new, P2_repo_comm_new, P3_repo_comm_new, 
P4_repo_comm_new

FROM
(
SELECT *
FROM P_5_4_repo_comm_new
) 
t RIGHT JOIN repos_5 ON t.rep_name = repos_5.repo_names 

""")
s.createOrReplaceTempView("repos_5")

In [None]:
all_table = spark.sql("""

SELECT repos_5.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_repo_comm_new, P2_repo_comm_new, P3_repo_comm_new, 
P4_repo_comm_new, P5_repo_comm_new

FROM
(
SELECT *
FROM P_6_5_repo_comm_new
) 
t RIGHT JOIN repos_5 ON t.rep_name = repos_5.repo_names 

""")

In [None]:
all_table = all_table.fillna(0, subset=['P1_repo_comm_new', 'P2_repo_comm_new', 
                                        'P3_repo_comm_new', 'P4_repo_comm_new',
                                        'P5_repo_comm_new'])

In [None]:
all_table.createOrReplaceTempView("repos_5")

## 6. P(N)_ repo_committers

The following steps are taken to create this feature:

1. We take all commits occuring within a specific period of question.
2. We perform a count per (i.e. groupby) distinct committer_name and repo
3. This table represents the count of unique developers who have committed to the repos within specific time period
4. We perform a join of this table to all_table
5. This step is repeated for each time period i.e. N,N+1,N+2...N+6. For V1, I only do till P2.
6. For rows that are null (i.e. no commits by developer during time period) we fill with 0

In [None]:
f = spark.sql("""

SELECT count(distinct(comm_name)) as P1_repo_committers, rep_names
FROM
(
SELECT committer_name as comm_name, repo_names as rep_names
FROM repos
WHERE committer_timestamp >= first_commit AND committer_timestamp < period_1
)
GROUP BY rep_names

""")
f.createOrReplaceTempView("P1_person_table")

In [None]:
a = spark.sql("""

SELECT count(distinct(committer_name)) as P2_repo_committers, repo_names as rep_names
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_1 AND committer_timestamp < period_2
)
GROUP BY rep_names

""")
a.createOrReplaceTempView("P2_person_table")

In [None]:
s = spark.sql("""

SELECT count(distinct(committer_name)) as P3_repo_committers, repo_names as rep_names
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_2 AND committer_timestamp < period_3
)
GROUP BY rep_names

""")
s.createOrReplaceTempView("P3_person_table")

In [None]:
f = spark.sql("""

SELECT count(distinct(committer_name)) as P4_repo_committers, repo_names as rep_names
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_3 AND committer_timestamp < period_4
)
GROUP BY rep_names

""")
f.createOrReplaceTempView("P4_person_table")

In [None]:
s = spark.sql("""

SELECT count(distinct(committer_name)) as P5_repo_committers, repo_names as rep_names
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_4 AND committer_timestamp < period_5
)
GROUP BY rep_names

""")
s.createOrReplaceTempView("P5_person_table")

In [None]:
g = spark.sql("""

SELECT count(distinct(committer_name)) as P6_repo_committers, repo_names as rep_names
FROM
(
SELECT *
FROM repos
WHERE committer_timestamp > period_5 AND committer_timestamp < period_6
)
GROUP BY rep_names

""")
g.createOrReplaceTempView("P6_person_table")

### Joining

In [None]:
g = spark.sql("""

SELECT repos.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_repo_committers

FROM
(
SELECT *
FROM P1_person_table
) 
t RIGHT JOIN repos ON t.rep_names = repos.repo_names 

""")
g.createOrReplaceTempView("repos_6")

In [None]:
g = spark.sql("""

SELECT repos_6.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_repo_committers, P2_repo_committers

FROM
(
SELECT *
FROM P2_person_table
) 
t RIGHT JOIN repos_6 ON t.rep_names = repos_6.repo_names 

""")
g.createOrReplaceTempView("repos_6")

In [None]:
s = spark.sql("""

SELECT repos_6.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,

P1_repo_committers, P2_repo_committers, P3_repo_committers

FROM
(
SELECT *
FROM P3_person_table
) 
t RIGHT JOIN repos_6 ON t.rep_names = repos_6.repo_names 

""")
s.createOrReplaceTempView("repos_6")

In [None]:
s = spark.sql("""

SELECT repos_6.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_repo_committers, P2_repo_committers, P3_repo_committers, 
P4_repo_committers

FROM
(
SELECT *
FROM P4_person_table
) 
t RIGHT JOIN repos_6 ON t.rep_names = repos_6.repo_names 

""")
s.createOrReplaceTempView("repos_6")

In [None]:
s = spark.sql("""

SELECT repos_6.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_repo_committers, P2_repo_committers, P3_repo_committers, 
P4_repo_committers, P5_repo_committers

FROM
(
SELECT *
FROM P5_person_table
) 
t RIGHT JOIN repos_6 ON t.rep_names = repos_6.repo_names 

""")
s.createOrReplaceTempView("repos_6")

In [None]:
all_table = spark.sql("""

SELECT repos_6.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_repo_committers, P2_repo_committers, P3_repo_committers, 
P4_repo_committers, P5_repo_committers, P6_repo_committers

FROM
(
SELECT *
FROM P6_person_table
) 
t RIGHT JOIN repos_6 ON t.rep_names = repos_6.repo_names 
ORDER BY ID ASC

""")


In [None]:
all_table = all_table.fillna(0, subset=['P1_repo_committers', 'P2_repo_committers', 
                                        'P3_repo_committers', 'P4_repo_committers', 
                                        'P5_repo_committers', 'P6_repo_committers'])

In [None]:
all_table.createOrReplaceTempView("repos_6")

## 7. total_repo_comm_absent

The following steps are taken to create this feature:

1. We add up columns P1_repo_comm_absent (N=1 till N=6) to the all_table
2. For rows that are null (i.e. no commits by developer during time period) we fill with 0

In [None]:
all_table = spark.sql("""

SELECT repos_4.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

(P1_repo_comm_absent+P2_repo_comm_absent+P3_repo_comm_absent+P4_repo_comm_absent+P5_repo_comm_absent) tot_repo_comm_absent

FROM repos_4
ORDER BY ID ASC

""")



In [None]:
all_table = all_table.fillna(0, subset=['tot_repo_comm_absent'])

In [None]:
all_table.createOrReplaceTempView("repos_7")

## 8. total_repo_comm_new

The following steps are taken to create this feature:

1. We add up columns P1_repo_comm_new (N=1 till N=6) to the all_table

In [None]:
all_table = spark.sql("""

SELECT repos_5.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

(P1_repo_comm_new+ P2_repo_comm_new+P3_repo_comm_new+ P4_repo_comm_new+P5_repo_comm_new) total_repo_comm_new

FROM repos_5
ORDER BY ID ASC

""")



In [None]:
all_table = all_table.fillna(0, subset=['total_repo_comm_new'])

In [None]:
all_table.createOrReplaceTempView("repos_8")

## 9. agg_absent_periods

The following steps are taken to create this feature:

1. Use a for loop within the F.when syntax stating the condition. The statement also adds a column to all_table


In [None]:
all_table = spark.sql("""

SELECT *
FROM repos_4
ORDER BY ID ASC

""")

In [None]:
# to edit slice integer depending on position of concerning columns
all_table.columns[-5:]

['P1_repo_comm_absent',
 'P2_repo_comm_absent',
 'P3_repo_comm_absent',
 'P4_repo_comm_absent',
 'P5_repo_comm_absent']

In [None]:
all_table = all_table.withColumn("agg_absent_periods",sum([F.when(F.col(cl) != 0, 1).otherwise(0) for cl in all_table.columns[-5:]]))

In [None]:
all_table.columns

['ID',
 'committer_name',
 'committer_timestamp',
 'repo_names',
 'repo_org',
 'first_commit',
 'P1_repo_comm_absent',
 'P2_repo_comm_absent',
 'P3_repo_comm_absent',
 'P4_repo_comm_absent',
 'P5_repo_comm_absent',
 'agg_absent_periods']

In [None]:
all_table_1 = all_table.select([c for c in all_table.columns if c not in 
                                    {'P1_repo_comm_absent',
 'P2_repo_comm_absent',
 'P3_repo_comm_absent',
 'P4_repo_comm_absent',
 'P5_repo_comm_absent'}])

In [None]:
all_table_1.columns

['ID',
 'committer_name',
 'committer_timestamp',
 'repo_names',
 'repo_org',
 'first_commit',
 'agg_absent_periods']

In [None]:
all_table_1.registerTempTable("repos_9")

In [None]:
# spark.sql("""

# SELECT AVG(agg_absent_periods), MIN(agg_absent_periods), MAX(agg_absent_periods)
# FROM repos_9

# """).show()

## 10. new_ppl_periods

The following steps are taken to create this feature:


1. Use a for loop within the F.when syntax stating the condition. The statement also adds a column to all_table


In [None]:
all_table = spark.sql("""

SELECT *
FROM repos_5
ORDER BY ID ASC

""")

In [None]:
# to edit slice integer depending on concerning col position
all_table.columns[-5:]

['P1_repo_comm_new',
 'P2_repo_comm_new',
 'P3_repo_comm_new',
 'P4_repo_comm_new',
 'P5_repo_comm_new']

In [None]:
all_table = all_table.withColumn("new_ppl_periods",sum([F.when(F.col(cl) != 0, 1).otherwise(0) for cl in all_table.columns[-5:]]))


In [None]:
all_table.columns

['ID',
 'committer_name',
 'committer_timestamp',
 'repo_names',
 'repo_org',
 'first_commit',
 'P1_repo_comm_new',
 'P2_repo_comm_new',
 'P3_repo_comm_new',
 'P4_repo_comm_new',
 'P5_repo_comm_new',
 'new_ppl_periods']

In [None]:
all_table_1 = all_table.select([c for c in all_table.columns if c not in 
                                    {'P1_repo_comm_new',
 'P2_repo_comm_new',
 'P3_repo_comm_new',
 'P4_repo_comm_new',
 'P5_repo_comm_new'}])

In [None]:
all_table_1.columns

['ID',
 'committer_name',
 'committer_timestamp',
 'repo_names',
 'repo_org',
 'first_commit',
 'new_ppl_periods']

In [None]:
all_table_1.registerTempTable("repos_10")

In [None]:
# spark.sql("""

# SELECT AVG(new_ppl_periods), MIN(new_ppl_periods), MAX(new_ppl_periods)
# FROM repos_10

# """).show()

In [None]:
# all_table = all_table.fillna(0, subset=['P1_repo_committers', 'P2_repo_committers'])

In [None]:
# all_table.createOrReplaceTempView("repos")

## 11. no_change_periods

The following steps are taken to create this feature:

1. Use a for loop within the F.when syntax stating the condition. The statement also adds a column to all_table
2. For rows that are null, we fill with 0

In [None]:
all_table = spark.sql("""

SELECT ID, committer_name, committer_timestamp, 
repo_names, repo_org,first_commit, 

P1_repo_comm_absent, P2_repo_comm_absent, P3_repo_comm_absent, 
P4_repo_comm_absent, P5_repo_comm_absent,


P1_repo_comm_new, P2_repo_comm_new,P3_repo_comm_new,
P4_repo_comm_new, P5_repo_comm_new

FROM
(
SELECT ID as t_ID , committer_name as t_committer_name, committer_timestamp as t_committer_name, 
repo_names as  t_repo_names, repo_org  as t_repo_org, first_commit as t_first_commit,

P1_repo_comm_absent, P2_repo_comm_absent, P3_repo_comm_absent, 
P4_repo_comm_absent, P5_repo_comm_absent

FROM repos_4
) 

t JOIN repos_5 ON t.t_ID = repos_5.ID
    
ORDER BY ID ASC

""")
all_table.createOrReplaceTempView("repos_4_5")

In [None]:
all_table.columns

['ID',
 'committer_name',
 'committer_timestamp',
 'repo_names',
 'repo_org',
 'first_commit',
 'P1_repo_comm_absent',
 'P2_repo_comm_absent',
 'P3_repo_comm_absent',
 'P4_repo_comm_absent',
 'P5_repo_comm_absent',
 'P1_repo_comm_new',
 'P2_repo_comm_new',
 'P3_repo_comm_new',
 'P4_repo_comm_new',
 'P5_repo_comm_new']

In [None]:
# to extend this code for more periods

all_table = all_table.withColumn("P1_no_change_periods",
                F.when((all_table["P1_repo_comm_absent"] != 0) & (all_table["P1_repo_comm_new"] != 0), 1).otherwise(0))

all_table = all_table.withColumn("P2_no_change_periods",
                F.when((all_table["P2_repo_comm_absent"] != 0) & (all_table["P2_repo_comm_new"] != 0), 1).otherwise(0))  


all_table = all_table.withColumn("P3_no_change_periods",
                F.when((all_table["P3_repo_comm_absent"] != 0) & (all_table["P3_repo_comm_new"] != 0), 1).otherwise(0))  


all_table = all_table.withColumn("P4_no_change_periods",
                F.when((all_table["P4_repo_comm_absent"] != 0) & (all_table["P4_repo_comm_new"] != 0), 1).otherwise(0))  

all_table = all_table.withColumn("P5_no_change_periods",
                F.when((all_table["P5_repo_comm_absent"] != 0) & (all_table["P5_repo_comm_new"] != 0), 1).otherwise(0))  


In [None]:
# to edit slice integer depending on position of concerning columns
all_table.columns[-5:]

['P1_no_change_periods',
 'P2_no_change_periods',
 'P3_no_change_periods',
 'P4_no_change_periods',
 'P5_no_change_periods']

In [None]:
# change slice integer to reflect the cols P(N)_no_change_periods
all_table = all_table.withColumn("no_change_periods",
sum([F.when(F.col(cl) != 0, 1).otherwise(0) for cl in all_table.columns[-5:]]))

In [None]:
all_table.columns

['ID',
 'committer_name',
 'committer_timestamp',
 'repo_names',
 'repo_org',
 'first_commit',
 'P1_repo_comm_absent',
 'P2_repo_comm_absent',
 'P3_repo_comm_absent',
 'P4_repo_comm_absent',
 'P5_repo_comm_absent',
 'P1_repo_comm_new',
 'P2_repo_comm_new',
 'P3_repo_comm_new',
 'P4_repo_comm_new',
 'P5_repo_comm_new',
 'P1_no_change_periods',
 'P2_no_change_periods',
 'P3_no_change_periods',
 'P4_no_change_periods',
 'P5_no_change_periods',
 'no_change_periods']

In [None]:
all_table_1 = all_table.select([c for c in all_table.columns if c not in 
                                    {'P1_repo_comm_absent',
 'P2_repo_comm_absent',
 'P3_repo_comm_absent',
 'P4_repo_comm_absent',
 'P5_repo_comm_absent',
 'P1_repo_comm_new',
 'P2_repo_comm_new',
 'P3_repo_comm_new',
 'P4_repo_comm_new',
 'P5_repo_comm_new'}])

In [None]:
all_table_1.columns

['ID',
 'committer_name',
 'committer_timestamp',
 'repo_names',
 'repo_org',
 'first_commit',
 'P1_no_change_periods',
 'P2_no_change_periods',
 'P3_no_change_periods',
 'P4_no_change_periods',
 'P5_no_change_periods',
 'no_change_periods']

In [None]:
# all_table = all_table.fillna(0, subset=['P1_repo_committers', 'P2_repo_committers'])

In [None]:
all_table_1.createOrReplaceTempView("repos_11")

## 12. P(n)_ multi_ratio

In [None]:
repos_spark = spark.sql("""

SELECT ID, committer_name, committer_timestamp, 
repo_names, repo_org,first_commit, 

P1_numb_commits_org, P2_numb_commits_org, P3_numb_commits_org, 
P4_numb_commits_org, P5_numb_commits_org,P6_numb_commits_org, 
P7_numb_commits_org,P8_numb_commits_org,P9_numb_commits_org,
P10_numb_commits_org, P11_numb_commits_org,


P1_numb_commits_repo, P2_numb_commits_repo,P3_numb_commits_repo,
P4_numb_commits_repo, P5_numb_commits_repo, P6_numb_commits_repo

FROM
(
SELECT ID as t_ID , committer_name as t_committer_name, committer_timestamp as t_committer_name, 
repo_names as  t_repo_names, repo_org  as t_repo_org, first_commit as t_first_commit,

P1_numb_commits_org, P2_numb_commits_org, P3_numb_commits_org, 
P4_numb_commits_org, P5_numb_commits_org,P6_numb_commits_org, 
P7_numb_commits_org,P8_numb_commits_org,P9_numb_commits_org,
P10_numb_commits_org, P11_numb_commits_org

FROM repos_2
) 

t JOIN repos_3 ON t.t_ID = repos_3.ID
    
ORDER BY ID ASC

""")
repos_spark.createOrReplaceTempView("repos_org_repo")

In [None]:
all_table = spark.sql("""

SELECT repos_org_repo.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

P1_numb_commits_repo/P1_numb_commits_org as P1_multi_ratio, 
P2_numb_commits_repo/P2_numb_commits_org as P2_multi_ratio,
P3_numb_commits_repo/P3_numb_commits_org as P3_multi_ratio,
P4_numb_commits_repo/P4_numb_commits_org as P4_multi_ratio,
P5_numb_commits_repo/P5_numb_commits_org as P5_multi_ratio,
P6_numb_commits_repo/P6_numb_commits_org as P6_multi_ratio

FROM repos_org_repo
""")

In [None]:
all_table = all_table.fillna(0, subset=['P1_multi_ratio', 'P2_multi_ratio',
                                       'P3_multi_ratio', 'P4_multi_ratio',
                                       'P5_multi_ratio', 'P6_multi_ratio'])

In [None]:
all_table.createOrReplaceTempView("repos_12")

## 13. total_commits_org

In [None]:
all_table = spark.sql("""

SELECT repos_2.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit

FROM repos_2
""")

In [None]:
# all_table = all_table.fillna(0, subset=['total_commits_org'])

In [None]:
all_table.createOrReplaceTempView("repos_13")

## 14. avg_multi_ratio

In [None]:
all_table = spark.sql("""

SELECT repos_12.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

(P1_multi_ratio+P2_multi_ratio+P3_multi_ratio+P4_multi_ratio+P5_multi_ratio+P6_multi_ratio) / 6 as avg_multi_ratio

FROM repos_12
""")

In [None]:
all_table = all_table.fillna(0, subset=['avg_multi_ratio'])

In [None]:
all_table.createOrReplaceTempView("repos_14")

## 15. y variable

In [None]:
all_data = spark.sql("""

SELECT repos_2.ID, 
committer_name,  committer_timestamp,  repo_names,  repo_org,  first_commit,  

(CASE WHEN ((P1_numb_commits_org * 0.2) > P5_numb_commits_org AND (P1_numb_commits_org * 0.2) > P6_numb_commits_org)  
        THEN "disengaged"
        ELSE "no_signs" END) y

FROM repos_2

""")


In [None]:
all_data.createOrReplaceTempView("repos_15")

# IV - Save Tables  Locally

In [None]:
spark_repos_2 = spark.sql("""

SELECT *
FROM  repos_2

""")

In [None]:
spark_repos_3 = spark.sql("""

SELECT *
FROM  repos_3

""")

In [None]:
spark_repos_4 = spark.sql("""

SELECT *
FROM  repos_4

""")

In [None]:
spark_repos_5 = spark.sql("""

SELECT *
FROM  repos_5

""")

In [None]:
spark_repos_6 = spark.sql("""

SELECT *
FROM  repos_6

""")

In [None]:
spark_repos_7 = spark.sql("""

SELECT *
FROM  repos_7

""")

In [None]:
spark_repos_8 = spark.sql("""

SELECT *
FROM  repos_8

""")

In [None]:
spark_repos_9 = spark.sql("""

SELECT *
FROM  repos_9

""")

In [None]:
spark_repos_10 = spark.sql("""

SELECT *
FROM  repos_10

""")

In [None]:
spark_repos_11 = spark.sql("""

SELECT *
FROM  repos_11

""")

In [None]:
spark_repos_12 = spark.sql("""

SELECT *
FROM  repos_12

""")

In [None]:
spark_repos_13 = spark.sql("""

SELECT *
FROM  repos_13

""")

In [None]:
spark_repos_14 = spark.sql("""

SELECT *
FROM  repos_14

""")

In [None]:
spark_repos_15 = spark.sql("""

SELECT *
FROM  repos_15

""")

In [None]:
all_table_1 = spark_repos_3.join(spark_repos_2, ["ID", "committer_name", "committer_timestamp",
                                                 "repo_names",  "repo_org",  "first_commit" ], "inner")
all_table_1.toPandas().to_csv('W:/Study 2019-2020/Data Analytics/Labs/all_table_1.csv')

In [None]:
spark_repos_4.toPandas().to_csv('W:/Study 2019-2020/Data Analytics/Labs/spark_repos_4.csv')

In [None]:
spark_repos_5.toPandas().to_csv('W:/Study 2019-2020/Data Analytics/Labs/spark_repos_5.csv')

In [None]:
spark_repos_6.toPandas().to_csv('W:/Study 2019-2020/Data Analytics/Labs/spark_repos_6.csv')

In [None]:
spark_repos_7.toPandas().to_csv('W:/Study 2019-2020/Data Analytics/Labs/spark_repos_7.csv')

In [None]:
spark_repos_8.toPandas().to_csv('W:/Study 2019-2020/Data Analytics/Labs/spark_repos_8.csv')

In [None]:
spark_repos_9.toPandas().to_csv('W:/Study 2019-2020/Data Analytics/Labs/spark_repos_9.csv')

In [None]:
spark_repos_10.toPandas().to_csv('W:/Study 2019-2020/Data Analytics/Labs/spark_repos_10.csv')

In [None]:
spark_repos_11.toPandas().to_csv('W:/Study 2019-2020/Data Analytics/Labs/spark_repos_11.csv')

In [None]:
spark_repos_12.toPandas().to_csv('W:/Study 2019-2020/Data Analytics/Labs/spark_repos_12.csv')

In [None]:
spark_repos_13.toPandas().to_csv('W:/Study 2019-2020/Data Analytics/Labs/spark_repos_13.csv')

In [None]:
spark_repos_14.toPandas().to_csv('W:/Study 2019-2020/Data Analytics/Labs/spark_repos_14.csv')

In [None]:
spark_repos_15.toPandas().to_csv('W:/Study 2019-2020/Data Analytics/Labs/spark_repos_15.csv')