## Spark Notebook Setup

In [1]:
import $ivy.`org.apache.spark::spark-sql:2.4.0`
import org.apache.spark.sql._
import org.apache.spark.{SparkConf, SparkContext}

[32mimport [39m[36m$ivy.$[39m
[32mimport [39m[36morg.apache.spark.sql._[39m
[32mimport [39m[36morg.apache.spark.{SparkConf, SparkContext}[39m

In [2]:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)

[32mimport [39m[36morg.apache.log4j.{Level, Logger}[39m

In [3]:
val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    .getOrCreate()
}

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@6ebbb45f

## Preprocessing Data

In [4]:
val moviePath = "notebooks/data/movies.csv"
val stopWordsPath = "notebooks/data/stopwords.txt"
val lemmatizationPath = "notebooks/data/lemmatization.txt"

[36mmoviePath[39m: [32mString[39m = [32m"notebooks/data/movies.csv"[39m
[36mstopWordsPath[39m: [32mString[39m = [32m"notebooks/data/stopwords.txt"[39m
[36mlemmatizationPath[39m: [32mString[39m = [32m"notebooks/data/lemmatization.txt"[39m

In [5]:
val moviesRaw = spark.read
    .option("header", "true")
    .option("multiline", "true")
    .option("escape", "\"")
    .csv(moviePath)
    .rdd
    .zipWithIndex
    .map(x => (x._2, x._1.getString(0), x._1.getString(1), x._1.getString(7))) // (id, title, genres)

moviesRaw.take(5).foreach(println)

(0,1901,Kansas Saloon Smashers,A bartender is working at a saloon, serving drinks to customers. After he fills a stereotypically Irish man's bucket with beer, Carrie Nation and her followers burst inside. They assault the Irish man, pulling his hat over his eyes and then dumping the beer over his head. The group then begin wrecking the bar, smashing the fixtures, mirrors, and breaking the cash register. The bartender then sprays seltzer water in Nation's face before a group of policemen appear and order everybody to leave.[1])
(1,1901,Love by the Light of the Moon,The moon, painted with a smiling face hangs over a park at night. A young couple walking past a fence learn on a railing and look up. The moon smiles. They embrace, and the moon's smile gets bigger. They then sit down on a bench by a tree. The moon's view is blocked, causing him to frown. In the last scene, the man fans the woman with his hat because the moon has left the sky and is perched over her shoulder to see everything

[36mmoviesRaw[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[([32mLong[39m, [32mString[39m, [32mString[39m, [32mString[39m)] = MapPartitionsRDD[9] at map at cmd5.sc:8

In [6]:
val stopWords = spark.read
    .textFile(stopWordsPath)
    .rdd
    .map(_.trim.toLowerCase.replaceAll("[^a-z]", ""))
    .collect()
    .toSet

[36mstopWords[39m: [32mSet[39m[[32mString[39m] = [33mSet[39m(
  [32m"used"[39m,
  [32m"e"[39m,
  [32m"down"[39m,
  [32m"side"[39m,
  [32m"wanting"[39m,
  [32m"interesting"[39m,
  [32m"number"[39m,
  [32m"behind"[39m,
  [32m"ways"[39m,
  [32m"for"[39m,
  [32m"s"[39m,
  [32m"find"[39m,
  [32m"pointing"[39m,
  [32m"further"[39m,
  [32m"works"[39m,
  [32m"working"[39m,
  [32m"x"[39m,
  [32m"largely"[39m,
  [32m"any"[39m,
  [32m"ended"[39m,
  [32m"across"[39m,
  [32m"years"[39m,
  [32m"young"[39m,
  [32m"area"[39m,
  [32m"this"[39m,
  [32m"in"[39m,
  [32m"needing"[39m,
  [32m"myself"[39m,
  [32m"have"[39m,
  [32m"needed"[39m,
  [32m"your"[39m,
  [32m"off"[39m,
  [32m"once"[39m,
  [32m"point"[39m,
  [32m"are"[39m,
  [32m"is"[39m,
  [32m"his"[39m,
  [32m"why"[39m,
...

In [7]:
val lemmatization = spark.read
    .textFile(lemmatizationPath)
    .rdd
    .map(_.split("\\s+"))
    .map(x => (x(1), x(0)))
    .collect()
    .toMap

lemmatization.take(5).foreach(println)

(professed,profess)
(pathogens,pathogen)
(purifies,purify)
(phosphates,phosphate)
(buns,bun)


[36mlemmatization[39m: [32mMap[39m[[32mString[39m, [32mString[39m] = [33mMap[39m(
  [32m"professed"[39m -> [32m"profess"[39m,
  [32m"pathogens"[39m -> [32m"pathogen"[39m,
  [32m"purifies"[39m -> [32m"purify"[39m,
  [32m"phosphates"[39m -> [32m"phosphate"[39m,
  [32m"buns"[39m -> [32m"bun"[39m,
  [32m"fathering"[39m -> [32m"father"[39m,
  [32m"soapiest"[39m -> [32m"soapy"[39m,
  [32m"basils"[39m -> [32m"basil"[39m,
  [32m"showdowns"[39m -> [32m"showdown"[39m,
  [32m"redcurrants"[39m -> [32m"redcurrant"[39m,
  [32m"night-watchmen"[39m -> [32m"night-watchman"[39m,
  [32m"regularizing"[39m -> [32m"regularize"[39m,
  [32m"boutiques"[39m -> [32m"boutique"[39m,
  [32m"satsumas"[39m -> [32m"satsuma"[39m,
  [32m"healings"[39m -> [32m"healing"[39m,
  [32m"breaks"[39m -> [32m"break"[39m,
  [32m"cut-backs"[39m -> [32m"cut-back"[39m,
  [32m"sneezed"[39m -> [32m"sneeze"[39m,
  [32m"forgotten"[39m -> [32m"forget"[39

In [8]:
val movies = moviesRaw.map { case (id, year, title, plot) =>
    val processedPlot = plot.toLowerCase
        .replaceAll("[^a-z ]", " ")
        .split(" ")
        .filterNot(stopWords.contains)
        .filterNot(_.isEmpty)
        .map(token => lemmatization.getOrElse(token, token))
        .filter(_.length > 1)

    (id, year, title, processedPlot)
}

movies.take(5).foreach(println)

(0,1901,Kansas Saloon Smashers,[Ljava.lang.String;@22b3e273)
(1,1901,Love by the Light of the Moon,[Ljava.lang.String;@1d78f48)
(2,1901,The Martyred Presidents,[Ljava.lang.String;@1837d478)
(3,1901,Terrible Teddy, the Grizzly King,[Ljava.lang.String;@52fafc08)
(4,1902,Jack and the Beanstalk,[Ljava.lang.String;@6190c083)


[36mmovies[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[([32mLong[39m, [32mString[39m, [32mString[39m, [32mArray[39m[[32mString[39m])] = MapPartitionsRDD[21] at map at cmd8.sc:1

In [9]:
val firstEntry = movies.first()

[36mfirstEntry[39m: ([32mLong[39m, [32mString[39m, [32mString[39m, [32mArray[39m[[32mString[39m]) = (
  [32m0L[39m,
  [32m"1901"[39m,
  [32m"Kansas Saloon Smashers"[39m,
  [33mArray[39m(
    [32m"bartender"[39m,
    [32m"saloon"[39m,
    [32m"serve"[39m,
    [32m"drink"[39m,
    [32m"customer"[39m,
    [32m"fill"[39m,
    [32m"stereotypically"[39m,
    [32m"irish"[39m,
    [32m"bucket"[39m,
    [32m"beer"[39m,
    [32m"carrie"[39m,
    [32m"nation"[39m,
    [32m"follower"[39m,
    [32m"burst"[39m,
    [32m"inside"[39m,
    [32m"assault"[39m,
    [32m"irish"[39m,
    [32m"pull"[39m,
    [32m"hat"[39m,
    [32m"eye"[39m,
    [32m"dump"[39m,
    [32m"beer"[39m,
    [32m"head"[39m,
    [32m"begin"[39m,
    [32m"wreck"[39m,
    [32m"bar"[39m,
    [32m"smash"[39m,
    [32m"fixture"[39m,
    [32m"mirror"[39m,
    [32m"break"[39m,
    [32m"cash"[39m,
    [32m"register"[39m,
    [32m"bartender"[39m,
    [32m"

## Obtaining a Vocabulary

In [10]:
val vocabulary = movies 
    .flatMap { case (_, _, _, cleanedPlot) => cleanedPlot }
    .distinct

vocabulary.take(10).foreach(println)

mistretta
shh
sammee
bone
gerven
nothin
rostom
mislabel
fred
bresac


[36mvocabulary[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[[32mString[39m] = MapPartitionsRDD[25] at distinct at cmd10.sc:2

In [11]:
val vocabularyIndexed = vocabulary
    .zipWithIndex
    .map(_.swap)

vocabularyIndexed.take(10).foreach(println)

(0,mistretta)
(1,shh)
(2,sammee)
(3,bone)
(4,gerven)
(5,nothin)
(6,rostom)
(7,mislabel)
(8,fred)
(9,bresac)


[36mvocabularyIndexed[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[([32mLong[39m, [32mString[39m)] = MapPartitionsRDD[27] at map at cmd11.sc:3

## Calculating Documents Frequencies (DF)

In [12]:
val numDocs = movies.count
val vocabularySize = vocabularyIndexed.count.toInt

[36mnumDocs[39m: [32mLong[39m = [32m34886L[39m
[36mvocabularySize[39m: [32mInt[39m = [32m112737[39m

In [13]:
def dfToIdf(df: Int, totalDocs: Long): Double = {
    math.log(totalDocs.toDouble / df)
}

defined [32mfunction[39m [36mdfToIdf[39m

In [14]:
val vocabIdfs = movies
    .flatMap { case (_, _, _, cleanedPlot) => cleanedPlot.distinct }
    .map { word => (word, 1) }
    .reduceByKey(_ + _)
    .zipWithIndex
    .map { case ((word, count), id) => (word, id, count) }
    .map { case (word, id, count) => (word, id, dfToIdf(count, numDocs)) }

vocabIdfs.take(10).foreach(println)

(mistretta,0,10.45984088157808)
(shh,1,10.45984088157808)
(sammee,2,10.45984088157808)
(bone,3,5.066213335225719)
(gerven,4,10.45984088157808)
(nothin,5,9.36122859290997)
(rostom,6,9.36122859290997)
(mislabel,7,9.36122859290997)
(fred,8,4.519669628857649)
(bresac,9,10.45984088157808)


[36mvocabIdfs[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[([32mString[39m, [32mLong[39m, [32mDouble[39m)] = MapPartitionsRDD[33] at map at cmd14.sc:7

vocabIdfs.take(10).foreach(println)

In [19]:
val tokenCounts = movies
    .flatMap { case (id, title, genres, plotTokens) => {
        val tfs = plotTokens
            .groupBy(identity)
            .mapValues(_.size)
            .map { case (token, count) => (token, id, count) }
            .toList
        tfs
    }}

tokenCounts.take(10).foreach(println)

(eye,0,1)
(register,0,1)
(wreck,0,1)
(begin,0,1)
(break,0,1)
(nation,0,2)
(appear,0,1)
(head,0,1)
(serve,0,1)
(burst,0,1)


[36mtokenCounts[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[([32mString[39m, [32mLong[39m, [32mInt[39m)] = MapPartitionsRDD[34] at flatMap at cmd19.sc:2

In [16]:
vocabIdfs.take(10).foreach(println)

(mistretta,0,10.45984088157808)
(shh,1,10.45984088157808)
(sammee,2,10.45984088157808)
(bone,3,5.066213335225719)
(gerven,4,10.45984088157808)
(nothin,5,9.36122859290997)
(rostom,6,9.36122859290997)
(mislabel,7,9.36122859290997)
(fred,8,4.519669628857649)
(bresac,9,10.45984088157808)


In [17]:
import scala.collection.immutable.Map

[32mimport [39m[36mscala.collection.immutable.Map[39m

In [25]:
def calc_magnitude(v: List[Double]) = {
  math.sqrt(v.map(x => x * x).sum)
}

def calc_cosign(doc_map_1: Map[Long, Double], doc_map_2: Map[Long, Double]) = {
  val dot_product = doc_map_1
    .map({case (k, v) => (k, (v, doc_map_2.get(k)))})
    .map({
      case (k, (v1, Some(v2))) => (v1 * v2)
      case (k, (v1, None)) => (0)
    }).sum

  val v1 = doc_map_1.map({case (k, v) => v}).toList
  val v2 = doc_map_2.map({case (k, v) => v}).toList

  dot_product / (calc_magnitude(v1) * calc_magnitude(v2))
}


defined [32mfunction[39m [36mcalc_magnitude[39m
defined [32mfunction[39m [36mcalc_cosign[39m

In [20]:
val tfIdfs = tokenCounts
    .map { case (token, docId, tf) => (token, (docId, tf)) }
    .join(vocabIdfs.map { case (word, wordId, idf) => (word, (wordId, idf)) })
    .map { case (token, ((docId, tf), (wordId, idf))) => (docId, (wordId, tf * idf)) }
    .groupByKey
    .mapValues(_.toMap)
    .persist

tfIdfs.take(10).foreach(println)

(19021,Map(111056 -> 2.83327067528742, 44857 -> 5.844720364736821, 23194 -> 3.9087605465346753, 57729 -> 3.231452430004476, 92707 -> 2.6446338193889924, 61171 -> 5.4831071391575055, 50482 -> 3.742036186554389, 13110 -> 3.2851165717417037, 79290 -> 3.6257321427642415, 85724 -> 5.0088024280123795, 99912 -> 2.1852289353685275, 11913 -> 3.273696577055755, 108599 -> 4.089939898749853, 104728 -> 3.124590565737507, 97570 -> 2.302183866381607, 84535 -> 2.8435573199976956, 12924 -> 3.567199240405991, 98148 -> 5.3007855823635515, 110782 -> 48.83346850509067, 89834 -> 1.8170728672537764, 56765 -> 2.3771297473404998, 51877 -> 4.1499226033515635, 44939 -> 1.0841555770150595, 23995 -> 5.372244546345696, 68355 -> 4.27157675849549, 61153 -> 2.624657126311332, 1836 -> 6.72623900616664, 98646 -> 2.27895993958169, 4740 -> 4.538262461934265, 56422 -> 4.0431085990657545, 92495 -> 6.609693279868021, 650 -> 8.792914874807488, 78273 -> 20.91968176315616, 94631 -> 4.196442618986456, 53999 -> 3.683333889205897,

[36mtfIdfs[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mrdd[39m.[32mRDD[39m[([32mLong[39m, [32mMap[39m[[32mLong[39m, [32mDouble[39m])] = MapPartitionsRDD[42] at mapValues at cmd20.sc:6

In [21]:
import org.apache.spark.rdd.RDD

[32mimport [39m[36morg.apache.spark.rdd.RDD[39m

In [22]:
def cosineBetweenDocs(doc1: Int, doc2: Int, tfIdfs: RDD[(Long, Map[Long, Double])]): Double = {
    val doc1TfIdfs = tfIdfs.filter(_._1 == doc1).collect.head._2
    val doc2TfIdfs = tfIdfs.filter(_._1 == doc2).collect.head._2

    calc_cosign(doc1TfIdfs, doc2TfIdfs)
}

defined [32mfunction[39m [36mcosineBetweenDocs[39m

In [24]:
cosineBetweenDocs(0, 1, tfIdfs)

[36mres24[39m: [32mDouble[39m = [32m0.02227586306780432[39m

In [23]:
cosineBetweenDocs(4, 4, tfIdfs)

[36mres23[39m: [32mDouble[39m = [32m1.0[39m