## Intro to PySpark

In [1]:
from pyspark import SparkContext

In [2]:
whos

Variable       Type    Data/Info
--------------------------------
SparkContext   type    <class 'pyspark.context.SparkContext'>


In [3]:
# Create the spark context
sc = SparkContext()

In [4]:
sc

a sparkcontext can be used to create the connection to the Spark Cluster. It can be used to *create an RDD* and *broadcast variables on that cluster*

You can only have One SparkContext at a time the way this is configured.

**Basic Operations**

first write a text file

In [5]:
whos

Variable       Type            Data/Info
----------------------------------------
SparkContext   type            <class 'pyspark.context.SparkContext'>
sc             SparkContext    <SparkContext master=loca<...>*] appName=pyspark-shell>


#### writing out a file using the example commands
 sortof like cat > filename is
 cat > filename
 stuff into file
 ctrl-D
 
 You can't have a comment for the first line - comments must be after the magic command. e.g.

In [6]:
%%writefile example.text
first line
second line
third line
fourth line

Overwriting example.text


In [7]:
%less example.text

We can read in this textfile using the textfile method from the Context we created to create an RDD. It can read from HDFS, a local filesystem or any hadoop supported filesystem and return it as an *RDD of strings*



In [8]:
# creating an RDD using the textfile method
textFile = sc.textFile('example.text')

In [9]:
type(textFile)

pyspark.rdd.RDD

we can now perform operations (actions, transformations, etc) on the RDD 

*RDDs have **actions** which return values*  
**transformations** *return pointers to new RDDs*

#### These are actions:

In [13]:
# Count the lines
textFile.count()

4

In [14]:
# Get the first element
textFile.first()

'first line'

#### These are transformations

They will return quickly. RDDs are lazily evaluated. You don't perform all the instructions of the transformations until you perform an *action*. *Transformations* can be thought of as a 'recipe'*


In [18]:
# Only return elements that satisfy the conditions
# Look for lines that contain the word 'second'
secfind = textFile.filter(lambda line: 'second' in line)

In [19]:
type(secfind)

pyspark.rdd.PipelinedRDD

In [20]:
secfind

PythonRDD[7] at RDD at PythonRDD.scala:53

In [21]:
# Now this will perform an Action on the transformation
secfind.collect()

['second line']

In [22]:
secfind.count()

1

## RDD Transformations And Actions

### Terms and definitions:  
**RDD** - Resilient Distributed Dataset  
**Transformation** - Spark operation that produces an RDD  
**Action** - Spark Operation that produces a local object  
**Spark Job** - Sequence of transformations on data with a final action

#### Creating an RDD  
- **sc.parallelize(array)** - create RDD elements of array (or list)
- **sc.textFile(path/to/file)** - create RDD of lines from a file


#### RDD Transformations
**We can use transformations to create a set of instructions we want to preform on the RDD (before we call an action and actually execute them).**  
- **filter(lambda x: x % 2 ==0)** - discard non-even elements
- **map(lambda x: x * 2)** - multiply each RDD element by 2
- **map(lambda x: x.split())** - split each string into words
- **flatMap(lambda x: x.split())** - split each string into words and flatten sequence
- **sample(withReplacement=True,0.25)** - Create a sample of 25% of elements with replacement
- **union(RDD)** - Append RDD ot existing RDD
- **distinct()** - remove duplicates from RDD
- **sortBy(lambda x: x, ascending=False)** - Sort elements in descending order

#### RDD Actions
**Once you have your 'recipe' of transformations ready, what you will do next is execute them by calling an action. Here are some common actions:**  
- **collect()** - convert RDD into in-memory list
- **take(3)** - take the first 3 elements of an RDD
- **top(3)** - top 3 elements of an RDD
- **createSample(withReplacement=True,3)** - create a sample of 3 element with replacement
- **sum()** - find elements sum (assumes numeric elements)
- **mean()** - find elements mean (assumes numeric elements)
- **stddev()** - find elements standard deviation (assumes numeric elements)

## Examples

In [24]:
%%writefile example2.txt
first
second line
the third line
then a fourth line


Overwriting example2.txt


In [26]:
whos

Variable       Type            Data/Info
----------------------------------------
SparkContext   type            <class 'pyspark.context.SparkContext'>
sc             SparkContext    <SparkContext master=loca<...>*] appName=pyspark-shell>
secfind        PipelinedRDD    PythonRDD[7] at RDD at PythonRDD.scala:53
textFile       RDD             example.text MapPartition<...>MethodAccessorImpl.java:0


In [29]:
# Transform the textfile as an RDD object
sc.textFile('example2.txt')

example2.txt MapPartitionsRDD[14] at textFile at NativeMethodAccessorImpl.java:0

In [30]:
# create a reference to the rdd
text_rdd = sc.textFile('example2.txt')

In [31]:
# now map a function or lamba expression to each line and collect the results
words = text_rdd.map(lambda line: line.split())

In [32]:
words.collect()

[['first'],
 ['second', 'line'],
 ['the', 'third', 'line'],
 ['then', 'a', 'fourth', 'line']]

In [33]:
# compare with the original
text_rdd.collect()

['first', 'second line', 'the third line', 'then a fourth line']

In [35]:
# Use the flatmap method
text_rdd.flatMap(lambda line: line.split()).collect()

['first',
 'second',
 'line',
 'the',
 'third',
 'line',
 'then',
 'a',
 'fourth',
 'line']

Let's check out k,v pairs as an RDD. Need to make a new textfile

In [36]:
%%writefile services.txt
#EventId    Timestamp    Customer   State    ServiceID    Amount
201       10/13/2017      100       NY       131          100.00
204       10/18/2017      700       TX       129          450.00
202       10/15/2017      203       CA       121          200.00
206       10/19/2017      202       CA       131          500.00
203       10/17/2017      101       NY       173          750.00
205       10/19/2017      202       TX       121          200.00

Writing services.txt


In [37]:
services = sc.textFile('services.txt')

In [38]:
# Cleaning the textfile and manipulating the data into the correct format
# Notice how this is just 1 string per line
services.take(2)

['#EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00']

In [40]:
# Notice the output is now an array of arrays
services.map(lambda line: line.split()).take(3)

[['#EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00']]

In [46]:
# Clean the # from the EventID
services.map(lambda line: line[1:] if line[0] == '#' else line).collect()

['EventId    Timestamp    Customer   State    ServiceID    Amount',
 '201       10/13/2017      100       NY       131          100.00',
 '204       10/18/2017      700       TX       129          450.00',
 '202       10/15/2017      203       CA       121          200.00',
 '206       10/19/2017      202       CA       131          500.00',
 '203       10/17/2017      101       NY       173          750.00',
 '205       10/19/2017      202       TX       121          200.00']

In [47]:
clean = services.map(lambda line: line[1:] if line[0] == '#' else line)

In [48]:
clean = clean.map(lambda line: line.split())

In [50]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

### Now that the data is cleaned up, review lambda expressions with bykey argument.

These methods will presume that the data is in some form of key, value pair


In [51]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

let's get total sales by state, using a *reduceByKey* statement

a note: when saying "lambda line:" the line is a list. 

return a tuple with the State column and Amount column as values

In [52]:
clean.map(lambda lst: (lst[3],lst[-1])).collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [53]:
pairs = clean.map(lambda lst: (lst[3],lst[-1]))

In [55]:
pairs.collect()

[('State', 'Amount'),
 ('NY', '100.00'),
 ('TX', '450.00'),
 ('CA', '200.00'),
 ('CA', '500.00'),
 ('NY', '750.00'),
 ('TX', '200.00')]

In [57]:
# reducebykey takes in a lambda expression and assumes you already have your data in a tuple format
# It also assumes that the very first item in the tuple is the key.
# it then performs the lambda expression as a Reduction from the item
rekey = pairs.reduceByKey(lambda amt1, amt2: amt1 + amt2)

In [58]:
# Notice that the amounts are still strings (numbers within '' quotes)
rekey.collect()

[('State', 'Amount'),
 ('NY', '100.00750.00'),
 ('TX', '450.00200.00'),
 ('CA', '200.00500.00')]

In [59]:
# fix that
rekey = pairs.reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2))

In [61]:
# Now we see total sales per state
rekey.collect()

[('State', 'Amount'), ('NY', 850.0), ('TX', 650.0), ('CA', 700.0)]

In [62]:
clean.collect()

[['EventId', 'Timestamp', 'Customer', 'State', 'ServiceID', 'Amount'],
 ['201', '10/13/2017', '100', 'NY', '131', '100.00'],
 ['204', '10/18/2017', '700', 'TX', '129', '450.00'],
 ['202', '10/15/2017', '203', 'CA', '121', '200.00'],
 ['206', '10/19/2017', '202', 'CA', '131', '500.00'],
 ['203', '10/17/2017', '101', 'NY', '173', '750.00'],
 ['205', '10/19/2017', '202', 'TX', '121', '200.00']]

#### Going through the steps

Take the clean data, tally amounts by state, remove the State tupe and sort the data

In [66]:
# Perform The Transformations
# grab states and amounts and return as tupe (adding () around the operative parameters)
step1 = clean.map(lambda lst: (lst[3],lst[-1]))
# reduce by key
step2 = step1.reduceByKey(lambda amt1, amt2: float(amt1) + float(amt2))
# get rid of State/Amount titles
step3 = step2.filter(lambda x: not x[0] =='State')
# sort results by amount
step4 = step3.sortBy(lambda stAmount: stAmount[1],ascending=False)


In [67]:
# Perform the action
step4.collect()

[('NY', 850.0), ('CA', 700.0), ('TX', 650.0)]

Use Tuple Unpacking for Readability

e.g

In [68]:
x = ['ID','State','Amount']

In [71]:
# Defining the earlier work of step1 as a function. Muddy.
def func1(lst):
    return lst[-1]

In [69]:
# Function 2 is much more readable
def func2(id_st_amt):
    #unpack the values
    (Id,st,amt) = (id_st_amt)
    return amt

In [72]:
func1(x)

'Amount'

In [73]:
func2(x)

'Amount'