In [None]:
# HELLO

### The case for distributed computing

* What happens if the data to be analyzed is too large?
  * e.g. cannot be stored on a single machine

* What if the computation is too complex?
  * e.g., in interactive mode, it is unacceptably slow

* What if you have to deal with both situations?

* Can you scale up? Can you scale out?

### Scaling Up: Local Machine

* Scaling up occurs within the same system hosting the data and running the computaiton
  * Simple to carry out from a physical standpoint
  * from a programmatic standpoint, it's managed by the operating system and programming libraries; does not require additional frameworks
* Scabaility is typically limited by the OS physical resources on the system.
  * RAM upper bound is determined by the OS  or the number of slots available on the motherboard
* The cost of a single machine at the highest configuration may be prohibitive

* May not meet the demands of the workload at hand


### Buying 256 GB of RAM 

* Single stick 256GB, DDR4 PC4-23400

![](https://www.dropbox.com/s/b5v6sypv5mk8ber/Screen%20Shot%202022-09-05%20at%208.35.03%20PM.png?dl=1)

### Maximum Number of Logical CPUs

```Red Hat defines a logical CPU as any schedulable entity. So every core/thread in a multicore/thread processor is a logical CPU.``` [ref](https://access.redhat.com/articles/rhel-limits#maximum-logical-cpus-1)

![](https://www.dropbox.com/s/1svoylj8jghz6n6/Screen%20Shot%202022-09-05%20at%208.19.22%20PM.png?dl=1)



### Maximum Memory Supported

![](https://www.dropbox.com/s/dlm7ofmqgko5wb4/Screen%20Shot%202022-09-05%20at%208.24.37%20PM.png?dl=1)
From [ref](https://access.redhat.com/articles/rhel-limits#maximum-logical-cpus-1)

### Storage limits

![](https://www.dropbox.com/s/7hgqjm0gcrzcuej/Screen%20Shot%202022-09-05%20at%208.25.54%20PM.png?dl=1)
From [ref](https://access.redhat.com/articles/rhel-limits#maximum-logical-cpus-1)

### Distributed Systems

* Distributed systems divide the workload among multiple machines

The typical process involves: 
  * A master node responsible for distributing the job and ensuring its completion
  * Worker nodes that perform the actual work
 
* Can be substantially more cost-effective compared to scaling out at maximum conifguration.

* Distributed systems are typically complex due to the following requirements:
  * Physically deploying and maintaining the platform; cabling, switches, machines, power management, etc.
  * A new paradigm for implementing distributed tasks
     * Think for instance, split-apply-combine paradigm
  * Framework to manage the platform
    * For instance, a master node needs to know how to split a file, way to assign workers, communicates the data, gathers and combine the results


### Distributed Systems

* The hardware specs may not be the same across machines, adding another layer of complexity if it doesn't

<img src="https://www.dropbox.com/s/8mncw4ffe8uajol/networking.jpg?dl=1" width="700" height="600">


### Distributed Systems Requirements

* Among other requirements, a distributed system should provide:

  * Communication: nodes must be able to communicate with each other
  * Fault tolerance: both data and computation should not be affected by system faults   
  * Scalable processing: workload increases can be accommodated, to some extent, by increasing computational resources

### Apache Spark

* Apache Spark is an open-source, distributed processing system used for big data workloads.
  * Runs on a cluster

* It's an enhancement to Hadoop's MapReduce
  * Processes and retains data in memory for subsequent steps
  * For smaller workloads, Spark’s data processing speeds are up to 100x faster than Hadoop's MapReduce

* Written in Scala and runs in the JVM


### Apache Spark

* Ideal for real-time processing as it utilizes in-memory caching and optimized query execution for fast queries against data of any size.  

* Provides a richer ecosystem of functionality
  * Over 80 high level operators beyond Map and Reduce
    * Tools for pipeline construction and evaluation
  * compared to Hadoop, Spark provides more operators other than map and reduce
    * Includes libraries to support SQL queries, machine learning (MLlib), graph data analysis (GraphX) and streaming data analysis
  * Plethora of functions for SQL-like operation, ML and working with graph data


### What is Apache Spark

* [See Video](https://www.databricks.com/spark/about)

### Spark and Functional Programming

* To manipulate data, Spark uses functional programming
  The functional programming paradigm is used in many popular languages including Common Lisp, Scheme, Clojure, OCaml, and Haskell
  
* Functional programming is a data oriented paradigm
  * Decomposes a problem into a set of functions.
  * Logic is implemented by applying and composing functions.

* The idea is that functions should be able to manipulate data without maintaining any external state.
  * No global variables
 
* In functional programming, we need to always return new data instead of manipulating the data in-place.

### Core Components of Spark
<img src="https://www.dropbox.com/s/azebxe8nv5nsqne/spark_architecture.png?dl=1" width="900" height="600">


### Core Components of Spark

* **Spark Core**
  * Basic functionality:
    * APIs that define datasets (RDDs)
    * operations and actions to process RDDs
    
* **Spark SQL**
    * Option ti comply with the ANSI SQL standard
    * APIs to interact with Apache Hive's variant of SQL called Hive Query Language (HiveQL).
    * DB tables are RDDs and Spark SQL queries are transformed into Spark operations

* **Spark Streaming**
    * Enables the processing and manipulation of live streaming data.
    
* **MLlib**
  * Implementation of machine learning algorithms using Spark on RDDs
  * Basic algorithms for classifications, regressions,

* **GraphX
  * Functionality for manipulating graphs and performing parallel graph operations and computations
  * A sort of large-scale Neo4J

### Spark Operations and RDDs

* Spark provides its own distributed data framework called resilient distributed datasets or RDDs
  * RDD is an abstraction that represents a read-only collection of objects that are partitioned across machines
  * Partitioning ensures fault tolerance and maximize job efficiency
    * RDDs can be accessed via parallel operations

* RDDs are cached in memory, making it efficient to iterate on the same data
  * Ideal for operations such as optimization or some ML algorithms
  * Fast operation speed makes it ideal for command-line-based queries.

### Spark Paradigm

* A Spark program typically follows a simple paradigm:


1. The main program is the `application driver` 
2. The program has one or more workers, called executors,
  * Those run code sent to them by the driver on their partitions of the data
  * Execution is dispatched, not code
3. Results are then sent back to the driver for aggregation or compilation




### Spark Job Sequence

<img src="https://www.dropbox.com/s/tv0vaxyuxzmxud4/Page3.jpg?dl=1" width="900" height="600">


### Spark Application Manager

<img src="https://www.dropbox.com/s/3fb4tw290rc3dy2/Page4.jpg?dl=1" width="900" height="600">



### Spark Execution using PySpark


* Interact with the Scala Interface using the PySpark Python library

  * Wrapper that uses almost exactly the same function and attribute names

![](https://www.dropbox.com/s/do918x5bpeh8oh4/cluster_mode.png?dl=1)

### Setting a Docker Cluster

* Manually installing Spark and all its components can be a daunting task.
 * Manually deploying, configure and optimizing a Spark is complex and time consuming
 
* Easy to deploy on the cloud. See for instance:
  * [Amazon's EMR](https://aws.amazon.com/emr/features/spark/)
  * DataBricks free [micro solution](https://www.databricks.com/product/faq/community-edition) and paid offerings
  * Vairous other providers, incuding Google (dataproc), Microsoft (HDInsight), etc.
  * A common approach to occasional jobs
 


### Installing Via Docker For ICS438

* It is easy to use Docker to install locally. We will use the following Docker image
  
```  
jupyter/all-spark-notebook
```
* There are other docker images, including (jupyter/pyspark-notebook), which does not include the jobs dashboard `http://localhost:4040`

* We will run the infrastructure as follows:

```
docker run --rm -p 4040:4040 -p 8888:8888 -v $(pwd):/home/jovyan/work jupyter/all-spark-notebook
```

* This configuration created a master and compute nodes locally in a docker instance
 
* While you're probably not going to need to, you can log into the running container using: `docker exec -it <CONTAINER_ID> bash`

 * where <CONTAINER_ID> of the container currently running the `jupyter/all-spark-notebook` image


* The Docker instance has all the libraries installed and ready to go.

* Make sure you run a Jupyer notebook on the Docker instnace
  * If the code below fails, this means you're not running in the Docker instance

In [4]:
# # pip install pyspark

# from pyspark import SparkContext
# sc = SparkContext()



In [5]:
SparkContext?

[0;31mInit signature:[0m
[0mSparkContext[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mmaster[0m[0;34m:[0m [0mOptional[0m[0;34m[[0m[0mstr[0m[0;34m][0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mappName[0m[0;34m:[0m [0mOptional[0m[0;34m[[0m[0mstr[0m[0;34m][0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0msparkHome[0m[0;34m:[0m [0mOptional[0m[0;34m[[0m[0mstr[0m[0;34m][0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mpyFiles[0m[0;34m:[0m [0mOptional[0m[0;34m[[0m[0mList[0m[0;34m[[0m[0mstr[0m[0;34m][0m[0;34m][0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0menvironment[0m[0;34m:[0m [0mOptional[0m[0;34m[[0m[0mDict[0m[0;34m[[0m[0mstr[0m[0;34m,[0m [0mAny[0m[0;34m][0m[0;34m][0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mbatchSize[0m[0;34m:[0m [0mint[0m [0;34m=[0m [0;36m0[0m[0;34m,[0

In [None]:
print(f"Spark version is {sc.version}")

print(f"Python version is {sc.pythonVer}")

print(f"The name of the master is {sc.master}")


In [None]:
sc.getConf().getAll()


### Creating a Text RDD

* You can create RDDs in a number of ways:

    * `parallelize()`: function to transform Python collections (list-like data structures) into `RDD`s
        * Distributes the passed list and makes it fault-tolerant

* Another easy way to create RDDs is to read in a file with `textFile()`

* Creates an RDD where every object is a line of the input text file

 

### Fundamental Operations on RDDs

* Once an RDD is created, we can access its `map`, `reduce` and `filter` methods
 * Those operations and others we will cover are called 'transformations'
 * A transformation on a `RDD` yields a new `RDD`
 * `flatMap` is also commonly used and is equivalent to `itertools.chain()`


### Transformations Versus Actions

* Transformation: transform the data. They include:
    * Narrow-dependency transformations:
      * data partitions can be processed independently, e.g. filter values and drop column
    * Wide-dependency transformations: 
      * Dependency across partitions, e.g., groupBy, aggregate


* Actions: trigger the work.
  * Each action triggers a Spark jobs

Ex. Why split those? 
  * E.g.: query optimization. For example, groupBy then filter is better optimized as filter then groupBy.

### Example Query


```python
flights_df = spark.read.option("head", "true").option("inferSchema", "true").csv("flights_info.csv")

flights_data_partitioned_df = flights_data.repartition(minPartitions=4)
counts_df = flights_data_partitioned_df.where("duration > 120")
                                       .select("dep", "dest", "carrier", "durations")
                                       .groupBy("carrier")
                                       .count()
counts_df.collect()
```


### PySpark: Job, Stages and Tasks

<img src="https://www.dropbox.com/s/5qa1fb7p867i787/Page5.jpg?dl=1" width="900" height="600">


* Stages are run in succession, potentially across separate nodes
* Tasks are run in parallel
* Computation is dispatched to the data.

    
    

<img src="https://www.dropbox.com/s/4n6cyf2n60zs46v/Page6.jpg?dl=1" width="900" height="600">


In [6]:
pip install randomuser

Collecting randomuser
  Downloading randomuser-1.6.tar.gz (5.0 kB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: randomuser
  Building wheel for randomuser (setup.py) ... [?25ldone
[?25h  Created wheel for randomuser: filename=randomuser-1.6-py3-none-any.whl size=5067 sha256=7a0cf14e8630ad5f5a2fd977269a710c9b3d3be26a06c2cb0bb3d08a88aff43c
  Stored in directory: /home/jovyan/.cache/pip/wheels/b8/f3/19/6a938647065b4bb2471a9d063647d14d4fcc3236731f4e2b53
Successfully built randomuser
Installing collected packages: randomuser
Successfully installed randomuser-1.6
Note: you may need to restart the kernel to use updated packages.


In [8]:
# Insstall using the following if not already installed 

from randomuser import RandomUser

# # Generate a single user
user = RandomUser({"nat": "us"})
print(f"user object  is {user}")
def get_user_info(u):

    user_dict = {
        "user_id": u.get_id()["number"], 
        "first_name": u.get_first_name(), 
        "last_name": u.get_last_name(), 
        "state": u.get_state(),
        "zip": u.get_zipcode(),
        "lat_long": u.get_coordinates()
    }
    return user_dict

user_json = get_user_info(user)
print(f"user json representation  is\n {user_json}")



user object  is <randomuser.RandomUser object at 0x4066ef5870>
user json representation  is
 {'user_id': '960-45-6374', 'first_name': 'Julian', 'last_name': 'Bowman', 'state': 'Maryland', 'zip': 11844, 'lat_long': {'latitude': '84.1334', 'longitude': '-110.5892'}}


In [10]:
my_users = RandomUser.generate_users(5000, {"nat": "us"})
print(len(my_users))
my_users[0:3]

5000


[<randomuser.RandomUser at 0x407921f5e0>,
 <randomuser.RandomUser at 0x407921e260>,
 <randomuser.RandomUser at 0x407921e590>]

In [11]:
# Generate a list of 10 random users

user_dicts = list(map(get_user_info, my_users))

user_dicts[0:3]

[{'user_id': '988-44-7974',
  'first_name': 'Julio',
  'last_name': 'White',
  'state': 'Michigan',
  'zip': 64953,
  'lat_long': {'latitude': '-85.8992', 'longitude': '-35.8376'}},
 {'user_id': '848-14-4659',
  'first_name': 'Isaac',
  'last_name': 'Phillips',
  'state': 'Indiana',
  'zip': 70288,
  'lat_long': {'latitude': '-34.5206', 'longitude': '1.8089'}},
 {'user_id': '419-81-7907',
  'first_name': 'Gary',
  'last_name': 'Wells',
  'state': 'Connecticut',
  'zip': 37970,
  'lat_long': {'latitude': '-49.4423', 'longitude': '11.3928'}}]

In [13]:
users_rdd = sc.parallelize(user_dicts)
users_rdd_size  = users_rdd.count()
print(f"The number of objects in my RDD is: {users_rdd_size}")
users_rdd.takeSample(False, 3)

The number of objects in my RDD is: 5000


[{'user_id': '675-04-0332',
  'first_name': 'Oscar',
  'last_name': 'Chambers',
  'state': 'Oregon',
  'zip': 17604,
  'lat_long': {'latitude': '60.8608', 'longitude': '-82.8117'}},
 {'user_id': '302-13-3578',
  'first_name': 'Julie',
  'last_name': 'Watson',
  'state': 'Georgia',
  'zip': 86710,
  'lat_long': {'latitude': '87.7597', 'longitude': '-27.8451'}},
 {'user_id': '707-49-4477',
  'first_name': 'Manuel',
  'last_name': 'Burns',
  'state': 'Florida',
  'zip': 83279,
  'lat_long': {'latitude': '64.0851', 'longitude': '99.6517'}}]

In [14]:
select_users_rdd = users_rdd.filter(lambda x: x['state'] in ["Nebraska", "Hawaii", "Idaho"])
select_users_rdd

PythonRDD[8] at RDD at PythonRDD.scala:53

In [14]:
# collect the result means grab them from all the chunk nodes
select_users_rdd.collect()[:10]

[{'user_id': '048-73-1753',
  'first_name': 'Albert',
  'last_name': 'Scott',
  'state': 'Nebraska',
  'zip': 94069,
  'lat_long': {'latitude': '73.8760', 'longitude': '-13.5365'}},
 {'user_id': '018-56-7439',
  'first_name': 'Veronica',
  'last_name': 'Woods',
  'state': 'Idaho',
  'zip': 85540,
  'lat_long': {'latitude': '35.9805', 'longitude': '-0.2080'}},
 {'user_id': '808-14-7364',
  'first_name': 'June',
  'last_name': 'Baker',
  'state': 'Nebraska',
  'zip': 89660,
  'lat_long': {'latitude': '-76.5984', 'longitude': '-51.9512'}},
 {'user_id': '753-04-9746',
  'first_name': 'Marvin',
  'last_name': 'Beck',
  'state': 'Idaho',
  'zip': 62505,
  'lat_long': {'latitude': '37.4552', 'longitude': '-169.8550'}},
 {'user_id': '553-71-2902',
  'first_name': 'Anita',
  'last_name': 'Hernandez',
  'state': 'Nebraska',
  'zip': 37841,
  'lat_long': {'latitude': '-0.7319', 'longitude': '-57.2282'}},
 {'user_id': '770-54-5308',
  'first_name': 'Adam',
  'last_name': 'Powell',
  'state': 'Idah

In [15]:
# Building an RDD from a text file.
text = sc.textFile('data/pride_and_prejudice.txt', minPartitions=4)
### Number of items in the RDD
text.getNumPartitions()

4

In [16]:
text_rdd_size = text.count()
print(f"numbe of objects in the RDD is {text_rdd_size}")

nb_lines = len(open("data/pride_and_prejudice.txt").readlines())
print(f"numbe of lines in the text file is {nb_lines}")


numbe of objects in the RDD is 14579
numbe of lines in the text file is 14579


In [17]:
subset_x = text.take(10)
print(f"len of subset_x is: {len(subset_x)}\n")
print(f"type of subset_x is: {type(subset_x)}\n")
print(f"subset_x is:\n{subset_x}")

len of subset_x is: 10

type of subset_x is: <class 'list'>

subset_x is:
['The Project Gutenberg eBook of Pride and Prejudice, by Jane Austen', '', 'This eBook is for the use of anyone anywhere in the United States and', 'most other parts of the world at no cost and with almost no restrictions', 'whatsoever. You may copy it, give it away or re-use it under the terms', 'of the Project Gutenberg License included with this eBook or online at', 'www.gutenberg.org. If you are not located in the United States, you', 'will have to check the laws of the country where you are located before', 'using this eBook.', '']


In [18]:
import re

def clean_split_line(line):
    a = re.sub('\d+', '', line)
    b = re.sub('[\W]+', ' ', a)
    return b.upper().split()

words = text.map(clean_split_line)
words.take(60)

[['THE',
  'PROJECT',
  'GUTENBERG',
  'EBOOK',
  'OF',
  'PRIDE',
  'AND',
  'PREJUDICE',
  'BY',
  'JANE',
  'AUSTEN'],
 [],
 ['THIS',
  'EBOOK',
  'IS',
  'FOR',
  'THE',
  'USE',
  'OF',
  'ANYONE',
  'ANYWHERE',
  'IN',
  'THE',
  'UNITED',
  'STATES',
  'AND'],
 ['MOST',
  'OTHER',
  'PARTS',
  'OF',
  'THE',
  'WORLD',
  'AT',
  'NO',
  'COST',
  'AND',
  'WITH',
  'ALMOST',
  'NO',
  'RESTRICTIONS'],
 ['WHATSOEVER',
  'YOU',
  'MAY',
  'COPY',
  'IT',
  'GIVE',
  'IT',
  'AWAY',
  'OR',
  'RE',
  'USE',
  'IT',
  'UNDER',
  'THE',
  'TERMS'],
 ['OF',
  'THE',
  'PROJECT',
  'GUTENBERG',
  'LICENSE',
  'INCLUDED',
  'WITH',
  'THIS',
  'EBOOK',
  'OR',
  'ONLINE',
  'AT'],
 ['WWW',
  'GUTENBERG',
  'ORG',
  'IF',
  'YOU',
  'ARE',
  'NOT',
  'LOCATED',
  'IN',
  'THE',
  'UNITED',
  'STATES',
  'YOU'],
 ['WILL',
  'HAVE',
  'TO',
  'CHECK',
  'THE',
  'LAWS',
  'OF',
  'THE',
  'COUNTRY',
  'WHERE',
  'YOU',
  'ARE',
  'LOCATED',
  'BEFORE'],
 ['USING', 'THIS', 'EBOOK'],
 [],
 [

In [20]:
import re

def clean_split_line(line):
    a = re.sub('\d+', '', line)
    b = re.sub('[\W]+', ' ', a)
    return b.upper().split()

words = text.flatMap(clean_split_line)
words.take(60)

['THE',
 'PROJECT',
 'GUTENBERG',
 'EBOOK',
 'OF',
 'PRIDE',
 'AND',
 'PREJUDICE',
 'BY',
 'JANE',
 'AUSTEN',
 'THIS',
 'EBOOK',
 'IS',
 'FOR',
 'THE',
 'USE',
 'OF',
 'ANYONE',
 'ANYWHERE',
 'IN',
 'THE',
 'UNITED',
 'STATES',
 'AND',
 'MOST',
 'OTHER',
 'PARTS',
 'OF',
 'THE',
 'WORLD',
 'AT',
 'NO',
 'COST',
 'AND',
 'WITH',
 'ALMOST',
 'NO',
 'RESTRICTIONS',
 'WHATSOEVER',
 'YOU',
 'MAY',
 'COPY',
 'IT',
 'GIVE',
 'IT',
 'AWAY',
 'OR',
 'RE',
 'USE',
 'IT',
 'UNDER',
 'THE',
 'TERMS',
 'OF',
 'THE',
 'PROJECT',
 'GUTENBERG',
 'LICENSE',
 'INCLUDED']

In [21]:
words.count()

126018

In [20]:
# We want to do something like the following
# words_mapped = words.map(lambda x: (x,1))

words_mapped = words.map(lambda x: (x,1))
words_mapped.take(10)

[(['THE',
   'PROJECT',
   'GUTENBERG',
   'EBOOK',
   'OF',
   'PRIDE',
   'AND',
   'PREJUDICE',
   'BY',
   'JANE',
   'AUSTEN'],
  1),
 ([], 1),
 (['THIS',
   'EBOOK',
   'IS',
   'FOR',
   'THE',
   'USE',
   'OF',
   'ANYONE',
   'ANYWHERE',
   'IN',
   'THE',
   'UNITED',
   'STATES',
   'AND'],
  1),
 (['MOST',
   'OTHER',
   'PARTS',
   'OF',
   'THE',
   'WORLD',
   'AT',
   'NO',
   'COST',
   'AND',
   'WITH',
   'ALMOST',
   'NO',
   'RESTRICTIONS'],
  1),
 (['WHATSOEVER',
   'YOU',
   'MAY',
   'COPY',
   'IT',
   'GIVE',
   'IT',
   'AWAY',
   'OR',
   'RE',
   'USE',
   'IT',
   'UNDER',
   'THE',
   'TERMS'],
  1),
 (['OF',
   'THE',
   'PROJECT',
   'GUTENBERG',
   'LICENSE',
   'INCLUDED',
   'WITH',
   'THIS',
   'EBOOK',
   'OR',
   'ONLINE',
   'AT'],
  1),
 (['WWW',
   'GUTENBERG',
   'ORG',
   'IF',
   'YOU',
   'ARE',
   'NOT',
   'LOCATED',
   'IN',
   'THE',
   'UNITED',
   'STATES',
   'YOU'],
  1),
 (['WILL',
   'HAVE',
   'TO',
   'CHECK',
   'THE',
   'LA

In [21]:
sorted_map = words_mapped.sortByKey()
sorted_map

PythonRDD[21] at RDD at PythonRDD.scala:53

In [31]:
sample = sorted_map.sample(withReplacement=False, fraction= 0.001)
sample.collect()

[('A', 1),
 ('A', 1),
 ('A', 1),
 ('ALL', 1),
 ('AND', 1),
 ('AND', 1),
 ('AND', 1),
 ('ASSEMBLED', 1),
 ('AT', 1),
 ('BE', 1),
 ('BE', 1),
 ('BEFORE', 1),
 ('BEFORE', 1),
 ('BENNET', 1),
 ('BUT', 1),
 ('BUT', 1),
 ('BY', 1),
 ('CANDOUR', 1),
 ('COACH', 1),
 ('COLONEL', 1),
 ('COLONEL', 1),
 ('CONFIRMATION', 1),
 ('CONTRIVED', 1),
 ('CRIED', 1),
 ('DARCY', 1),
 ('DAY', 1),
 ('DESIRED', 1),
 ('DIFFIDENCE', 1),
 ('DON', 1),
 ('EBOOK', 1),
 ('EDWARD', 1),
 ('ELIZABETH', 1),
 ('EMBARRASSMENT', 1),
 ('EVERYTHING', 1),
 ('EXAGGERATION', 1),
 ('HAD', 1),
 ('HAD', 1),
 ('HAD', 1),
 ('HE', 1),
 ('HER', 1),
 ('HER', 1),
 ('HERTFORDSHIRE', 1),
 ('HIM', 1),
 ('HIS', 1),
 ('HIS', 1),
 ('HOPE', 1),
 ('IN', 1),
 ('IN', 1),
 ('INDEED', 1),
 ('INDUCEMENT', 1),
 ('IS', 1),
 ('IT', 1),
 ('IT', 1),
 ('ITS', 1),
 ('KNOW', 1),
 ('LAUGHED', 1),
 ('LINES', 1),
 ('LOOSE', 1),
 ('MANNERS', 1),
 ('ME', 1),
 ('MORE', 1),
 ('MUST', 1),
 ('MYSELF', 1),
 ('NO', 1),
 ('NOT', 1),
 ('OCCASION', 1),
 ('OF', 1),
 ('OF', 

In [33]:
counts = words_mapped.reduceByKey(lambda x,y: x+y)
counts.collect()[:50]

[('PRIDE', 52),
 ('UNITED', 22),
 ('OTHER', 227),
 ('WORLD', 68),
 ('NO', 501),
 ('GIVE', 127),
 ('LICENSE', 18),
 ('WWW', 9),
 ('ARE', 361),
 ('TO', 4245),
 ('DATE', 5),
 ('UPDATED', 2),
 ('ENGLISH', 1),
 ('CHARACTER', 65),
 ('PRODUCED', 13),
 ('ILLUSTRATED', 1),
 ('THAT', 1555),
 ('POSSESSION', 10),
 ('LITTLE', 187),
 ('KNOWN', 58),
 ('VIEWS', 11),
 ('CONSIDERED', 23),
 ('AS', 1193),
 ('ONE', 273),
 ('THEIR', 439),
 ('MR', 784),
 ('JUST', 72),
 ('TOLD', 69),
 ('ME', 427),
 ('ANSWER', 65),
 ('WHO', 288),
 ('TELL', 71),
 ('HEARING', 24),
 ('ENOUGH', 106),
 ('WHY', 53),
 ('YOUNG', 130),
 ('MONDAY', 8),
 ('FOUR', 35),
 ('MUCH', 327),
 ('AGREED', 13),
 ('MICHAELMAS', 2),
 ('SERVANTS', 13),
 ('WEEK', 29),
 ('NAME', 34),
 ('BINGLEY', 307),
 ('OH', 96),
 ('FIVE', 32),
 ('YEAR', 29),
 ('FINE', 31),
 ('CAN', 223)]

In [35]:
# As functional programming always returns new data instead of manipulating the data in-place, we can rewrite the above as:

%%time
counts_test_2 = text.flatMap(clean_split_line).map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)
counts_test_2.take(100)


CPU times: user 40.9 ms, sys: 7.41 ms, total: 48.4 ms
Wall time: 2.9 s


[('PRIDE', 52),
 ('UNITED', 22),
 ('OTHER', 227),
 ('WORLD', 68),
 ('NO', 501),
 ('GIVE', 127),
 ('LICENSE', 18),
 ('WWW', 9),
 ('ARE', 361),
 ('TO', 4245),
 ('DATE', 5),
 ('UPDATED', 2),
 ('ENGLISH', 1),
 ('CHARACTER', 65),
 ('PRODUCED', 13),
 ('ILLUSTRATED', 1),
 ('THAT', 1555),
 ('POSSESSION', 10),
 ('LITTLE', 187),
 ('KNOWN', 58),
 ('VIEWS', 11),
 ('CONSIDERED', 23),
 ('AS', 1193),
 ('ONE', 273),
 ('THEIR', 439),
 ('MR', 784),
 ('JUST', 72),
 ('TOLD', 69),
 ('ME', 427),
 ('ANSWER', 65),
 ('WHO', 288),
 ('TELL', 71),
 ('HEARING', 24),
 ('ENOUGH', 106),
 ('WHY', 53),
 ('YOUNG', 130),
 ('MONDAY', 8),
 ('FOUR', 35),
 ('MUCH', 327),
 ('AGREED', 13),
 ('MICHAELMAS', 2),
 ('SERVANTS', 13),
 ('WEEK', 29),
 ('NAME', 34),
 ('BINGLEY', 307),
 ('OH', 96),
 ('FIVE', 32),
 ('YEAR', 29),
 ('FINE', 31),
 ('CAN', 223),
 ('NONSENSE', 8),
 ('THEREFORE', 75),
 ('VISIT', 53),
 ('PERHAPS', 76),
 ('PARTY', 58),
 ('THAN', 285),
 ('CONSIDER', 33),
 ('YOUR', 446),
 ('ESTABLISHMENT', 6),
 ('WILLIAM', 46),
 (

In [75]:
text.take(1)

['The Project Gutenberg eBook of Pride and Prejudice, by Jane Austen']

In [None]:
# The following won't return an error until an action is performed

data_s1 = text.map(lambda x: len(x)/0)
data_s2.filter(lambda x: x>0)



In [None]:
# The following will generate an error since the transformation dividing by 0 
# is executed
# the `ZeroDivisionError: division by zero` is burried in many Scala error messages.

data.collect()

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 12.0 failed 1 times, most recent failure: Lost task 1.0 in stage 12.0 (TID 46) (bd518172607a executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_113/403181734.py", line 3, in <lambda>
ZeroDivisionError: division by zero

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2293)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 686, in main
    process()
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/worker.py", line 678, in process
    serializer.dump_stream(out_iter, outfile)
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 273, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/util.py", line 81, in wrapper
    return f(*args, **kwargs)
  File "/tmp/ipykernel_113/403181734.py", line 3, in <lambda>
ZeroDivisionError: division by zero

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more


### The Spark Computation DAG

* Lazy evaluation is possible because Spark maintains a graph (DAG) of the transformations
* The transformating are optimized and executed in the graph once an action is triggered

* A simple exampe of an execution is:

```python
data_2 =  data_1.map(lambda x: x+2)
# do some work here
data_3 =  data_2.map(lambda x: x-2)
```

* The above transformations are not run because it does not change the value of `x`.
  * `data_3` is equal to `data_1`

* See the the following blog post about the catalyst optimizer.

https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html

* More on this when we cover Adaptive Query Execution 
