In [1]:
import os
import json
from difflib import unified_diff
from pyspark import ml
import numpy as np
from pyspark import SparkContext
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import udf
from pyspark.ml import Pipeline, Transformer, PipelineModel
from pyspark.sql.types import *
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, RegexTokenizer, StopWordsRemover, CountVectorizer, VectorAssembler, StringIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.classification import DecisionTreeClassifier, NaiveBayes, NaiveBayesModel

from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType

cwd = os.getcwd()
print(cwd)

C:\Users\u0115374\Desktop\spark


In [7]:
sample_a = spark.read.parquet("sample_a.parquet")
sample_b = spark.read.parquet("sample_b.parquet")
sample_c = spark.read.parquet("sample_c.parquet")

In [None]:
sc = SparkContext("local", "example")
spark = SparkSession.builder.appName('example').getOrCreate()
spark

In [None]:
sc

In [None]:
#df_a = spark.read.format("json").load('C:\\Users\\u0115374\\Documents\\PhD\\Courses\\Big Data\\Assignment3\\streaming_1a\\*')
df_a = spark.read.format("json").load('C:\\Users\\u0115374\\Documents\\PhD\\Courses\\Big Data\\Assignment3\\streaming_1a\\*')#.groupBy("label").count().show()

In [None]:
df_b = spark.read.format("json").load('C:\\Users\\u0115374\\Documents\\PhD\\Courses\\Big Data\\Assignment3\\streaming_1b\\*')


In [None]:
df_c = spark.read.format("json").load('C:\\Users\\u0115374\\Documents\\PhD\\Courses\\Big Data\\Assignment3\\streaming_1c\\*')
df_c.printSchema()

In [None]:
df_c.groupBy("label").count().show()

In [None]:
'''Try to create balanced subsamples of all stream folders'''
fractions_a = {'safe': 0.01, 'unsafe': 0.10, 'vandal': 1}
fractions_b = {'safe': 0.01, 'unsafe': 0.065, 'vandal': 1}
fractions_c = {'safe': 0.012, 'unsafe': 0.065, 'vandal': 1}
#sample_a = df_a.sampleBy('label', fractions=fractions_a, seed = 19052020)
#sample_b = df_b.sampleBy('label', fractions=fractions_b, seed = 19052020)
#sample_c = df_c.sampleBy('label', fractions=fractions_c, seed = 19052020)

In [8]:
sample_a.groupBy("label").count().show()
sample_b.groupBy("label").count().show()
sample_c.groupBy("label").count().show()

+------+-----+
| label|count|
+------+-----+
|unsafe|  288|
|  safe|  227|
|vandal|  245|
+------+-----+

+------+-----+
| label|count|
+------+-----+
|unsafe|  450|
|  safe|  474|
|vandal|  416|
+------+-----+

+------+-----+
| label|count|
+------+-----+
|  safe|  584|
|unsafe|  636|
|vandal|  584|
+------+-----+



In [None]:
df = sample_a

In [None]:
#df_a.write.parquet("df_a.parquet")
#sample_a.write.parquet("sample_a.parquet")

#df_b.write.parquet("df_b.parquet")
#sample_b.write.parquet("sample_b.parquet")

#df_c.write.parquet("df_c.parquet")
#sample_c.write.parquet("sample_c.parquet")

In [None]:
def make_diff(old, new):
    return '\n'.join([ l for l in unified_diff(old.split('\n'), new.split('\n')) if l.startswith('+') or l.startswith('-') ])

udfMake_Diff = udf(make_diff, StringType())

def add_diff(df):
    df.withColumn("diff", udfMake_Diff("text_old", "text_new"))

In [None]:
class DiffColTransformer(Transformer, ml.util.DefaultParamsWritable, ml.util.DefaultParamsReadable):
    '''Custom transformer to get udf makeDiff into pipeline.'''
    def __init__(self):
        super(DiffColTransformer, self).__init__()
        
    def _transform(self, df: DataFrame) -> DataFrame:
        df = df.withColumn("diff", udfMake_Diff("text_old", "text_new"))
        return df

In [None]:
train, test = df.randomSplit([0.70, 0.30])

In [None]:
dct = DiffColTransformer()
tokenizer = RegexTokenizer(inputCol="diff", outputCol="words", pattern="\\W")
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
cv = CountVectorizer(inputCol="filtered", outputCol="vectors")
idf = IDF(inputCol="vectors", outputCol="features")
label_indexer = StringIndexer(inputCol = "label", outputCol = "target")

nb = NaiveBayes(smoothing=1.0, modelType="multinomial", labelCol="target", featuresCol="features")

In [None]:
stages = [dct, tokenizer, remover, cv, idf, label_indexer, nb]

In [None]:
model = Pipeline(stages=stages).fit(dataset = train)

In [None]:
model.transform(test)

In [None]:
evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction", metricName="accuracy")

In [None]:
evaluator.evaluate(model.transform(test))

In [None]:
model.save('nbmodel')

In [None]:
from threading import Thread

class StreamingThread(Thread):
    def __init__(self, ssc):
        Thread.__init__(self)
        self.ssc = ssc
    def run(self):
        ssc.start()
        ssc.awaitTermination()
    def stop(self):
        print('----- Stopping... this may take a few seconds -----')
        self.ssc.stop(stopSparkContext=False, stopGraceFully=True)

In [None]:
globals()['models_loaded'] = False

def process(time, rdd):
    if rdd.isEmpty():
        return
    
    print("========= %s =========" % str(time))
    
    # Convert to data frame
    df = spark.read.json(rdd)
    df.show()
    '''# Load in the model if not yet loaded:
    if not globals()['models_loaded']:
        # load in your models here
        globals()['nbmodel'] = PipelineModel.load('nbmodel') # Replace '***' with:    [...].load('my_logistic_regression')
        globals()['models_loaded'] = True
        
    df_result = globals()['nbmodel'].transform(df)
    df_result.select('target').show()'''

In [None]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:95% !important; }</style>"))

In [None]:
ssc = StreamingContext(sc, 10)

In [None]:
lines = ssc.socketTextStream("seppe.net", 7778)
lines.foreachRDD(process)

In [None]:
ssc_t = StreamingThread(ssc)
ssc_t.start()

In [None]:
ssc_t.stop()