# FIT5202 Group Assessment - Phase 2

In [1]:
// import packages
import org.apache.spark.ml.feature.{HashingTF, IDF, StringIndexer, RegexTokenizer}
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.sql.DataFrame
import scala.io.Source

Intitializing Scala interpreter ...

Spark Web UI available at http://51665ba4a540:4040
SparkContext available as 'sc' (version = 2.4.3, master = local[*], app id = local-1558858190018)
SparkSession available as 'spark'


import org.apache.spark.ml.feature.{HashingTF, IDF, StringIndexer, RegexTokenizer}
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.sql.DataFrame
import scala.io.Source


### IMPORT REDDIT COMMENTS CSVS AND COMBINE INTO SPARK DATAFRAME


In [2]:
// set file import options
var dfPos = spark.read
          .option("quote", "\"")
          .option("escape", "\"")
          .option("wholeFile", true)
          .option("multiline",true)
          .option("header", true)
          .option("inferSchema", "true")
            .csv("./comments_positive.csv")
var dfNeg = spark.read
          .option("quote", "\"")
          .option("escape", "\"")
          .option("wholeFile", true)
          .option("multiline",true)
          .option("header", true)
          .option("inferSchema", "true")
            .csv("./comments_negative.csv")

dfPos: org.apache.spark.sql.DataFrame = [id: string, parent_id: string ... 13 more fields]
dfNeg: org.apache.spark.sql.DataFrame = [id: string, parent_id: string ... 13 more fields]


In [3]:
// combine the pos and neg dataframes together
var df = dfPos.union(dfNeg)

df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: string, parent_id: string ... 13 more fields]


In [4]:
// view the data to show everything is working as expected
df.show(5)

+-------+----------+------------+--------+--------------------+-----+----+------------+----------------+--------------+--------------------+------------+----------+-------------+-----------------------+
|     id| parent_id|subreddit_id| link_id|                text|score| ups|      author|controversiality|parent_link_id|         parent_text|parent_score|parent_ups|parent_author|parent_controversiality|
+-------+----------+------------+--------+--------------------+-----+----+------------+----------------+--------------+--------------------+------------+----------+-------------+-----------------------+
|c092j8m|t1_c092gss|    t5_2qh2p|t3_8eyy3|This isn't Twitte...| 9582|9582|  nraustinii|               0|      t3_8eyy3|     Fucking faggot.|       -7526|     -7526|   Glorificus|                      0|
|c4imcva|t1_c4im948|    t5_2qh1i|t3_t0ynr|Well, it is exact...| 9531|9531|     Lynfect|               0|      t3_t0ynr|Elaborate on this...|        3841|      3841|     eeeeevil|          

### REMOVE ROWS WITH SHALLOW TEXT
Comments with one or two words are typically low scored  
and do not lend insight since they are overwhelmingly
made up of extremely common words

In [5]:
// make a copy of the unaltered dataframe
val df2 = df
// count the number of unaltered rows
df2.count()

df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: string, parent_id: string ... 13 more fields]
res1: Long = 4000000


Filter out rows where text char length is below 3

In [6]:
// remove rows with text fields less than 3 characters in length
df = df.where(length(col("text")) >= 3)
// count the remaining rows
df.count()

df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: string, parent_id: string ... 13 more fields]
res2: Long = 3991922


### TRIM THE DATASET TO THE SELECTED COLUMNS AND CREATE NEW DATAFRAME DROP ROWS WITH NAs

In [7]:
// define function to return dataframe of selected columns
def reduceToSelectFields(loadedCsv:org.apache.spark.sql.DataFrame): org.apache.spark.sql.DataFrame = {
    var trimmedCol = loadedCsv.col("text")
    var newDf = loadedCsv.withColumn("comments",trimmedCol).select("id",
                                                                   "score",
                                                                   "parent_score",
                                                                   "subreddit_id",
                                                                   "comments")
    return newDf.na.drop()
 }

reduceToSelectFields: (loadedCsv: org.apache.spark.sql.DataFrame)org.apache.spark.sql.DataFrame


In [8]:
// dataframe of selected columns: text and id 
val temp = reduceToSelectFields(df)

temp: org.apache.spark.sql.DataFrame = [id: string, score: int ... 3 more fields]


### TOKENIZE THE DATA USING REGEX TOKENIZER FROM MLLIB

In [9]:
// define the regex tokenizer
val regexTokenizer = new RegexTokenizer()
  .setInputCol("comments")
  .setOutputCol("words")
  .setPattern("\\w*[-']?\\w?").setGaps(false) // CAPTURE WORDS WITH APOSTROPHES AND HYPHENS INSIDE
  .setToLowercase(true)

regexTokenizer: org.apache.spark.ml.feature.RegexTokenizer = regexTok_ca67bfe91ca4


In [10]:
// apply the tokenizer to the dataframe
var wordsData = regexTokenizer.transform(temp)

wordsData: org.apache.spark.sql.DataFrame = [id: string, score: int ... 4 more fields]


In [11]:
// view a row to affirm the tokenizer has applied correctly
wordsData.select("words").take(1)

res3: Array[org.apache.spark.sql.Row] = Array([WrappedArray(this, isn't, twitter, try, to, comment, on, the, article, and, not, your, current, activities)])


### REMOVE STOPWORDS USING CUSTOM STOPWORD LIST FROM EDA
Use the StopWordsRemover function from ML Lib

In [12]:
// load list of stopwords
val fileLines = Source.fromFile("./lex_stopwords.txt").getLines.toList
// save stopwords as an array
val stpWordList = fileLines.toArray

fileLines: List[String] = List(v, abst, na, beginnings, ., m, haven, y, why, ask, said, would, accordingly, overall, somewhere, wouldnt, become, tis, ;, other, adj, itd, selves, was, ll, indeed, 'll, mostly, ol, does, won, same, i've, ,, briefly, anyway, it, cannot, specifying, begins, f, name, mustn, out, _, gotten, over, looking, act, howbeit, everybody, 6, 1, eight, hereby, mg, thru, probably, <, readily, yet, nowhere, my, words, yourselves, seem, wasn, unto, anyways, behind, now, I, :, by, upon, also, do, part, ought, seemed, keep	keeps, formerly, brief, co, becoming, enough, hereafter, research, shows, unless, gone, mightn, awfully, /, most, 0f, others, they, whole, used, afterwards, did, ones, for, shes, these, otherwise, usually, didn, necessary, both, such, couldnt, be, whoever,...

In [13]:
// view a word from the stopwords array
stpWordList(1)

res4: String = abst


In [14]:
// define input and output for stopword removal
val sWRemover = new StopWordsRemover()
    .setInputCol("words")
    .setOutputCol("words_stopped")
    .setStopWords(stpWordList)

sWRemover: org.apache.spark.ml.feature.StopWordsRemover = stopWords_a30b7ddf6cfb


In [15]:
// remove stopwords
wordsData = sWRemover.transform(wordsData)

wordsData: org.apache.spark.sql.DataFrame = [id: string, score: int ... 5 more fields]


#### Check how rows many have no tokens, then remove these rows

In [16]:
// view number of rows
wordsData.count()

res5: Long = 3991922


In [17]:
// remove rows with empty text fields
var wordsData2 = wordsData.where(size(col("words_stopped")) > 0)
// view the remaining number of rows
wordsData2.count()

wordsData2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: string, score: int ... 5 more fields]
res6: Long = 3914568


In [18]:
// update the dataframe
wordsData = wordsData2

wordsData: org.apache.spark.sql.DataFrame = [id: string, score: int ... 5 more fields]


#### Save negative and positive datsets separately for later EDA use
Export to seprate jsons

In [19]:
// write the negative dataset to json for later EDA
wordsData2.where(col("score") < 0).select("score","words_stopped","words").coalesce(1).write.json("neg_import")

In [20]:
// write the negative dataset to json for later EDA
wordsData2.where(col("score") >= 0).select("score","words_stopped","words").coalesce(1).write.json("pos_import")

### CREATE TERM FREQUENCY VECTORS USING HASHINGTF

In [21]:
// define input and output for tf-idf vectorization
val hashingTF = new HashingTF().setInputCol("words_stopped").setOutputCol("Features")
.setNumFeatures(20000)

hashingTF: org.apache.spark.ml.feature.HashingTF = hashingTF_5a9f5ca840fb


In [22]:
// vectorize the tokens
val featureData = hashingTF.transform(wordsData)

featureData: org.apache.spark.sql.DataFrame = [id: string, score: int ... 6 more fields]


In [23]:
// view a row from from the vectorized dataframe
featureData.take(1)

res9: Array[org.apache.spark.sql.Row] = Array([c092j8m,9582,-7526,t5_2qh2p,This isn't Twitter: try to comment on the article, and not your current activities.,WrappedArray(this, isn't, twitter, try, to, comment, on, the, article, and, not, your, current, activities),WrappedArray(twitter, comment, article, current, activities),(20000,[1294,3186,4470,6710,16622],[1.0,1.0,1.0,1.0,1.0])])


In [24]:
// view a row from the vectorized dataframe in more readable format
val str = featureData.col("features")(1).toString()
print(str)

features[1]

str: String = features[1]


In [25]:
featureData.show(1)

+-------+-----+------------+------------+--------------------+--------------------+--------------------+--------------------+
|     id|score|parent_score|subreddit_id|            comments|               words|       words_stopped|            Features|
+-------+-----+------------+------------+--------------------+--------------------+--------------------+--------------------+
|c092j8m| 9582|       -7526|    t5_2qh2p|This isn't Twitte...|[this, isn't, twi...|[twitter, comment...|(20000,[1294,3186...|
+-------+-----+------------+------------+--------------------+--------------------+--------------------+--------------------+
only showing top 1 row



### EXPORT TO JSON
Features to Export:  ID, PARENT_SCORE, SUBREDDIT_IT, SCORE, WORD TOKENS (WORDS_STOPPED) AND TF VECTOR HASHES  
**NOTE**: The following cells create a directory in which to hold the ouput json file.  
YOU MUST DELETE THE DIRECTORY TO **RE-RUN** THE CODE HERE ELSE AN ERROR WILL OCCUR.  
**NOTE**: The json file created here will be uploaded to Google drive.  
Link provided in the report.

In [26]:
// export the vectorized dataframe to json
featureData.select("id", "parent_score","subreddit_id","score","words_stopped", "Features").coalesce(1).write.json("output_dir")

org.apache.spark.sql.AnalysisException:  path file:/home/FIT5202_Group_Project/reddit-comment-score-prediction/output_dir already exists.;

In [None]:
// view the dimensions of the contents being exported to JSON
featureData.select("id", "parent_score","subreddit_id","score","words_stopped", "Features").take(1)