<a href="https://colab.research.google.com/github/suriarasai/BEAD2024/blob/main/colab/05_FlightDelays_using_RDD.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Resilient Distributed Data Set
Resilient Distributed Datasets (RDDs) are collections of immutable JVM objects that are distributed across an Apache Spark cluster. An RDD is the fundamental dataset type of Apache Spark; any action on a Spark DataFrame eventually gets translated into a highly optimized execution of transformations and actions on RDDs.

## Setup PySpark
Spark requires a handful of environments to be present on the machine before we can use it. The below codes help to install pyspark and related tools.

In [None]:
# install pyspark using pip
!pip install --ignore-install -q pyspark
# install findspark using pip
!pip install --ignore-install -q findspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m10.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


## Spark Session
The below codes are used to create a Spark session object, and also set up related UI port.

In [None]:
#from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
import collections
spark = SparkSession.builder.master("local").appName("Flight Delay").config('spark.ui.port', '4050').getOrCreate()


## Creating RDD
There are two ways to create an RDD in PySpark: we can either use the parallelize() method—a collection (list or an array of some elements) or reference a file (or files) located either locally or through an external source, as noted in subsequent recipes.

Parallelized collections are created by calling SparkContext's parallelize method on an existing iterable or collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel.

The following code snippet creates RDD (myRDD) using the sc.parallelize() method:

In [None]:
myRDD = spark.sparkContext.parallelize([('Suria', 21), ('Venkat', 18), ('Liu Fan',16), ('Bob', 18), ('Scott', 17)])
myRDD.take(5)

[('Suria', 21), ('Venkat', 18), ('Liu Fan', 16), ('Bob', 18), ('Scott', 17)]


Reading data from a file involves file besing stored locally or in hadoop or AWS S3 or Azure WASB or Google Cloud Storage or Data Bricks storage. Examples for the same are provided below:
1. sc.textFile('/local folder/filename.csv')
2. sc.textFile('hdfs://folder/filename.csv')
3. sc.textFile('s3://bucket/folder/filename.csv')
4. sc.textFile('wasb://bucket/folder/filename.csv')
5. sc.textFile('gs://bucket/folder/filename.csv')
6. sc.textFile('dbfs://folder/filename.csv')

For this workshop, please use the data from GIT two files (airport-codes-na.txt and departuredelays.csv).

Mount them into your google drive using the below file upload button.

Note that the files can be copied from this [URL](https://github.com/suriarasai/BEAD2024/tree/main/data).

To demonstrate the upload facility we will upload file airport-codes-na.txt via file explorer.



In [None]:
from google.colab import files
uploaded = files.upload()

Saving airport-codes-na.txt to airport-codes-na.txt


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Alternatively, you can also upload the folders in your mounted google drive as shown below.

In [None]:
airportRDD = spark.sparkContext.textFile("/content/drive/MyDrive/data/airport-data/airport-codes-na.txt").map(lambda element: element.split("\t"))



## Process the RDD
Now since we are treating the data as a text file, we need to strip of the header rown in order to parse the rest of the records.

In [None]:
airportRDD.take(5)


[['City', 'State', 'Country', 'IATA'],
 ['Abbotsford', 'BC', 'Canada', 'YXX'],
 ['Aberdeen', 'SD', 'USA', 'ABR'],
 ['Abilene', 'TX', 'USA', 'ABI'],
 ['Akron', 'OH', 'USA', 'CAK']]

Let us take a look at the total number of records.

In [None]:
airportRDD.count()


527

Below code returns the number of partitions in RDD

In [None]:
airportRDD.getNumPartitions()

1

In [None]:
# Setup the RDD: flights
flights = (
    spark.sparkContext
    .textFile('/content/drive/MyDrive/data/airport-data/departuredelays.csv')
    .map(lambda element: element.split(","))
)

In [None]:
flights.take(5)

[['date', 'delay', 'distance', 'origin', 'destination'],
 ['01011245', '6', '602', 'ABE', 'ATL'],
 ['01020600', '-8', '369', 'ABE', 'DTW'],
 ['01021245', '-2', '602', 'ABE', 'ATL'],
 ['01020605', '-4', '602', 'ABE', 'ATL']]

# Exploration
Let us try some transformation functions on the Data Set.

Airports in Washington State

In [None]:
# User filter() to filter where second column == "WA"
(
    airportRDD
    .map(lambda c: (c[0], c[1]))
    .filter(lambda c: c[1] == "WA")
    .take(5)
)

[('Bellingham', 'WA'),
 ('Moses Lake', 'WA'),
 ('Pasco', 'WA'),
 ('Pullman', 'WA'),
 ('Seattle', 'WA')]

This time we try the flatmap.

In [None]:
# Filter only second column == "WA",
# select first two columns within the RDD,
# and flatten out all values
(
    airportRDD
    .filter(lambda c: c[1] == "WA")
    .map(lambda c: (c[0], c[1]))
    .flatMap(lambda x: x)
    .take(10)
)

['Bellingham',
 'WA',
 'Moses Lake',
 'WA',
 'Pasco',
 'WA',
 'Pullman',
 'WA',
 'Seattle',
 'WA']

Third column represent the countries and the unique countries this data set covers are:

In [None]:
# Provide the distinct elements for the
# third column of airports representing
# countries
(
    airportRDD
    .map(lambda c: c[2])
    .distinct()
    .take(5)
)

['Country', 'Canada', 'USA']

List of starting airports this  flight data covers.

In [None]:
# Provide a sample based on 0.001% the
# flights RDD data specific to the fourth
# column (origin city of flight)
# without replacement (False) using random
# seed of 123
(
    flights
    .map(lambda c: c[3])
    .sample(False, 0.001, 123)
    .take(5)
)

['ABQ', 'AEX', 'AGS', 'ANC', 'ATL']

Joining data related to KFK airport and newyork.

In [None]:
# Flights data
#  e.g. (u'JFK', u'01010900')
flt = flights.map(lambda c: (c[3], c[0]))

# Airports data
# e.g. (u'JFK', u'NY')
air = airportRDD.map(lambda c: (c[3], c[1]))

# Execute inner join between RDDs
flt.join(air).take(5)

[('ABE', ('01011245', 'PA')),
 ('ABE', ('01020600', 'PA')),
 ('ABE', ('01021245', 'PA')),
 ('ABE', ('01020605', 'PA')),
 ('ABE', ('01031245', 'PA'))]

Programatically controlling partitions:

In [None]:
# Let's re-partition this to 8 so we can have 8
# partitions
flights2 = flights.repartition(8)

# Checking the number of partitions for the flights2 RDD
flights2.getNumPartitions()


8

Add index column to existing RDD.

In [None]:
# View each row within RDD + the index
# i.e. output is in form ([row], idx)
ac = airportRDD.map(lambda c: (c[0], c[3]))
ac.zipWithIndex().take(5)

[(('City', 'IATA'), 0),
 (('Abbotsford', 'YXX'), 1),
 (('Aberdeen', 'ABR'), 2),
 (('Abilene', 'ABI'), 3),
 (('Akron', 'CAK'), 4)]

Join Airports in the Washington State and British Columbia

In [None]:
# Create `a` RDD of Washington airports
a = (
    airportRDD
    .zipWithIndex()
    .filter(lambda row, idx : idx > 0)
    .map(lambda row, idx: row)
    .filter(lambda c: c[1] == "WA")
)

# Create `b` RDD of British Columbia airports
b = (
    airportRDD
    .zipWithIndex()
    .filter(lambda row, idx: idx > 0)
    .map(lambda row, idx: row)
    .filter(lambda c: c[1] == "BC")
)

# Union WA and BC airports
a.union(b)


UnionRDD[50] at union at NativeMethodAccessorImpl.java:0

In [None]:
# Flights data
#  e.g. (u'JFK', u'01010900')
flt = flights.map(lambda c: (c[3], c[0]))

# Airports data
# e.g. (u'JFK', u'NY')
air = airportRDD.map(lambda c: (c[3], c[1]))

# Execute inner join between RDDs
flt.join(air).take(5)

[('ABE', ('01011245', 'PA')),
 ('ABE', ('01020600', 'PA')),
 ('ABE', ('01021245', 'PA')),
 ('ABE', ('01020605', 'PA')),
 ('ABE', ('01031245', 'PA'))]

In [None]:
# Setup the RDD: airports
airports = (
    spark.sparkContext
    .textFile('/content/drive/MyDrive/data/airport-data/airport-codes-na.txt')
    .map(lambda element: element.split("\t"))
)

airports.take(5)

[['City', 'State', 'Country', 'IATA'],
 ['Abbotsford', 'BC', 'Canada', 'YXX'],
 ['Aberdeen', 'SD', 'USA', 'ABR'],
 ['Abilene', 'TX', 'USA', 'ABI'],
 ['Akron', 'OH', 'USA', 'CAK']]

In [None]:
# Setup the RDD: flights
flights2 = (
    spark.sparkContext
     .textFile('/content/drive/MyDrive/data/airport-data/departuredelays.csv', minPartitions=8)
    .map(lambda line: line.split(","))
)

flights2.take(5)


[['date', 'delay', 'distance', 'origin', 'destination'],
 ['01011245', '6', '602', 'ABE', 'ATL'],
 ['01020600', '-8', '369', 'ABE', 'DTW'],
 ['01021245', '-2', '602', 'ABE', 'ATL'],
 ['01020605', '-4', '602', 'ABE', 'ATL']]

In [None]:
# Print to console the first 3 elements of
# the airports RDD
airportRDD.take(3)

[['City', 'State', 'Country', 'IATA'],
 ['Abbotsford', 'BC', 'Canada', 'YXX'],
 ['Aberdeen', 'SD', 'USA', 'ABR']]

In [None]:
# Return all airports elements
# filtered by WA state
airportRDD.filter(lambda c: c[1] == "WA").collect()

[['Bellingham', 'WA', 'USA', 'BLI'],
 ['Moses Lake', 'WA', 'USA', 'MWH'],
 ['Pasco', 'WA', 'USA', 'PSC'],
 ['Pullman', 'WA', 'USA', 'PUW'],
 ['Seattle', 'WA', 'USA', 'SEA'],
 ['Spokane', 'WA', 'USA', 'GEG'],
 ['Walla Walla', 'WA', 'USA', 'ALW'],
 ['Wenatchee', 'WA', 'USA', 'EAT'],
 ['Yakima', 'WA', 'USA', 'YKM']]

In [None]:
# Return all airports elements
# filtered by WA state
airportRDD.filter(lambda c: c[1] == "WA").collect()

[['Bellingham', 'WA', 'USA', 'BLI'],
 ['Moses Lake', 'WA', 'USA', 'MWH'],
 ['Pasco', 'WA', 'USA', 'PSC'],
 ['Pullman', 'WA', 'USA', 'PUW'],
 ['Seattle', 'WA', 'USA', 'SEA'],
 ['Spokane', 'WA', 'USA', 'GEG'],
 ['Walla Walla', 'WA', 'USA', 'ALW'],
 ['Wenatchee', 'WA', 'USA', 'EAT'],
 ['Yakima', 'WA', 'USA', 'YKM']]

In [None]:
# Calculate the total delays of flights
# between SEA (origin) and SFO (dest),
# convert delays column to int
# and summarize
flights\
 .filter(lambda c: c[3] == 'SEA' and c[4] == 'SFO')\
 .map(lambda c: int(c[1]))\
 .reduce(lambda x, y: x + y)

22293

# Write From RDD To File
PySpark SequenceFile support loads an RDD of key-value pairs within Java, converts Writables to base Java types, and pickles the resulting Java objects using pickle. When saving an RDD of key-value pairs to SequenceFile, PySpark does the reverse. It unpickles Python objects into Java objects and then converts them to Writables. The following Writables are automatically converted:

0. Writable /	Python Type
1. Text	str
2. IntWritable	int
3. FloatWritable	float
4. DoubleWritable	float
5. BooleanWritable	bool
6. BytesWritable	bytearray
7. NullWritable	None
8. MapWritable	dict

Similarly to text files, SequenceFiles can be saved and loaded by specifying the path. The key and value classes can be specified, but for standard Writables this is not required.

In [None]:
rdd = spark.sparkContext.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
rdd.saveAsSequenceFile("/content/drive/MyDrive/data/airport-data/sample.txt")

# Closing Comments
RDDs are the most basic and low-level API, providing more control over the data but with lower-level optimizations. Also, RDD is slower than both Dataframes and Datasets to perform simple operations like grouping the data. Because RDD lacks query optimization and schema inferences.

However, DataFrames provide a higher-level API that is optimized for performance and easier to work with for structured data.
Workshop Ends Here