In [18]:
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.sql import SQLContext
from pyspark.sql.functions import col, avg, lit, udf, count, stddev
from pyspark.sql.types import FloatType
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF, HashingTF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.stat import Summarizer
from functools import reduce
from datetime import datetime
from data_utils import get_date_range, get_data, clear_data
import matplotlib
import matplotlib.pyplot as plt
import pandas
import os
import numpy as np
import json
import requests

In [19]:
spark = SparkSession.builder.getOrCreate()
sqlContext = SQLContext(spark.sparkContext)

In [20]:
# Sentiment 140 Dataset (Tweet + sentiment)
df = sqlContext.read.format('com.databricks.spark.csv').options(header='false').load('training.1600000.processed.noemoticon.csv', schema='Category INTEGER, Timestamp LONG, Date STRING, Query STRING, Username STRING, Text STRING')
df.printSchema()

root
 |-- Category: integer (nullable = true)
 |-- Timestamp: long (nullable = true)
 |-- Date: string (nullable = true)
 |-- Query: string (nullable = true)
 |-- Username: string (nullable = true)
 |-- Text: string (nullable = true)



In [21]:
# Show categories (4 = positive, 0 = negative)
df = df.where(col("Category").isNotNull())
df.groupBy("Category").count().orderBy(col("count").desc()).show()

+--------+------+
|Category| count|
+--------+------+
|       0|800000|
|       4|800000|
+--------+------+



In [22]:
def create_model(regParam, vocabSize, elasticNetParam, df):
    
    print("regParam: %f, vocabSize: %d, elasticNetParam: %f" % (regParam, vocabSize, elasticNetParam))

    # Tokenizer
    regexTokenizer = RegexTokenizer(inputCol="Text", outputCol="words", pattern="@\\S*|\\W|http\\S*")

    # Stop words
    stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered")
    stopwordsRemover.setStopWords(StopWordsRemover.loadDefaultStopWords("english"))

    # Bag of words
    countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=vocabSize, minDF=5)

    # String indexer
    label_stringIdx = StringIndexer(inputCol = "Category", outputCol = "label")

    # Apply the pipeline
    pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
    pipelineFit = pipeline.fit(df)
    dataset = pipelineFit.transform(df)
    
    # Training and test splits
    (trainingData, testData) = dataset.randomSplit([0.9, 0.1], seed = 100)
    
    # Train and evaluate the model
    lr = LogisticRegression(maxIter=10, regParam=regParam, elasticNetParam=elasticNetParam)
    model = lr.fit(trainingData)
    predictions = model.transform(testData)
    evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
    f1_score = evaluator.evaluate(predictions)
    
    return [model, pipelineFit, trainingData, testData, f1_score]

In [7]:
# Find best regParam and vocabSize combination
for regParam in [0.0, 0.125, 0.25, 0.5]:
    for vocabSize in [100, 1000, 10000, 100000]:
        model, pipelineFit, trainingData, testData, f1_score = create_model(regParam, vocabSize, 0.0, df)
        print("F1-score: %f" % f1_score)
        
"""
Results 

regParam: 0.000000, vocabSize: 100
F1 Score: 0.645073
regParam: 0.000000, vocabSize: 1000
F1 Score: 0.744660
> regParam: 0.000000, vocabSize: 10000
F1 Score: 0.777977
regParam: 0.000000, vocabSize: 100000
F1 Score: 0.776525
regParam: 0.125000, vocabSize: 100
F1 Score: 0.644638
regParam: 0.125000, vocabSize: 1000
F1 Score: 0.743337
regParam: 0.125000, vocabSize: 10000
F1 Score: 0.775839
regParam: 0.125000, vocabSize: 100000
F1 Score: 0.777695
regParam: 0.250000, vocabSize: 100
F1 Score: 0.643508
regParam: 0.250000, vocabSize: 1000
F1 Score: 0.742042
regParam: 0.250000, vocabSize: 10000
F1 Score: 0.774576
regParam: 0.250000, vocabSize: 100000
F1 Score: 0.777197
regParam: 0.500000, vocabSize: 100
F1 Score: 0.643352
regParam: 0.500000, vocabSize: 1000
F1 Score: 0.740815
regParam: 0.500000, vocabSize: 10000
F1 Score: 0.772755
regParam: 0.500000, vocabSize: 100000
F1 Score: 0.775840

"""


regParam: 0.000000, vocabSize: 100, elasticNetParam: 0.000000
F1-score: 0.643338
regParam: 0.000000, vocabSize: 1000, elasticNetParam: 0.000000
F1-score: 0.741295
regParam: 0.000000, vocabSize: 10000, elasticNetParam: 0.000000
F1-score: 0.776351
regParam: 0.000000, vocabSize: 100000, elasticNetParam: 0.000000
F1-score: 0.774960
regParam: 0.125000, vocabSize: 100, elasticNetParam: 0.000000
F1-score: 0.643418
regParam: 0.125000, vocabSize: 1000, elasticNetParam: 0.000000
F1-score: 0.739739
regParam: 0.125000, vocabSize: 10000, elasticNetParam: 0.000000
F1-score: 0.773997
regParam: 0.125000, vocabSize: 100000, elasticNetParam: 0.000000
F1-score: 0.775814
regParam: 0.250000, vocabSize: 100, elasticNetParam: 0.000000
F1-score: 0.642606
regParam: 0.250000, vocabSize: 1000, elasticNetParam: 0.000000
F1-score: 0.738675
regParam: 0.250000, vocabSize: 10000, elasticNetParam: 0.000000
F1-score: 0.772538
regParam: 0.250000, vocabSize: 100000, elasticNetParam: 0.000000
F1-score: 0.775309
regParam: 

'\nResults \n\nregParam: 0.000000, vocabSize: 100\nF1 Score: 0.645073\nregParam: 0.000000, vocabSize: 1000\nF1 Score: 0.744660\n> regParam: 0.000000, vocabSize: 10000\nF1 Score: 0.777977\nregParam: 0.000000, vocabSize: 100000\nF1 Score: 0.776525\nregParam: 0.125000, vocabSize: 100\nF1 Score: 0.644638\nregParam: 0.125000, vocabSize: 1000\nF1 Score: 0.743337\nregParam: 0.125000, vocabSize: 10000\nF1 Score: 0.775839\nregParam: 0.125000, vocabSize: 100000\nF1 Score: 0.777695\nregParam: 0.250000, vocabSize: 100\nF1 Score: 0.643508\nregParam: 0.250000, vocabSize: 1000\nF1 Score: 0.742042\nregParam: 0.250000, vocabSize: 10000\nF1 Score: 0.774576\nregParam: 0.250000, vocabSize: 100000\nF1 Score: 0.777197\nregParam: 0.500000, vocabSize: 100\nF1 Score: 0.643352\nregParam: 0.500000, vocabSize: 1000\nF1 Score: 0.740815\nregParam: 0.500000, vocabSize: 10000\nF1 Score: 0.772755\nregParam: 0.500000, vocabSize: 100000\nF1 Score: 0.775840\n\n'

In [8]:
model, pipelineFit, trainingData, testData, f1_score = create_model(0.0, 10000, 0.0, df)

regParam: 0.000000, vocabSize: 10000, elasticNetParam: 0.000000


In [9]:
test_df = spark.createDataFrame(
    [
        ("name", "Your code sucks and you will never get a job."),
        ("name", "I love your library, very excellent code sir."),
        ("name", "Why did you choose Java? It is so f***ing slow and terrible."),
        ("name", "Yes, thank you very much friend."),
        ("name", "I should be around 0.5."),
    ],
    ["Name", "Text"]
)

model.transform(pipelineFit.transform(test_df)).select(col('probability'), col('prediction'), col('Text')).show()

+--------------------+----------+--------------------+
|         probability|prediction|                Text|
+--------------------+----------+--------------------+
|[0.06269704531033...|       1.0|Your code sucks a...|
|[0.94724971501712...|       0.0|I love your libra...|
|[0.11565798243482...|       1.0|Why did you choos...|
|[0.92590959580645...|       0.0|Yes, thank you ve...|
|[0.48689805666691...|       1.0|I should be aroun...|
+--------------------+----------+--------------------+



In [59]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header='false').load('data_april2021.csv', schema='Repo STRING, Language STRING, Comment STRING, Forks_Count INTEGER, Stargazers_count INTEGER, Open_issues_count INTEGER, Date TIMESTAMP')

df.createOrReplaceTempView("events")

In [60]:
def get_js_data():    
    lang_df = spark.sql("SELECT Repo, Comment as Text, Language FROM events WHERE Language IN ('JavaScript')").dropna()

    # Sentiment
    union_df = model.transform(pipelineFit.transform(lang_df)).select(col('probability'), col('Language'), col('Text'))
    # Pandas
    pd = union_df.toPandas()
    s = pd['probability'].apply(lambda x: pandas.Series(x.toArray()))
    pd['p'] = 1 - s[0]
    pd = pd.drop('probability', 1)
    
    return pd

def get_java_data():    
    lang_df = spark.sql("SELECT Repo, Comment as Text, Language FROM events WHERE Language IN ('Java')").dropna()

    # Sentiment
    union_df = model.transform(pipelineFit.transform(lang_df)).select(col('probability'), col('Language'), col('Text'))
    # Pandas
    pd = union_df.toPandas()
    s = pd['probability'].apply(lambda x: pandas.Series(x.toArray()))
    pd['p'] = 1 - s[0]
    pd = pd.drop('probability', 1)
    
    return pd

def get_python_data():    
    lang_df = spark.sql("SELECT Repo, Comment as Text, Language FROM events WHERE Language IN ('Python')").dropna()

    # Sentiment
    union_df = model.transform(pipelineFit.transform(lang_df)).select(col('probability'), col('Language'), col('Text'))
    # Pandas
    pd = union_df.toPandas()
    s = pd['probability'].apply(lambda x: pandas.Series(x.toArray()))
    pd['p'] = 1 - s[0]
    pd = pd.drop('probability', 1)
    
    return pd


def get_google_data():    
    google_df = spark.sql("SELECT Repo, Comment as Text FROM events WHERE Repo LIKE '%google/%'").dropna()
    
    # Sentiment
    union_df =  model.transform(pipelineFit.transform(google_df)).select(col('probability'), col('Repo'), col('Text'))
    # Pandas
    pd = union_df.toPandas()
    s = pd['probability'].apply(lambda x: pandas.Series(x.toArray()))
    pd['p'] = 1 - s[0]
    pd = pd.drop('probability', 1)
    
    return pd

def get_amazon_data():    
    amazon_df = spark.sql("SELECT Repo, Comment as Text FROM events WHERE Repo LIKE '%aws/%'").dropna()
    
    # Sentiment
    union_df =  model.transform(pipelineFit.transform(amazon_df)).select(col('probability'), col('Repo'), col('Text'))
    # Pandas
    pd = union_df.toPandas()
    s = pd['probability'].apply(lambda x: pandas.Series(x.toArray()))
    pd['p'] = 1 - s[0]
    pd = pd.drop('probability', 1)
    return pd

def get_facebook_data():    
    facebook_df = spark.sql("SELECT Repo, Comment as Text FROM events WHERE Repo LIKE '%facebook/%'").dropna()
    
    union_df = model.transform(pipelineFit.transform(facebook_df)).select(col('probability'), col('Repo'), col('Text'))
    # Pandas
    pd = union_df.toPandas()
    s = pd['probability'].apply(lambda x: pandas.Series(x.toArray()))
    pd['p'] = 1 - s[0]
    pd = pd.drop('probability', 1)
    
    return pd

In [61]:
# results = [['Python', 0.510208], ['Go', 0.574086], ['JavaScript',0.521960], ['C++',0.404080], ['Java', 0.296523]]
# df = pandas.DataFrame(results, columns = ['Name', 'P'])

In [62]:
lang = get_facebook_data()
lang.to_csv('facebook2021.csv', index=False)

In [63]:
lang = get_amazon_data()
lang.to_csv('amazon2021.csv', index=False)

In [64]:
lang = get_google_data()
lang.to_csv('google2021.csv', index=False)

In [65]:
lang = get_js_data()
lang.to_csv('js2021.csv', index=False)

In [66]:
lang = get_python_data()
lang.to_csv('python2021.csv', index=False)

In [67]:
lang = get_java_data()
lang.to_csv('java2021.csv', index=False)