### Spark RDD notes

Link to follow: https://data-flair.training/blogs/spark-rdd-tutorial/

#### what are RDDs and what are their features?

RDDs stand for Resilient Distributed Datasets. 
- Key abstraction for the Spark ecosystem
- Fault tolerant (recover with the help of a DAG)
- meant for distributed computing (computes on multiple nodes)
- immutable (coarse grained transformations)

Features of RDD:
- in memory computation
- lazy eval
- fault tolerance
- immutability (safe to share across process!)
- partitioning - tell what data goes to which node
- coarse grained opertaions
- location stickiness (vvv imp! DAG scheduler will arrange the partitions on the disks such that its quicker for the disks to fetch data)

#### RDD vs Distributed Shared Memory (DSM)

* note: coarse-grained transformations mean we can apply a transformation to an entire column rather than individual element. The later would be fined grained

* what is "straggler mitigation"? when all parallel working nodes are finally supposed to sync (say a reduce operation) it could happen that one of the nodes is making all other nodes wait and that node is known as the straggler node. Straggler mitigation is a strategy to deal with that node

RDD | DSM
----|----
Read is coarse grained | read is fine grained
write is coarse grained | write is coarse grained
fault recovery using lineage graph | fault recovery using check pointing
Straggler mitigation done using backup tasks | hard to achieve straggler mitigation
spill into disk if low on RAM | sys performance reduces if low on RAM


In [1]:
from pyspark.sql import SparkSession
# remember that spark session is the only thing you need to connect to other contexts such as
# sql, spark, streaming, mlib, hive etc

In [2]:
# create sess
spark = SparkSession.builder.appName('RDD-practice').master('local').getOrCreate()

# RDD Transformations

A transformation action on an RDD will only result in another RDD. Both will be immutable. It is important to recognize two types of transformations!
* Narrow Transformations
* Wide Transformations

### Narrow transformations:
These are transformations that only happen within a partition. Imagine you have a RDD partitioned into 5 parts. A filter operation on them can hypothetically work on a single partition. The nature of this transformation doesn't depend on looking at other partitions.

Example operations:
* Map/Flatmap/Map partitions
* filter
* sample
* union

### Wide transformations:
These are transformations that need to look at many other partitions to complete the transformations. Example could be a shuffle operation or a groupby operation

Example operations:
* intersection
* groupby key
* reduce by key
* join
* cartesian

# Actions
Actions execute the DAG and computes the stored operations. Take(), count(), reduce(), collect() etc are action ops

### Creating an RDD

Three ways:
* Parallelize the RDDs
* Read from a file
* Transfrom from another RDD 

# Method 1 parallelize

Note that in general an RDD can contain any data collection.

In [4]:
data = [("Num"+str(i), i) for i in range(10)]
rdd1 = spark.sparkContext.parallelize(data)

In [6]:
rdd1.count()

10

In [10]:
# To print the RDD
rdd1.collect()

[('Num0', 0),
 ('Num1', 1),
 ('Num2', 2),
 ('Num3', 3),
 ('Num4', 4),
 ('Num5', 5),
 ('Num6', 6),
 ('Num7', 7),
 ('Num8', 8),
 ('Num9', 9)]

In [11]:
def f1():
    print("test func")

def f2():
    print("test func2")

rdd2 = spark.sparkContext.parallelize([f1, f2])

In [16]:
# can even parallelize functions!
rdd2.collect()[1]()

test func2


# Method 2 Read from HDFS/file store

Two step process:
* Read csv/parquet into Dataframe class
* Use object.rdd to convert into RDD

In [17]:
rdd_file = spark.read.csv("/home/shiv/projects/exide/datasets/tdf-prod-jul-18-5k.csv", inferSchema=True)

In [18]:
rdd_file.head()

Row(_c0='policyNumber', _c1='clientNumber', _c2='agentNumber', _c3='productCode', _c4='firstPremiumMode', _c5='policyPaymentFrequency', _c6='amlCategory', _c7='policyPremium', _c8='policyStartDate', _c9='op_uwDecision', _c10='admitted', _c11='claimStatus', _c12='pEducation', _c13='pCity', _c14='pGender', _c15='pMaritalStatus', _c16='pOccupation', _c17='pState', _c18='pPin', _c19='pCountry', _c20='pDOB', _c21='agentBranchCode', _c22='agentClass', _c23='agentType', _c24='agentTimeAtExide', _c25='agentTimeToLicenseExpiry', _c26='cabrPartnerRefName', _c27='cabrBranch', _c28='productMainCategory', _c29='agentDesignation', _c30='addressProof', _c31='oldClass', _c32='newClass', _c33='op_classChange', _c34='insuredClientNumber', _c35='iSmokingStatus', _c36='agentBranchState', _c37='policyChannel', _c38='policyChannelDescription', _c39='agentBankKey', _c40='agentBank', _c41='agentBankBranch', _c42='agentBankCity', _c43='agentBankState', _c44='agentsClientNumber', _c45='agentDOB', _c46='agentGen

In [19]:
type(rdd_file)

pyspark.sql.dataframe.DataFrame

In [20]:
# to convert to a dataframe just do the following:
df_as_rdd = rdd_file.rdd

In [21]:
type(df_as_rdd) # tada

pyspark.rdd.RDD

# Caching and Persistence

* Cache() - memory only
* Persist() - define various levels of persistence

You would want to persist an RDD because in memory operations are faster due to fewer I/O operations

In [25]:
# this is how you persist/cache
print(rdd1.is_cached)
rdd1.cache()
print("RDD cache status", rdd1.is_cached)

False
RDD cache status True


In [26]:
# to unpersist or uncache()
rdd1.unpersist()
print(rdd1.is_cached)

False


# Paired RDDs

RDDs which have a simple key-value structure are called as paired RDDs. 

Why are them imp?
* Cuz you can combine them across a network or across nodes because of unique keys. 

rdd reduceByKey() and join() work by looking at the keys

### Creating a paired RDD

In [29]:
# the lambda can be any function that returns a value
paired_rdd = rdd1.map(lambda x: (x, len(x)))

In [28]:
paired_rdd.collect()

[(('Num0', 0), 2),
 (('Num1', 1), 2),
 (('Num2', 2), 2),
 (('Num3', 3), 2),
 (('Num4', 4), 2),
 (('Num5', 5), 2),
 (('Num6', 6), 2),
 (('Num7', 7), 2),
 (('Num8', 8), 2),
 (('Num9', 9), 2)]

In [32]:
p_rdd2 = rdd1.map(f1)

In [35]:
p_rdd2.collect() # this will error out cuz this returns nothing. This is a potential issue of lazy evaluation

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 15.0 failed 1 times, most recent failure: Lost task 0.0 in stage 15.0 (TID 15, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/shiv/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/home/shiv/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/shiv/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
TypeError: f1() takes 0 positional arguments but 1 was given

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:938)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/shiv/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/home/shiv/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/home/shiv/spark-2.3.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
TypeError: f1() takes 0 positional arguments but 1 was given

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:939)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:109)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


# Map and Reduce ke beech mai ka antar

In [42]:
# group by key will repartition all the data such that things with similar keys are together
paired_rdd.groupByKey().collect()

[(('Num0', 0), <pyspark.resultiterable.ResultIterable at 0x7ff7051ed748>),
 (('Num1', 1), <pyspark.resultiterable.ResultIterable at 0x7ff7051ed048>),
 (('Num2', 2), <pyspark.resultiterable.ResultIterable at 0x7ff7051ed898>),
 (('Num3', 3), <pyspark.resultiterable.ResultIterable at 0x7ff7051ed7b8>),
 (('Num4', 4), <pyspark.resultiterable.ResultIterable at 0x7ff7051ed978>),
 (('Num5', 5), <pyspark.resultiterable.ResultIterable at 0x7ff7051ed9e8>),
 (('Num6', 6), <pyspark.resultiterable.ResultIterable at 0x7ff7051eda58>),
 (('Num7', 7), <pyspark.resultiterable.ResultIterable at 0x7ff7051ed8d0>),
 (('Num8', 8), <pyspark.resultiterable.ResultIterable at 0x7ff7051edb38>),
 (('Num9', 9), <pyspark.resultiterable.ResultIterable at 0x7ff7051edba8>)]

In [43]:
# better example here
d1 = [("Num"+str(i), i) for i in range(10)]
d2 = [("Num"+str(i), i) for i in range(3,12)]
rdd3 = spark.sparkContext.parallelize(d1+d2)

In [45]:
# WIll ideally have repeated keys
rdd3.collect()

[('Num0', 0),
 ('Num1', 1),
 ('Num2', 2),
 ('Num3', 3),
 ('Num4', 4),
 ('Num5', 5),
 ('Num6', 6),
 ('Num7', 7),
 ('Num8', 8),
 ('Num9', 9),
 ('Num3', 3),
 ('Num4', 4),
 ('Num5', 5),
 ('Num6', 6),
 ('Num7', 7),
 ('Num8', 8),
 ('Num9', 9),
 ('Num10', 10),
 ('Num11', 11)]

#### Assumption: Is the key element always the 0th element in the collection?

In [47]:
# THis has now removed all the duplicates and grouped the RDD by its key 
rdd3.groupByKey().collect()

[('Num0', <pyspark.resultiterable.ResultIterable at 0x7ff7051e5be0>),
 ('Num1', <pyspark.resultiterable.ResultIterable at 0x7ff7051e5f98>),
 ('Num2', <pyspark.resultiterable.ResultIterable at 0x7ff7051e5a58>),
 ('Num3', <pyspark.resultiterable.ResultIterable at 0x7ff7051e5c88>),
 ('Num4', <pyspark.resultiterable.ResultIterable at 0x7ff7051e5278>),
 ('Num5', <pyspark.resultiterable.ResultIterable at 0x7ff7051e5390>),
 ('Num6', <pyspark.resultiterable.ResultIterable at 0x7ff7051e5320>),
 ('Num7', <pyspark.resultiterable.ResultIterable at 0x7ff7051e51d0>),
 ('Num8', <pyspark.resultiterable.ResultIterable at 0x7ff7051e5c18>),
 ('Num9', <pyspark.resultiterable.ResultIterable at 0x7ff7051e5ba8>),
 ('Num10', <pyspark.resultiterable.ResultIterable at 0x7ff7051e5438>),
 ('Num11', <pyspark.resultiterable.ResultIterable at 0x7ff7051e5518>)]

In [49]:
grouped = rdd3.groupByKey()

In [51]:
# note that the type is a pipelinedRDD
# pipelinedRDD will have different properties as compared to a normal RDD
type(grouped)

pyspark.rdd.PipelinedRDD

#### reduceByKey and other similar functions

In [59]:
rdd3.collect()[:3]

[('Num0', 0), ('Num1', 1), ('Num2', 2)]

In [80]:
# reduceByKey syntax:
# you pass in a lambda which has an accumulator and the value
# YOur function must be defined as a continous operation on the accumulator
# Cuz thats what a reduce operation does! It reduces data accross keys and for that it of course has to use an
# accumulator
interm = rdd3.reduceByKey(lambda accumulator, value: accumulator*value)

In [81]:
type(interm)

pyspark.rdd.PipelinedRDD

In [82]:
# Taaa Fucking Daaa!!!!! 
# in general, your keys will be retained and all you have to do to reduce is 
interm.collect()

[('Num0', 0),
 ('Num1', 1),
 ('Num2', 2),
 ('Num3', 9),
 ('Num4', 16),
 ('Num5', 25),
 ('Num6', 36),
 ('Num7', 49),
 ('Num8', 64),
 ('Num9', 81),
 ('Num10', 10),
 ('Num11', 11)]

### An example of my own to see if I have gotten a hang of pySpark eco system