In [None]:
# pyspark --packages org.postgresql:postgresql:42.4.0

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import date as dt

spark = SparkSession.builder.appName('BA_BOT').getOrCreate()

spark_postgre = SparkSession \
    .builder \
    .appName("BA_BOT") \
    .config("spark.jars", "postgresql-42.4.0.jar") \
    .getOrCreate()


params = spark_postgre.read \
     .format("jdbc") \
     .option("url", "jdbc:postgresql://localhost:5432/postgres") \
     .option("user", "postgres") \
     .option("password", "SMM695") \
     .option("driver", "org.postgresql.Driver") \


 df_0 = params \
     .option("dbtable", "commit_info") \
     .load()

 df_1 = params \
     .option("dbtable", "update_info") \
     .load()

 df_2 = params \
     .option("dbtable", "added_deleted_lines") \
     .load()

 df_3 = params \
     .option("dbtable", "issue") \
     .load()

 df_4 = params \
     .option("dbtable", "comment") \
     .load()

 df_0.printSchema() # commit_info TABLE
 df_1.printSchema() # update_info TABLE
 df_2.printSchema() # added_deleted_lines TABLE
 df_3.printSchema() # issue TABLE
 df_4.printSchema() # comment TABLE

df_3 = df_3.withColumn('lifespan_days', F.datediff(df_3.closed_at, df_3.created_at)) # add a new column lifespan: how long the issue existed
df_3.show()

df_0.describe(['deletions','insertions','lines','files']).show()
df_1.describe(['deleted_lines','nloc','complexity','token_count']).show()
df_3.describe(['n_comments','project','lifespan_days']).show()


#df_0.createOrReplaceTempView('commit_info')
#df_1.createOrReplaceTempView('update_info')
#df_2.createOrReplaceTempView('added_deleted_lines')

# deal with df_3 issue TABLE: perform a logistic regression on project ~ n_comments + age, where age = AGE(closed_at, created_at).int(?)
df_3 = df_3.na.drop('any')

df_3.describe(['n_comments','project','lifespan_days']).show()

df_3 = df_3.withColumn('lifespan_days', F.datediff(df_3.closed_at, df_3.created_at)) # add a new column lifespan: how long the issue existed
df_3.show()

df_3.groupby('project').count().show() # so pytorch: 120 -> 112, tensorflow: 5358 -> 2005

from pyspark.ml.feature import StringIndexer
# indexing "project"
idx_0 = StringIndexer().setInputCol("project").setOutputCol("project_idx")
df_3 = idx_0.fit(df_3).transform(df_3)
df_3.select(['lifespan_days', 'project', 'project_idx']).show()

from pyspark.ml.feature import OneHotEncoder
# one hot encoding for project
ohe_0 = OneHotEncoder().setInputCol("project_idx").setOutputCol("project_idx_ohe")
ohe_0.fit(df_3).transform(df_3).select(['project', 'project_idx', 'project_idx_ohe']).show()
df_3 = ohe_0.fit(df_3).transform(df_3)

from pyspark.ml.feature import VectorAssembler
# Assembling a Vector for Logistic Regression
v_0 = VectorAssembler() \
     .setInputCols(["lifespan_days", "project_idx_ohe"]) \
     .setOutputCol('features_0')
df_3 = v_0.transform(df_3)
df_3.select(['features_0']).show()

# split data
train, test = df_3.randomSplit([0.7, 0.3])

# take a sample out to make it less imbalanced ---------------------------------------------------
df_t = df_3.filter("project like '%tensorflow%'")
df_p = df_3.filter("project like '%pytorch%'") 
big_train_t, small_train_t = df_t.randomSplit([0.95, 0.05]) 
big_train_p, small_train_p = df_p.randomSplit([0.95, 0.05])
# so use small_train_t, big_train_p to train the model and original test to test
train_balanced = small_train_t.union(big_train_p)

from pyspark.sql.functions import count
train_balanced.groupBy(train_balanced.project).count().show() #1:1

a = lr_0.explainParams().split('\n')
x = 1
for i in a:
    b = i.split(':',1)
    b_0,b_1 = '\033[1m' + b[0] + '\033[0m', b[1]
    print("""{}. {} : {}
    """.format(x, b_0,b_1), flush=True)
    x=x+1
fitLr_0 = lr_0.fit(train_balanced)
fitLr_0.transform(test).select("project_idx", "prediction").show()
predictions = fitLr_0.transform(test)
predictions.select('project_idx', 'rawPrediction', 'probability', 'prediction').toPandas().head(30)
s_0 = fitLr_0.summary
objectiveHistory = s_0.objectiveHistory
print("""
- Accuracy: {} 
- Area Under ROC : {}
- False Positive Rate by Label: {}
- Precision by Label: {}
- Tot. Iterations: {}
- Objective History: 
{}
""".format(s_0.accuracy, s_0.areaUnderROC,
           s_0.falsePositiveRateByLabel, s_0.precisionByLabel,
           s_0.totalIterations, [obj for obj in objectiveHistory]),
      flush=True)

#from pyspark.ml.evaluation import BinaryClassificationEvaluator
#evaluator = BinaryClassificationEvaluator()
#print('Test set ROC', evaluator.evaluate(predictions))

accuracy = predictions.filter(predictions.project_idx == predictions.prediction).count() / float(predictions.count()) # 1!!!

# ---------------------------------------------------------------------------------------

# logistic regression
from pyspark.ml.classification import LogisticRegression
lr_0 = LogisticRegression(labelCol="project_idx",featuresCol="features_0")

a = lr_0.explainParams().split('\n')
x = 1
for i in a:
    b = i.split(':',1)
    b_0,b_1 = '\033[1m' + b[0] + '\033[0m', b[1]
    print("""{}. {} : {}
    """.format(x, b_0,b_1), flush=True)
    x=x+1
fitLr_0 = lr_0.fit(train)
print(""" 

Coefficients:
============
{}


Intercept:
=========
{}

""".format(fitLr_0.coefficients, fitLr_0.intercept), flush=True)
fitLr_0.transform(test).select("project_idx", "prediction").show()
s_0 = fitLr_0.summary
objectiveHistory = s_0.objectiveHistory
print("""
- Accuracy: {} # 1! which is amazing??
- Area Under ROC : {}
- False Positive Rate by Label: {}
- Precision by Label: {}
- Tot. Iterations: {}
- Objective History: 
{}
""".format(s_0.accuracy, s_0.areaUnderROC,
           s_0.falsePositiveRateByLabel, s_0.precisionByLabel,
           s_0.totalIterations, [obj for obj in objectiveHistory]),
      flush=True)

