# **Installing PySpark on Google Colab**

Installing PySpark is not a straightforward process as usual in Python. Simply using a "pip install" is not enough. In fact, before anything else, it is necessary to install dependencies such as Java 8, Apache Spark 2.3.2, and Hadoop 2.7.

In [None]:
#Install JDK
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
# Donwload Spark
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz

In [None]:
# Unzip spark folder
!tar xf /content/spark-3.1.2-bin-hadoop2.7.tgz

In [None]:
# Install findspark lib
!pip install -q findspark

The next step is to configure the environment variables, as this enables the Colab environment to correctly identify where the dependencies are running.

To effectively interact with and manipulate the terminal, you can utilize the os library.

In [None]:
# Create env variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [None]:
# Import findspark, that helps find the Spark on the system and import as a regular lib
import findspark
findspark.init()

With everything set up, let's run a local session to test if the installation was successful.



##**Using Spark with Python**
The initial step in utilizing Spark is to establish a connection with a cluster.

In practice, the cluster will be hosted on a remote machine that is connected to all other nodes. There will be a master computer responsible for dividing the data and computations. The master is connected to the remaining computers in the cluster, known as workers. The master sends data and computations to the workers for execution, and they return the results back to the master.
##**Creating a SparkSession**
Creating multiple `SparkSessions` and `SparkContext` can lead to problems, so it is recommended to use the `SparkSession.builder.getOrCreate()` method. This method returns an existing SparkSession if one is already present in the environment or creates a new one if necessary!

In [None]:
# Create SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master('local')\
        .appName('sparkFlights')\
        .getOrCreate()

In [None]:
# Verify SparkContext
print(spark)

<pyspark.sql.session.SparkSession object at 0x7f88c82dce80>


In [None]:
# Print Spark version
print(spark.version)

3.1.2


## **Using DataFrames**

To get started with Spark DataFrames, you first need to create a `SparkSession` object from your `SparkContext`. You can consider the `SparkContext` as your connection to the cluster and the `SparkSession` as your interface with that connection.

## **Importing Tables & Making Queries**

One of the advantages of the DataFrame interface is the ability to run SQL queries on tables in your Spark cluster.

We will import the `flights` table. This table contains a row for each flight that departed from either Portland International Airport (PDX) or Seattle-Tacoma International Airport (SEA) in 2014 and 2015.


In [None]:
arquivo = "/data/flights_small.csv"
flights = spark\
        .read.format("csv")\
        .option("inferSchema", "True")\
        .option("header", "True")\
        .csv(arquivo)

In [None]:
# Checking dataframe shape
print((flights.count(), len(flights.columns)))

(10000, 16)


In [None]:
# Show first 10 lines
flights.show(10)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
|2014|    1| 15|    1037|        7|    1

In [None]:
# Checking columns types
flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



In [None]:
# Importing col function to perform operations on a specific column.
from pyspark.sql.functions import col

In [None]:
# Creating "new_air_time" column by converting the values of the "air_time" column to integers. at the end we will remove "air_time" column
flights = flights.withColumn("new_air_time", col("air_time").cast("integer")).drop("air_time")

In [None]:
# Renaming columns
flights = flights.withColumnRenamed("new_air_time","air_time")

In [None]:
flights.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)
 |-- air_time: integer (nullable = true)



## **Creating a Temporary View**

In PySpark, a temporary view is a logical table that represents the data in a DataFrame. By creating a temporary view, you can query and manipulate the DataFrame using SQL syntax.

In [None]:
# Creating a temporary view called flights
flights.createOrReplaceTempView("flights")

# Creating a query, to get the contect from the temp view
query = "SELECT * FROM flights LIMIT 10"

# run the query and storying in another variable called flights10
flights10 = spark.sql(query)

# see result
flights10.show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|distance|hour|minute|air_time|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     954|   6|    58|     132|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|    2677|  10|    40|     360|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     679|  14|    43|     111|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|     569|  17|     5|      83|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     937|   7|    54|     127|
|2014|    1| 15|    1037|        7|    1

## **Global Temporary View**

`Temporary views` (such as the one we created above) in Spark SQL have session scope and will disappear if the session that created them is terminated. If you want to have a temporary view shared across all sessions and keep it active until the Spark application is terminated, you can create a global temporary view. The global temporary view is tied to a `global_temp` database maintained by the system, and we must use the qualified name to reference it.

In [None]:
# Creating a global view
flights.createGlobalTempView("flights")

# The global temporary view is associated with a system-preserved database called global_temp.
spark.sql("SELECT * FROM global_temp.flights LIMIT 10").show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|distance|hour|minute|air_time|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     954|   6|    58|     132|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|    2677|  10|    40|     360|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     679|  14|    43|     111|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|     569|  17|     5|      83|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     937|   7|    54|     127|
|2014|    1| 15|    1037|        7|    1

## **Converting PySpark DataFrame to Pandas DataFrame**

Suppose you have executed a query on your massive dataset and aggregated it to something more manageable.

Spark DataFrames make this easy with the `.toPandas()` method. Calling this method on a Spark DataFrame returns the corresponding pandas DataFrame. It's as simple as that!

In [None]:
# The query counts the number of flights for each airport in SEA and PDX.
query = "SELECT origin, dest, COUNT(*) as N FROM flights GROUP BY origin, dest"
flight_counts = spark.sql(query)
flight_counts.show()

+------+----+---+
|origin|dest|  N|
+------+----+---+
|   SEA| RNO|  8|
|   SEA| DTW| 98|
|   SEA| CLE|  2|
|   SEA| LAX|450|
|   PDX| SEA|144|
|   SEA| BLI|  5|
|   PDX| IAH| 57|
|   PDX| PHX|209|
|   SEA| SLC|225|
|   SEA| SBA| 23|
|   SEA| BWI| 29|
|   PDX| IAD| 23|
|   PDX| SFO|305|
|   SEA| KOA| 40|
|   PDX| MCI| 15|
|   SEA| SJC|213|
|   SEA| ABQ| 43|
|   SEA| SAT| 18|
|   PDX| ONT| 57|
|   SEA| LAS|364|
+------+----+---+
only showing top 20 rows



In [None]:
# Import pandas lib
import pandas as pd

# Converting Spark Dataframe to Pandas Dataframe
df_flight_counts = flight_counts.toPandas()

In [None]:
df_flight_counts.head(20)

Unnamed: 0,origin,dest,N
0,SEA,RNO,8
1,SEA,DTW,98
2,SEA,CLE,2
3,SEA,LAX,450
4,PDX,SEA,144
5,SEA,BLI,5
6,PDX,IAH,57
7,PDX,PHX,209
8,SEA,SLC,225
9,SEA,SBA,23


However, you may want to go in the other direction and put a pandas DataFrame into a Spark cluster, the .`createDataFrame() `method takes a pandas DataFrame and returns a Spark DataFrame.

The output of this method is stored locally, not in the SparkSession catalog. This means you can use all the Spark DataFrame methods on it, but you cannot access the data in other contexts.

For example, an SQL query (using the `.sql()` method) that references your DataFrame will result in an error. To access the data in that way, you must save it as a temporary table.

In [None]:
# Creating a Pandas Dataframe with CSV data
arq = "/data/airports.csv"
pd_temp = pd.read_csv(arq)

In [None]:
pd_temp.head()

Unnamed: 0,faa,name,lat,lon,alt,tz,dst
0,04G,Lansdowne Airport,41.130472,-80.619583,1044,-5,A
1,06A,Moton Field Municipal Airport,32.460572,-85.680028,264,-5,A
2,06C,Schaumburg Regional,41.989341,-88.101243,801,-6,A
3,06N,Randall Airport,41.431912,-74.391561,523,-5,A
4,09J,Jekyll Island Airport,31.074472,-81.427778,11,-4,A


In [109]:
# Creating Spark Dataframe from CSV data
airports = spark.createDataFrame(pd_temp)

  for column, series in pdf.iteritems():


In [110]:
airports.show(5)

+---+--------------------+----------+-----------+----+---+---+
|faa|                name|       lat|        lon| alt| tz|dst|
+---+--------------------+----------+-----------+----+---+---+
|04G|   Lansdowne Airport|41.1304722|-80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|32.4605722|-85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|41.9893408|-88.1012428| 801| -6|  A|
|06N|     Randall Airport| 41.431912|-74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|31.0744722|-81.4277778|  11| -4|  A|
+---+--------------------+----------+-----------+----+---+---+
only showing top 5 rows



## **Manipulating Data**

In Spark, you can do this using the `.withColumn()` method, which takes two arguments. First, a string with the name of your new column, and then the column itself.

Updating a Spark DataFrame is slightly different from working with pandas because the Spark DataFrame is immutable. This means it cannot be changed, and therefore, columns cannot be updated in-place.

Thus, all these methods return a new DataFrame. To replace the original DataFrame, you need to reassign the returned DataFrame using the method as follows: `df = df.withColumn("newCol", df.oldCol + 1)`

In [None]:
# Converting data on the air_time collum from minutes to hours
flights.select(flights.air_time/60).show()

+------------------+
|   (air_time / 60)|
+------------------+
|               2.2|
|               6.0|
|              1.85|
|1.3833333333333333|
|2.1166666666666667|
|2.0166666666666666|
|               1.5|
|1.6333333333333333|
|              2.25|
|               3.3|
|2.1666666666666665|
| 2.566666666666667|
|2.1166666666666667|
|              3.05|
|              2.15|
|               1.5|
|1.2666666666666666|
|               3.6|
| 4.833333333333333|
|              1.85|
+------------------+
only showing top 20 rows



In [None]:
# Add column created on the last step
flights = flights.withColumn("airtime_hrs", flights.air_time/60)

In [None]:
flights.show(10)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|distance|hour|minute|air_time|       airtime_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     954|   6|    58|     132|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|    2677|  10|    40|     360|               6.0|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     679|  14|    43|     111|              1.85|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|     569|  17|     5|      83|1.3833333333333333|
|2014|    3|  9|     754|  

You can also use the `.alias()` method to rename a selected column. So, if you wanted to `.select()` the column `duration_hrs` (which is not in your DataFrame), it will also works.

In [None]:
#flights.select((flights.air_time/60).alias("airtime_hrs")).show()

+------------------+
|      duration_hrs|
+------------------+
|               2.2|
|               6.0|
|              1.85|
|1.3833333333333333|
|2.1166666666666667|
|2.0166666666666666|
|               1.5|
|1.6333333333333333|
|              2.25|
|               3.3|
|2.1666666666666665|
| 2.566666666666667|
|2.1166666666666667|
|              3.05|
|              2.15|
|               1.5|
|1.2666666666666666|
|               3.6|
| 4.833333333333333|
|              1.85|
+------------------+
only showing top 20 rows



## **Selecting Data**
The Spark variant of SQL's `SELECT` is the `.select()` method. This method takes multiple arguments - one for each column you want to select. These arguments can be the column name as a string (one for each column) or a column object (using the syntax `df.colName`). By passing a column object, you can perform operations like addition or subtraction on the column to modify the data it contains, just like in .withColumn().

The difference between the `.select()` and `.withColumn()` methods is that `.select()` returns only the columns you specify, while `.withColumn()` returns all columns of the DataFrame, including the one you defined. It is often a good idea to drop unnecessary columns at the beginning of an operation so that you don't carry extra data while processing. In that case, you would use `.select()` and not `.withColumn()`.

Similar to SQL, you can also use the `.select(`) method in Spark to perform operations on columns. When selecting a column using the notation df.colName, you can perform any column operation, and the `.select()` method will return the transformed column. For example

In [None]:
# Selecting some columns from the main Dataframe
selected1 = flights.select("tailnum", "origin", "dest")
selected1.show()

+-------+------+----+
|tailnum|origin|dest|
+-------+------+----+
| N846VA|   SEA| LAX|
| N559AS|   SEA| HNL|
| N847VA|   SEA| SFO|
| N360SW|   PDX| SJC|
| N612AS|   SEA| BUR|
| N646SW|   PDX| DEN|
| N422WN|   PDX| OAK|
| N361VA|   SEA| SFO|
| N309AS|   SEA| SAN|
| N564AS|   SEA| ORD|
| N323AS|   SEA| LAX|
| N305AS|   SEA| PHX|
| N433AS|   SEA| LAS|
| N765AS|   SEA| ANC|
| N713AS|   SEA| SFO|
| N27205|   PDX| SFO|
| N626AS|   SEA| SMF|
| N8634A|   SEA| MDW|
| N597AS|   SEA| BOS|
| N215AG|   PDX| BUR|
+-------+------+----+
only showing top 20 rows



In [None]:
# Selecting some columns from the main Dataframe - SECOND WAY
selected_columns = ["tailnum", "origin", "dest"]
selected2 = flights.select(selected_columns)
selected2.show()

+-------+------+----+
|tailnum|origin|dest|
+-------+------+----+
| N846VA|   SEA| LAX|
| N559AS|   SEA| HNL|
| N847VA|   SEA| SFO|
| N360SW|   PDX| SJC|
| N612AS|   SEA| BUR|
| N646SW|   PDX| DEN|
| N422WN|   PDX| OAK|
| N361VA|   SEA| SFO|
| N309AS|   SEA| SAN|
| N564AS|   SEA| ORD|
| N323AS|   SEA| LAX|
| N305AS|   SEA| PHX|
| N433AS|   SEA| LAS|
| N765AS|   SEA| ANC|
| N713AS|   SEA| SFO|
| N27205|   PDX| SFO|
| N626AS|   SEA| SMF|
| N8634A|   SEA| MDW|
| N597AS|   SEA| BOS|
| N215AG|   PDX| BUR|
+-------+------+----+
only showing top 20 rows



In [None]:
# Selecting some columns from the main Dataframe - THIRD WAY
selected3 = flights.select(flights.tailnum, flights.origin, flights.dest)
selected3.show()

+-------+------+----+
|tailnum|origin|dest|
+-------+------+----+
| N846VA|   SEA| LAX|
| N559AS|   SEA| HNL|
| N847VA|   SEA| SFO|
| N360SW|   PDX| SJC|
| N612AS|   SEA| BUR|
| N646SW|   PDX| DEN|
| N422WN|   PDX| OAK|
| N361VA|   SEA| SFO|
| N309AS|   SEA| SAN|
| N564AS|   SEA| ORD|
| N323AS|   SEA| LAX|
| N305AS|   SEA| PHX|
| N433AS|   SEA| LAS|
| N765AS|   SEA| ANC|
| N713AS|   SEA| SFO|
| N27205|   PDX| SFO|
| N626AS|   SEA| SMF|
| N8634A|   SEA| MDW|
| N597AS|   SEA| BOS|
| N215AG|   PDX| BUR|
+-------+------+----+
only showing top 20 rows



## **Filtering Data**

This is Spark's counterpart to the `WHERE` clause in SQL. The `.filter()` method accepts an expression that would follow the `WHERE` clause of an SQL expression as a string or a Spark column of boolean values `(True/False)`.



In [None]:
# Filtering flights that took longer than 120 minutes
flights.filter("air_time > 120").show()

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|distance|hour|minute|air_time|       airtime_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     954|   6|    58|     132|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|    2677|  10|    40|     360|               6.0|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     937|   7|    54|     127|2.1166666666666667|
|2014|    1| 15|    1037|        7|    1352|        2|     WN| N646SW|    48|   PDX| DEN|     991|  10|    37|     121|2.0166666666666666|
|2014|    4| 19|    1236|  

In [None]:
# Filtering flights that took longer than 120 minutes - show only the first 5 records
flights.filter(flights.air_time > 120).show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|distance|hour|minute|air_time|       airtime_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     954|   6|    58|     132|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|    2677|  10|    40|     360|               6.0|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     937|   7|    54|     127|2.1166666666666667|
|2014|    1| 15|    1037|        7|    1352|        2|     WN| N646SW|    48|   PDX| DEN|     991|  10|    37|     121|2.0166666666666666|
|2014|    4| 19|    1236|  

Note that in the first case, we passed a string to `.filter()`. In SQL, we would write this filtering task as `SELECT * FROM flight WHERE air_time > 120`.

Spark's `.filter() `can accept any expression that can go in the WHERE clause of an SQL query (in this case, `"air_time > 120"`), as long as it is passed as a string.

Another way to perform data filtering


In [None]:
# Creating the first filter
filterA = flights.origin == "SEA"

# Creating the second filter
filterB = flights.dest == "PDX"

# Using the previous filters to filtering data from Dataframe, first by the filterA then for the filterB
filtering = flights.filter(filterA).filter(filterB)

In [None]:
filtering.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+-------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|distance|hour|minute|air_time|        airtime_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+-------------------+
|2014|   10|  1|    1717|       -8|    1819|        4|     OO| N810SK|  4546|   SEA| PDX|     129|  17|    17|      28| 0.4666666666666667|
|2014|    9| 26|    2339|      144|      29|      142|     OO| N822SK|  4612|   SEA| PDX|     129|  23|    39|      29|0.48333333333333334|
|2014|    8| 18|    1728|       -2|    1822|        0|     OO| N586SW|  5440|   SEA| PDX|     129|  17|    28|      41| 0.6833333333333333|
|2014|    2|  4|    2053|       -7|    2144|       -4|     OO| N223SW|  5433|   SEA| PDX|     129|  20|    53|      29|0.48333333333333334|
|2014|    2|  9|    

## **Aggregating**

All common aggregation methods such as `.min()`, `.max()`, and `.count()` are `GroupedData` methods. They are created by calling the `.groupBy()` method on a DataFrame. For example, to find the minimum value of a column `col` in a DataFrame `df`, you can use `df.groupBy().min("col").show()`.

In [None]:
# Find the longest flight duration from SEA to other cities
flights.filter(flights.origin == "SEA").groupBy().max("airtime_hrs").show()

+-----------------+
| max(airtime_hrs)|
+-----------------+
|6.816666666666666|
+-----------------+



In [None]:
# Find the shortest flight distance from PDX to other cities.
flights.filter(flights.origin == "PDX").groupBy().min("distance").show()

+-------------+
|min(distance)|
+-------------+
|          106|
+-------------+



- Use o método `.avg()` para obter o tempo médio de voo dos voos da Delta Airlines (onde a coluna `transportadora` tem o valor `"DL"`) que saíram da SEA. O local de partida é armazenado na coluna `origem`. `show()` o resultado.
- Use o método `.sum()` para obter o número total de horas que todos os aviões neste conjunto de dados passaram no ar criando uma coluna chamada `duration_hrs` a partir da coluna `air_time`. `show()` o resultado.

In [None]:
# Use the .avg() method to obtain the average flight time of Delta Airlines flights that departed from SEA.
flights.filter(flights.carrier == "DL").filter(flights.origin == "SEA").groupBy().avg("air_time").show()

+------------------+
|     avg(air_time)|
+------------------+
|188.20689655172413|
+------------------+



In [None]:
# Use the .sum() method to calculate the total number of hours all airplanes in this dataset spent in the air by creating a column called duration_hrs from the air_time column.
flights.withColumn("duration_hrs", flights.air_time/60).groupBy().sum("duration_hrs").show()

+------------------+
| sum(duration_hrs)|
+------------------+
|25289.600000000126|
+------------------+



There is also the `.agg()` method. This method allows you to pass an aggregated column expression that uses any of the aggregate functions from the `pyspark.sql.functions` submodule.

This submodule contains many useful functions for calculating things like standard deviation. All the aggregation functions in this submodule use the name of a column in a `GroupedData` table.

Let`s Practice:

*   Import the `pyspark.sql.functions` submodule as F.
*   Find the standard deviation of `dep_delay` using the `.agg()` method with the `F.stddev()` function.

In [None]:
# Import pyspark.sql.functions as F
import pyspark.sql.functions as F

# GroupBy por month and dest
by_month_dest = flights.groupBy("month", "dest")

In [None]:
# Get Standard deviation
by_month_dest.agg(F.stddev("dep_delay")).show()

+-----+----+----------------------+
|month|dest|stddev_samp(dep_delay)|
+-----+----+----------------------+
|    4| PHX|    15.003380033491737|
|    1| RDM|     8.830749846821778|
|    5| ONT|    18.895178691342874|
|    7| OMA|    2.1213203435596424|
|    8| MDW|    14.467659032985843|
|    6| DEN|    13.536905534420026|
|    5| IAD|    3.8078865529319543|
|   12| COS|    1.4142135623730951|
|   11| ANC|    18.604716401245316|
|    5| AUS|     4.031128874149275|
|    5| COS|     33.38163167571851|
|    2| PSP|     4.878524367060187|
|    4| ORD|    11.593882803741764|
|   10| DFW|     45.53019017606675|
|   10| DCA|    0.7071067811865476|
|    8| JNU|     40.79368823727514|
|   11| KOA|    1.8708286933869707|
|   10| OMA|    5.8594652770823155|
|    6| ONT|     25.98316762829351|
|    3| MSP|    21.556779370817555|
+-----+----+----------------------+
only showing top 20 rows



## **Joining**

A join combines two different tables based on a column they share, called the key. Examples of keys here include the tailnum and carrier columns from the flights table.

By joining the flights table with this airplane information table, you are adding all the columns from the planes table to the flights table. To fill in these columns with information, you will look up the tail number from the flights table and find the corresponding number in the planes table and then use that row to fill in all the new columns.

In PySpark, joins are performed using the DataFrame method `.join()`. This method takes three arguments.

*    The first is the second DataFrame that you want to join with the first one.
*    The second argument, on, is the name of the key columns as a string. The key column name(s) must be the same in each table.
*    The third argument, how, specifies the type of join to perform.

Lets Pratice:
*    Join flights with the airports DataFrame on the dest column by calling the `.join()` method on flights. Save the result as `flights_with_airports`.


In [118]:
# Check both dataframes
airports.show()

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

In [119]:
flights.show(20)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+------------------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|distance|hour|minute|air_time|       airtime_hrs|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+----+------+--------+------------------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     954|   6|    58|     132|               2.2|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|    2677|  10|    40|     360|               6.0|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     679|  14|    43|     111|              1.85|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|     569|  17|     5|      83|1.3833333333333333|
|2014|    3|  9|     754|  

In [120]:
# Rename column faa to dest
airports = airports.withColumnRenamed("faa", "dest")

In [121]:
# Join DataFrames
flights_with_airports = flights.join(airports, on="dest", how="leftouter")

In [122]:
# Check new dataframes
flights_with_airports.show()

+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+----+------+--------+------------------+--------------------+---------+-----------+----+---+---+
|dest|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|distance|hour|minute|air_time|       airtime_hrs|                name|      lat|        lon| alt| tz|dst|
+----+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+--------+----+------+--------+------------------+--------------------+---------+-----------+----+---+---+
| MSY|2014|    8| 16|    1014|       -1|    1644|      -16|     AS| N549AS|   788|   SEA|    2086|  10|    14|     248| 4.133333333333334|Louis Armstrong N...|29.993389| -90.258028|   4| -6|  A|
| MSY|2014|   10| 29|    1009|       -6|    1613|      -47|     AS| N532AS|   788|   SEA|    2086|  10|     9|     224|3.7333333333333334|Louis Armstrong N...|29.993389| -90.258028|   4| -6|  A|
| MSY|2014|    8| 18|    

As an addition I will join with planes table.

In [123]:
arquivo = "/data/planes.csv"
planes = spark\
        .read.format("csv")\
        .option("inferSchema", "True")\
        .option("header", "True")\
        .csv(arquivo)

In [124]:
planes.show()

+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
|tailnum|year|                type|    manufacturer|   model|engines|seats|speed|   engine|
+-------+----+--------------------+----------------+--------+-------+-----+-----+---------+
| N102UW|1998|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N103US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N104UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N105UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N107US|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N108UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N109UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA|Turbo-fan|
| N110UW|1999|Fixed wing multi ...|AIRBUS INDUSTRIE|A320-214|      2|  182|   NA

In [125]:
# The tailnum is the key column
# Let's create another table which will get all data
flights_airports_planes = flights_with_airports.join(planes, on="tailnum", how="leftouter")

In [126]:
flights_airports_planes.show()

+-------+----+----+-----+---+--------+---------+--------+---------+-------+------+------+--------+----+------+--------+------------------+--------------------+---------+-----------+----+---+---+----+--------------------+------------+---------+-------+-----+-----+---------+
|tailnum|dest|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|flight|origin|distance|hour|minute|air_time|       airtime_hrs|                name|      lat|        lon| alt| tz|dst|year|                type|manufacturer|    model|engines|seats|speed|   engine|
+-------+----+----+-----+---+--------+---------+--------+---------+-------+------+------+--------+----+------+--------+------------------+--------------------+---------+-----------+----+---+---+----+--------------------+------------+---------+-------+-----+-----+---------+
| N549AS| MSY|2014|    8| 16|    1014|       -1|    1644|      -16|     AS|   788|   SEA|    2086|  10|    14|     248| 4.133333333333334|Louis Armstrong N...|29.993389| -90.2580