# PySpark from Google colab

In this lab we will see and test the basic functionality of Spark, and how to upload dataset files in Google colab.

As you will see, installing pyspark is straightforward, so you should be easily able to install it on any computer at your disposal, with the command: pip install pyspark.

Now we start the notebook by installing pyspark

In [None]:
!pip install pyspark

## Get the dataset

In order to have a fast way to get the dataset we have prepared for this lab, we created a link to a file containing it in another google account, and written down all the necessary steps to get the file in the current path.

This file is 2007.csv, and contains information about flights during the year 2007.

Now, execute the following code cell.

In [None]:
!gdown --id "13yfm1bNdMBSaNp896pzrg4oCVd6JrVeP"
!unzip Spark_Tutorial1.zip
!bzip2 -d Spark_Tutorial1/2007.csv.bz2
!mv Spark_Tutorial1/2007.csv .
!rm -r __MACOSX
!rm Spark_Tutorial1.zip
!rm -r Spark_Tutorial1
!ls

## Reading the file

In [None]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName('testSparkSession').getOrCreate()

df = spark.read.format("csv").option("header", "true").option("nullValue","NA").option("inferSchema", "true").load("2007.csv")

Now we loaded the entire file into a DataFrame named "df".

Next we will ask Spark to print the associated schema for the data. This means, the rows of the table and the type of the data contained on each row.

In [None]:
df.printSchema()

We can also ask Spark to tell us how many partitions has it made. Depending on the number of CPU cores of your system, this number will change.

The idea to make partitions in a single computer, is to allow each CPU core to process different data.

In [None]:
df.rdd.getNumPartitions()

## Basic operations

### Remove columns

Spark offers a simple transformation to do that. Remember that transformations are not inmediatly executed.

In [None]:
df2 = df.drop("FlightNum","TailNum","UniqueCarrier")

We can also do it in another way. Instead of removing columns, we can create a new dataframe, with only the columns we want to work with.

In [None]:
df2 = df.select("Origin", "Dest", "ArrDelay", "DepDelay")
df2.show()

From now on, we will work with df2 contents, but before we want to remove all the entries with a NA value in the columns ArrDelay or DepDelay.

In [None]:
df3 = df2.na.drop()

### Add columns

We can add new columns and new information. For instance, in the dataframe we have information about the departure delay (DepDelay) and the arrival delay (ArrDelay). With that, we can create a new column composed by the addition of the two.

In [None]:
from pyspark.sql.functions import expr
df4 = df3.withColumn("SumDelay", expr("ArrDelay + DepDelay"))

Now we can check the results by executing an action, which will tirgger the execution of all the accumulated transformations.

In [None]:
df4.select("DepDelay", "ArrDelay", "SumDelay").show(10)

See that the action was executed very quickly! That is because the transformations where only applied to the first 10 entries of our DataFrame.

In case we are interested in knowing the maximum and minimum delays, it will take more time, because we need to traverse the entire DataFrame.

In [None]:
from pyspark.sql.functions import max, min
df4.select(max("SumDelay"),min("SumDelay")).show()

And we can also find the mean:

In [None]:
from pyspark.sql.functions import avg
df4.select(max("SumDelay"),min("SumDelay"),avg("SumDelay")).show()

### Storing intermadiate results

It is not mandatory to always execute all the transformations all the time. We can ask Spark to keep the results of all the transformations already present in a data frame. That way, the next transformations we add will continue from there.

The intermediate results will be stored either on RAM or hard disk. This will be decided by Spark.

In [None]:
df4.cache()

### Filter operations

Filter operations allow us to create new DataFrames that satisfy a condition over the data in another DataFrame.

There are two transformations that allow to do that: "where" and "filter".

In [None]:
df5 = df4.where("SumDelay < 0")
df5.show()

Now we can check how many flights land before time.

In [None]:
df3.count()
df5.count()

We can also apply more than one filter, one after the other.

In [None]:
df5 = df4.where("SumDelay < 0").where("Origin == 'JFK'")
df5.show()
df5.count()

The "filter" transformation is very similar to "where".

We can do the same operation whe previously did, but using "filter" instead of "where".

In [None]:
from pyspark.sql.functions import col
i = 0
city = "JFK"
df5 = df4.filter(col("SumDelay") < i).filter(col("Origin") == city)
df5.count()

#### Test

Could you find the total, maximum, minimum and average delay for flights that start from 'JFK' airport? 

In [None]:
from pyspark.sql.functions import sum
# your code here

## Sort operations



In [None]:
from pyspark.sql.functions import asc, desc
df5 = df4.sort(asc("SumDelay"))

Here we sorted the data by the total delay.

This is an ascendent ordering, from minor to major.

Again, this is a transformation, and is not executed until we execute an action like "show".

In [None]:
df5.show()

We can also do the ordering in descendent order. But in this case we will ask only the first 5 elements.

In [None]:
df5 = df4.sort(desc("SumDelay")).limit(5)
df5.show()

### Obtaining unique elements

How many different airports are in the DataFrame?

In [None]:
df5 = df4.select("Origin").distinct()
df5.count()

#### Test

How many destinations are there?

In [None]:
# Your code here

Additionally, what does the following operation do?

In [None]:
df4.select("Origin","Dest").distinct().count()

### Accessing data from python

We will show how to dump df4 into a python variable and how to read it.

In [None]:
dades = df4.limit(5).collect()
dades
dades[0]
dades[0][3]

## Writing into files

We have seen how to read a csv file from disc. Now we will see how to do the opposite.

Let's save df4 into disk. Remember it has n partitions.

In [None]:
df4.rdd.getNumPartitions()

By default if we store df4 into a file, we would not obtain a single file, but as many files as partitions are there.

To have all the data in a single frame we can use "coalesce"

In [None]:
df4_one = df4.coalesce(1)
df4_one.rdd.getNumPartitions()

df4_one has all the data in a single partition

Now we can save it to a file.

In [None]:
df4_one.write.csv('df4_one.csv')

### Test

What happens if you try to sort the elements in df4_one? Will it be slower?

Does the number of CPU cores affect the result?

In [None]:
# Your code here