# GBA 6430 - Big Data Technology in Business
# Dr. Mohammad Salehan
# Spark Transformations

`SparkContext` is automatically stored in a variable named `sc`.

<img align="center" src="https://cis4567-salehan.s3.amazonaws.com/img/Spark+Cluster.png">

In [None]:
sc.version

`Spark Session` is stored in variable `spark`.

In [2]:
spark

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f4fd70c5370>

## SparkContext.parallelize()
`sc.parallelize()` method is the SparkContext's parallelize method to create a parallelized collection. This allows Spark
to distribute the data across multiple nodes, instead of depending on a single node to process the data.

In [5]:
myRDD = sc.parallelize(
[('Mike', 19), ('June', 18), ('Rachel',16), ('Rob', 18), ('Scott', 17), ('Jack', 25)])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

* `collect` <b>action</b> will return all values in the RDD from the Spark worker nodes to the driver
* A Spark driver (aka an application’s driver process) is a `JVM` process that hosts `SparkContext` for a `Spark application`. It is the master node in a Spark application.
* There are performance implications when working with a large amount of data as this translates to large volumes of data being transferred from the Spark worker nodes to the driver.

In [6]:
myRDD.collect()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('Mike', 19), ('June', 18), ('Rachel', 16), ('Rob', 18), ('Scott', 17), ('Jack', 25)]

take(n) method returns the first n elements of the RDD instead of the whole dataset

In [7]:
myRDD.take(3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('Mike', 19), ('June', 18), ('Rachel', 16)]

In [6]:
myRDD.getNumPartitions() #returns the number of partitions the data was split into

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2

The lower bound for number of partitions is 2 x number of cores.

## Reading Data From Files
<table align='left'><tr><td>Storage type <td>Example
<tr><td>Local files <td>sc.textFile('/local folder/filename.csv')
<tr><td>Hadoop HDFS <td>sc.textFile('hdfs://folder/filename.csv')
<tr><td>S3<td>sc.textFile('s3://bucket/folder/filename.csv')

In [8]:
airports = sc.textFile('s3://cis4567-salehan/Spark/Data/airport-codes-na.txt') 
# you may need to update this address based on file location on your system
airports.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['City\tState\tCountry\tIATA', 'Abbotsford\tBC\tCanada\tYXX', 'Aberdeen\tSD\tUSA\tABR', 'Abilene\tTX\tUSA\tABI', 'Akron\tOH\tUSA\tCAK']

In [9]:
airports.getNumPartitions() # The default number for text files is 2

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2

In [10]:
airports.count() #each line is one item

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

527

## Transformations
- Narrow transformation – In Narrow transformation, all the elements that are required to compute the records in single partition live in the single partition of parent `RDD`. A limited subset of partition is used to calculate the result. Narrow transformations are the result of `map()`, `filter()`.
- Wide transformation – In wide transformation, all the elements that are required to compute the records in the single partition may live in many partitions of parent `RDD`. The partition may live in many partitions of parent RDD. Wide transformations are the result of `groupbyKey()` and `reducebyKey()`.
<table><tr><td><img src='https://cis4567-salehan.s3.amazonaws.com/img/spark-narrow-transformation-2.png'></td><td><img src='https://cis4567-salehan.s3.amazonaws.com/img/spark-wide-transformation-1.png'/></td></tr></table>

### map function
use the `.map()`function to transform the data from a list of strings to a list of lists  
`lambda` is an anonymous function (that is, a function defined without a name) composed of a single expression  
The following code replaces each line by its list of words split by TAB

In [11]:
airports = airports.map(
    lambda line: line.split("\t"))
airports.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[['City', 'State', 'Country', 'IATA'], ['Abbotsford', 'BC', 'Canada', 'YXX'], ['Aberdeen', 'SD', 'USA', 'ABR'], ['Abilene', 'TX', 'USA', 'ABI'], ['Akron', 'OH', 'USA', 'CAK']]

In [12]:
flights = sc.textFile('s3://cis4567-salehan/Spark/Data/departuredelays.csv') 
flights.getNumPartitions()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2

In [13]:
flights.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['date,delay,distance,origin,destination', '01011245,6,602,ABE,ATL', '01020600,-8,369,ABE,DTW', '01021245,-2,602,ABE,ATL', '01020605,-4,602,ABE,ATL']

In [14]:
flights = sc.textFile(
    's3://cis4567-salehan/Spark/Data/departuredelays.csv', 
    minPartitions=8).map(
    lambda line: line.split(","))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
def split_by_comma (line):
    return line.split(",")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
sc.textFile(
    's3://cis4567-salehan/Spark/Data/departuredelays.csv', 
    minPartitions=8).map(split_by_comma).take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[['date', 'delay', 'distance', 'origin', 'destination'], ['01011245', '6', '602', 'ABE', 'ATL'], ['01020600', '-8', '369', 'ABE', 'DTW'], ['01021245', '-2', '602', 'ABE', 'ATL'], ['01020605', '-4', '602', 'ABE', 'ATL']]

In [16]:
flights.getNumPartitions()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8

In [17]:
# Use map() to extract out the first two columns
airports.map(lambda c: (c[0], c[1])).take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('City', 'State'), ('Abbotsford', 'BC'), ('Aberdeen', 'SD'), ('Abilene', 'TX'), ('Akron', 'OH')]

### filter function
The `filter`(f) transformation returns a new `RDD` based on selecting elements for which the f function returns true

In [20]:
# User filter() to filter where second column == "WA"
#use \ to create multiline statement

airports\
.map(lambda c: (c[0], c[1]))\
.filter(lambda c: c[1] == "WA")\
.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('Bellingham', 'WA'), ('Moses Lake', 'WA'), ('Pasco', 'WA'), ('Pullman', 'WA'), ('Seattle', 'WA')]

In [17]:
#Here we do the thing as above without a lambda function
def f(x):
    if x[1] == "WA":
        return True
    return False

(
airports
.map(lambda c: (c[0], c[1]))
.filter(f)
.take(5)
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('Bellingham', 'WA'), ('Moses Lake', 'WA'), ('Pasco', 'WA'), ('Pullman', 'WA'), ('Seattle', 'WA')]

## Class exercise
Filter airports RDD to include airports from Washington and California.

In [19]:
def f(x):
    if x[1] == "WA":
        return True
    elif x[1] == "CA":
        return True
    return False

(
airports
.map(lambda c: (c[0], c[1]))
.filter(f)
.collect()
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('Bakersfield', 'CA'), ('Bellingham', 'WA'), ('Burbank', 'CA'), ('Carlsbad', 'CA'), ('Chico', 'CA'), ('El Centro', 'CA'), ('Eureka', 'CA'), ('Fresno', 'CA'), ('Inyokern', 'CA'), ('Long Beach', 'CA'), ('Los Angeles', 'CA'), ('Los Angeles', 'CA'), ('Modesto', 'CA'), ('Monterey', 'CA'), ('Moses Lake', 'WA'), ('Oakland', 'CA'), ('Ontario', 'CA'), ('Orange County', 'CA'), ('Oxnard', 'CA'), ('Palm Springs', 'CA'), ('Pasco', 'WA'), ('Pullman', 'WA'), ('Redding', 'CA'), ('Sacramento', 'CA'), ('San Diego', 'CA'), ('San Francisco', 'CA'), ('San Francisco', 'CA'), ('San Jose', 'CA'), ('San Luis Obispo', 'CA'), ('Santa Barbara', 'CA'), ('Santa Maria', 'CA'), ('Santa Rosa', 'CA'), ('Seattle', 'WA'), ('Spokane', 'WA'), ('Visalia', 'CA'), ('Walla Walla', 'WA'), ('Wenatchee', 'WA'), ('Yakima', 'WA')]

In [22]:
airports\
.map(lambda c: (c[0], c[1]))\
.filter(lambda c: c[1] in ("WA","CA"))\
.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('Bakersfield', 'CA'), ('Bellingham', 'WA'), ('Burbank', 'CA'), ('Carlsbad', 'CA'), ('Chico', 'CA')]

### distinct function
The `distinct()` transformation returns a new RDD containing the distinct elements of the
source RDD.

In [23]:
# Provide the distinct elements for the
# third column of airports representing
# countries
(
airports
.map(lambda c: c[2])
.distinct()
.take(5)
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['Country', 'USA', 'Canada']

## Class exercise
Retrieve distinct values for state from airports RDD.

In [25]:
(
airports
.map(lambda c: c[1])
.distinct()
.collect()
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

['State', 'BC', 'SD', 'TX', 'OH', 'CO', 'NY', 'NM', 'LA', 'NE', 'AK', 'WI', 'NWT', 'NC', 'ME', 'PQ', 'CA', 'WV', 'MO', 'WY', 'SC', 'VA', 'PEI', 'TN', 'MB', 'NF', 'KS', 'AR', 'NS', 'HI', 'CT', 'Hawaii', 'RI', '', 'YT', 'DE', 'GA', 'PA', 'MI', 'NJ', 'MD', 'MA', 'WA', 'MN', 'MT', 'AL', 'ND', 'IL', 'IN', 'ID', 'IA', 'VT', 'AB', 'MS', 'FL', 'ON', 'NV', 'OK', 'OR', 'AZ', 'NB', 'Nunavut', 'NH', 'KY', 'SK', 'UT']

### sample function
The `sample(withReplacement, fraction, seed)` transformation samples a fraction of the data, with or without replacement (the withReplacement parameter), based on a random seed.

In [26]:
flights.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1391579

In [27]:
# Provide a sample based on 0.001 of
# flights RDD data specific to the fourth
# column (origin city of flight)
# without replacement (False) using random
# seed of 123
(
flights
.map(lambda c: c[3])
.sample(False, 0.001, 123)
.count()
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1405

`sample` function doesn't return the same sample size because spark internally uses something called `Bernoulli Ssampling` for taking the sample. The fraction argument doesn't represent the fraction of the actual size of the RDD.

### repartition function
The `repartition(n)` transformation repartitions the RDD into n partitions by randomly
reshuffling and uniformly distributing data across the network. As noted in the preceding
recipes, this can improve performance by running more parallel threads concurrently.

In [28]:
#The flights RDD has 8 partitions
flights.getNumPartitions()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

8

In [29]:
# Let's re-partition this to 8 so we can have 8
# partitions
flights2 = flights.repartition(16)
# Checking the number of partitions for the flights2 RDD
flights2.getNumPartitions()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

16

To remove the header from your data, you can use the following code:

In [30]:
header = airports.first()
airports = airports.filter(lambda row: row != header)
airports.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[['Abbotsford', 'BC', 'Canada', 'YXX'], ['Aberdeen', 'SD', 'USA', 'ABR'], ['Abilene', 'TX', 'USA', 'ABI'], ['Akron', 'OH', 'USA', 'CAK'], ['Alamosa', 'CO', 'USA', 'ALS']]

In [31]:
header = flights.first()
flights = flights.filter(lambda row: row != header)
flights.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[['01011245', '6', '602', 'ABE', 'ATL'], ['01020600', '-8', '369', 'ABE', 'DTW'], ['01021245', '-2', '602', 'ABE', 'ATL'], ['01020605', '-4', '602', 'ABE', 'ATL'], ['01031245', '-4', '602', 'ABE', 'ATL']]

In [32]:
flights.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1391578

### reduceByKey function
- The `reduceByKey(f)` transformation reduces the elements of the `RDD` using f function and a key.
- The f function should be commutative and associative so that it can be computed correctly
in parallel.

In [33]:
# Determine sum of delays by originating city
(
flights
.map(lambda c: (c[3], int(c[1]))) #map to (destination, delay)
.reduceByKey(lambda x, y: x + y) #the first element is used as key
.take(5)
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('ACT', 392), ('BMI', 7817), ('BPT', 1936), ('BZN', 7226), ('CRP', 10579)]

### sortByKey function
The `sortByKey(asc=True)` transformation orders (key, value) RDD by key and returns an RDD in
ascending or descending order.

In [37]:
# Takes the origin code and delays
# runs a group by origin code via reduceByKey()
# sorting by the key (origin code)
(
flights
.map(lambda c: (c[3], int(c[1])))
.reduceByKey(lambda x, y: x + y)
.sortByKey(False)
.take(10)
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('YUM', 1546), ('YAK', -1190), ('XNA', 31616), ('WRG', -450), ('VPS', 11967), ('VLD', 2598), ('TYS', 25690), ('TYR', 2358), ('TXK', 419), ('TWF', -449)]

### sortBy function
The `sortBy(f, asc=True)` transformation orders (key, value) RDD using the specified function `f` and returns an RDD in
ascending or descending order.

In [36]:
(
flights
.map(lambda c: (c[3], int(c[1])))
.reduceByKey(lambda x, y: x + y)
.sortBy(lambda x: x[1], False)
.take(10)
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('ORD', 1193929), ('ATL', 1151087), ('DEN', 899406), ('DFW', 679249), ('IAH', 587978), ('LAX', 565490), ('SFO', 501670), ('EWR', 452791), ('MCO', 445070), ('LAS', 441797)]

## Class exercise
Show the top 5 airports with the most flights.

In [41]:
(
flights
.map(lambda c: (c[3], 1))
.reduceByKey(lambda x, y: x + y)
.sortBy(lambda x: x[1],False)
.take(10)
)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('ATL', 91484), ('DFW', 68482), ('ORD', 64228), ('LAX', 54086), ('DEN', 53148), ('IAH', 43361), ('PHX', 40155), ('SFO', 39483), ('LAS', 33107), ('CLT', 28402)]

transformation takes an existing `RDD` and transforms it into one or more output `RDDs`. It is also a lazy process that is not initiated until an action is executed (e.g., take).

In [42]:
# Flights data
# e.g. (u'JFK', u'01010900')
flt = flights.map(lambda c: (c[3], c[0]))
# Airports data
# e.g. (u'JFK', u'NY')
air = airports.map(lambda c: (c[3], c[1]))
# Execute inner join between RDDs
output = flt.join(air)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
output.take(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('AVP', ('01010640', 'PA')), ('AVP', ('01010550', 'PA')), ('AVP', ('01020545', 'PA')), ('AVP', ('01020600', 'PA')), ('AVP', ('01021653', 'PA'))]