In [1]:
from __future__ import print_function
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext


In [2]:
conf = SparkConf().setAppName("CS143 Project 2B")
conf = conf.setMaster("local[*]")
sc   = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
sc.addPyFile("cleantext.py")
#main(sqlContext)

comments = sqlContext.read.json("comments-minimal.json.bz2")
#comments.write.parquet("comments-minimal.json.bz2")
#submissions = sqlContext.read.json("submissions.json.bz2")
labels = sqlContext.read.csv("labeled_data.csv")

# Task 2

1. The functional dependencies on labeled_data.csv is input_id -> (labeldem, labelgop, labeldjt). The input_id is a superkey hence this is in BCNF. 

2. Comments has the schema (Author(char), Body(text), Can_Gild(boolean), Collapsed(Boolean), Contraversality(int), created_utc(int?), edited(boolean), gilded(int), id(char), is_submitter(boolean), link_id(char), parent_id(char), retrieved_on(int?), score(int), stickied(boolean), subreddit(text). We see there is some redundancy. For example, author determines Can_Gild. However, there don't seem to be many redundancies overall.


Write SQL query: SELECT comment, input_id, labeldel, labelgop, labeldjt 
                 FROM comment C INNER JOIN labeled_data L ON C.id = L.input_id



In [None]:
#labels.printSchema()

comments.first()
#labels.show()

In [4]:
comments.createOrReplaceTempView("Comments")
labels.createOrReplaceTempView("Labels")

X_y = sqlContext.sql("SELECT C.body, L._c0, L._c1, L._c2, L._c3 FROM Comments C INNER JOIN Labels L ON C.id = L._c0")

#X_y.show()

In [5]:
from cleantext import sanitize

X_y.createOrReplaceTempView("Train")
just_comm = sqlContext.sql("SELECT body FROM Train")
#just_comm = just_comm.astype(str)


In [6]:
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
#add the n_grams
from pyspark.sql.functions import udf
sanitize_udf = udf(sanitize)
sanitized_df = X_y.select("body", "_c1", "_c2", "_c3", sanitize_udf("body").alias("n_grams"))

concat_string_arrays_udf = udf(lambda out: (' '.join(out)).split(), ArrayType(StringType(), True))
concat_string_arrays_df = sanitized_df.select("body", "_c1", "_c2", "_c3", concat_string_arrays_udf("n_grams").alias("n_grams_formatted"))

In [7]:
concat_string_arrays_df.limit(2).show(2, False)
#sanitized_df.show(2, False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+---+---+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [8]:
#count vectorizer
from pyspark.ml.feature import CountVectorizer
cv = CountVectorizer(inputCol="n_grams_formatted", outputCol="features", minDF = 5, binary=True)

model = cv.fit(concat_string_arrays_df)

#result = model.transform(comments_df)
#result.show(truncate=False)

In [9]:
result = model.transform(concat_string_arrays_df)
result.show(truncate=False)

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+---+---+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [10]:
from pyspark.sql import functions as F
pos = result.select("body", "features", F.when(X_y._c1 == 1, 1).otherwise(0).alias("dem"), F.when(X_y._c2 == 1, 1).otherwise(0).alias("gop"), F.when(X_y._c3 == 1, 1).otherwise(0).alias("label"))
pos.show(20)
neg = result.select("body", "features", F.when(X_y._c1 == -1, 1).otherwise(0).alias("dem"), F.when(X_y._c2 == -1, 1).otherwise(0).alias("gop"), F.when(X_y._c3 == -1, 1).otherwise(0).alias("label"))
neg.show(20)

+--------------------+--------------------+---+---+-----+
|                body|            features|dem|gop|label|
+--------------------+--------------------+---+---+-----+
|No it isnt. I cal...|(3532,[1,2,4,6,7,...|  0|  0|    1|
|Good move by the ...|(3532,[0,1,2,8,14...|  0|  1|    1|
|Well, that's it. ...|(3532,[0,1,2,4,5,...|  0|  0|    0|
|&gt;"I also know ...|(3532,[0,1,4,5,7,...|  0|  0|    0|
|&gt;He is asking ...|(3532,[1,2,6,7,8,...|  0|  0|    0|
|Donald Trump is b...|(3532,[0,1,3,6,7,...|  0|  0|    0|
|Hillary was guilt...|(3532,[0,1,2,4,5,...|  0|  0|    0|
|Even by liberal d...|(3532,[0,1,2,3,4,...|  0|  0|    0|
|Can you imagine i...|(3532,[0,1,2,3,4,...|  0|  0|    0|
|So this is the po...|(3532,[0,1,2,4,5,...|  0|  0|    0|
|How can developin...|(3532,[0,1,2,6,9,...|  0|  0|    0|
|I see you. Can't ...|(3532,[1,3,4,6,8,...|  0|  0|    0|
|&gt; Sane people ...|(3532,[0,1,2,4,6,...|  0|  0|    1|
|Oh man we just ne...|(3532,[2,12,16,28...|  0|  0|    0|
|As a baby boo

# Task 7

In [11]:
# Bunch of imports (may need more)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
# Initialize two logistic regression models.
# Replace labelCol with the column containing the label, and featuresCol with the column containing the features.
poslr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10).setThreshold(.2)
neglr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10).setThreshold(.25)

In [12]:
# This is a binary classifier so we need an evaluator that knows how to deal with binary classifiers.
posEvaluator = BinaryClassificationEvaluator()
negEvaluator = BinaryClassificationEvaluator()
# There are a few parameters associated with logistic regression. We do not know what they are a priori.
# We do a grid search to find the best parameters. We can replace [1.0] with a list of values to try.
# We will assume the parameter is 1.0. Grid search takes forever.
posParamGrid = ParamGridBuilder().addGrid(poslr.regParam, [1.0]).build()
negParamGrid = ParamGridBuilder().addGrid(neglr.regParam, [1.0]).build()

In [13]:
# We initialize a 5 fold cross-validation pipeline.
posCrossval = CrossValidator(
    estimator=poslr,
    evaluator=posEvaluator,
    estimatorParamMaps=posParamGrid,
    numFolds=5)
negCrossval = CrossValidator(
    estimator=neglr,
    evaluator=negEvaluator,
    estimatorParamMaps=negParamGrid,
    numFolds=5)
# Although crossvalidation creates its own train/test sets for
# tuning, we still need a labeled test set, because it is not
# accessible from the crossvalidator (argh!)
# Split the data 50/50
posTrain, posTest = pos.randomSplit([0.5, 0.5])
negTrain, negTest = neg.randomSplit([0.5, 0.5])
# Train the models
print("Training positive classifier...")
posModel = posCrossval.fit(posTrain)
print("Training negative classifier...")
negModel = negCrossval.fit(negTrain)

# Once we train the models, we don't want to do it again. We can save the models and load them again later.
# posModel.save("www/pos.model")
# negModel.save("www/neg.model")

Training positive classifier...
Training negative classifier...


# Task 8

In [14]:
# obtains data for testing
comments = sqlContext.read.json("comments-minimal.json.bz2")
comments.createOrReplaceTempView("Comments")
#comments.printSchema()
submissions = sqlContext.read.json("submissions.json.bz2")
submissions.createOrReplaceTempView("Submissions")
#submissions.printSchema()
#submissions.limit(1).show()

eval_data = sqlContext.sql("SELECT Comments.body as body, Comments.id as id FROM Comments JOIN Submissions ON Comments.link_id = Concat('t3_', Submissions.id)")



# Task 9

In [15]:
# prepares data for testing

eval_data.createOrReplaceTempView("Eval_data")
#eval_data = sqlContext.sql("SELECT * FROM Eval_data")
eval_data = sqlContext.sql("SELECT body, id FROM Eval_data WHERE Eval_data.body NOT LIKE '/s' AND Eval_data.body NOT LIKE '&gt'")

#eval_data.where(not("/s" in eval_data.body) and not("&gt" in eval_data.body))

sanitize_udf = udf(sanitize)
#eval_data.show()
sanitized_df = eval_data.select("body", "id", sanitize_udf("body").alias("n_grams"))

concat_string_arrays_udf = udf(lambda out: (' '.join(out)).split(), ArrayType(StringType(), True))
concat_string_arrays_df = sanitized_df.select("body", "id", concat_string_arrays_udf("n_grams").alias("n_grams_formatted"))

#posTest = concat_string_arrays_df.select("body", "state", "created_utc", "score", "controversiality", "distinguished", "retrieved_on", "n_grams_formatted", F.when(concat_string_arrays_df._c3 == 1, 1).otherwise(0).alias("label"))
#negTest =  concat_string_arrays_df.select("body", "state", "created_utc", "score", "controversiality", "distinguished", "retrieved_on", "n_grams_formatted", F.when(concat_string_arrays_df._c3 == -1, 1).otherwise(0).alias("label"))


#run the model on the data




In [16]:

posTestCounts = model.transform(concat_string_arrays_df)
posResult = posModel.transform(posTestCounts)
negTestCounts = model.transform(concat_string_arrays_df)
negResult = negModel.transform(negTestCounts)

In [17]:

#posResult.show()
#posResult.write.format("posResult.csv")
#negResult.show()
#negResult.write.format("negResult.csv")

# Task 10

In [18]:
posResult.createOrReplaceTempView("PosResult")
negResult.createOrReplaceTempView("NegResult")
resultCount = sqlContext.sql("SELECT COUNT(*) FROM PosResult")
resultCount.show()
posResultCount = sqlContext.sql("SELECT COUNT(*) FROM PosResult WHERE prediction == 1")
posResultCount.show()
negResultCount = sqlContext.sql("SELECT COUNT(*) FROM NegResult WHERE prediction == 1")
negResultCount.show()

+--------+
|count(1)|
+--------+
|10481025|
+--------+

+--------+
|count(1)|
+--------+
| 4131453|
+--------+

+--------+
|count(1)|
+--------+
| 9020610|
+--------+



In [63]:
comments.createOrReplaceTempView("Comments")
posResultWComment = sqlContext.sql("SELECT Comments.body as body, Comments.id as id, Comments.author_flair_text as state, Comments.created_utc as created_utc, Comments.score as score, Comments.controversiality as controversiality, Comments.distinguished as distinguished, PosResult.prediction as prediction FROM Comments JOIN PosResult ON Comments.id == PosResult.id")
posResultWComment.createOrReplaceTempView("PosResultWComment")



In [64]:
negResultWComment = sqlContext.sql("SELECT Comments.body as body, Comments.id as id, Comments.author_flair_text as state, Comments.created_utc as created_utc, Comments.score as score, Comments.controversiality as controversiality, Comments.distinguished as distinguished, NegResult.prediction as prediction FROM Comments JOIN NegResult ON Comments.id == NegResult.id")
negResultWComment.createOrReplaceTempView("NegResultWComment")


In [84]:
submissions = sqlContext.read.json("submissions.json.bz2")
submissions.printSchema()

root
 |-- archived: boolean (nullable = true)
 |-- author: string (nullable = true)
 |-- author_cakeday: boolean (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- brand_safe: boolean (nullable = true)
 |-- can_gild: boolean (nullable = true)
 |-- can_mod_post: boolean (nullable = true)
 |-- contest_mode: boolean (nullable = true)
 |-- created_utc: long (nullable = true)
 |-- crosspost_parent: string (nullable = true)
 |-- crosspost_parent_list: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- archived: boolean (nullable = true)
 |    |    |-- author: string (nullable = true)
 |    |    |-- author_cakeday: boolean (nullable = true)
 |    |    |-- author_flair_css_class: string (nullable = true)
 |    |    |-- author_flair_text: string (nullable = true)
 |    |    |-- brand_safe: boolean (nullable = true)
 |    |    |-- can_gild: boolean (nullable = true)
 |    |    |-- can_m

In [85]:
submissions.createOrReplaceTempView("Submssions")

posResultWSubmission = sqlContext.sql("SELECT Submissions.score as score, Submissions.id as id, PosResult.prediction as prediction FROM Submissions JOIN PosResult ON Submissions.id == PosResult.id")
posResultWSubmission.createOrReplaceTempView("PosResultWSubmission")

negResultWSubmission = sqlContext.sql("SELECT Submissions.score as score, Submissions.id as id, NegResult.prediction as prediction FROM Submissions JOIN NegResult ON Submissions.id == NegResult.id")
negResultWSubmission.createOrReplaceTempView("NegResultWSubmission")



In [93]:
#percent by day of year
posDays = sqlContext.sql("SELECT FROM_UNIXTIME(created_utc, 'Y-mm-d') AS date, SUM(prediction) / COUNT(prediction) as pct FROM PosResultWComment GROUP BY date")
negDays = sqlContext.sql("SELECT FROM_UNIXTIME(created_utc, 'Y-mm-d') AS date, SUM(prediction) / COUNT(prediction) as pct FROM NegResultWComment GROUP BY date")


posState = sqlContext.sql("SELECT state, SUM(prediction) / COUNT(prediction) as pct FROM PosResultWComment WHERE state = 'Alabama' OR state ='Alaska' OR state ='Arizona' OR state ='Arkansas' OR state='California' OR state ='Colorado' OR state ='Connecticut' OR state ='Delaware' OR state ='District of Columbia' OR state ='Florida' OR state ='Georgia' OR state ='Hawaii' OR state ='Idaho' OR state ='Illinois' OR state ='Indiana' OR state ='Iowa' OR state ='Kansas' OR state ='Kentucky' OR state ='Louisiana' OR state ='Maine' OR state ='Maryland' OR state ='Massachusetts' OR state ='Michigan' OR state ='Minnesota' OR state ='Mississippi' OR state ='Missouri' OR state ='Montana' OR state ='Nebraska' OR state ='Nevada' OR state ='New Hampshire' OR state ='New Jersey' OR state ='New Mexico' OR state ='New York' OR state ='North Carolina' OR state ='North Dakota' OR state ='Ohio' OR state ='Oklahoma' OR state ='Oregon' OR state ='Pennsylvania' OR state ='Rhode Island' OR state ='South Carolina' OR state ='South Dakota' OR state ='Tennessee' OR state ='Texas' OR state ='Utah' OR state ='Vermont' OR state ='Virginia' OR state ='Washington' OR state ='West Virginia' OR state ='Wisconsin' OR state ='Wyoming' GROUP BY state")
negState = sqlContext.sql("SELECT state, SUM(prediction) / COUNT(prediction) as pct FROM NegResultWComment WHERE state = 'Alabama' OR state ='Alaska' OR state ='Arizona' OR state ='Arkansas' OR state='California' OR state ='Colorado' OR state ='Connecticut' OR state ='Delaware' OR state ='District of Columbia' OR state ='Florida' OR state ='Georgia' OR state ='Hawaii' OR state ='Idaho' OR state ='Illinois' OR state ='Indiana' OR state ='Iowa' OR state ='Kansas' OR state ='Kentucky' OR state ='Louisiana' OR state ='Maine' OR state ='Maryland' OR state ='Massachusetts' OR state ='Michigan' OR state ='Minnesota' OR state ='Mississippi' OR state ='Missouri' OR state ='Montana' OR state ='Nebraska' OR state ='Nevada' OR state ='New Hampshire' OR state ='New Jersey' OR state ='New Mexico' OR state ='New York' OR state ='North Carolina' OR state ='North Dakota' OR state ='Ohio' OR state ='Oklahoma' OR state ='Oregon' OR state ='Pennsylvania' OR state ='Rhode Island' OR state ='South Carolina' OR state ='South Dakota' OR state ='Tennessee' OR state ='Texas' OR state ='Utah' OR state ='Vermont' OR state ='Virginia' OR state ='Washington' OR state ='West Virginia' OR state ='Wisconsin' OR state ='Wyoming' GROUP BY state")

posCommScores = sqlContext.sql("SELECT score, SUM(prediction) / COUNT(prediction) as pct FROM PosResultWComment GROUP BY score")
negCommScores = sqlContext.sql("SELECT score, SUM(prediction) / COUNT(prediction) as pct FROM NegResultWComment GROUP BY score")

posSubScores = sqlContext.sql("SELECT score, SUM(prediction) / COUNT(prediction) as pct FROM PosResultWSubmission GROUP BY score")
negSubScores = sqlContext.sql("SELECT score, SUM(prediction) / COUNT(prediction) as pct FROM NegResultWSubmission GROUP BY score")

posControversiality = sqlContext.sql("SELECT controversiality, SUM(prediction) / COUNT(prediction) as pct FROM PosResultWComment GROUP BY controversiality")
negControversiality = sqlContext.sql("SELECT controversiality, SUM(prediction) / COUNT(prediction) as pct FROM NegResultWComment GROUP BY controversiality")

posDistinguished = sqlContext.sql("SELECT distinguished, SUM(prediction) / COUNT(prediction) as pct FROM PosResultWComment GROUP BY distinguished")
negDistinguished = sqlContext.sql("SELECT distinguished, SUM(prediction) / COUNT(prediction) as pct FROM NegResultWComment GROUP BY distinguished")



In [96]:
# posState.show(50, False)
# negState.show(50, False)

# posCommScores.show(posCommScores.count(), False)
# negCommScores.show(negCommScores.count(), False)

# posDays.show(366, False)
# negDays.show(366, False)

# posControversiality(posControversiality.count(), False)
# negControversiality(negControversiality.count(), False)

# posDistinguished(posDistinguished.count(), False)
# negDistinguished(negDistinguished.count(), False)


In [97]:
posDays = sqlContext.sql("SELECT FROM_UNIXTIME(created_utc, 'Y-mm-d') AS date, SUM(prediction) / COUNT(prediction) as pct FROM PosResultWComment GROUP BY date").explain(True)

== Parsed Logical Plan ==
'Aggregate ['date], ['FROM_UNIXTIME('created_utc, Y-mm-d) AS date#5336, ('SUM('prediction) / 'COUNT('prediction)) AS pct#5337]
+- 'UnresolvedRelation `PosResultWComment`

== Analyzed Logical Plan ==
date: string, pct: double
Aggregate [from_unixtime(created_utc#4090L, Y-mm-d, Some(America/Los_Angeles))], [from_unixtime(created_utc#4090L, Y-mm-d, Some(America/Los_Angeles)) AS date#5336, (sum(prediction#4094) / cast(count(prediction#4094) as double)) AS pct#5337]
+- SubqueryAlias posresultwcomment
   +- Project [body#3322 AS body#4087, id#3332 AS id#4088, author_flair_text#3321 AS state#4089, created_utc#3328L AS created_utc#4090L, score#3338L AS score#4091L, controversiality#3327L AS controversiality#4092L, distinguished#3329 AS distinguished#4093, prediction#3526 AS prediction#4094]
      +- Join Inner, (id#3332 = id#3495)
         :- SubqueryAlias comments
         :  +- Relation[author#3318,author_cakeday#3319,author_flair_css_class#3320,author_flair_text#33