# Spark Fundamentals

Basic RDDs - Don't freak, these are just references for future needs
======================================

Common RDD Constructors
-----------------------

Expression                               |Meaning
----------                               |-------
`sc.parallelize(iterable)`               |Create RDD of elements of some iterable
`sc.textFile(path)`                      |Create RDD of lines from file

Common Transformations
----------------------

Expression                               |Meaning
----------                               |-------
`filter(boolean condition)`              |Returns for where some boolean condition is True
`map(some function)`                     |Applies some function
`flatMap(some function)`                 |Apply some function that returns an iterator and flatten the entire output
`sample(withReplacement=True, ratio)`    |Sample the data by some ratio
`distinct()`                             |Remove duplicates in RDD
`sortBy(key function, ascending=True)`   |Sort elements by key defined in function in designated order
`randomSplit([ratio1, ratio2], seed)`    |Splits your data into two depening on ratio array

Common Key Pair RDD Transformations
----------------------------------

Expression                               |Meaning
----------                               |-------
`groupByKey(key value rdd)`              |Collapse a key value RDD by the key, and keeps the values in a iterable
`reduceByKey(some function)`             |Collapse a key value RDD by the key, and combines the values by some function
`mapValues(some function)`               |Apply some function to the values of some key value RDD
`flatMapValues(some function)`           |Apply some function that turns a key and iterable value RDD into key value RDD
`keys()`                                 |Returns the keys of a key value RDD
`values()`                               |Returns the values of a key value RDD

Common Multiple RDD Transformations
----------------------------------

Expression                               |Meaning
----------                               |-------
`union(another rdd)`                     |Append another RDD to current RDD
`join(another rdd)`                      |Join another RDD to current RDD by matching keys
`leftOuterJoin(another rdd)`             |Join another RDD to current RDD where another RDD has matching keys
`rightOuterJoin(another rdd)`            |Join current RDD to other RDD where current RDD has matching keys
`zip(another rdd)`                       |Combines two RDD to form a key value pair RDD

Common Actions
--------------

Expression                             |Meaning
----------                             |-------
`collect()`                            |Convert RDD to in-memory list 
`take(n)`                              |First n elements of RDD 
`top(n)`                               |Top n elements of RDD
`takeSample(withReplacement=True, n)`  |Create sample of n elements with replacement
`sum()`                                |Find element sum (assumes numeric elements)
`mean()`                               |Find element mean (assumes numeric elements)
`stdev()`                              |Find element deviation (assumes numeric elements)
`takeOrdered(n, function)`             |Returns n ordered elements as sorted by the value returned by the function

### Step 1: import pyspark

In [1]:
import pyspark as ps

### Step 2: initialize a spark context (RDD manager)

In [2]:
sc = ps.SparkContext('local[*]')

### Step 3:  Construct an RDD with the data (we will be using churn.csv)

In [3]:
churn_rdd = sc.textFile('data/churn.csv')

### Step 4: Lets look at the first two lines to understand the format that textFile creates

In [53]:
churn_rdd.first()

"State,Account Length,Area Code,Phone,Int'l Plan,VMail Plan,VMail Message,Day Mins,Day Calls,Day Charge,Eve Mins,Eve Calls,Eve Charge,Night Mins,Night Calls,Night Charge,Intl Mins,Intl Calls,Intl Charge,CustServ Calls,Churn?"

In [14]:
print(churn_rdd.take(1))

["State,Account Length,Area Code,Phone,Int'l Plan,VMail Plan,VMail Message,Day Mins,Day Calls,Day Charge,Eve Mins,Eve Calls,Eve Charge,Night Mins,Night Calls,Night Charge,Intl Mins,Intl Calls,Intl Charge,CustServ Calls,Churn?"]


### Step 5: We need to split the data by commas.

In [8]:
def split_by_commas(text):
    vals = text.split(',')
    return (vals)

In [9]:
churn_split = churn_rdd.map(split_by_commas)

In [15]:
print(churn_split.take(5))
# question: based on what rule, does it make lists?

[['State', 'Account Length', 'Area Code', 'Phone', "Int'l Plan", 'VMail Plan', 'VMail Message', 'Day Mins', 'Day Calls', 'Day Charge', 'Eve Mins', 'Eve Calls', 'Eve Charge', 'Night Mins', 'Night Calls', 'Night Charge', 'Intl Mins', 'Intl Calls', 'Intl Charge', 'CustServ Calls', 'Churn?'], ['KS', '128', '415', '382-4657', 'no', 'yes', '25', '265.100000', '110', '45.070000', '197.400000', '99', '16.780000', '244.700000', '91', '11.010000', '10.000000', '3', '2.700000', '1', 'False.'], ['OH', '107', '415', '371-7191', 'no', 'yes', '26', '161.600000', '123', '27.470000', '195.500000', '103', '16.620000', '254.400000', '103', '11.450000', '13.700000', '3', '3.700000', '1', 'False.'], ['NJ', '137', '415', '358-1921', 'no', 'no', '0', '243.400000', '114', '41.380000', '121.200000', '110', '10.300000', '162.600000', '104', '7.320000', '12.200000', '5', '3.290000', '0', 'False.'], ['OH', '84', '408', '375-9999', 'yes', 'no', '0', '299.400000', '71', '50.900000', '61.900000', '88', '5.260000', '

In [16]:
headers = churn_split.first()

In [17]:
headers

['State',
 'Account Length',
 'Area Code',
 'Phone',
 "Int'l Plan",
 'VMail Plan',
 'VMail Message',
 'Day Mins',
 'Day Calls',
 'Day Charge',
 'Eve Mins',
 'Eve Calls',
 'Eve Charge',
 'Night Mins',
 'Night Calls',
 'Night Charge',
 'Intl Mins',
 'Intl Calls',
 'Intl Charge',
 'CustServ Calls',
 'Churn?']

In [18]:
churn_split = churn_split.filter(lambda x: x!= headers)

In [29]:
churn_split.take(2)

[['KS',
  '128',
  '415',
  '382-4657',
  'no',
  'yes',
  '25',
  '265.100000',
  '110',
  '45.070000',
  '197.400000',
  '99',
  '16.780000',
  '244.700000',
  '91',
  '11.010000',
  '10.000000',
  '3',
  '2.700000',
  '1',
  'False.'],
 ['OH',
  '107',
  '415',
  '371-7191',
  'no',
  'yes',
  '26',
  '161.600000',
  '123',
  '27.470000',
  '195.500000',
  '103',
  '16.620000',
  '254.400000',
  '103',
  '11.450000',
  '13.700000',
  '3',
  '3.700000',
  '1',
  'False.']]

### Step 6: Extract the headers

In [6]:
#

### Step 7: Remove the header from the data

In [7]:
#

### Step 8: Finding total churn

In [63]:
(churn_split.map(lambda x: x[-1]=='True.'))

PythonRDD[95] at RDD at PythonRDD.scala:53

### Step 9: Finding total churn per State

In [45]:
churn_split.map(lambda x : (x[0],x[-1]=="True."))\
.reduceByKey(lambda x,y: x+y).take(10)

[('KS', 13),
 ('OH', 10),
 ('MO', 7),
 ('LA', 4),
 ('WV', 10),
 ('RI', 6),
 ('NY', 15),
 ('VA', 5),
 ('TX', 18),
 ('CO', 9)]

### Step 10: Finding average Customer Service Calls per Churn for each State

In [46]:
headers

['State',
 'Account Length',
 'Area Code',
 'Phone',
 "Int'l Plan",
 'VMail Plan',
 'VMail Message',
 'Day Mins',
 'Day Calls',
 'Day Charge',
 'Eve Mins',
 'Eve Calls',
 'Eve Charge',
 'Night Mins',
 'Night Calls',
 'Night Charge',
 'Intl Mins',
 'Intl Calls',
 'Intl Charge',
 'CustServ Calls',
 'Churn?']

In [None]:
def casting_function(X):
    return int(X[0]), X[1], int(X[2]), X[3], int(X[4]), float(X[5])

(
    churn_rdd.map(lambda x: (x[0], (int(x[-2]), x[-1] != 'False.')))
             .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
             .mapValues(lambda x: x[0] / x[1])
             .take(20)
)

In [61]:
(churn_split.map(lambda x: (x[0],(x[-1]=='True.',int(x[-2])))\
.reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1]))\
#.mapValues(lambda x: x[0]/x[1]))
               ).take(20))

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 45.0 failed 1 times, most recent failure: Lost task 0.0 in stage 45.0 (TID 59, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/anaconda3/envs/pyspark_env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/anaconda3/envs/pyspark_env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/anaconda3/envs/pyspark_env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/anaconda3/envs/pyspark_env/lib/python3.7/site-packages/pyspark/rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
  File "/anaconda3/envs/pyspark_env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-61-149dd8a0523d>", line 1, in <lambda>
AttributeError: 'tuple' object has no attribute 'reduceByKey'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.api.python.PythonRDD$.runJob(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD.runJob(PythonRDD.scala)
	at sun.reflect.GeneratedMethodAccessor72.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/anaconda3/envs/pyspark_env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
    process()
  File "/anaconda3/envs/pyspark_env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/anaconda3/envs/pyspark_env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 393, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/anaconda3/envs/pyspark_env/lib/python3.7/site-packages/pyspark/rdd.py", line 1354, in takeUpToNumLeft
    yield next(iterator)
  File "/anaconda3/envs/pyspark_env/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
    return f(*args, **kwargs)
  File "<ipython-input-61-149dd8a0523d>", line 1, in <lambda>
AttributeError: 'tuple' object has no attribute 'reduceByKey'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:592)
	at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:575)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$class.foreach(Iterator.scala:891)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
	at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.api.python.PythonRDD$$anonfun$3.apply(PythonRDD.scala:153)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


#### Joins

In [11]:
#cust_rdd = 


In [12]:
#state_rdd = 

In [13]:
# Join both:

### Caching RDDs to leverage the in memory usage

In [14]:
cached_churn_rdd = churn_rdd.persist()

### Practice #1: What's the min, mean, and max night charge for users that churned?

### Practice #2: How many of the churned users have Vmail plan?

### Practice #3: Which state has the most day calls?