## PySpark Friends of Friends

We are going to reproduce the Friends of Friends Hadoop example in Spark.
First things first, let's run it as a map/reduce program in Spark.

Node A with neighbors B and C propose candidate triples to it's neighbors
* B A C to node B (A<C, else B C A)
* C A B to node C (A<B else C B A)

All triples will get two proposals from it's neighbors and reduce them. If there are two matching proposals, we have a triple.

#### Map/Reduce Implementation

The code below `lines_to_triples` implements the proposal to be run in `flatMap`.

In [1]:
import numpy as np

def line_to_triples(line: str):
    fids = np.array(line.split(), dtype=int)
    ret = []
    for i in range(1, len(fids) - 1):
        for j in  range(i + 1, len(fids)):
            source = fids[0]
            fi, fj = fids[i], fids[j]
            if source < fi:
                ret.append([fj, source, fi])
            else:
                ret.append([fj, fi, source])
            if source < fj:
                ret.append([fi, source, fj])
            else:
                ret.append([fi, fj, source])
    return ret    

Simple test to show what it does.

In [2]:
line_to_triples ("1 5 8 7 9")

[[8, 1, 5],
 [5, 1, 8],
 [7, 1, 5],
 [5, 1, 7],
 [9, 1, 5],
 [5, 1, 9],
 [7, 1, 8],
 [8, 1, 7],
 [9, 1, 8],
 [8, 1, 9],
 [9, 1, 7],
 [7, 1, 9]]


Now, complete the program. You will have to use the wordcount style `<triple>, 1` to get a simpler reducer to work.

In [3]:
from pyspark import SparkContext

inputdir = "../data/simple.input"
outdir = "/tmp/outputsimple.mre"

from pyspark import SparkContext
sc = SparkContext("local", "App Name",)
rdd = sc.textFile(f"{inputdir}/*")
rdd = rdd.flatMap(line_to_triples)
### TODO add a WordCount style reducer
rdd = rdd.map(lambda x: (str(x), 1))
rdd = rdd.reduceByKey(lambda x, y: x + y)
### TODO Identify triples of count >=2
rdd = rdd.filter(lambda x: x[1]>1)
### TODO format output to just triples
rdd = rdd.map(lambda x: x[0])
rdd.saveAsTextFile(outdir)

sc.stop()             

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/15 22:08:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

In [16]:
sc.stop()

#### Join Implementation

In previous years when we ran this on distributed memory, we found that an implementation using a `join` was faster. This is because join has efficient implementation (based on a hash map) that do not have to send all the triples around. The concept of the program is:
* output all triples (same as before)
* add an partition number to each triple
* perform a self-join on the triples with partition numbers
* use the partition index to identify when the triples came from different lists
    * self-join will produce joined triples from the same partition 
* output triples when two proposals came from different partitions
    * i.e. two proposal from different friends lists
    * you need to avoid additional output when there is no match
    
This is a pretty different usage of Spark.  The function `mapPartitions` allows the programmer to write functions that apply to an entire partition, rather than each individual element of an RDD.  The function `mapPartitionsWithIndex` makes the partition identifier available to differentiate behavior, analagous to have the thread ID in OpenMP.

The following helper function will add an index to the triple.

In [17]:
### add a parition index to each triple.
def add_index (idx, part):
    for p in part:
        yield str(p), str(idx)

You will need to write a function to identify when the joined triples have different indexes.

In [6]:
def filter_diff_idx (x):
    
    idxs = set()
    for i in x[1]:
        idxs.add(i)
    if len(idxs) > 1:
        return True
    else:
        return False


In [7]:
from pyspark import SparkContext

inputdir = "../data/simple.input"
outdir = "/tmp/output.join"

from pyspark import SparkContext
sc = SparkContext("local", "App Name",)
rdd = sc.textFile(f"{inputdir}/*")
rdd = rdd.flatMap(line_to_triples)
rdd = rdd.mapPartitionsWithIndex(add_index)
# TODO self join
rdd = rdd.join(rdd)
# TODO filter out matches from same partition
rdd = rdd.filter(lambda x: x[1][0] != x[1][1])
# TODO cleanup output
...
rdd.saveAsTextFile(outdir)

sc.stop()

                                                                                

The following `sc.stop()` can be called alone to clean up crashed environments.

In [8]:
sc.stop()

It turns out (we will see below) that this version is not faster on shared-memory. It does a lot more computation and the reduction in traffic is not as important, because memory is faster than networking.


### List Merge Version

A better idea might be to use the memory of Spark to make things faster. We are going to make the assumption that each Spark worker can hold two entire friends lists at once.  So this idea here is:
* merge pairs of lists representing, e.g. from 100 and 200, into an RDD. You should generate an RDD with entries like:
    * `'100, 200', array([300, 319, 400])` from 100  
    * `'100, 200', array([219, 300, 400])` from 200
    * note that we sorted friends in the key so that the keys will match.
    * also note that list only contains the remaining friends, i.e. not 100 or 200
* group these lists together
    * '100, 200', [array([219, 300, 400]), array([300, 319, 400])]`
* when you have two arrays in the value, compute their intersection
    * '100, 200', [300, 400]
* output the corresponding triples
    * be careful here. it's subtle to get the output right.
    
You have write any helper functions that you need. I'm including prototypes for the ones I used.

In [9]:
### Output the remaining list of friends for each friend.
def pair_lists(line: str):
    # TODO
    pass

In [10]:
pair_lists ("2 1 5 8 7 9")

Sample output from pair_list
```
[['1, 2', array([5, 8, 7, 9])],
 ['2, 5', array([1, 8, 7, 9])],
 ['2, 8', array([1, 5, 7, 9])],
 ['2, 7', array([1, 5, 8, 9])]]
```

In [11]:
### Output the appropriate triples after intersection.
def output_triples(x):
    output = []
    # TODO
    return output
        

In [12]:
sc.stop()

In [13]:
from pyspark import SparkContext

inputdir = "../data/simple.input"
outdir = "/tmp/output.merge"

from pyspark import SparkContext
sc = SparkContext("local", "App Name",)
rdd = sc.textFile(f"{inputdir}/*")
rdd = rdd.flatMap(pair_lists)
# group by and turn into lists
rdd = rdd.groupByKey().mapValues(list)
# keep only keys with two inputs
rdd = rdd.filter(lambda x: len(x[1])==2)
# TODO intersect the two lists
...
rdd = rdd.flatMap(output_triples)
rdd.saveAsTextFile(outdir)

sc.stop()

23/11/15 22:08:58 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 830, in main
    process()
  File "/opt/conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 820, in process
    out_iter = func(split_index, iterator)
  File "/opt/conda/lib/python3.10/site-packages/pyspark/rdd.py", line 5405, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/conda/lib/python3.10/site-packages/pyspark/rdd.py", line 5405, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/conda/lib/python3.10/site-packages/pyspark/rdd.py", line 828, in func
    return f(iterator)
  File "/opt/conda/lib/python3.10/site-packages/pyspark/rdd.py", line 4156, in combine
    merger.mergeValues(iterator)
  File "/opt/conda/lib/python3.10/s

Py4JJavaError: An error occurred while calling o164.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:106)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1091)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1089)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1062)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1027)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$3(PairRDDFunctions.scala:1009)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1008)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$2(PairRDDFunctions.scala:965)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:963)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$2(RDD.scala:1593)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1593)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1579)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1579)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:564)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:563)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (991fc5c4f86c executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 830, in main
    process()
  File "/opt/conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 820, in process
    out_iter = func(split_index, iterator)
  File "/opt/conda/lib/python3.10/site-packages/pyspark/rdd.py", line 5405, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/conda/lib/python3.10/site-packages/pyspark/rdd.py", line 5405, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/conda/lib/python3.10/site-packages/pyspark/rdd.py", line 828, in func
    return f(iterator)
  File "/opt/conda/lib/python3.10/site-packages/pyspark/rdd.py", line 4156, in combine
    merger.mergeValues(iterator)
  File "/opt/conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/shuffle.py", line 256, in mergeValues
    for k, v in iterator:
TypeError: 'NoneType' object is not iterable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2284)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2316)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:83)
	... 51 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 830, in main
    process()
  File "/opt/conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 820, in process
    out_iter = func(split_index, iterator)
  File "/opt/conda/lib/python3.10/site-packages/pyspark/rdd.py", line 5405, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/conda/lib/python3.10/site-packages/pyspark/rdd.py", line 5405, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/opt/conda/lib/python3.10/site-packages/pyspark/rdd.py", line 828, in func
    return f(iterator)
  File "/opt/conda/lib/python3.10/site-packages/pyspark/rdd.py", line 4156, in combine
    merger.mergeValues(iterator)
  File "/opt/conda/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/shuffle.py", line 256, in mergeValues
    for k, v in iterator:
TypeError: 'NoneType' object is not iterable

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:767)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:749)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


### Timings

Run on the full dataset to compare performance.

I've removed the code, but I've left the timing information for your reference.
  * M/R OK
  * Join slowest
  * Merge fastest
Conclusion: different implementations better for different architectures.

##### Map/Reduce version

In [None]:
%%timeit -n1 -r1
from pyspark import SparkContext

inputdir = "../data/fof.input"
outdir = "/tmp/outputfof.mr"

from pyspark import SparkContext
sc = SparkContext("local", "App Name",)
rdd = sc.textFile(f"{inputdir}/*")
rdd = rdd.flatMap(line_to_triples)
...
rdd.saveAsTextFile(outdir)

sc.stop()                

#### Join version

In [None]:
%%timeit -n1 -r1
from pyspark import SparkContext

inputdir = "../data/fof.input"
outdir = "/tmp/outputfof.join"

from pyspark import SparkContext
sc = SparkContext("local", "App Name",)
rdd = sc.textFile(f"{inputdir}/*")
rdd = rdd.flatMap(line_to_triples)
...
rdd.saveAsTextFile(outdir)

sc.stop()

#### List merge version

In [None]:
%%timeit -n1 -r1
from pyspark import SparkContext

inputdir = "../data/fof.input"
outdir = "/tmp/outputfof.merge"

from pyspark import SparkContext
sc = SparkContext("local", "App Name",)
rdd = sc.textFile(f"{inputdir}/*")
...
rdd.saveAsTextFile(outdir)

sc.stop()