In [1]:
import findspark

findspark.init()

In [2]:
from pyspark import SparkContext
sc = SparkContext()
sc

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("final_model").getOrCreate()

In [4]:
data = spark.read.parquet("s3://ppol567lab2/data_text.parquet")

In [19]:
data.printSchema()

root
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- body: string (nullable = true)
 |-- author_patreon_flair: boolean (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- collapsed: boolean (nullable = true)
 |-- score: long (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- comment: integer (nullable = true)
 |-- post: integer (nullable = true)
 |-- moderator: integer (nullable = true)
 |-- admin: integer (nullable = true)
 |-- body_clean: string (nullable = true)
 |-- body_tokenized: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- body_no_stopw: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawfeatures: vector (nullable = true)
 |-- feats: vector (nullable = true)



In [21]:
from pyspark.ml.feature import VectorAssembler

#assembling vector to run LR 
vectorAssembler = VectorAssembler(inputCols = ['score',
                                              'collapsed', 'moderator', 'feats', 'comment',], 
                                  outputCol = 'features')

model_df = vectorAssembler.transform(data)
model_df = model_df.select('features', 'controversiality')

In [22]:
model_df.printSchema()

root
 |-- features: vector (nullable = true)
 |-- controversiality: long (nullable = true)



In [23]:
#split into training and test
training, test = model_df.randomSplit([0.6, 0.4], 123)


In [24]:
training.count()

26297781

In [None]:
test.count()

In [14]:
from pyspark.ml.classification import LogisticRegression

#running first model
lr = LogisticRegression(featuresCol = 'features', labelCol = 'controversiality', maxIter=10)
lr_model = lr.fit(training)

In [15]:
#saving output for later use
lr_model.save("s3://ppol567lab2/lr_model.model")


In [22]:
# Make predictions.
predictions = lr_model.transform(test)

In [23]:
predictions.show()

+-------------------+----------------+--------------------+--------------------+----------+
|           features|controversiality|       rawPrediction|         probability|prediction|
+-------------------+----------------+--------------------+--------------------+----------+
|     (262148,[],[])|               0|[3.84760059684005...|[0.97911464607619...|       0.0|
|     (262148,[],[])|               0|[3.84760059684005...|[0.97911464607619...|       0.0|
|     (262148,[],[])|               0|[3.84760059684005...|[0.97911464607619...|       0.0|
|     (262148,[],[])|               0|[3.84760059684005...|[0.97911464607619...|       0.0|
|     (262148,[],[])|               0|[3.84760059684005...|[0.97911464607619...|       0.0|
|     (262148,[],[])|               0|[3.84760059684005...|[0.97911464607619...|       0.0|
|     (262148,[],[])|               0|[3.84760059684005...|[0.97911464607619...|       0.0|
|     (262148,[],[])|               0|[3.84760059684005...|[0.97911464607619...|

In [37]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

#select only prediction and label columns
preds_and_labels = predictions.select(['prediction','controversiality'])

preds_and_labels = preds_and_labels.withColumn("prediction",preds_and_labels.prediction.cast('float'))
preds_and_labels = preds_and_labels.withColumn("controversiality",preds_and_labels.controversiality.cast('float'))

metrics = BinaryClassificationMetrics(preds_and_labels.rdd.map(tuple))


In [38]:
# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)

Area under ROC = 0.501353127128873


In [5]:
#changing column from boolean to int
data = data.withColumn("author_patreon_flair",data.author_patreon_flair.cast('int'))


In [17]:
from pyspark.sql.functions import size

#creating body length column
data = data.withColumn('body_length', size(data.body_tokenized))

In [9]:
from pyspark.ml.feature import VectorAssembler

#assembling new vector
vectorAssembler = VectorAssembler(inputCols = ['score', 'body_length',
                                              'collapsed', 'moderator', 'feats', 'comment', 'author_patreon_flair'], 
                                  outputCol = 'features')

model_df2 = vectorAssembler.transform(data)
model_df2 = model_df2.select('features', 'controversiality')

In [10]:
training, test = model_df2.randomSplit([0.6, 0.4], 123)


In [13]:
from pyspark.ml.classification import LogisticRegression

#running new model
lr = LogisticRegression(featuresCol = 'features', labelCol = 'controversiality', maxIter=10)
lr_model = lr.fit(training)

In [14]:
# Make predictions.
predictions = lr_model.transform(test)

In [15]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics

#select only prediction and label columns
preds_and_labels = predictions.select(['prediction','controversiality'])

preds_and_labels = preds_and_labels.withColumn("prediction",preds_and_labels.prediction.cast('float'))
preds_and_labels = preds_and_labels.withColumn("controversiality",preds_and_labels.controversiality.cast('float'))

metrics = BinaryClassificationMetrics(preds_and_labels.rdd.map(tuple))


In [16]:
# Area under ROC curve
print("Area under ROC = %s" % metrics.areaUnderROC)

Area under ROC = 0.5116195465971471
