In [2]:
import os
import sys
import re 
import nltk
from pyspark.ml.feature import StopWordsRemover
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StringType, ArrayType
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower, col


In [95]:
spark = SparkSession.builder \
         .appName("big-data-project") \
         .getOrCreate()

# data = spark.read.csv("gnm_comments.csv", header=True) #gives a dataframe

# data = spark.sparkContext.textFile('gnm_articles.csv') #gives an rdd

data = spark.read.csv("SFU_constructiveness_toxicity_corpus.csv", header=True) #gives a dataframe


In [96]:
data2 = data.select(['comment_text','is_constructive'])
data2 = data2.where(col("comment_text").isNotNull())
# data2.show()

def sentiment(x):
    if x=='yes': 
        return float(1)
    else: 
        return float(0)

sentiment_udf = udf(lambda x: sentiment(x), FloatType())
data2 = data2.withColumn("sentiment", sentiment_udf("is_constructive"))

data2 = data2.withColumn("comment_text", lower(col("comment_text")))

data2.show(30)

+--------------------+---------------+---------+
|        comment_text|is_constructive|sentiment|
+--------------------+---------------+---------+
|while technology ...|            yes|      1.0|
|everyone is still...|            yes|      1.0|
|you've never used...|             no|      0.0|
|you may be using ...|             no|      0.0|
|of course we all ...|             no|      0.0|
|simpson claims th...|            yes|      1.0|
|we have multiple ...|            yes|      1.0|
|a good start to s...|            yes|      1.0|
|all of this energ...|            yes|      1.0|
|time for the elde...|            yes|      1.0|
|canada has done m...|             no|      0.0|
|a few of the comm...|            yes|      1.0|
|there is a differ...|            yes|      1.0|
|it's absolutely n...|            yes|      1.0|
|i honestly cannot...|            yes|      1.0|
|why does the glob...|             no|      0.0|
|playing the race ...|             no|      0.0|
|the historic head..

In [97]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator


In [98]:
(train_set, validation_set) = data2.randomSplit([0.7,0.3], seed = 2000)

In [99]:
tokenizer = Tokenizer(inputCol="comment_text", outputCol="words")
hashtf = HashingTF(numFeatures=2**16, inputCol="words", outputCol='tf')
idf = IDF(inputCol='tf', outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
label_stringIdx = StringIndexer(inputCol = "sentiment", outputCol = "label")
pipeline = Pipeline(stages=[tokenizer, hashtf, idf, label_stringIdx])

pipelineFit = pipeline.fit(train_set)
train_df = pipelineFit.transform(train_set)
validation_df = pipelineFit.transform(validation_set)
train_df.count()
train_df.show()


+--------------------+--------------------+---------+--------------------+--------------------+--------------------+-----+
|        comment_text|     is_constructive|sentiment|               words|                  tf|            features|label|
+--------------------+--------------------+---------+--------------------+--------------------+--------------------+-----+
| but this last pa...|                null|      0.0|[, but, this, las...|(65536,[1518,3331...|(65536,[1518,3331...|  1.0|
|      !!!!!!!!!!!!!!|                  no|      0.0|    [!!!!!!!!!!!!!!]|(65536,[27630],[1...|(65536,[27630],[0...|  1.0|
|!!!!!''turks shoo...|                  no|      0.0|[!!!!!''turks, sh...|(65536,[8436,1045...|(65536,[8436,1045...|  1.0|
|"""canada's carte...|                  no|      0.0|["""canada's, car...|(65536,[1752,2820...|(65536,[1752,2820...|  1.0|
|"""union bosses""...|                  no|      0.0|["""union, bosses...|(65536,[170,637,1...|(65536,[170,637,1...|  1.0|
|"@informed albe

In [100]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(maxIter=100)
lrModel = lr.fit(train_df)
predictions = lrModel.transform(validation_df)



In [101]:
predictions.select(['comment_text','sentiment','label','rawPrediction']).show()
trainingSummary = lrModel.summary


+--------------------+---------+-----+--------------------+
|        comment_text|sentiment|label|       rawPrediction|
+--------------------+---------+-----+--------------------+
|             """what|      0.0|  1.0|[-16.556949459227...|
|"of course you ig...|      0.0|  1.0|[8.58827612161051...|
|' the problem is ...|      0.0|  1.0|[5.77610890326717...|
|'''rouba al-fatta...|      1.0|  0.0|[-2.2271023186132...|
|'...whether the f...|      0.0|  1.0|[-29.457463998710...|
|'he’s hardly the ...|      1.0|  0.0|[22.6459438700175...|
|'he’s hardly the ...|      1.0|  0.0|[67.1813873056786...|
|'honey, do you re...|      0.0|  1.0|[3.10616648978955...|
|'if she was a man...|      1.0|  0.0|[34.0667181172660...|
|'the world has co...|      0.0|  1.0|[-12.061915383038...|
|'this $37-billion...|      1.0|  0.0|[48.6080923891138...|
|'what future do r...|      1.0|  0.0|[-1.1694210706198...|
|'yes to uber. no ...|      0.0|  1.0|[-38.883508202324...|
|a few of the comm...|      1.0|  0.0|[4

In [102]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)


0.7995129870129875