# DIC2020 - A2.1 Calculation of 200 most discriminative terms per category for amazon review dataset with Spark and Scala

## Imports

In [1]:
import scala.util.parsing.json._

Intitializing Scala interpreter ...

Spark Web UI available at http://c100.local:8088/proxy/application_1587827373944_4203
SparkContext available as 'sc' (version = 2.4.0-cdh6.3.2, master = yarn, app id = application_1587827373944_4203)
SparkSession available as 'spark'


import scala.util.parsing.json._


## Constants

In [32]:
val INPUT = "hdfs:///user/pknees/amazon-reviews/full/reviews_devset.json"
val STOPWORDS = "hdfs:///user/e11944050/stopwords.txt"
val DELIMS = "[.!?,;:()\\[\\]{}\\-_\"\\`~#&*%$\\/\\s\\d]"
val N_DOCS_IN_CAT_KEY = "_nDocs"
val TOP_N = 200


INPUT: String = hdfs:///user/pknees/amazon-reviews/full/reviews_devset.json
STOPWORDS: String = hdfs:///user/e11944050/stopwords.txt
DELIMS: String = [.!?,;:()\[\]{}\-_"\`~#&*%$\/\s\d]
N_DOCS_IN_CAT_KEY: String = _nDocs
TOP_N: Int = 200


## Link reviews dataset in hdfs

In [4]:
val amazonReviewSet = sc.textFile(INPUT)

amazonReviewSet: org.apache.spark.rdd.RDD[String] = hdfs:///user/pknees/amazon-reviews/full/reviews_devset.json MapPartitionsRDD[1] at textFile at <console>:30


## Count reviews in different categories

In [31]:
def mapDocsPerCat(JSONString: String) : (String, Int) = {
    val parsed = JSON.parseFull(JSONString)
    parsed match {
        case Some(e:Map[String,String]) => (e("category"), 1)  //parse category from JSON and emit (parsedCategory, 1)
        case _ => ("_error", 0)
    }  
}

               case Some(e:Map[String,String]) => (e("category"), 1)
                           ^
mapDocsPerCat: (JSONString: String)(String, Int)


In [5]:
val nDocsPerCat = amazonReviewSet.map(review => mapDocsPerCat(review)) //for each review, parse category and emit 1
                                .reduceByKey(_+_) //reduce by category and sum emitted 1s
                                .collectAsMap() //collect results as map and keep in memory

nDocsPerCat: scala.collection.Map[String,Int] = Map(Kindle_Store -> 3205, Electronic -> 7825, Automotive -> 1374, Pet_Supplie -> 1235, Clothing_Shoes_and_Jewelry -> 5749, Baby -> 916, Musical_Instrument -> 500, Grocery_and_Gourmet_Food -> 1297, Book -> 22507, Movies_and_TV -> 4607, Tools_and_Home_Improvement -> 1926, Sports_and_Outdoor -> 3269, CDs_and_Vinyl -> 3749, Apps_for_Android -> 2638, Home_and_Kitche -> 4254, Office_Product -> 1243, Health_and_Personal_Care -> 2982, Digital_Music -> 836, Cell_Phones_and_Accessorie -> 3447, Beauty -> 2023, Toys_and_Game -> 2253, Patio_Lawn_and_Garde -> 994)


## Link stopwords in hdfs

In [6]:
val stopWords = sc.textFile(STOPWORDS)

stopWords: org.apache.spark.rdd.RDD[String] = hdfs:///user/e11944050/stopwords.txt MapPartitionsRDD[5] at textFile at <console>:30


## Collect stopwords as set and keep in memory

In [7]:
val stopWordsSet = stopWords.collect().toSet

stopWordsSet: scala.collection.immutable.Set[String] = Set(serious, latterly, looks, particularly, used, down, regarding, entirely, it's, regardless, moreover, please, "", ourselves, able, that's, behind, for, despite, maybe, viz, further, corresponding, any, wherein, across, name, allows, there's, this, haven't, instead, in, ought, myself, have, your, off, once, i'll, are, is, his, oh, why, rd, knows, too, among, course, greetings, somewhat, everyone, seen, likely, said, try, already, soon, nobody, got, given, using, less, am, consider, hence, than, accordingly, isn't, four, didn't, anyhow, want, three, forth, whereby, himself, specify, yes, throughout, inasmuch, but, you're, whether, sure, below, co, best, plus, becomes, what, unto, different, would, although, elsewhere, causes, anoth...

## Parse reviewTexts and categories from JSON

In [33]:
def parseReviews(JSONString: String)  = {
    val parsed = JSON.parseFull(JSONString)
    parsed match {
        case Some(e: Map[String, String]) => {
            val reviewText = e("reviewText")
            val category = e("category")
            (reviewText, category)         
        }
        case _ => ("_error", "_error")
    }
}

               case Some(e: Map[String, String]) => {
                            ^
parseReviews: (JSONString: String)(String, Any)


In [34]:
val parsedReviews = amazonReviewSet.map(json => parseReviews(json))
//containing (reviewText, category) tuples

parsedReviews: org.apache.spark.rdd.RDD[(String, Any)] = MapPartitionsRDD[28] at map at <console>:31


## Create tuples of (term, category), for every term. Casefold, remove stopwords and 1 char terms  

In [10]:
val termCats = parsedReviews.flatMap(rc => (rc._1.split(DELIMS) //reviewCat pairs
                             .distinct //only emit each term,cat pair once per review
                             .map(x => x.toLowerCase) //casefolding (before stopWord filterinng)
                             .filter(x => !stopWordsSet.contains(x)) //remove stopwords
                             .filter(x => x.length > 1) //only keep chars with len > 1
                             .map(x => (x, rc._2)))) //make tuple of term and category

termCats: org.apache.spark.rdd.RDD[(String, String)] = MapPartitionsRDD[7] at flatMap at <console>:33


## Reduce by terms and create maps containing document occurrences in each category

In [11]:
def reduceTermCats(term: String, categories: Iterable[String]) : (String, Map[String, Int]) = {
    var tempMap = categories.groupBy(identity).mapValues(_.size) //group by category and count occurrence
    val nDocs = tempMap.foldLeft(0)(_+_._2) //calculate sum of all map values
    tempMap += (N_DOCS_IN_CAT_KEY -> nDocs) //add total number of documents containing term to map  
    (term, tempMap)
}

reduceTermCats: (term: String, categories: Iterable[String])(String, Map[String,Int])


In [12]:
val termOccs = termCats.groupByKey()
                            .map((tc) => reduceTermCats(tc._1, tc._2))
//containing (term, Map[category, numOccurrences])

termOccs: org.apache.spark.rdd.RDD[(String, Map[String,Int])] = MapPartitionsRDD[9] at map at <console>:32


## Calculate chi square value for every term and category combination

In [13]:
def calcChiSquare(category: String, catOccMap: Map[String, Int]) : (String, BigDecimal) = {
    var temp =  catOccMap.get(category)
    temp match {
        case Some(e: Int) => {
            val a = BigDecimal(e)
            temp = catOccMap.get(N_DOCS_IN_CAT_KEY)
            temp match {
                case Some(e2: Int) => {
                    val b = BigDecimal(e2).-(a)
                    temp = nDocsPerCat.get(category)
                    temp match {
                        case Some(e3: Int) => {
                            val c = BigDecimal(e3).-(a)
                            val n = nDocsPerCat.foldLeft(0)(_+_._2) //calculate sum of all map values
                            val d = n.-(e3).-(b)
                            val nomChi = ((a.*(d)).-((b.*(c)))).pow(2).*(n)
                            val denomChi = (a.+(b)).*((a.+(c))).*((b.+(d))).*((c.+(d)))
                            val chi = nomChi./(denomChi)
                            (category, chi)    
                        }
                        case _ => {("_error", BigDecimal(0))}
                    }           
                }
                case _ => {("_error", BigDecimal(0))}
            }
        }
        case _ => {("_error", BigDecimal(0))}  
    }    
}

calcChiSquare: (category: String, catOccMap: Map[String,Int])(String, BigDecimal)


In [14]:
val catTermChis = termOccs.flatMap(termOcc => termOcc._2 // pairs of (term, MAP<category, occurrence>), take each MAP
                             .filter(catOcc => !catOcc._1.equals(N_DOCS_IN_CAT_KEY)) //remove key N_DOCS_IN_CAT_KEY 
                             .map(catOcc => calcChiSquare(catOcc._1, termOcc._2)) //calcchiSquare for remaining keys 
                                                                                     //(all occurrences in categories)
                             .map(catChi => (catChi._1, (termOcc._1, catChi._2)))) //map result to (category, (term, chi)) format
//containing (category, (term, chiSquare))

catTermChis: org.apache.spark.rdd.RDD[(String, (String, BigDecimal))] = MapPartitionsRDD[10] at flatMap at <console>:33


## Reduce over category, sort by descending chi square values and only keep top 200

In [15]:
def reduceCatTermChis(category: String, termChis: Iterable[(String, BigDecimal)]) : (String, Array[(String, BigDecimal)]) = {
    val sortedTermChis = scala.util.Sorting.stableSort(termChis.toList, 
                                                       (e1: (String, BigDecimal), e2: (String, BigDecimal)) => e1._2 > e2._2)
    (category, sortedTermChis.take(TOP_N))
}

reduceCatTermChis: (category: String, termChis: Iterable[(String, BigDecimal)])(String, Array[(String, BigDecimal)])


In [35]:
val termsPerCat = catTermChis.groupByKey()
            .map(ctc => reduceCatTermChis(ctc._1, ctc._2))
            .sortByKey() //sort by categories
//containing (category, (term, chiSquare))
//only top 200 terms per category
//sorted by category

termsPerCat: org.apache.spark.rdd.RDD[(String, Array[(String, BigDecimal)])] = ShuffledRDD[33] at sortByKey at <console>:33


## Create string representation of termChiPairs and save to file

In [17]:
val stringRep = termsPerCat.map(line => line._1 + " " + 
                                line._2.map(tuple => tuple._1 + ":" + tuple._2)
                                .mkString(" "))


stringRep: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at map at <console>:29


**Remove file from hdfs before saving new one**

In [18]:
!hadoop fs -rm -r a2/term_chi 

rm: `a2/term_chi': No such file or directory


In [19]:
stringRep.saveAsTextFile("a2/term_chi")

## Create dictionary and save it to file

In [20]:
val dict = termsPerCat.flatMap(line => line._2
                               .map(termChi => termChi._1))
                        .distinct
                        .sortBy(term => term)

dict: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at sortBy at <console>:32


**Remove file from hdfs before saving new one**

In [21]:
!hadoop fs -rm -r a2/dict 

20/05/06 14:21:50 INFO fs.TrashPolicyDefault: Moved: 'hdfs://nameservice1/user/e11944050/a2/dict' to trash at: hdfs://nameservice1/user/e11944050/.Trash/Current/user/e11944050/a2/dict


In [22]:
dict.saveAsTextFile("a2/dict")

## Construct final output file via shell script

In [23]:
! . make_output.sh a2/term_chi a2/dict