### Apache Spark

* Apache Spark is an open-source, distributed processing system used for big data workloads.


* It's an enhancement to Hadoop's MapReduce.
  * Processes and retains data in memory for subsequent steps,
  * For smaller workloads, Spark’s data processing speeds are up to 100x faster than Hadoop's MapReduce.

* Spark was written in Scala
  * Runs on the JVM
 
* Includes libraries to support SQL queries, machine learning (MLlib), graph data analysis (GraphX) and streaming data analysis.
  * Tools for pipeline construction and evaluation

* Enahanced security.

* Ideal for real-time processing as it utilizes in-memory caching and optimized query execution for fast queries against data of any size.  

* Provides more operators other than map and reduce
  * Plethora of functions for SQL-like operation, ML and working with graph data
  * Over 80 high level operators beyond Map and Reduce.

### Spark and Function Programming

* Spark uses functional programming to manipulate data
  * Pradigm used in some popular languages such as Common Lisp, Scheme and Clojure, OCaml and Haskell
  
* Functional programming is a data oriented paradigm.
  * Functional programming decomposes a problem into a set of functions.
  * Logic is implemented by applying and composing functions.

* It's centered around the fact that data should be manipulated by functions without maintaining any external state.
  * No global variables

  * The above defines exactly what lambda functions do.
 
* In functional programming, we need to always return new data instead of manipulating the data in-place.

### Spark Operations and RDDs

* Spark Provides its own distributed data framework called resilient distributed datasets or RDDs.
   * RDD is an abstraction that represents a read-only collection of objects that are partitioned across machines.   
  * RDDs are fault tolerant and are accessed via parallel operations.

* RDDs are cached in memory, making it efficient to iterate ont he same data
  * Ideal for operations such as optimization or some ML algorithms
  * Fast operation speed makes it ideal for command-line-based queries.

### Core Components of Spark

![](https://www.dropbox.com/s/azebxe8nv5nsqne/spark_architecture.png?dl=1)

* **Spark Core**
  * Basic functionality:
    * APIs that define RDDs
    * operations and actions to process RDDs
    
* **Spark SQL**
    * APIs to interact with Apache Hive's variant of SQL called Hive Query Language (HiveQL).
    * DB tables are RDD and Spark SQL queries are transformed into Spark operations

* **Spark Streaming**
    * Enables the processing and manipulation of live streams of data.
    
* **MLlib**
  * Implementation of machine learning algorithms using Spark on RDDs
  * Basic algorithms for classifications, regressions,

* **GraphX
  * Functionality for manipulating graphs and performing parallel graph operations and computations
  * A sort of large-scale Neo4J
 

### Spark Paradigm

A Spark program typically follows a simple paradigm:


![](https://www.dropbox.com/s/do918x5bpeh8oh4/cluster_mode.png?dl=1)


1. The main program is the `driver`
2. The program has one or more workers, called executors,
  * Those run code sent to them by the driver on their partitions of the RDD
3. Results are then sent back to the driver for aggregation or compilation.

* in local mode (or Spark in-process), the executor is the same at the driver (my laptop here)

* See the cluster mode document for more details.

https://spark.apache.org/docs/latest/cluster-overview.html

### Setting a Docker Cluster

* Manually installing Spark and all its components can be a daunting task.
 * Manually deploying, configure and optimizing a Spark is complex and time consuming.
* Easy to use Docker to install locally
  * We will use the following Docker image
jupyter/all-spark-notebook
* There are other docker images, including (jupyter/pyspark-notebook), which does not includ the jobs dashboard `http://localhost:4040`

```
docker run --rm -p 4040:4040 -p 8888:8888 -v $(pwd):/home/jovyan/work jupyter/all-spark-notebook
```

* This configuration created a master and compute nodes locally in a docker instance
 
* While you're probably not going to need to, you can log into the running container using: `docker exec -it <CONTAINER_ID> bash`

where <CONTAINER_ID> of the container currently running the `jupyter/all-spark-notebook` image

### Spark Execution using PySpark


* Interact with the Scala Interface using the PySpark Python library
  * Wrapper that uses almost exactly the same function and attribute names

The Docker instance has all the libraries installed and ready to go.

* Make sure you run a Jupyer notebook on the Docker instnace
  * If the code below fails, this means you're not running in the Docker instance


In [37]:
from pyspark import SparkContext
# sc.stop()
sc = SparkContext()


21/09/22 22:28:27 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [38]:
SparkContext?

In [3]:
print(f"Spark version is {sc.version}")

print(f"Phthon version is {sc.pythonVer}")

print(f"The name of the master is {sc.master}")


Spark version is 3.1.2
Phthon version is 3.9
The name of the master is local[*]


In [4]:
sc.getConf().getAll()


[('spark.app.startTime', '1632265750213'),
 ('spark.driver.port', '36283'),
 ('spark.app.id', 'local-1632265751791'),
 ('spark.driver.host', '804949680baf'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.driver.extraJavaOptions',
  '-Dio.netty.tryReflectionSetAccessible=true'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.executor.extraJavaOptions',
  '-Dio.netty.tryReflectionSetAccessible=true'),
 ('spark.ui.showConsoleProgress', 'true')]

### Creating a Text RDD

* You can create RDDs in a number of ways:

    * `parallelize()`: function to transform Python collections (list-like data structures) into RDDs
    * Distributes the passed list and makes it fault-tolerant.

* Another easy way to create RDDs is to read in a file with `textFile()`

* Creates an RDD where every object is a line of the input text file

* Once an RDD is created, we can access its `map`, `reduce` and `filter` methods
 * Those operations and other we will cover are called 'transformations'
 * A transformation on a RDD yields a new RDD
 * `flatMap` is also commonly used and is equivalent to `itertools.chain()`
 
* To get (see) results, we need to perform an 'action' on an RDD
  * Some actions like `take()`, `takeSample` and `count()`, `mean()` etc., are needed to return data.
 
* More on the distinction between transformations and actions later 

In [32]:
# Insstall using the following if not already installed 
!pip install randomuser
from randomuser import RandomUser

# Generate a single user
user = RandomUser({"nat": "us"})
print(f"user object  is {user}")
def get_user_info(u):

    user_dict = {
        "user_id": u.get_id()["number"], 
        "first_name": u.get_first_name(), 
        "last_name": u.get_last_name(), 
        "state": u.get_state(),
        "zip": u.get_zipcode(),
        "lat_long": u.get_coordinates()
    }
    return user_dict

user_json = get_user_info(user)
print(f"user json representation  is")
user_json


user object  is <randomuser.RandomUser object at 0x7f0230456910>
user json representation  is


{'user_id': '468-88-4219',
 'first_name': 'Aubree',
 'last_name': 'Hunt',
 'state': 'Ohio',
 'zip': 45039,
 'lat_long': {'latitude': '55.0586', 'longitude': '13.8443'}}

In [33]:
my_users = RandomUser.generate_users(5000, {"nat": "us"})
print(len(my_users))
my_users[0:3]

5000


[<randomuser.RandomUser at 0x7f02304560d0>,
 <randomuser.RandomUser at 0x7f0230456610>,
 <randomuser.RandomUser at 0x7f0230456430>]

In [34]:
# Generate a list of 10 random users

user_dicts = list(map(get_user_info, my_users))
user_dicts[0:3]

[{'user_id': '895-76-0473',
  'first_name': 'Christopher',
  'last_name': 'Morgan',
  'state': 'Nebraska',
  'zip': 73093,
  'lat_long': {'latitude': '6.4442', 'longitude': '-78.5063'}},
 {'user_id': '218-53-2453',
  'first_name': 'Riley',
  'last_name': 'Franklin',
  'state': 'Indiana',
  'zip': 43053,
  'lat_long': {'latitude': '86.3108', 'longitude': '81.9157'}},
 {'user_id': '290-41-4495',
  'first_name': 'Tammy',
  'last_name': 'Gutierrez',
  'state': 'Delaware',
  'zip': 11862,
  'lat_long': {'latitude': '10.2281', 'longitude': '-141.5519'}}]

In [35]:
# Write the users to a json file such that each line is a separate, self-contained valid JSON object.
# This is a special JSON format callednewline-delimited JSON.


import json
f = open("data/random_user_dicts.json", "w")
for o in user_dicts:
    o["lat_long"]["latitude"] = float(o["lat_long"]["latitude"])
    o["lat_long"]["longitude"] = float(o["lat_long"]["longitude"])

    json.dump(o, f)
    f.write("\n")
f.close()


In [39]:
users_rdd = sc.parallelize(user_dicts)
users_rdd_size  = users_rdd.count()
print(f"The number of objects in my RDD is: {users_rdd_size}")
users_rdd.takeSample(False, 3)

                                                                                

The number of objects in my RDD is: 5000


[{'user_id': '214-08-0103',
  'first_name': 'Joe',
  'last_name': 'Black',
  'state': 'Delaware',
  'zip': 57903,
  'lat_long': {'latitude': 62.858, 'longitude': 109.5403}},
 {'user_id': '327-55-8527',
  'first_name': 'Hailey',
  'last_name': 'Shaw',
  'state': 'New Mexico',
  'zip': 82415,
  'lat_long': {'latitude': 25.672, 'longitude': 62.809}},
 {'user_id': '381-98-6816',
  'first_name': 'Walter',
  'last_name': 'Perez',
  'state': 'Ohio',
  'zip': 99746,
  'lat_long': {'latitude': 88.8267, 'longitude': 15.5566}}]

In [15]:
select_users_rdd = users_rdd.filter(lambda x: x['state'] in ["Nebraska", "Hawaii", "Idaho"])
select_users_rdd

PythonRDD[8] at RDD at PythonRDD.scala:53

In [16]:
# collec the result means grab them from all the chunk nodes
select_users_rdd.collect()

[{'user_id': '816-75-8167',
  'first_name': 'Mason',
  'last_name': 'Patterson',
  'state': 'Idaho',
  'zip': 64947},
 {'user_id': '994-86-8073',
  'first_name': 'Ida',
  'last_name': 'Carpenter',
  'state': 'Idaho',
  'zip': 86327},
 {'user_id': '380-06-4689',
  'first_name': 'Joe',
  'last_name': 'Ray',
  'state': 'Idaho',
  'zip': 42562},
 {'user_id': '160-51-2868',
  'first_name': 'Nicole',
  'last_name': 'Hill',
  'state': 'Hawaii',
  'zip': 73687},
 {'user_id': '990-61-6010',
  'first_name': 'Avery',
  'last_name': 'Webb',
  'state': 'Idaho',
  'zip': 53372},
 {'user_id': '842-56-6565',
  'first_name': 'Carrie',
  'last_name': 'Rhodes',
  'state': 'Hawaii',
  'zip': 18556},
 {'user_id': '500-92-3559',
  'first_name': 'Alfred',
  'last_name': 'Hoffman',
  'state': 'Nebraska',
  'zip': 77520},
 {'user_id': '146-32-0395',
  'first_name': 'Dan',
  'last_name': 'Long',
  'state': 'Hawaii',
  'zip': 28104},
 {'user_id': '814-67-3737',
  'first_name': 'Roberta',
  'last_name': 'Ryan',
 

In [43]:
# Building an RDD from a text file.
text = sc.textFile('data/pride_and_prejudice.txt', minPartitions=4)
### Number of items in the RDD
text.getNumPartitions()

4

In [44]:
text_rdd_size = text.count()
print(f"numbe of objects in the RDD is {text_rdd_size}")

nb_lines = len(open("data/pride_and_prejudice.txt").readlines())
print(f"numbe of lines in the text file is {nb_lines}")


numbe of objects in the RDD is 14579
numbe of lines in the text file is 14579


In [45]:
subset_x = text.take(10)
print(f"len of subset_x is: {len(subset_x)}\n")
print(f"type of subset_x is: {type(subset_x)}\n")
print(f"subset_x is:\n{subset_x}")


      


len of subset_x is: 10

type of subset_x is: <class 'list'>

subset_x is:
['The Project Gutenberg eBook of Pride and Prejudice, by Jane Austen', '', 'This eBook is for the use of anyone anywhere in the United States and', 'most other parts of the world at no cost and with almost no restrictions', 'whatsoever. You may copy it, give it away or re-use it under the terms', 'of the Project Gutenberg License included with this eBook or online at', 'www.gutenberg.org. If you are not located in the United States, you', 'will have to check the laws of the country where you are located before', 'using this eBook.', '']


In [46]:
import re

def clean_split_line(line):
    a = re.sub('\d+', '', line)
    b = re.sub('[\W]+', ' ', a)
    return b.upper().split()

words = text.map(clean_split_line)
words.take(60)

[['THE',
  'PROJECT',
  'GUTENBERG',
  'EBOOK',
  'OF',
  'PRIDE',
  'AND',
  'PREJUDICE',
  'BY',
  'JANE',
  'AUSTEN'],
 [],
 ['THIS',
  'EBOOK',
  'IS',
  'FOR',
  'THE',
  'USE',
  'OF',
  'ANYONE',
  'ANYWHERE',
  'IN',
  'THE',
  'UNITED',
  'STATES',
  'AND'],
 ['MOST',
  'OTHER',
  'PARTS',
  'OF',
  'THE',
  'WORLD',
  'AT',
  'NO',
  'COST',
  'AND',
  'WITH',
  'ALMOST',
  'NO',
  'RESTRICTIONS'],
 ['WHATSOEVER',
  'YOU',
  'MAY',
  'COPY',
  'IT',
  'GIVE',
  'IT',
  'AWAY',
  'OR',
  'RE',
  'USE',
  'IT',
  'UNDER',
  'THE',
  'TERMS'],
 ['OF',
  'THE',
  'PROJECT',
  'GUTENBERG',
  'LICENSE',
  'INCLUDED',
  'WITH',
  'THIS',
  'EBOOK',
  'OR',
  'ONLINE',
  'AT'],
 ['WWW',
  'GUTENBERG',
  'ORG',
  'IF',
  'YOU',
  'ARE',
  'NOT',
  'LOCATED',
  'IN',
  'THE',
  'UNITED',
  'STATES',
  'YOU'],
 ['WILL',
  'HAVE',
  'TO',
  'CHECK',
  'THE',
  'LAWS',
  'OF',
  'THE',
  'COUNTRY',
  'WHERE',
  'YOU',
  'ARE',
  'LOCATED',
  'BEFORE'],
 ['USING', 'THIS', 'EBOOK'],
 [],
 [

In [47]:
import re

def clean_split_line(line):
    a = re.sub('\d+', '', line)
    b = re.sub('[\W]+', ' ', a)
    return b.upper().split()

words = text.flatMap(clean_split_line)
words.take(60)

['THE',
 'PROJECT',
 'GUTENBERG',
 'EBOOK',
 'OF',
 'PRIDE',
 'AND',
 'PREJUDICE',
 'BY',
 'JANE',
 'AUSTEN',
 'THIS',
 'EBOOK',
 'IS',
 'FOR',
 'THE',
 'USE',
 'OF',
 'ANYONE',
 'ANYWHERE',
 'IN',
 'THE',
 'UNITED',
 'STATES',
 'AND',
 'MOST',
 'OTHER',
 'PARTS',
 'OF',
 'THE',
 'WORLD',
 'AT',
 'NO',
 'COST',
 'AND',
 'WITH',
 'ALMOST',
 'NO',
 'RESTRICTIONS',
 'WHATSOEVER',
 'YOU',
 'MAY',
 'COPY',
 'IT',
 'GIVE',
 'IT',
 'AWAY',
 'OR',
 'RE',
 'USE',
 'IT',
 'UNDER',
 'THE',
 'TERMS',
 'OF',
 'THE',
 'PROJECT',
 'GUTENBERG',
 'LICENSE',
 'INCLUDED']

In [50]:
words.count()

126018

In [51]:
# We want to do something like the following
# words_mapped = words.map(lambda x: (x,1))

words_mapped = words.map(lambda x: (x,1))
words_mapped.take(10)

[('THE', 1),
 ('PROJECT', 1),
 ('GUTENBERG', 1),
 ('EBOOK', 1),
 ('OF', 1),
 ('PRIDE', 1),
 ('AND', 1),
 ('PREJUDICE', 1),
 ('BY', 1),
 ('JANE', 1)]

In [54]:
sorted_map = words_mapped.sortByKey()
sorted_map

PythonRDD[32] at RDD at PythonRDD.scala:53

In [36]:
sorted_map.take(20000)[18000:18080]

                                                                                

[('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTH', 1),
 ('BOTTLE', 1),
 ('BOTTLE', 1),
 ('BOTTOM', 1),
 ('BOUGHT', 1),
 ('BOUGHT', 1),
 ('BOUGHT', 1),
 ('BOUGHT', 1),
 ('BOUGHT', 1),
 ('BOUND', 1),
 ('BOUND', 1),
 ('BOUND', 1),
 ('BOUNDARY', 1),
 ('BOUNDARY', 1),
 ('BOUNDLESS', 1),
 ('BOUNDS', 1),
 ('BOUNTY', 1),
 ('BOURGH', 1),
 ('BOURGH', 1),
 ('BOURGH', 1),
 ('BOURGH', 1),
 ('BOURGH', 1),
 ('BOURGH', 1),
 ('BOURGH', 1),
 ('BOURGH', 1),
 ('BOURGH', 1),
 ('BOURGH', 1),
 ('BOURGH', 1),
 ('BOURGH', 1),
 ('BOURGH', 1),
 ('BOURGH', 1),
 ('BOURGH', 1),
 ('BOURGH', 1),
 ('BOURGH', 1),
 ('BOURGH', 1),
 ('BOURGH', 1),
 ('BOURGH', 1),
 ('BOURGH', 1)

In [66]:
counts = words_mapped.reduceByKey(lambda x,y: x+y)
counts.collect()

[('PRIDE', 52),
 ('UNITED', 22),
 ('OTHER', 227),
 ('WORLD', 68),
 ('NO', 501),
 ('GIVE', 127),
 ('LICENSE', 18),
 ('WWW', 9),
 ('ARE', 361),
 ('TO', 4245),
 ('DATE', 5),
 ('UPDATED', 2),
 ('ENGLISH', 1),
 ('CHARACTER', 65),
 ('PRODUCED', 13),
 ('ILLUSTRATED', 1),
 ('THAT', 1555),
 ('POSSESSION', 10),
 ('LITTLE', 187),
 ('KNOWN', 58),
 ('VIEWS', 11),
 ('CONSIDERED', 23),
 ('AS', 1193),
 ('ONE', 273),
 ('THEIR', 439),
 ('MR', 784),
 ('JUST', 72),
 ('TOLD', 69),
 ('ME', 427),
 ('ANSWER', 65),
 ('WHO', 288),
 ('TELL', 71),
 ('HEARING', 24),
 ('ENOUGH', 106),
 ('WHY', 53),
 ('YOUNG', 130),
 ('MONDAY', 8),
 ('FOUR', 35),
 ('MUCH', 327),
 ('AGREED', 13),
 ('MICHAELMAS', 2),
 ('SERVANTS', 13),
 ('WEEK', 29),
 ('NAME', 34),
 ('BINGLEY', 307),
 ('OH', 96),
 ('FIVE', 32),
 ('YEAR', 29),
 ('FINE', 31),
 ('CAN', 223),
 ('NONSENSE', 8),
 ('THEREFORE', 75),
 ('VISIT', 53),
 ('PERHAPS', 76),
 ('PARTY', 58),
 ('THAN', 285),
 ('CONSIDER', 33),
 ('YOUR', 446),
 ('ESTABLISHMENT', 6),
 ('WILLIAM', 46),
 (

Since in functional programming we need to always return new data instead of manipulating the data in-place we can re-write the above cleanly using:
    
```
counts_test_2 = text.flatMap(clean_split_line).map(lambda x: (x,1)).reduceByKey(add)
counts_test_2.take(100)

```

In [70]:
%%time
counts_test_2 = text.flatMap(clean_split_line).map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)
counts_test_2.take(100)


CPU times: user 15 ms, sys: 14.3 ms, total: 29.3 ms
Wall time: 471 ms


[('PRIDE', 52),
 ('UNITED', 22),
 ('OTHER', 227),
 ('WORLD', 68),
 ('NO', 501),
 ('GIVE', 127),
 ('LICENSE', 18),
 ('WWW', 9),
 ('ARE', 361),
 ('TO', 4245),
 ('DATE', 5),
 ('UPDATED', 2),
 ('ENGLISH', 1),
 ('CHARACTER', 65),
 ('PRODUCED', 13),
 ('ILLUSTRATED', 1),
 ('THAT', 1555),
 ('POSSESSION', 10),
 ('LITTLE', 187),
 ('KNOWN', 58),
 ('VIEWS', 11),
 ('CONSIDERED', 23),
 ('AS', 1193),
 ('ONE', 273),
 ('THEIR', 439),
 ('MR', 784),
 ('JUST', 72),
 ('TOLD', 69),
 ('ME', 427),
 ('ANSWER', 65),
 ('WHO', 288),
 ('TELL', 71),
 ('HEARING', 24),
 ('ENOUGH', 106),
 ('WHY', 53),
 ('YOUNG', 130),
 ('MONDAY', 8),
 ('FOUR', 35),
 ('MUCH', 327),
 ('AGREED', 13),
 ('MICHAELMAS', 2),
 ('SERVANTS', 13),
 ('WEEK', 29),
 ('NAME', 34),
 ('BINGLEY', 307),
 ('OH', 96),
 ('FIVE', 32),
 ('YEAR', 29),
 ('FINE', 31),
 ('CAN', 223),
 ('NONSENSE', 8),
 ('THEREFORE', 75),
 ('VISIT', 53),
 ('PERHAPS', 76),
 ('PARTY', 58),
 ('THAN', 285),
 ('CONSIDER', 33),
 ('YOUR', 446),
 ('ESTABLISHMENT', 6),
 ('WILLIAM', 46),
 (

### Spark and Lazy Evaluation


* Transformation on an RDD  are delayed until an action is performed

  * Similar to python genertors

  * This is called lazy evaluation

* You can chain many transformations on the same RDD without causing any execution. 

In [75]:
text.take(1)

['The Project Gutenberg eBook of Pride and Prejudice, by Jane Austen']

In [79]:
# The following wo't return an error until an action is performed
data = text.map(lambda x: len(x)/0).filter(lambda x: x>0)


In [80]:
# The following will generate an error since the transformation dividing by 0 
# is executed
# the `ZeroDivisionError: division by zero` is burried in many Scala error messages.

data.collect()

21/09/22 22:49:37 ERROR Executor: Exception in task 0.0 in stage 30.0 (TID 84)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_50/1492525221.py", line 2, in <lambda>
ZeroDivisionError: division by zero

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 30.0 failed 1 times, most recent failure: Lost task 0.0 in stage 30.0 (TID 84) (804949680baf executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_50/1492525221.py", line 2, in <lambda>
ZeroDivisionError: division by zero

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2261)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 604, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 596, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_50/1492525221.py", line 2, in <lambda>
ZeroDivisionError: division by zero

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:517)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:652)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:635)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:470)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:315)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:313)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:307)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:307)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:294)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:288)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1030)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2236)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more


### The Spark Computation DAG

* Lazy evaluation is possible because Spark maintains a graph (DAG) of the transformation transformations
* The transformating are optimized and executed in the graph once an action is triggered

* A simple exampe of an execution is:

```python
data_2 =  data_1.map(lambda x: x+2)
# do some work here
data_3 =  data_2.map(lambda x: x-2)
```

* The above transformations are not run because it does not change the value of `x`.
  * `data_3` is equal to `data_1`

* See the the following blog post about the catalyst optimizer.

https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html

* More on this when we cover Spark SQL
