# PySpark from Google colab

In this lab we will see and test the basic functionality of Spark, and how to upload dataset files in Google colab.

As you will see, installing pyspark is straightforward, so you should be easily able to install it on any computer at your disposal, with the command: pip install pyspark.

Now we start the notebook by installing pyspark

In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.1.1.tar.gz (212.3 MB)
[K     |████████████████████████████████| 212.3 MB 27.3 MB/s eta 0:00:01  |█▎                              | 8.4 MB 3.3 MB/s eta 0:01:03     |█████████████████▋              | 116.6 MB 31.0 MB/s eta 0:00:04
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 29.3 MB/s eta 0:00:01
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=8f1e9f9f91d1ab96ef4820fd6428be24c9147aa23d341ee35740527223abdafa
  Stored in directory: /Users/marcosplazagonzalez/Library/Caches/pip/wheels/b3/0e/81/264aeed961e43b9f6ba9ec81c8c540d2d7dccc52c6b51cbf22
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


## Get the dataset

In order to have a fast way to get the dataset we have prepared for this lab, we created a link to a file containing it in another google account, and written down all the necessary steps to get the file in the current path.

This file is 2007.csv, and contains information about flights during the year 2007.

Now, execute the following code cell.

In [3]:
!gdown --id "13yfm1bNdMBSaNp896pzrg4oCVd6JrVeP"
!unzip Spark_Tutorial1.zip
!bzip2 -d Spark_Tutorial1/2007.csv.bz2
!mv Spark_Tutorial1/2007.csv .
!rm -r __MACOSX
!rm Spark_Tutorial1.zip
!rm -r Spark_Tutorial1
!ls

Downloading...
From: https://drive.google.com/uc?id=13yfm1bNdMBSaNp896pzrg4oCVd6JrVeP
To: /Users/marcosplazagonzalez/Desktop/UB Ing. Informatica Cuarto/Segundo semestre/ProgramacioParalela/Lab/Lab3Spark/Spark_Tutorial1.zip
121MB [00:02, 41.3MB/s] 
Archive:  Spark_Tutorial1.zip
   creating: Spark_Tutorial1/
  inflating: Spark_Tutorial1/.DS_Store  
   creating: __MACOSX/
   creating: __MACOSX/Spark_Tutorial1/
  inflating: __MACOSX/Spark_Tutorial1/._.DS_Store  
  inflating: Spark_Tutorial1/2007.csv.bz2  
  inflating: __MACOSX/Spark_Tutorial1/._2007.csv.bz2  
  inflating: Spark_Tutorial1/README.txt  
  inflating: __MACOSX/Spark_Tutorial1/._README.txt  
  inflating: Spark_Tutorial1/tutorial1.pdf  
  inflating: __MACOSX/Spark_Tutorial1/._tutorial1.pdf  
  inflating: __MACOSX/._Spark_Tutorial1  
2007.csv        SparkLab1.ipynb


## Reading the file

In [4]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName('testSparkSession').getOrCreate()

df = spark.read.format("csv").option("header", "true").option("nullValue","NA").option("inferSchema", "true").load("2007.csv")

Now we loaded the entire file into a DataFrame named "df".

Next we will ask Spark to print the associated schema for the data. This means, the rows of the table and the type of the data contained on each row.

In [5]:
df.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- DepTime: integer (nullable = true)
 |-- CRSDepTime: integer (nullable = true)
 |-- ArrTime: integer (nullable = true)
 |-- CRSArrTime: integer (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- ActualElapsedTime: integer (nullable = true)
 |-- CRSElapsedTime: integer (nullable = true)
 |-- AirTime: integer (nullable = true)
 |-- ArrDelay: integer (nullable = true)
 |-- DepDelay: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- Dest: string (nullable = true)
 |-- Distance: integer (nullable = true)
 |-- TaxiIn: integer (nullable = true)
 |-- TaxiOut: integer (nullable = true)
 |-- Cancelled: integer (nullable = true)
 |-- CancellationCode: string (nullable = true)
 |-- Diverted: integer (nullable = true)

We can also ask Spark to tell us how many partitions has it made. Depending on the number of CPU cores of your system, this number will change.

The idea to make partitions in a single computer, is to allow each CPU core to process different data.

In [6]:
df.rdd.getNumPartitions()

6

## Basic operations

### Remove columns

Spark offers a simple transformation to do that. Remember that transformations are not inmediatly executed.

In [7]:
df2 = df.drop("FlightNum","TailNum","UniqueCarrier")

We can also do it in another way. Instead of removing columns, we can create a new dataframe, with only the columns we want to work with.

In [8]:
df2 = df.select("Origin", "Dest", "ArrDelay", "DepDelay")
df2.show()

+------+----+--------+--------+
|Origin|Dest|ArrDelay|DepDelay|
+------+----+--------+--------+
|   SMF| ONT|       1|       7|
|   SMF| PDX|       8|      13|
|   SMF| PDX|      34|      36|
|   SMF| PDX|      26|      30|
|   SMF| PDX|      -3|       1|
|   SMF| PDX|       3|      10|
|   SMF| PHX|      47|      56|
|   SMF| PHX|      -2|       9|
|   SMF| PHX|      44|      47|
|   SMF| PHX|      -7|       3|
|   SMF| PHX|     -11|       1|
|   SMF| PHX|      52|      52|
|   SMF| SAN|      45|      53|
|   SMF| SAN|     -17|      -5|
|   SMF| SAN|      -5|       6|
|   SMF| SAN|      33|      44|
|   SMF| SAN|      -9|       0|
|   SMF| SAN|      -7|       2|
|   SMF| SAN|     -11|       1|
|   SMF| SAN|      36|      29|
+------+----+--------+--------+
only showing top 20 rows



From now on, we will work with df2 contents, but before we want to remove all the entries with a NA value in the columns ArrDelay or DepDelay.

In [9]:
df3 = df2.na.drop()

### Add columns

We can add new columns and new information. For instance, in the dataframe we have information about the departure delay (DepDelay) and the arrival delay (ArrDelay). With that, we can create a new column composed by the addition of the two.

In [10]:
from pyspark.sql.functions import expr
df4 = df3.withColumn("SumDelay", expr("ArrDelay + DepDelay"))

Now we can check the results by executing an action, which will tirgger the execution of all the accumulated transformations.

In [11]:
df4.select("DepDelay", "ArrDelay", "SumDelay").show(10)

+--------+--------+--------+
|DepDelay|ArrDelay|SumDelay|
+--------+--------+--------+
|       7|       1|       8|
|      13|       8|      21|
|      36|      34|      70|
|      30|      26|      56|
|       1|      -3|      -2|
|      10|       3|      13|
|      56|      47|     103|
|       9|      -2|       7|
|      47|      44|      91|
|       3|      -7|      -4|
+--------+--------+--------+
only showing top 10 rows



In [15]:
df4.show(10)

+------+----+--------+--------+--------+
|Origin|Dest|ArrDelay|DepDelay|SumDelay|
+------+----+--------+--------+--------+
|   SMF| ONT|       1|       7|       8|
|   SMF| PDX|       8|      13|      21|
|   SMF| PDX|      34|      36|      70|
|   SMF| PDX|      26|      30|      56|
|   SMF| PDX|      -3|       1|      -2|
|   SMF| PDX|       3|      10|      13|
|   SMF| PHX|      47|      56|     103|
|   SMF| PHX|      -2|       9|       7|
|   SMF| PHX|      44|      47|      91|
|   SMF| PHX|      -7|       3|      -4|
+------+----+--------+--------+--------+
only showing top 10 rows



See that the action was executed very quickly! That is because the transformations where only applied to the first 10 entries of our DataFrame.

In case we are interested in knowing the maximum and minimum delays, it will take more time, because we need to traverse the entire DataFrame.

In [12]:
from pyspark.sql.functions import max, min
df4.select(max("SumDelay"),min("SumDelay")).show()

+-------------+-------------+
|max(SumDelay)|min(SumDelay)|
+-------------+-------------+
|         5199|         -617|
+-------------+-------------+



And we can also find the mean:

In [13]:
from pyspark.sql.functions import avg
df4.select(max("SumDelay"),min("SumDelay"),avg("SumDelay")).show()

+-------------+-------------+-----------------+
|max(SumDelay)|min(SumDelay)|    avg(SumDelay)|
+-------------+-------------+-----------------+
|         5199|         -617|21.55425998256014|
+-------------+-------------+-----------------+



### Storing intermadiate results

It is not mandatory to always execute all the transformations all the time. We can ask Spark to keep the results of all the transformations already present in a data frame. That way, the next transformations we add will continue from there.

The intermediate results will be stored either on RAM or hard disk. This will be decided by Spark.

In [14]:
df4.cache()

DataFrame[Origin: string, Dest: string, ArrDelay: int, DepDelay: int, SumDelay: int]

### Filter operations

Filter operations allow us to create new DataFrames that satisfy a condition over the data in another DataFrame.

There are two transformations that allow to do that: "where" and "filter".

In [16]:
df5 = df4.where("SumDelay < 0")
df5.show()

+------+----+--------+--------+--------+
|Origin|Dest|ArrDelay|DepDelay|SumDelay|
+------+----+--------+--------+--------+
|   SMF| PDX|      -3|       1|      -2|
|   SMF| PHX|      -7|       3|      -4|
|   SMF| PHX|     -11|       1|     -10|
|   SMF| SAN|     -17|      -5|     -22|
|   SMF| SAN|      -9|       0|      -9|
|   SMF| SAN|      -7|       2|      -5|
|   SMF| SAN|     -11|       1|     -10|
|   SMF| SAN|      -6|       3|      -3|
|   SMF| SAN|     -14|       0|     -14|
|   SMF| SAN|      -9|      -5|     -14|
|   SMF| SNA|      -4|       0|      -4|
|   SMF| SNA|      -8|       2|      -6|
|   SMF| SNA|     -16|      -4|     -20|
|   SMF| SNA|      -7|       0|      -7|
|   SMF| SNA|     -15|      -4|     -19|
|   SNA| MDW|     -18|       0|     -18|
|   SNA| OAK|      -1|       0|      -1|
|   SNA| OAK|       0|      -1|      -1|
|   SNA| OAK|      -2|       0|      -2|
|   SNA| OAK|      -1|       0|      -1|
+------+----+--------+--------+--------+
only showing top

Now we can check how many flights land before time.

In [17]:
df3.count()
df5.count()

3676937

We can also apply more than one filter, one after the other.

In [18]:
df5 = df4.where("SumDelay < 0").where("Origin == 'JFK'")
df5.show()
df5.count()

+------+----+--------+--------+--------+
|Origin|Dest|ArrDelay|DepDelay|SumDelay|
+------+----+--------+--------+--------+
|   JFK| CLE|     -13|     -19|     -32|
|   JFK| CLE|     -25|      -3|     -28|
|   JFK| CLE|     -25|      -8|     -33|
|   JFK| CLE|       4|     -14|     -10|
|   JFK| CLE|     -17|      -2|     -19|
|   JFK| CLE|      -4|      -3|      -7|
|   JFK| CLE|      -4|      -3|      -7|
|   JFK| CLE|      -1|      -8|      -9|
|   JFK| CLE|     -17|     -10|     -27|
|   JFK| CLE|     -30|      -9|     -39|
|   JFK| CLE|       3|      -5|      -2|
|   JFK| CLE|      -6|     -10|     -16|
|   JFK| CLE|     -20|      -4|     -24|
|   JFK| CLE|     -12|      -6|     -18|
|   JFK| CLE|       5|      -8|      -3|
|   JFK| IAD|     -15|      -3|     -18|
|   JFK| IAD|      -4|       1|      -3|
|   JFK| IAD|     -13|     -16|     -29|
|   JFK| IAD|      -7|       6|      -1|
|   JFK| CLT|      -8|       0|      -8|
+------+----+--------+--------+--------+
only showing top

53321

The "filter" transformation is very similar to "where".

We can do the same operation whe previously did, but using "filter" instead of "where".

In [19]:
from pyspark.sql.functions import col
i = 0
city = "JFK"
df5 = df4.filter(col("SumDelay") < i).filter(col("Origin") == city)
df5.count()

53321

#### Test

Could you find the total, maximum, minimum and average delay for flights that start from 'JFK' airport? 

In [24]:
from pyspark.sql.functions import sum
# your code here
df_test = df4.filter(col("Origin") == city).select(sum("SumDelay"), max("SumDelay"),min("SumDelay"),avg("SumDelay"))
df_test.show()

+-------------+-------------+-------------+-----------------+
|sum(SumDelay)|max(SumDelay)|min(SumDelay)|    avg(SumDelay)|
+-------------+-------------+-------------+-----------------+
|      4290965|         3111|          -90|35.16175687302823|
+-------------+-------------+-------------+-----------------+



## Sort operations



In [25]:
from pyspark.sql.functions import asc, desc
df5 = df4.sort(asc("SumDelay"))

Here we sorted the data by the total delay.

This is an ascendent ordering, from minor to major.

Again, this is a transformation, and is not executed until we execute an action like "show".

In [26]:
df5.show()

+------+----+--------+--------+--------+
|Origin|Dest|ArrDelay|DepDelay|SumDelay|
+------+----+--------+--------+--------+
|   AKN| ANC|    -312|    -305|    -617|
|   CHA| ATL|    -175|    -165|    -340|
|   ANC| FAI|    -162|    -165|    -327|
|   AUS| ATL|    -132|    -124|    -256|
|   ATL| AVL|    -116|    -111|    -227|
|   ANC| SEA|     -13|    -168|    -181|
|   SAN| OKC|    -157|     -19|    -176|
|   HNL| KOA|     -89|     -82|    -171|
|   SFO| HNL|       8|    -169|    -161|
|   ADK| ANC|     -83|     -72|    -155|
|   SJC| SBA|     -82|     -67|    -149|
|   ITO| HNL|     -74|     -71|    -145|
|   ADK| ANC|     -78|     -67|    -145|
|   GNV| ATL|     -82|     -62|    -144|
|   ANC| SEA|      -5|    -137|    -142|
|   PHL| SJU|     -79|     -60|    -139|
|   KOA| HNL|     -72|     -65|    -137|
|   ADK| ANC|     -79|     -58|    -137|
|   YAK| JNU|     -73|     -64|    -137|
|   DHN| ATL|     -75|     -60|    -135|
+------+----+--------+--------+--------+
only showing top

We can also do the ordering in descendent order. But in this case we will ask only the first 5 elements.

In [27]:
df5 = df4.sort(desc("SumDelay")).limit(5)
df5.show()

+------+----+--------+--------+--------+
|Origin|Dest|ArrDelay|DepDelay|SumDelay|
+------+----+--------+--------+--------+
|   PBI| DTW|    2598|    2601|    5199|
|   ALO| MSP|    1942|    1956|    3898|
|   HNL| MSP|    1848|    1831|    3679|
|   FWA| DTW|    1715|    1736|    3451|
|   FAI| MSP|    1665|    1689|    3354|
+------+----+--------+--------+--------+



### Obtaining unique elements

How many different airports are in the DataFrame?

In [28]:
df5 = df4.select("Origin").distinct()
df5.count()

304

#### Test

How many destinations are there?

In [30]:
# Your code here
df_test2 = df4.select("Dest").distinct()
df_test2.count()

304

Additionally, what does the following operation do?

In [31]:
df4.select("Origin","Dest").distinct().count()

5032

#### Answering the last question:
```
df4.select("Origin","Dest").distinct().count()
```
Here we are summing the number of distinct combination of "Origin" "Dest" pairs. In total it gives 5032.

### Accessing data from python

We will show how to dump df4 into a python variable and how to read it.

In [36]:
dades = df4.limit(5).collect()
dades

[Row(Origin='SMF', Dest='ONT', ArrDelay=1, DepDelay=7, SumDelay=8),
 Row(Origin='SMF', Dest='PDX', ArrDelay=8, DepDelay=13, SumDelay=21),
 Row(Origin='SMF', Dest='PDX', ArrDelay=34, DepDelay=36, SumDelay=70),
 Row(Origin='SMF', Dest='PDX', ArrDelay=26, DepDelay=30, SumDelay=56),
 Row(Origin='SMF', Dest='PDX', ArrDelay=-3, DepDelay=1, SumDelay=-2)]

In [37]:
dades[0]

Row(Origin='SMF', Dest='ONT', ArrDelay=1, DepDelay=7, SumDelay=8)

In [38]:
dades[0][3]

7

## Writing into files

We have seen how to read a csv file from disc. Now we will see how to do the opposite.

Let's save df4 into disk. Remember it has n partitions.

In [39]:
df4.rdd.getNumPartitions()

6

By default if we store df4 into a file, we would not obtain a single file, but as many files as partitions are there.

To have all the data in a single frame we can use "coalesce"

In [40]:
df4_one = df4.coalesce(1)
df4_one.rdd.getNumPartitions()

1

df4_one has all the data in a single partition

Now we can save it to a file.

In [41]:
df4_one.write.csv('df4_one.csv')

### Test

What happens if you try to sort the elements in df4_one? Will it be slower?

Does the number of CPU cores affect the result?

In [60]:
# Your code here
import time

t = time.process_time()
df_beforemerge = df4.sort(desc("SumDelay")).limit(5)
elapsed_time = time.process_time() - t

In [61]:
print("Elapsed time before merging partitions is {}".format(elapsed_time))
print(df4.rdd.getNumPartitions())
df_beforemerge.show()

Elapsed time before merging partitions is 0.0061909999999993914
6
+------+----+--------+--------+--------+
|Origin|Dest|ArrDelay|DepDelay|SumDelay|
+------+----+--------+--------+--------+
|   PBI| DTW|    2598|    2601|    5199|
|   ALO| MSP|    1942|    1956|    3898|
|   HNL| MSP|    1848|    1831|    3679|
|   FWA| DTW|    1715|    1736|    3451|
|   FAI| MSP|    1665|    1689|    3354|
+------+----+--------+--------+--------+



In [62]:
t2 = time.process_time()
df_aftermerge = df4_one.sort(desc("SumDelay")).limit(5)
elapsed_time2 = time.process_time() - t2

In [63]:
print("Elapsed time after merging partitions is {}".format(elapsed_time2))
print(df4_one.rdd.getNumPartitions())
df_aftermerge.show()

Elapsed time after merging partitions is 0.005943999999999505
1
+------+----+--------+--------+--------+
|Origin|Dest|ArrDelay|DepDelay|SumDelay|
+------+----+--------+--------+--------+
|   PBI| DTW|    2598|    2601|    5199|
|   ALO| MSP|    1942|    1956|    3898|
|   HNL| MSP|    1848|    1831|    3679|
|   FWA| DTW|    1715|    1736|    3451|
|   FAI| MSP|    1665|    1689|    3354|
+------+----+--------+--------+--------+



It seems that the sorting time in the Data frame with one single partition is faster.