# Introduction to PySpark

The first thing to do is to create a Spark context, that will make our code able to run on Spark facilities.
Remember you can run a single SparkContext at a time.

In [2]:
import pyspark

sc = pyspark.SparkContext('local[*]')

24/10/21 16:31:30 WARN Utils: Your hostname, Gianlucas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.10.84.118 instead (on interface en0)
24/10/21 16:31:30 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/21 16:31:30 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


The string "local[\*\]" states that we want to run our code on a local cluster (and not on an external one). The values * tells Spark to parallelize the code on all cores available in our local machine.

## Spark data structures and main functions

In this section we will see which are the main functions and data structures employed by Spark to parallelize the code.

We read our example file

In [3]:
#reads from local (or distributed) file system and returns a RDD of strings.
txt = sc.textFile('data/joyce.txt')

#count the lines
print(txt.count())

32372


                                                                                

The method textFile reads from a file and returns an RDD of strings. But what is an RDD?

**RDD: Resilient Distributed Dataset**

An RDD is an immutable distributed collection of data elements (represented as Scala o Java objects).
RDDs are used when we deal both with structured and unstructured data. RDDs do not infer the schema of the input data. They are ideal to handle data which do not have any predefined structure.

Using our RDD, we want to count the lines that have the word "gold".

We will use the first of the main operator of distributed programming: **filter**.

In [3]:
lines = txt.filter(lambda line: 'gold' in line.lower())

print(lines.count())

143


Two main concepts here:

* **lambda** function: small anonimous stateless function. It takes any number of arguments and returns a single expression. In our case, the lambda function applies the .lower() method to any line of our RDD. It is the pythonic way to perform functional programming.
* **filter**: one of the pillars of functional (and distributed) programming. It filters out some elements based on a condition, typically expressed as a lambda function.

Another way to create and RDD is by using the function *parallelize*.

In [7]:
data = range(10000)
rdd = sc.parallelize(data, 2) #we distribute it in 2 partitions

odds = rdd.filter(lambda x: x % 2 != 0)

rdd.take(5) #see first 5 elements

[0, 1, 2, 3, 4]

Now, we se the usage of another important function, **map**.

We want to associate any element to its double.

In [8]:
double_data = rdd.map(lambda x: x**2)
double_data.take(10)

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

The **map** method is similar to the filter methods, with the difference that in this case the number of input and output elements **are the same**.

It is important to underline that operations on RDDs are dalayed until the final result is requested. So, the processing is carried in a lazy fashion. All operations are stacked one on another and excecuted only when the results is requested. In our case, using the method *take()*.

The last main function is the **reduce** function. Differently from the filter and the map function, which return a new RDD, reduce returns a single value, based on the lambda function.

In [9]:
less_data = rdd.filter(lambda x: x < 10)

reduced_data = less_data.reduce(lambda x1, x2: x1+x2)

#less_data.take(10)

reduced_data

45

Note that all the operations we made could be replaced with loops, but it would be way less efficient and not parallelizable. On the contrary, Spark parallelizes the operations on RDDs.

### Complete example 

We now want to apply the concept learnt on James Joyce's Ulysses. We want to find the index of the shortest line in the text that contains the word "mulligan", one of the chatacters.

Since we are interested in the line index, we associate the indexes to our RDD. The structure will be a key/valued-one, with key-> line, value-> index

In [7]:
txt = sc.textFile('data/joyce.txt').zipWithIndex()
print(txt.count())
txt.take(5)

32372


[('Ulysses by James Joyce', 0),
 ('', 1),
 ('', 2),
 ('    --  I -- ', 3),
 ('', 4)]

We keep lines with the word mulligan and their index (we are working with tuples now).

In [8]:
lines = txt.filter(lambda line: 'mulligan' in line[0].lower())

print(lines.count())

lines.collect()

161


[('STATELY, PLUMP BUCK MULLIGAN CAME FROM THE STAIRHEAD, bearing a bowl of',
  7),
 ('Buck Mulligan peeped an instant under the mirror and then covered', 27),
 ("Buck Mulligan's gay voice went on.", 59),
 ('-- My name is absurd too: Malachi Mulligan, two dactyls. But it has a', 61),
 ('-- Tell me, Mulligan, Stephen said quietly.', 72),
 ('Buck Mulligan showed a shaven cheek over his right shoulder.', 78),
 ('-- A woful lunatic! Mulligan said. Were you in a funk?', 91),
 ('Buck Mulligan frowned at the lather on his razorblade. He hopped down', 98),
 ('dirty crumpled handkerchief. Buck Mulligan wiped the razorblade neatly.',
  109),
 ('-- Our mighty mother! Buck Mulligan said.', 128),
 ("you, Buck Mulligan said. I'm hyperborean as much as you. But to think of",
  139),
 ('Buck Mulligan wiped again his razorblade.', 163),
 ('Buck Mulligan attacked the hollow beneath his underlip.', 170),
 ("-- He can't wear them, Buck Mulligan told his face in the mirror.", 179),
 ('-- That fellow I was w

We map any (line, index) pair to a (line, index, len(line)) tuple.

In [9]:
lines_with_len = lines.map(lambda line: (line[0], line[1], len(line[0])))

lines_with_len.collect()

[('STATELY, PLUMP BUCK MULLIGAN CAME FROM THE STAIRHEAD, bearing a bowl of',
  7,
  71),
 ('Buck Mulligan peeped an instant under the mirror and then covered', 27, 65),
 ("Buck Mulligan's gay voice went on.", 59, 34),
 ('-- My name is absurd too: Malachi Mulligan, two dactyls. But it has a',
  61,
  69),
 ('-- Tell me, Mulligan, Stephen said quietly.', 72, 43),
 ('Buck Mulligan showed a shaven cheek over his right shoulder.', 78, 60),
 ('-- A woful lunatic! Mulligan said. Were you in a funk?', 91, 54),
 ('Buck Mulligan frowned at the lather on his razorblade. He hopped down',
  98,
  69),
 ('dirty crumpled handkerchief. Buck Mulligan wiped the razorblade neatly.',
  109,
  71),
 ('-- Our mighty mother! Buck Mulligan said.', 128, 41),
 ("you, Buck Mulligan said. I'm hyperborean as much as you. But to think of",
  139,
  72),
 ('Buck Mulligan wiped again his razorblade.', 163, 41),
 ('Buck Mulligan attacked the hollow beneath his underlip.', 170, 55),
 ("-- He can't wear them, Buck Mulli

We keep only the shortest line (in case we have more, we keep the first one: this behavior is guaranteed by the fact that the order is preserved in the RDD).

In [10]:
shortest = lines_with_len.reduce(lambda line1, line2: line1 if line1[2] <= line2[2] 
                                 else line2)

print(shortest)

("Mulligan's cheek.", 292, 17)


# Search Engine

In this section we will develop a small search engine leveraging Spark data structures and operations. We will consider any line of the Ulysses as a document.

## Inverted index creation

This subsection is dedicated to the creation of our inverted index.
We start by loading the file lines into an RDD.

In [11]:
txt = sc.textFile('data/joyce.txt').zipWithIndex()
txt.take(10)

[('Ulysses by James Joyce', 0),
 ('', 1),
 ('', 2),
 ('    --  I -- ', 3),
 ('', 4),
 ('', 5),
 ('', 6),
 ('STATELY, PLUMP BUCK MULLIGAN CAME FROM THE STAIRHEAD, bearing a bowl of',
  7),
 ('lather on which a mirror and a razor lay crossed. A yellow dressinggown,',
  8),
 ('ungirdled, was sustained gently behind him by the mild morning air. He', 9)]

### Term frequency
TF is the number of times a particular word appears in a document. We can compute the term frequency either normalized or not (which choose the not normalized variant in this example). We map our key-value pair into a new one that has as key (doc-id, token) and as value the number 1 (used for counting the occurrences). The usage of flatMap combines the tokens into a single list (otherwise we would have multiple lists).

In [12]:
index = txt.flatMap(lambda line: [((line[1] ,word),1) for word in 
                                  line[0].lower().split()])

index.take(20)

24/10/21 17:04:33 ERROR Executor: Exception in task 0.0 in stage 6.0 (TID 8)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/gianlucadecarlo/Library/Python/3.9/lib/python/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/Users/gianlucadecarlo/Library/Python/3.9/lib/python/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/gianlucadecarlo/Library/Python/3.9/lib/python/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/gianlucadecarlo/Library/Python/3.9/lib/python/site-packages/pyspark/rdd.py", line 2849, in takeUpToNumLeft
    yield next(iterator)
  File "/Users/gianlucadecarlo/Library/Python/3.9/lib/python/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wr

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 8) (10.10.84.118 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/gianlucadecarlo/Library/Python/3.9/lib/python/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/Users/gianlucadecarlo/Library/Python/3.9/lib/python/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/gianlucadecarlo/Library/Python/3.9/lib/python/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/gianlucadecarlo/Library/Python/3.9/lib/python/site-packages/pyspark/rdd.py", line 2849, in takeUpToNumLeft
    yield next(iterator)
  File "/Users/gianlucadecarlo/Library/Python/3.9/lib/python/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
  File "/var/folders/qq/2hd7rzr534s3rlrl1z5td7gr0000gn/T/ipykernel_89116/2084983313.py", line 2, in <lambda>
IndexError: string index out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:181)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/gianlucadecarlo/Library/Python/3.9/lib/python/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1247, in main
    process()
  File "/Users/gianlucadecarlo/Library/Python/3.9/lib/python/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1239, in process
    serializer.dump_stream(out_iter, outfile)
  File "/Users/gianlucadecarlo/Library/Python/3.9/lib/python/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 274, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/Users/gianlucadecarlo/Library/Python/3.9/lib/python/site-packages/pyspark/rdd.py", line 2849, in takeUpToNumLeft
    yield next(iterator)
  File "/Users/gianlucadecarlo/Library/Python/3.9/lib/python/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 83, in wrapper
    return f(*args, **kwargs)
  File "/var/folders/qq/2hd7rzr534s3rlrl1z5td7gr0000gn/T/ipykernel_89116/2084983313.py", line 2, in <lambda>
IndexError: string index out of range

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:784)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:766)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$.$anonfun$runJob$1(PythonRDD.scala:181)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2433)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


We here group the key-value pairs by common key and aggregate the values (summing the 1s to obtain the term frequency in each doc).

In [11]:
postings = index.reduceByKey(lambda line1, line2 : line1+line2)

postings.take(10)

NameError: name 'index' is not defined

Here we perfom a mapping to change the key-value paris to a new set, in which the token is the key and the value is (docID, TF).

In [10]:
tf=postings.map(lambda post: (post[0][1],(post[0][0],post[1])))

tf.take(10)

NameError: name 'postings' is not defined

### Inverse Document Frequency

IDF score tells us how important is a particular word in the **whole** corpus.

We start by mapping to a new key-value pairs set in which the key is the token and the value is (docID, TF, 1 (a counter)).

In [15]:
postings_2 = postings.map(lambda post: (post[0][1],(post[0][0],post[1],1)))

postings_2.take(10)

[('james', (0, 1, 1)),
 ('--', (3, 2, 1)),
 ('plump', (7, 1, 1)),
 ('from', (7, 1, 1)),
 ('the', (7, 1, 1)),
 ('bearing', (7, 1, 1)),
 ('a', (7, 1, 1)),
 ('lather', (8, 1, 1)),
 ('mirror', (8, 1, 1)),
 ('razor', (8, 1, 1))]

A further mapping is needed to obatin key=token and value = occurences (now 1)

In [16]:
postings_3 = postings_2.map(lambda post:(post[0],post[1][2]))

postings_3.take(10)

CodeCache: size=131072Kb used=14559Kb max_used=14691Kb free=116512Kb
 bounds [0x00000001069f8000, 0x0000000107868000, 0x000000010e9f8000]
 total_blobs=6006 nmethods=5155 adapters=765
 compilation: disabled (not enough contiguous free space left)




[('james', 1),
 ('--', 1),
 ('plump', 1),
 ('from', 1),
 ('the', 1),
 ('bearing', 1),
 ('a', 1),
 ('lather', 1),
 ('mirror', 1),
 ('razor', 1)]

A last step consists into reducing the values by key. This step aggregates all the values with the same key, obtaining a count of the occurences of a token in the corpus. 

**reduceByKey** works like the **reduce** method, but it is applied key-wise.

In [17]:
postings_4 = postings_3.reduceByKey(lambda term1,term2 : term1+term2) #key is the first values it sees

postings_4.take(10) 

[('james', 26),
 ('lather', 3),
 ('mirror', 32),
 ('razor', 5),
 ('yellow', 41),
 ('dressinggown,', 1),
 ('bowl', 11),
 ('aloft', 3),
 ('halted,', 6),
 ('he', 3308)]

With the next sept we compute the actual idf value. There are many ways to compute this value using differnt normalization techiques. We use the simple one with the log.

In [18]:
import math

num_lines = txt.count()

idf = postings_4.map(lambda term: (term[0],math.log10(num_lines/term[1])))

idf.take(10)

[('james', 3.0951961837496964),
 ('lather', 4.033048277000852),
 ('mirror', 3.005019553400609),
 ('razor', 3.811199527384496),
 ('yellow', 2.897385675000779),
 ('dressinggown,', 4.510169531720514),
 ('bowl', 3.4687768465622897),
 ('aloft', 4.033048277000852),
 ('halted,', 3.7320182813368707),
 ('he', 0.9906040308400055)]

As you see we computed the idf score only for the tokens that actually appear in the document. It is an efficent way to implement the index by saving space. When you need to consider the idf and tf-idf values for tokens that are not present in the document, you have to retrieve them somehow (as we will see later...).

### Computing TD-IDF

We have two RDDs, one with the TF and the other with IDF, let's joint them.

In [19]:
rdd=tf.join(idf)

rdd.take(10)

[('james', ((0, 1), 3.0951961837496964)),
 ('james', ((3104, 1), 3.0951961837496964)),
 ('james', ((4584, 1), 3.0951961837496964)),
 ('james', ((7594, 1), 3.0951961837496964)),
 ('james', ((7716, 1), 3.0951961837496964)),
 ('james', ((8810, 1), 3.0951961837496964)),
 ('james', ((11294, 1), 3.0951961837496964)),
 ('james', ((11796, 1), 3.0951961837496964)),
 ('james', ((13638, 1), 3.0951961837496964)),
 ('james', ((15644, 1), 3.0951961837496964))]

We use a map to compute the tf-idf score and we sort the RDD by key.

In [20]:
tf_idf=rdd.map(lambda entry: (entry[1][0][0],(entry[0],entry[1][0][1],entry[1][1],
                                              entry[1][0][1]*entry[1][1]))).sortByKey()

tf_idf.take(10)

[(0, ('james', 1, 3.0951961837496964, 3.0951961837496964)),
 (0, ('by', 1, 1.4382875244143891, 1.4382875244143891)),
 (0, ('joyce', 1, 4.510169531720514, 4.510169531720514)),
 (0, ('ulysses', 1, 3.908109540392552, 3.908109540392552)),
 (3, ('i', 1, 1.160503547623885, 1.160503547623885)),
 (3, ('--', 2, 1.152997273997481, 2.305994547994962)),
 (7, ('bowl', 1, 3.4687768465622897, 3.4687768465622897)),
 (7, ('came', 1, 2.2383279251840156, 2.2383279251840156)),
 (7, ('of', 1, 0.6749229917242036, 0.6749229917242036)),
 (7, ('buck', 1, 2.472743033779891, 2.472743033779891))]

In order to visualize our index, we convert it to a Spark **Dataframe**. This is another Spark data structure, useful when dealing with **structured** data that follow a schema.

In [21]:
tf_idf_2=tf_idf.map(lambda entry: (entry[0],entry[1][0],entry[1][1],entry[1][2],
                                   entry[1][3]))

from pyspark.sql import SparkSession #a spark session is needed to use Dataframe.

spark = SparkSession(sc)

df = tf_idf_2.toDF(["DocumentId","Token","TF","IDF","TF-IDF"]).show()

+----------+----------+---+-------------------+-------------------+
|DocumentId|     Token| TF|                IDF|             TF-IDF|
+----------+----------+---+-------------------+-------------------+
|         0|     james|  1| 3.0951961837496964| 3.0951961837496964|
|         0|        by|  1| 1.4382875244143891| 1.4382875244143891|
|         0|     joyce|  1|  4.510169531720514|  4.510169531720514|
|         0|   ulysses|  1|  3.908109540392552|  3.908109540392552|
|         3|         i|  1|  1.160503547623885|  1.160503547623885|
|         3|        --|  2|  1.152997273997481|  2.305994547994962|
|         7|      bowl|  1| 3.4687768465622897| 3.4687768465622897|
|         7|      came|  1| 2.2383279251840156| 2.2383279251840156|
|         7|        of|  1| 0.6749229917242036| 0.6749229917242036|
|         7|      buck|  1|  2.472743033779891|  2.472743033779891|
|         7|  mulligan|  1|  2.453264680384042|  2.453264680384042|
|         7|stairhead,|  1|  4.510169531720514| 

### Search

We now implement our search. Some more tweaks of the index are needed in order to use it better. We start with a map such to have docId -> (token, IDF, TF-IDF)

In [22]:
tf_idf_scores = tf_idf_2.map(lambda term: (term[0], term[1], term[3], term[4]))

tf_idf_scores.take(10)

[(0, 'james', 3.0951961837496964, 3.0951961837496964),
 (0, 'by', 1.4382875244143891, 1.4382875244143891),
 (0, 'joyce', 4.510169531720514, 4.510169531720514),
 (0, 'ulysses', 3.908109540392552, 3.908109540392552),
 (3, 'i', 1.160503547623885, 1.160503547623885),
 (3, '--', 1.152997273997481, 2.305994547994962),
 (7, 'bowl', 3.4687768465622897, 3.4687768465622897),
 (7, 'came', 2.2383279251840156, 2.2383279251840156),
 (7, 'of', 0.6749229917242036, 0.6749229917242036),
 (7, 'buck', 2.472743033779891, 2.472743033779891)]

We have to keep track of the norm of every document since we will need to normalize the final score (we apply the cosine similarity).

In [23]:
only_tf_idf = tf_idf_scores.map(lambda term: (term[0], term[-1]))

only_tf_idf.take(10)

[(0, 3.0951961837496964),
 (0, 1.4382875244143891),
 (0, 4.510169531720514),
 (0, 3.908109540392552),
 (3, 1.160503547623885),
 (3, 2.305994547994962),
 (7, 3.4687768465622897),
 (7, 2.2383279251840156),
 (7, 0.6749229917242036),
 (7, 2.472743033779891)]

We compute the norm in three steps.

In [24]:
squared_tf_idf = only_tf_idf.map(lambda term: (term[0], term[1]**2))

squared_tf_idf.take(10)

[(0, 9.580239415898685),
 (0, 2.068671002886072),
 (0, 20.34162920486004),
 (0, 15.273320179707286),
 (3, 1.3467684840476226),
 (3, 5.317610855382489),
 (7, 12.032412811246623),
 (7, 5.01011190065858),
 (7, 0.45552104475794936),
 (7, 6.114458111106978)]

In [25]:
norms = squared_tf_idf.reduceByKey(lambda item1, item2: item1 + item2)

norms.take(10)

[(0, 47.263859803352084),
 (8, 107.25051991263538),
 (12, 67.40331974857523),
 (16, 109.20370327524861),
 (20, 95.88344470005734),
 (24, 91.09253198587062),
 (28, 29.972942861663235),
 (32, 48.87547246782259),
 (36, 71.55775283808075),
 (40, 94.84875880510157)]

In [26]:
norms_sqrt = norms.map(lambda item: (item[0], math.sqrt(item[1])))

norms_sqrt.take(10)

[(0, 6.8748716208633365),
 (8, 10.356182690192144),
 (12, 8.209952481505312),
 (16, 10.450057572819809),
 (20, 9.792009226918514),
 (24, 9.544240775769994),
 (28, 5.474755050380176),
 (32, 6.9910995178028035),
 (36, 8.459181570227745),
 (40, 9.73903274484184)]

We remap and join with the norm RDD.

In [27]:
tf_idf_scores_2 = tf_idf_scores.map(lambda item: (item[0], (item[1], item[2], 
                                                            item[3])))

tf_idf_scores_2.take(10)

[(0, ('james', 3.0951961837496964, 3.0951961837496964)),
 (0, ('by', 1.4382875244143891, 1.4382875244143891)),
 (0, ('joyce', 4.510169531720514, 4.510169531720514)),
 (0, ('ulysses', 3.908109540392552, 3.908109540392552)),
 (3, ('i', 1.160503547623885, 1.160503547623885)),
 (3, ('--', 1.152997273997481, 2.305994547994962)),
 (7, ('bowl', 3.4687768465622897, 3.4687768465622897)),
 (7, ('came', 2.2383279251840156, 2.2383279251840156)),
 (7, ('of', 0.6749229917242036, 0.6749229917242036)),
 (7, ('buck', 2.472743033779891, 2.472743033779891))]

In [28]:
final_index = tf_idf_scores_2.join(norms_sqrt)

final_index.take(10)

[(0, (('james', 3.0951961837496964, 3.0951961837496964), 6.8748716208633365)),
 (0, (('by', 1.4382875244143891, 1.4382875244143891), 6.8748716208633365)),
 (0, (('joyce', 4.510169531720514, 4.510169531720514), 6.8748716208633365)),
 (0, (('ulysses', 3.908109540392552, 3.908109540392552), 6.8748716208633365)),
 (8, (('lather', 4.033048277000852, 4.033048277000852), 10.356182690192144)),
 (8, (('mirror', 3.005019553400609, 3.005019553400609), 10.356182690192144)),
 (8, (('razor', 3.811199527384496, 3.811199527384496), 10.356182690192144)),
 (8, (('a', 0.7748900836600578, 2.3246702509801738), 10.356182690192144)),
 (8, (('and', 0.738802560862734, 0.738802560862734), 10.356182690192144)),
 (8, (('lay', 2.7393175200783704, 2.7393175200783704), 10.356182690192144))]

We flatten the values for simplicity.

In [29]:
final_index_flatten = final_index.map(lambda item: (item[1][0][0], item[0], 
                                        item[1][0][1], item[1][0][2], item[1][1]))

final_index_flatten.take(10)

[('james', 0, 3.0951961837496964, 3.0951961837496964, 6.8748716208633365),
 ('by', 0, 1.4382875244143891, 1.4382875244143891, 6.8748716208633365),
 ('joyce', 0, 4.510169531720514, 4.510169531720514, 6.8748716208633365),
 ('ulysses', 0, 3.908109540392552, 3.908109540392552, 6.8748716208633365),
 ('lather', 8, 4.033048277000852, 4.033048277000852, 10.356182690192144),
 ('mirror', 8, 3.005019553400609, 3.005019553400609, 10.356182690192144),
 ('razor', 8, 3.811199527384496, 3.811199527384496, 10.356182690192144),
 ('a', 8, 0.7748900836600578, 2.3246702509801738, 10.356182690192144),
 ('and', 8, 0.738802560862734, 0.738802560862734, 10.356182690192144),
 ('lay', 8, 2.7393175200783704, 2.7393175200783704, 10.356182690192144)]

### Query time

In [30]:
query = "back to the barracks!".split(" ")

#we keep only the docs with the words of the query. So we obtain a smaller RDD to work in main memory.
filtered_rdd = final_index_flatten.filter(lambda term: term[0] in query)

#compute cosine similarity
norm_tfs_query = {}
candidates = {}
norms = {}
idf_query = {}
for word in query:
    tf_word = query.count(word)
    norm_tfs_query[word] = tf_word
    
    #we take only the docs with the current word
    tf_idf_vals = filtered_rdd.filter(lambda term : term[0] == word).map(lambda term: 
                                                (term[1], term[2], term[3], term[4])).collect()
    
    #we compute the scores
    for val in tf_idf_vals:
        docId = val[0]
        idf_score = val[1]
        tf_idf_score = val[2]
        norm = val[3]
        norms[docId] = norm
        
        idf_query[word] = idf_score
        
        if docId not in candidates:
            candidates[docId] = {word: [idf_score, tf_idf_score]}
        else:
            candidates[docId][word] = [idf_score, tf_idf_score]
    

The following step may seem tricky: in the previous code, if a word that appears in the query does not appear in a document, it is not considered. So we need to add its idf and tf-idf now by hand, since we did not compute those values for efficiency reasons.

In [32]:
for doc in candidates:
    for word in query:
        if word not in candidates[doc]:
            candidates[doc][word] = [idf_query[word], 0] #tf-idf is 0 since tf = 0

In [33]:
candidates

{696: {'back': [2.1036293512865596, 2.1036293512865596],
  'the': [0.47294229713824015, 0.47294229713824015],
  'to': [0.873080258190184, 0],
  'barracks!': [4.510169531720514, 0]},
 912: {'back': [2.1036293512865596, 2.1036293512865596],
  'to': [0.873080258190184, 0.873080258190184],
  'the': [0.47294229713824015, 0],
  'barracks!': [4.510169531720514, 0]},
 1656: {'back': [2.1036293512865596, 2.1036293512865596],
  'to': [0.873080258190184, 0],
  'the': [0.47294229713824015, 0],
  'barracks!': [4.510169531720514, 0]},
 2352: {'back': [2.1036293512865596, 2.1036293512865596],
  'the': [0.47294229713824015, 0.9458845942764803],
  'to': [0.873080258190184, 0],
  'barracks!': [4.510169531720514, 0]},
 4648: {'back': [2.1036293512865596, 2.1036293512865596],
  'the': [0.47294229713824015, 0.9458845942764803],
  'to': [0.873080258190184, 0],
  'barracks!': [4.510169531720514, 0]},
 5320: {'back': [2.1036293512865596, 2.1036293512865596],
  'to': [0.873080258190184, 0.873080258190184],
  '

We now compute the top scores among the candidate documents

In [34]:
top_scores = {}

for doc in candidates:
    query_tf_idf_scores = []
    cumulative_tf_idf = 0
    for word in candidates[doc]:
        
        query_tf_idf = norm_tfs_query[word]*candidates[doc][word][0]
        query_tf_idf_scores.append(query_tf_idf)
        
        doc_tf_idf = candidates[doc][word][1] 
        cumulative_tf_idf += query_tf_idf * doc_tf_idf
    
    
    query_norm = 0
    
    for elem in query_tf_idf_scores:
            query_norm += elem**2
            
        
    query_norm = math.sqrt(query_norm)
   
    top_scores[doc] = cumulative_tf_idf / (query_norm*norms[doc])
    

Any ideas on how to parallelize this process too? :)

In [35]:
top_scores

{696: 0.09511518927479116,
 912: 0.12044774196071045,
 1656: 0.10942826681472262,
 2352: 0.10388096054847297,
 4648: 0.10138032557219151,
 5320: 0.11628631608633884,
 6944: 0.10650110715196727,
 7944: 0.0999437376923831,
 10376: 0.10581096355044949,
 15064: 0.11506559134745291,
 17512: 0.10763382543408068,
 17592: 0.13494163357062341,
 17888: 0.07448295676820173,
 17952: 0.097056329706566,
 19336: 0.10146328343481899,
 22624: 0.050367062066299394,
 24168: 0.12018988713828752,
 24208: 0.0918809028832492,
 25472: 0.11536725782741689,
 25696: 0.09168136473875302,
 28192: 0.09358572869101883,
 28728: 0.20159069014565176,
 28816: 0.0930391959632418,
 29440: 0.09987074992572356,
 31584: 0.14287036971438496,
 1801: 0.08703390827977445,
 1969: 0.07999608150158664,
 2545: 0.22253480593224356,
 3161: 0.08836431864457578,
 4185: 0.09572894815022671,
 4993: 0.08587636187319427,
 5313: 0.12150253355847412,
 5449: 0.08905950752904677,
 5985: 0.09173586053908642,
 8281: 0.10818537563634033,
 12217: 0

We get now the top 5 similar documents.

In [36]:
top5 = list(sorted(top_scores.items(), key=lambda item: item[1], reverse = True))[:5]
print(top5)

[(30, 0.6994154521368598), (2545, 0.22253480593224356), (3062, 0.2185938672542448), (17405, 0.21274200534036225), (4818, 0.20657810509052843)]


We now retrieve the top 5 docs in a parallel fashion.

In [37]:
top5_rdd = sc.parallelize(top5, 2)
top5_rdd.collect() 

[(30, 0.6994154521368598),
 (2545, 0.22253480593224356),
 (3062, 0.2185938672542448),
 (17405, 0.21274200534036225),
 (4818, 0.20657810509052843)]

Since we have to make a join, we reverse our original dataset such to have docId->line (it was the opposite).

In [38]:
txt_reverse = txt.map(lambda line: (line[1], line[0]))
txt_reverse.take(10)

[(0, 'Ulysses by James Joyce'),
 (1, ''),
 (2, ''),
 (3, '    --  I -- '),
 (4, ''),
 (5, ''),
 (6, ''),
 (7,
  'STATELY, PLUMP BUCK MULLIGAN CAME FROM THE STAIRHEAD, bearing a bowl of'),
 (8,
  'lather on which a mirror and a razor lay crossed. A yellow dressinggown,'),
 (9, 'ungirdled, was sustained gently behind him by the mild morning air. He')]

We join our results to retrieve the top 5 documents from the original RDD (inverted).

In [39]:
results = top5_rdd.join(txt_reverse).sortBy(lambda item: item[1][0], False)

results.collect()

[(30, (0.6994154521368598, '-- Back to barracks! he said sternly.')),
 (2545, (0.22253480593224356, 'back anyhow.')),
 (3062, (0.2185938672542448, 'with her back to the fire too.')),
 (17405,
  (0.21274200534036225,
   'whatever he was doing to it and then he put it back and put his hands back')),
 (4818,
  (0.20657810509052843,
   'shook it again. Then he came back and put it back in the bucket. As you'))]