In [88]:
// load Shakespeare file into RDD
val lines = sc.textFile("shakespeare.txt")
lines.take(20).foreach(println)

1609

THE SONNETS

by William Shakespeare



                     1
  From fairest creatures we desire increase,
  That thereby beauty's rose might never die,
  But as the riper should by time decease,
  His tender heir might bear his memory:
  But thou contracted to thine own bright eyes,
  Feed'st thy light's flame with self-substantial fuel,
  Making a famine where abundance lies,
  Thy self thy foe, to thy sweet self too cruel:
  Thou that art now the world's fresh ornament,
  And only herald to the gaudy spring,
  Within thine own bud buriest thy content,


lines: org.apache.spark.rdd.RDD[String] = shakespeare.txt MapPartitionsRDD[71] at textFile at <console>:65


In [2]:
// remove empty lines
val rdd1 = lines.filter(l => l.length>0)
rdd1.take(20).foreach(println)

1609
THE SONNETS
by William Shakespeare
                     1
  From fairest creatures we desire increase,
  That thereby beauty's rose might never die,
  But as the riper should by time decease,
  His tender heir might bear his memory:
  But thou contracted to thine own bright eyes,
  Feed'st thy light's flame with self-substantial fuel,
  Making a famine where abundance lies,
  Thy self thy foe, to thy sweet self too cruel:
  Thou that art now the world's fresh ornament,
  And only herald to the gaudy spring,
  Within thine own bud buriest thy content,
  And tender churl mak'st waste in niggarding:
    Pity the world, or else this glutton be,
    To eat the world's due, by the grave and thee.
                     2
  When forty winters shall besiege thy brow,


rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:27


In [3]:
// Remove the punctuations 
val rdd2 = rdd1.map(x=> x.replaceAll("""[\p{Punct}]""", ""))
rdd2.take(20).foreach(println)

1609
THE SONNETS
by William Shakespeare
                     1
  From fairest creatures we desire increase
  That thereby beautys rose might never die
  But as the riper should by time decease
  His tender heir might bear his memory
  But thou contracted to thine own bright eyes
  Feedst thy lights flame with selfsubstantial fuel
  Making a famine where abundance lies
  Thy self thy foe to thy sweet self too cruel
  Thou that art now the worlds fresh ornament
  And only herald to the gaudy spring
  Within thine own bud buriest thy content
  And tender churl makst waste in niggarding
    Pity the world or else this glutton be
    To eat the worlds due by the grave and thee
                     2
  When forty winters shall besiege thy brow


rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at map at <console>:27


In [90]:
// Split the words
val rdd3 = rdd2.flatMap(line => line.split(" "))
rdd3.take(20).foreach(println)

1609
THE
SONNETS
by
William
Shakespeare
















rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[74] at flatMap at <console>:66


In [91]:
// Remove empty words
val rdd4 = rdd3.filter(_.nonEmpty)
rdd4.take(20).foreach(println)

1609
THE
SONNETS
by
William
Shakespeare
1
From
fairest
creatures
we
desire
increase
That
thereby
beautys
rose
might
never
die


rdd4: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[75] at filter at <console>:66


In [92]:
// To lower case
val rdd5 = rdd4.map(w => w.toLowerCase())
rdd5.take(20).foreach(println)

1609
the
sonnets
by
william
shakespeare
1
from
fairest
creatures
we
desire
increase
that
thereby
beautys
rose
might
never
die


rdd5: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[76] at map at <console>:66


In [93]:
// load verbs file
// Create an RDD for the verbs
val verbs = sc.textFile("all_verbs.txt").collect()
verbs.take(20).foreach(println)

abash
abashed
abashed
abashes
abashing
abate
abated
abated
abates
abating
abide
abode
abode
abides
abiding
absorb
absorbed
absorbed
absorbs
absorbing


verbs: Array[String] = Array(abash, abashed, abashed, abashes, abashing, abate, abated, abated, abates, abating, abide, abode, abode, abides, abiding, absorb, absorbed, absorbed, absorbs, absorbing, accept, accepted, accepted, accepts, accepting, accompany, accompanied, accompanied, accompanies, accompanying, ache, ached, ached, aches, aching, achieve, achieved, achieved, achieves, achieving, acquire, acquired, acquired, acquires, acquiring, act, acted, acted, acts, acting, add, added, added, adds, adding, address, addressed, addressed, addresses, addressing, adjust, adjusted, adjusted, adjusts, adjusting, admire, admired, admired, admires, admiring, admit, admitted, admitted, admits, admitting, advise, advised, advised, advises, advising, afford, afforded, afforded, affords, affording,...


In [94]:
// Filter out the non-verb words
val rdd6 = rdd5.filter(x => verbs.contains(x))
rdd6.take(20).foreach(println)

desire
increase
rose
die
bear
contracted
own
eyes
lights
making
lies
spring
own
waste
be
eat
dig
gazed
be
held


rdd6: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[79] at filter at <console>:68


In [9]:
// Generate key/value pairs
val rdd7 = rdd6.map(x => (x,1))
rdd7.take(20).foreach(println)

(desire,1)
(increase,1)
(rose,1)
(die,1)
(bear,1)
(contracted,1)
(own,1)
(eyes,1)
(lights,1)
(making,1)
(lies,1)
(spring,1)
(own,1)
(waste,1)
(be,1)
(eat,1)
(dig,1)
(gazed,1)
(be,1)
(held,1)


rdd7: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[10] at map at <console>:27


In [10]:
// Aggregate the word frequencies
val rdd8 = rdd7.reduceByKey((a,b) => (a+b))
rdd8.take(20).foreach(println)

(float,2)
(agree,20)
(healing,2)
(shot,45)
(guide,24)
(opening,11)
(urging,9)
(practises,1)
(surge,9)
(maintained,2)
(counted,9)
(carried,33)
(order,92)
(handled,4)
(hidden,8)
(shunning,2)
(valuing,1)
(stinks,1)
(shaping,1)
(hatches,7)


rdd8: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[11] at reduceByKey at <console>:27


In [97]:
// read file of verb_dict
import scala.io.Source 
val dict = Source.fromFile("verb_dict.txt")

// use for loop to generate verb_dict : Array[Array[String]]
var verb_dict : Array[Array[String]] = Array()
var verb : Array[String] = Array()
for (line <- dict.getLines()) {
    var value = line.split(",")
    verb = Array(value(0),value(1),value(2),value(3),value(4),value(5))
    verb_dict = verb_dict ++ Array(verb)
}

import scala.io.Source
dict: scala.io.BufferedSource = <iterator>
verb_dict: Array[Array[String]] = Array(Array(abash, abash, abashed, abashed, abashes, abashing), Array(abate, abate, abated, abated, abates, abating), Array(abide, abide, abode, abode, abides, abiding), Array(absorb, absorb, absorbed, absorbed, absorbs, absorbing), Array(accept, accept, accepted, accepted, accepts, accepting), Array(accompany, accompany, accompanied, accompanied, accompanies, accompanying), Array(ache, ache, ached, ached, aches, aching), Array(achieve, achieve, achieved, achieved, achieves, achieving), Array(acquire, acquire, acquired, acquired, acquires, acquiring), Array(act, act, acted, acted, acts, acting), Array(add, add, added, added, adds, adding), Array(address, address, addressed, addressed, add...


In [99]:
// verb -> base verb (e.g had -> have)
// Current verbs array generates have -> have twice, eliminates duplicate records
val verbs2 = sc.parallelize(large)
    .flatMap(v => v.map(v2 => (v2, v.head))) 
    .reduceByKey((k1, k2) => if (k1 == k2) k1 else k2) 

// Join two RDD by their keys 
//  Then sum all values that having same base verb 
val t2 = verbs2.join(rdd8).map(d => d._2).reduceByKey(_ + _) 

t2.take(20).foreach(println)

(float,5)
(engrave,1)
(call,686)
(offer,121)
(agree,45)
(guide,41)
(sort,88)
(surge,13)
(improve,1)
(include,4)
(order,104)
(type,3)
(squeeze,3)
(limp,7)
(attend,223)
(flee,90)
(select,2)
(prefer,21)
(soothe,6)
(contract,38)


verbs2: org.apache.spark.rdd.RDD[(String, String)] = ShuffledRDD[98] at reduceByKey at <console>:77
t2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[103] at reduceByKey at <console>:81


In [100]:
// return the top 10 that are most frequently used verbs
t2.sortBy(_._2,false).take(10).foreach(println)

(be,26727)
(have,7848)
(do,6416)
(come,3610)
(make,2892)
(love,2501)
(go,2476)
(let,2384)
(say,2356)
(know,2251)
