# Step 0

Importing a bunch of modules, there might be some redundancies.

In [1]:
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.feature.ChiSqSelectorModel
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.json4s._
import org.json4s.jackson.JsonMethods._
import java.io.{ PrintWriter, File, FileOutputStream }
import org.apache.spark.ml.feature.Normalizer
import org.apache.spark.ml.classification.LinearSVC
import org.apache.spark.ml.classification.{LinearSVC, OneVsRest}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

sc

Intitializing Scala interpreter ...

Spark Web UI available at http://captain01.os.hpc.tuwien.ac.at:9999/proxy/application_1683456198714_2611
SparkContext available as 'sc' (version = 3.2.3, master = yarn, app id = application_1683456198714_2611)
SparkSession available as 'spark'


import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.feature.ChiSqSelectorModel
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.json4s._
import org.json4s.jackson.JsonMethods._
import java.io.{PrintWriter, File, FileOutputStream}
import org.apache.spark.ml.feature.Normalizer
import org.apache.spark.ml.classification.LinearSVC
import org.apache.spark.ml.classification.{LinearSVC, OneVsRest}
import org.apache.spa...


Data could have been loaded straight into a *dataframe* but since I wanted to be efficient aka. code less, I just copy pasted the data from `Part 1` and used `toDF()`.

In [5]:
val stopwords = sc.textFile("stopwods.txt").collect()
val rdd = sc.textFile("hdfs:///user/dic23_shared/amazon-reviews/full/reviews_devset.json")
val data = rdd.map{row => val json_row = parse(row)
(compact(json_row \ "category"),
 compact(json_row \ "reviewText"))}
val reviews = data.toDF("category","reviewText")

stopwords: Array[String] = Array(a, aa, able, about, above, absorbs, accord, according, accordingly, across, actually, after, afterwards, again, against, ain, album, album, all, allow, allows, almost, alone, along, already, also, although, always, am, among, amongst, an, and, another, any, anybody, anyhow, anyone, anything, anyway, anyways, anywhere, apart, app, appear, appreciate, appropriate, are, aren, around, as, aside, ask, asking, associated, at, available, away, awfully, b, baby, bb, be, became, because, become, becomes, becoming, been, before, beforehand, behind, being, believe, below, beside, besides, best, better, between, beyond, bibs, bike, book, books, both, brief, bulbs, but, by, c, came, camera, can, cannot, cant, car, case, cause, causes, cd, certain, certainly, changes,...


In [7]:
val newReviews = new StringIndexer()
    .setInputCol("category")
    .setOutputCol("label")
    .fit(reviews)
    .transform(reviews)

newReviews: org.apache.spark.sql.DataFrame = [category: string, reviewText: string ... 1 more field]


# Pipeline

## Tokenizer

String cleaning, tokenization, setting to lower case. Even though the pattern looks like regex, it's just the delimiters + a bunch of escapes. 

In [8]:
val tokenizer = new RegexTokenizer()
            .setInputCol("reviewText")
            .setOutputCol("terms")
            .setPattern("[\\(\\)\\[\\]\\{\\}\\.\\!\\?,;:\\+\\='\\-\\_\\\"`~#@&*\\%€$§/\\\\1234567890\t\\s+]")
            .setMinTokenLength(2)
            .setToLowercase(true)

tokenizer: org.apache.spark.ml.feature.RegexTokenizer = RegexTokenizer: uid=regexTok_611423317d6d, minTokenLength=2, gaps=true, pattern=[\(\)\[\]\{\}\.\!\?,;:\+\='\-\_\"`~#@&*\%€$§/\\1234567890	\s+], toLowercase=true


## Remover

Pretty straight forward, removes the stopwords.

In [9]:
val remover = new StopWordsRemover()
            .setInputCol("terms")
            .setOutputCol("terms_remover")
            .setStopWords(stopwords)

remover: org.apache.spark.ml.feature.StopWordsRemover = StopWordsRemover: uid=stopWords_8121a1a3402c, numStopWords=596, locale=en_US, caseSensitive=false


## CountVectorizer

In [10]:
val cv_model = new CountVectorizer()
            .setInputCol("terms_remover")
            .setOutputCol("features")
            .setMinDF(30)

cv_model: org.apache.spark.ml.feature.CountVectorizer = cntVec_ab4669ff5784


## IDF

This takes the output of the *CountVectorizer* model as input and then scales each feature.

In [11]:
val idf = new IDF()
            .setInputCol("features")
            .setOutputCol("weightedFeatures")

idf: org.apache.spark.ml.feature.IDF = idf_8f715138721c


## Selector

This is our `ChiSqSelector()`. This selected the top 2000 features according to the Chi-Squared Test.

In [12]:
val selector = new ChiSqSelector()
          .setNumTopFeatures(2000)
          .setFeaturesCol("weightedFeatures")
          .setLabelCol("label")
          .setOutputCol("chiFeatures")

selector: org.apache.spark.ml.feature.ChiSqSelector = chiSqSelector_099907002b37


## PART 3 - Normalizer

Normalizes samples individually to unit L(p) norm.

In [13]:
val normalizer = new Normalizer().setInputCol("chiFeatures")
                                .setOutputCol("normFeatures") 
                                .setP(2.0)

normalizer: org.apache.spark.ml.feature.Normalizer = Normalizer: uid=normalizer_ca7d2151cc08, p=2.0
