# Spark Overview

## Table of Content
<ol style = "type:1">
    <li><a href = "#intro">Introduction</a></li>
    <li><a href = "#rddtransformaction">RDD Transformations and Actions</a></li>
    <li><a href = "#mapvsflatmap"> <code>map</code> vs <code>flatMap</code></a></li>
    <li><a href = "#keyandvaluepairs">Key and Value Pairs</a></li>
    <li><a href = "#reducebykey">Reduce-by-Key Statement</a></li>
    <li><a href = "#ref">References</a></li>
</ol>

## <a name = "#intro">Introduction</a>

A Spark context represents the connection to the spark cluster. It can be used in creating **resilient distributed dataset (RDD)**, and broadcast variables on that cluster. (You can only have one spark context at a time the way we are running things here.)

In [1]:
import findspark

In [2]:
findspark.init("/home/virchan/spark-3.3.1-bin-hadoop3")

In [3]:
from pyspark import SparkContext

In [4]:
# Output hidden
sc = SparkContext()

23/01/17 11:58:05 WARN Utils: Your hostname, UbuntuTest resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/01/17 11:58:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/17 11:58:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


We use the Jupyter notebook magic commands to quickly write a text file here.

In [5]:
%%writefile example.txt
first line
second line
third line
fourth line

Writing example.txt


We now create an RDD.

In [6]:
textFile = sc.textFile("example.txt")

Here, `textFile` is the RDD, and `sc` is the Spark contact that connects to a Spark cluster.

We can perform actions or transformations on RDDs'. Losely speaking, RDDs' have <span style="color:blue"> actions </span> which <span style="color:blue"> return values </span> and <span style="color:orange"> transformations </span> which <span style="color:orange"> return pointers to new RDDs' </span>.

<p>More precisely, an <strong><i><span style="color:blue"> action</span></i></strong> (resp. a <strong><i> <span style="color:orange">transformation</span></i></strong>) is a Spark operation that produces a <span style="color:blue"> local object</span> (resp. an <span style="color:orange"> RDD</span>.).</p>

Let's start with the most basic action---`.count()`.

In [7]:
textFile.count()

                                                                                

4

The `.count()` action counts the number of elements in the RDD object. In our case, an element is a line in `example.txt`. Here is the `.first()` action, which returns the first element

In [8]:
textFile.first()

'first line'

For more complicated tasks, we can perform transformations. For example, we can use the `.filter()` transformation to return a new RDD of a subset of items in the file. Let's go and try looking for lines that contain the word "second".

In [9]:
secfind = textFile.filter(lambda line: "second" in line)

RDDs are lazily evaluated---you don't actually execute all those instructions of transformations until you perform an action. This is why it was fast for the above cell to run.

Let's check:

In [10]:
secfind

PythonRDD[4] at RDD at PythonRDD.scala:53

What does it mean? It tells you that it is some sort of RDD. It has a recipe of instructions to follow, but it does not actually execute them until you ask for the performance of the action.

Now, let us perform the action `.collect()`.

In [11]:
secfind.collect()

['second line']

We can also perform another action:

In [12]:
secfind.count()

1

## <a name = "rddtransformaction">RDD Transformations and Actions</a>

Let's create a text file:

In [13]:
%%writefile example2.txt
first
second line
the third line
then a fourth line

Writing example2.txt


and perform some transformations and actions it to get started.

In [14]:
sc.textFile("example2.txt")

example2.txt MapPartitionsRDD[7] at textFile at NativeMethodAccessorImpl.java:0

In [15]:
text_rdd = sc.textFile("example2.txt")

We perform the transformation that <u>splits every line into a list of words</u>.

In [16]:
words = text_rdd.map(lambda line: line.split())

In [17]:
words.collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

Now, compare this with

In [18]:
text_rdd.collect()

['first', 'second line', 'the third line', 'then a fourth line']

If `.collect()` is called on the original text, we actually get each string line in that list.

## <a name = "mapvsflatmap"><code>map</code> vs <code>flatMap</code></a>

Repeat the previous steps with `flatMap`.

In [19]:
text_rdd.flatMap(lambda line: line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

We have one single list of all the words in that text file.

## <a name = "keyandvaluepairs">Key and Value Pairs</a>

We use the code from the lecture notes to generate some fake data.

In [20]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Writing services.txt


In [21]:
services = sc.textFile("services.txt")

Look at the first two lines of the data:

In [22]:
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [23]:
# services.map(lambda line: line.split())

# # PythonRDD[17] at RDD at PythonRDD.scala:53

services.map(lambda line: line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [24]:
# Remove hash-tag
services.map(lambda line: line[1:] if line[0] == "#" else line).collect()

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00',
 '206       10/19/2017      202       CA       131          500.00',
 '203       10/17/2017      101       NY       173          750.00',
 '205       10/19/2017      202       TX       121          200.00']

Now, try

In [25]:
clean = services.map(lambda line: line[1:] if line[0] == "#" else line)

clean = clean.map(lambda line: line.split())

clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

We then have our previous services mapped to a list of items, and we no longer have that hash tag.

## <a name = "reducebykey">Reduce-by-Key Statement</a>

Next, we practice **fields grabbing**. How can we get the total sales per state? We want to grab the "State" field and the "Amount" field.

In [26]:
pairs = clean.map(lambda lst: (lst[3], lst[-1]))

The `reduceByKey` method takes in lambda expression, and it assumes the data are already in tuples form. More precisely, it takes the first element in the tuple as key, and perform the lambda expression on the last element. C.f. `GROUPBY`.

In [27]:
rekey = pairs.reduceByKey(lambda amt1, amt2: amt1 + amt2)

In [28]:
rekey.collect()

                                                                                

[('State', 'Amount'),
 ('NY', '100.00750.00'),
 ('TX', '450.00200.00'),
 ('CA', '200.00500.00')]

Looks bad... Is that string concatenation?

In [29]:
rekey = pairs.reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2))

Finally, we get the total sales by states. Let's continue this analysis by sorting the output.

In [30]:
# Grab (State, Amount)
step1 = clean.map(lambda lst: (lst[3], lst[-1]))

# Reduce by Key
step2 = step1.reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2))

# Get rid of State, Amount titles
step3 = step2.filter(lambda x: not x[0] == "State")

# Sort Results by Amount
step4 = step3.sortBy(lambda stAmount: stAmount[1], ascending = False)

# Perorm Action
step4.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

Packing and tuple are good for readability.

In [31]:
x = ["ID", "State", "Amount"]

In [32]:
def func1(lst):
    return lst[-1]

The problem with `func1` is, it is not readable when come back to it later. If a couple of days pass by, you are trying to remember the index tree and the last index. You would want to use packing and tuple instead.

In [33]:
def func2(id_st_amt):
    #unpack values
    (Id, st, amt) = id_st_amt
    return amt

Clearly, func2 is more readable than func1.

## <a name = "ref">References</a>

<ol style = "type:1">
    <li>Jose Portilla. Python for Data Science and Machine Learning Bootcamp.</li>
    <li>Apache Spark. <a href = "https://spark.apache.org/docs/latest/api/python/">https://spark.apache.org/docs/latest/api/python/</a>.</li>
</ol>