In [1]:
from pyspark.streaming import StreamingContext
from pyspark.sql import Row
from pyspark.sql.functions import udf, struct, array, col, lit
from pyspark.sql.types import StringType

In [2]:
sc
spark

In [3]:
# read RDDs
import os
rootdir = 'C:/Pr/AA_Big_Data/Assignment_3/spark/save_sub'

files_list = []

for subdir, dirs, files in os.walk(rootdir):
    for name in files:
        if "part" in name.lower() and not ".crc" in name.lower():
            #files_list.append( sc.textFile(os.path.join(subdir,name)) )
            files_list.append(os.path.join(subdir,name))

In [4]:
df = spark.read.json(sc.textFile(','.join(files_list)))
df.show(5)

+--------------------+-----+--------------------+--------------------+--------------------+---------------+--------------------+
|             comment|label|           name_user|            text_new|            text_old|     title_page|            url_page|
+--------------------+-----+--------------------+--------------------+--------------------+---------------+--------------------+
|→‎Theatrical care...| safe|           John B123|{{short descripti...|{{short descripti...|Dorothea Jordan|//en.wikipedia.or...|
|                    | safe|           TSUBAME98|{{BLP sources|dat...|{{BLP sources|dat...|      Kan Otake|//en.wikipedia.or...|
|→‎top:hatnote not...| safe|             Uanfala|The '''Motozintle...|{{for|the languag...|  Motozintlecos|//en.wikipedia.or...|
|Add image caption...| safe|         Robby.is.on|{{short descripti...|{{short descripti...|  Joseph Akpala|//en.wikipedia.or...|
|Removed external ...| safe|Anthonygalluccisc...|{{wikt|Kemet|ke'm...|{{wikt|Kemet|ke'm...|      

In [5]:
#check data structure
print(df.columns)
type(df)

['comment', 'label', 'name_user', 'text_new', 'text_old', 'title_page', 'url_page']


pyspark.sql.dataframe.DataFrame

In [6]:
#df.toPandas().to_csv('C:/Pr/AA_Big_Data/Assignment_3/spark/csv/mycsv.csv')

In [5]:
#define diff
from difflib import unified_diff

def make_diff(old, new):
    return '\n'.join([ l for l in unified_diff(old.split('\n'), new.split('\n')) if l.startswith('+') or l.startswith('-') ])

In [6]:
#make diff column
diff = make_diff(df.first().text_old, df.first().text_new)
df_withdiff = df.withColumn("diff", lit(diff))
df_withdiff.select('diff').show()


+--------------------+
|                diff|
+--------------------+
|--- 

+++ 

-|ali...|
|--- 

+++ 

-|ali...|
|--- 

+++ 

-|ali...|
|--- 

+++ 

-|ali...|
+--------------------+



In [7]:
#change column
df_wd = df_withdiff.withColumnRenamed('label', 'label_string')

In [8]:
#show top 5 rows
#print(df_withdiff.columns)
print(df_withdiff.show(5))

+--------------------+-----+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|             comment|label|         name_user|            text_new|            text_old|          title_page|            url_page|                diff|
+--------------------+-----+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                    | safe|      WikiMaster37|{{pp-vandalism|sm...|{{pp-vandalism|sm...|List of World Ser...|//en.wikipedia.or...|--- 

+++ 

-|ali...|
|→‎Members of the ...| safe|Mr Serjeant Buzfuz|{{short descripti...|{{short descripti...|Gaspé (Province o...|//en.wikipedia.or...|--- 

+++ 

-|ali...|
|         →‎Biography| safe|      SabellaAsher|{{Use dmy dates|d...|{{Use dmy dates|d...|        Sabine Huynh|//en.wikipedia.or...|--- 

+++ 

-|ali...|
|→‎The English Pro...| safe|        Jenhawk777|[[Persecution]] i...|[[Persecution]

In [14]:
#For model pipeline: tokenize diff column, remove stop words
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="diff", outputCol="words", pattern="\\W")
# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"] 
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
# bag of words count
#do not use, this uses count vectors instead of TF-IDF!
#countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=3)

In [15]:
#index categorical variable
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "label_string", outputCol = "label")

+--------------------+------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|             comment|label_string|         name_user|            text_new|            text_old|          title_page|            url_page|                diff|               words|            filtered|            features|label|
+--------------------+------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----+
|                    |        safe|      WikiMaster37|{{pp-vandalism|sm...|{{pp-vandalism|sm...|List of World Ser...|//en.wikipedia.or...|--- 

+++ 

-|ali...|[align, left, sty...|[align, left, sty...|(73,[0,1,2,3,4,5,...|  0.0|
|→‎Members of the ...|        safe|Mr Serjeant Buzfuz|{{short descripti...|{{short d

In [16]:
# set seed for reproducibility
#print obs count
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Training Dataset Count: 3
Test Dataset Count: 1


In [17]:
#do TF-IDF embeding, fit a pipeline, fit a logisti
from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=2) #minDocFreq: remove sparse terms
#define the pipeline
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])
#fit a pipeline and transform
pipelineFit = pipeline.fit(df_wd)
dataset = pipelineFit.transform(df_wd)
#split dataset into train and test
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
#fit logistic regression
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
#predict on test set
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("diff","label_string","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

+------------------------------+------------+-----------+-----+----------+
|                          diff|label_string|probability|label|prediction|
+------------------------------+------------+-----------+-----+----------+
|--- 

+++ 

-|align=left st...|        safe|  [1.0,0.0]|  0.0|       0.0|
+------------------------------+------------+-----------+-----+----------+



In [18]:
#evaluate model accuracy
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

1.0

In [19]:
# save logit model
from pyspark.ml import PipelineModel
outpath = 'C:/Pr/AA_Big_Data/Assignment_3/spark/output'
lrModel.write().overwrite().save(outpath)

In [36]:
#from pyspark.ml.classification import LogisticRegressionModel
#model_in = LogisticRegressionModel.load(outpath)