<a href="https://colab.research.google.com/github/pratikgujral/AutoRCCar/blob/master/Learn_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In this notebook, we'll learn how to use Spark using Python!

# Prerequisites
- Familiarity with Python


# Introduction
![Spark logo](https://spark.apache.org/docs/latest/api/python/_static/spark-logo-hd.png)

Spark is a tool for doing parallel computation with large datasets. Spark integrates with Python very well. PySpark is the Python package that makes the magic happen. You'll use this package to work with data about flights from Portland and Seattle. You'll learn to wrangle this data and build a whole machine learning pipeline to predict whether or not flights will be delayed. Get ready to put some Spark in your Python code and dive into the world of high-performance machine learning!

# Part 1: Getting to know PySpark
In this part, we'll learn how Spark manages data and how can you read and write tables from Python.

## What is Spark?
Spark is a platform for **cluster computing**. Spark lets you spread data and computations over clusters with multiple nodes (think of each node as a separate computer). Splitting up your data makes it easier to work with very large datasets because each node only works with a small amount of data.

![Cluster Computing](https://i.pinimg.com/originals/bf/c4/17/bfc4173a6e383fb815935fad9a8d9c11.png)

As each node works on its own subset of the total data, it also carries out a part of the total calculations required, so that both data processing and computation are performed in parallel over the nodes in the cluster. It is a fact that parallel computation can make certain types of programming tasks much faster.

However, with greater computing power comes greater complexity.

Deciding whether or not Spark is the best solution for your problem takes some experience, but you can consider questions like:
 - Is my data too big to work with on a single machine?
 - Can my calculations be easily parallelized?

## Using Spark in Python
The first step in using Spark is connecting to a cluster.

In practice, the cluster will be hosted on a remote machine that's connected to all other nodes. There will be one computer, called the master (also sometimes called a **frontend node**) that manages splitting up the data and the computations. The **master** is connected to the rest of the computers in the cluster, which are called **worker**. The master sends the workers data and calculations to run, and they send their results back to the master.

When you're just getting started with Spark it's simpler to just run a cluster locally. Thus, for this course, instead of connecting to another computer, all computations will be run on DataCamp's servers in a simulated cluster.

Creating the connection is as simple as creating an instance of the `SparkContext` class. The class constructor takes a few optional arguments that allow you to specify the attributes of the cluster you're connecting to.

An object holding all these attributes can be created with the `SparkConf()` constructor. Take a look at the documentation for all the details!

For the rest of this course you'll have a `SparkContext` called sc already available in your workspace.



## Installing and setting up Spark
Run the below code. If the it throws an error, update the link to the Apache Spark dist in the wget and later while setting the environment variables

In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

Now that we have installed Spark and Java in Colab, it is time to set the environment path that enables us to run PySpark in our Colab environment. Set the location of Java and Spark by running the following code:

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

Running a local Spark Session

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

## Reading the datasets

In [0]:
url_airports_dataset = r"https://assets.datacamp.com/production/repositories/1237/datasets/6e5c4ac2a4799338ba7e13d54ce1fa918da644ba/airports.csv"
url_flights_dataset = r"https://assets.datacamp.com/production/repositories/1237/datasets/fa47bb54e83abd422831cbd4f441bd30fd18bd15/flights_small.csv"
url_planes_datasets = r"https://assets.datacamp.com/production/repositories/1237/datasets/231480a2696c55fde829ce76d936596123f12c0c/planes.csv"

## Getting to know `SparkContext`

In [0]:
from pyspark import SparkContext
sc = SparkContext()

# Verify SparkContext
print(sc)

# Print Spark version
print(sc.version)

## Using DataFrames
Spark's core data structure is the **Resilient Distributed Dataset (RDD)**. This is a low level object that lets Spark work its magic by splitting data across multiple nodes in the cluster. However, RDDs are hard to work with directly, so in this notebook we'll be using the Spark DataFrame abstraction built on top of RDDs.

The Spark DataFrame was designed to behave a lot like a SQL table (a table with variables in the columns and observations in the rows). Not only are they easier to understand, DataFrames are also more optimized for complicated operations than RDDs.

When you start modifying and combining columns and rows of data, there are many ways to arrive at the same result, but some often take much longer than others. When using RDDs, it's up to the data scientist to figure out the right way to optimize the query, but the DataFrame implementation has much of this optimization built in!

To start working with Spark DataFrames, you first have to create a `SparkSession` object from your `SparkContext`. You can think of the `SparkContext` as your connection to the cluster and the `SparkSession` as your interface with that connection.

Remember, for the rest of the notebook, we'll have a `SparkSession` called `spark` available in our workspace!

## Creating a SparkSession
We'll have a `SparkSession` called `spark`, but what if you're not sure there already is one? Creating multiple SparkSessions and SparkContexts can cause issues, so it's best practice to use the `SparkSession.builder.getOrCreate()` method. This returns an existing `SparkSession` if there's already one in the environment, or creates a new one if necessary!

In [0]:
from pyspark.sql import SparkSession

# Creating a new PySpark Session
my_spark = SparkSession.builder.getOrCreate()
print(my_spark)

---

## Viewing Tables
Now that we have created a `SparkSession`, we can start poking around to see what data is in our cluster!

Our `SparkSession` has an attribute called `catalog` which lists all the data inside the cluster. This attribute has a few methods for extracting different pieces of information.

One of the most useful is the `.listTables()` method, which returns the names of all the tables in your cluster as a list.

In [0]:
print(spark.catalog.listTables()) # Output -> [Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

### Are you query-ious?
One of the advantages of the DataFrame interface is that you can run SQL queries on the tables in your Spark cluster.

As you saw in the last exercise, one of the tables in your cluster is the `flights` table. This table contains a row for every flight that left Portland International Airport (PDX) or Seattle-Tacoma International Airport (SEA) in 2014 and 2015.

Running a query on this table is as easy as using the `.sql()` method on your `SparkSession`. This method takes a string containing the query and returns a DataFrame with the results!

If you look closely, you'll notice that the table `flights` is only mentioned in the query, not as an argument to any of the methods. This is because there isn't a local object in your environment that holds that data, so it wouldn't make sense to pass the table as an argument.

Before that, we'll create a `SparkSession` called `spark`.

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# Writing the query to fetch first 10 rows of flights
query = 'FROM flights SELECT * LIMIT 10'

# Executing the query using the .sql() method of SparkSession
flights10 = spark.sql(query)

# Using the DataFrame method .show() to print flights10
flights10.show()



---


### Panda-fying a Spark DataFrame

Suppose you've run a query on your huge dataset and aggregated it down to something a little more manageable.

Sometimes it makes sense to then take that table and work with it locally using a tool like `pandas`. Spark DataFrames make that easy with the `.toPandas()` method. Calling this method on a Spark DataFrame returns the corresponding `pandas` DataFrame. It's as simple as that!

This time the query counts the number of flights to each airport from SEA and PDX.

Ofcourse, we shall first create a `SparkSession` called `spark` in our workspace!

In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

query = 'SELECT origin, dest, COUNT(*) AS N FROM flights GROUP BY origin, dest'

# Running the query
flight_counts = spark.sql(query)

# Conerting the resulting table into a Pandas DataFrame
pd_counts = flight_counts.toPandas()

# Print the head of pd_counts
print(pd_counts.head())

"""
OUTPUT
  origin dest    N
    0    SEA  RNO    8
    1    SEA  DTW   98
    2    SEA  CLE    2
    3    SEA  LAX  450
    4    PDX  SEA  144
"""

---

### Put some Spark in your data
In the last exercise, you saw how to move data from `Spark` to `pandas`. However, maybe you want to go the other direction, and put a `pandas` DataFrame into a Spark cluster! The `SparkSession` class has a method for this as well.

The `.createDataFrame()` method takes a `pandas` DataFrame and returns a Spark DataFrame.

The output of this method is stored locally, not in the `SparkSession` `catalog`. This means that you can use all the Spark DataFrame methods on it, but you can't access the data in other contexts.

For example, a SQL query (using the `.sql()` method) that references your DataFrame will throw an error. To access the data in this way, you have to save it as a temporary table.

You can do this using the `.createTempView()` Spark DataFrame method, which takes as its only argument the name of the temporary table you'd like to register. This method registers the DataFrame as a table in the catalog, but as this table is temporary, it can only be accessed from the specific `SparkSession` used to create the Spark DataFrame.

There is also the method `.createOrReplaceTempView()`. This safely creates a new temporary table if nothing was there before, or updates an existing table if one was already defined. You'll use this method to avoid running into problems with duplicate tables.

Check out the diagram to see all the different ways your Spark data structures interact with each other.

![alt](https://s3.amazonaws.com/assets.datacamp.com/production/course_4452/datasets/spark_figure.png)

In [0]:
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd

spark = SparkSession.builder.getOrCreate()

# Creating a temmporary pandas DataFrame
pd_temp = pd.DataFrame(np.random.random(10))

# Creating a Spark DataFrame called spark_temp by calling the .createDataFrame() method with pd_temp as the argument
spark_temp = spark.createDataFrame(pd_temp)

# Examing the tables in the catalog
print(spark.catalog.listTables()) # OUTPUT -> # [Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
# Note that it DOES NOT have the newly created spark_temp table as this is only created locally and not in the spark catalog

# Registering spark_temp as a temporary table names 'temp' using the createOrReplaceTempView() method. The name of the table is set by passing the desired name as an argument to the method
spark_temp.createOrReplaceTempView('temp')

# Examining the list of tables once again. This time, it will list our newly created temp DataFrame as a table
print(spark.catalog.listTables()) 
# OUTPUT-> [Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True), Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

---


### Dropping the middle man
Now you know how to put data into Spark via `pandas`, but you're probably wondering why deal with `pandas` at all? Wouldn't it be easier to just read a text file straight into Spark? Of course it would!

Luckily, your `SparkSession` has a `.read` attribute which has several methods for reading different data sources into Spark DataFrames. Using these you can create a DataFrame from a .csv file just like with regular pandas DataFrames!

The variable `file_path` is a string with the path to the file `airports.csv`. This file contains information about different airports all over the world.

In [0]:
!wget "https://assets.datacamp.com/production/repositories/1237/datasets/6e5c4ac2a4799338ba7e13d54ce1fa918da644ba/airports.csv" -O ../tmp/airports.csv

--2020-02-02 09:53:31--  https://assets.datacamp.com/production/repositories/1237/datasets/6e5c4ac2a4799338ba7e13d54ce1fa918da644ba/airports.csv
Resolving assets.datacamp.com (assets.datacamp.com)... 52.85.77.45, 52.85.77.63, 52.85.77.62, ...
Connecting to assets.datacamp.com (assets.datacamp.com)|52.85.77.45|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 84548 (83K)
Saving to: ‘../tmp/airports.csv’


2020-02-02 09:53:31 (4.96 MB/s) - ‘../tmp/airports.csv’ saved [84548/84548]



In [0]:
spark = SparkSession.builder.getOrCreate()

file_path = '../tmp/airports.csv'

# Reading the CSV file into a Spark DataFrame
airports = spark.read.csv(file_path, header=True)
print(type(airports)) # Note the Datatype is <class 'pyspark.sql.dataframe.DataFrame'>

# Show the data
airports.show()

<class 'pyspark.sql.dataframe.DataFrame'>
+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.81064



---



---



# Part 2: Manipulating data
In this chapter, we'll learn about the pyspark.sql module, which provides optimized data queries to our Spark session.

## Creating columns
In this chapter, you'll learn how to use the methods defined by Spark's DataFrame class to perform common data operations.

Let's look at performing column-wise operations. In Spark you can do this using the `.withColumn()` method, which takes two arguments. First, a string with the name of your new column, and second the new column itself.

The new column must be an object of class `Column`. Creating one of these is as easy as extracting a column from your DataFrame using `df.colName`.

Updating a Spark DataFrame is somewhat different than working in `pandas` because the Spark DataFrame is immutable. This means that it can't be changed, and so columns can't be updated in place.

Thus, all these methods return a **new** DataFrame. To overwrite the original DataFrame you must reassign the returned DataFrame using the method like so:



> `df = df.withColumn("newCol", df.oldCol + 1)`


The above code creates a DataFrame with the same columns as df plus a new column, `newCol`, where every entry is equal to the corresponding entry from `oldCol`, plus one.

To overwrite an existing column, just pass the name of that particular column as the first argument!

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.getOrCreate()

# Use the spark.table() method with the argument "flights" to create a DataFrame containing the values of the flights table in the .catalog. Save it as flights
flights = spark.table('flights')

# Show the head using .show() method. The column air_time contains the duration of the flight in minutes.
flights.show()

# Updating flights to contain a new column duration_hrs, that contains the duration of each flight in hours.
flights = flights.withColumn('duration_hrs', flights.air_time / 60)

---

### Filtering Data
Now that you have a bit of SQL know-how under your belt, it's easier to talk about the analogous operations using Spark DataFrames.

Let's take a look at the `.filter()` method. As you might suspect, this is the Spark counterpart of SQL's WHERE clause. The `.filter()` method takes either an expression that would follow the WHERE clause of a SQL expression as a string, or a Spark Column of boolean (True/False) values.

For example, the following two expressions will produce the same output:

> `flights.filter("air_time > 120").show()`

> `flights.filter(flights.air_time > 120).show()`

Notice that in the first case, we pass a string to `.filter()`. In SQL, we would write this filtering task as `SELECT * FROM flights WHERE air_time > 120`. Spark's `.filter()` can accept any expression that could go in the WHERE clause of a SQL query (in this case, `"air_time > 120"`), as long as it is passed as a string. Notice that in this case, we do not reference the name of the table in the string -- as we wouldn't in the SQL request.

In the second case, we actually pass a **column of boolean values** to `.filter()`. Remember that `flights.air_time > 120` returns a column of boolean values that has True in place of those records in `flights.air_time` that are over 120, and False otherwise.

Remember, a SparkSession called spark is already in your workspace, along with the Spark DataFrame flights.

In [0]:
# Filter flights by passing a string to filter() method to find all flights that flew over 1000 miles distance.
long_flights1 = flights.filter('distance > 1000')

# Filter flights by passing a column of boolean values
long_flights2 = flights.filter(flights.distance > 1000)

# Print the data to check they're equal
long_flights1.show()
long_flights2.show()

---

### Selecting
The Spark variant of SQL's `SELECT` is the `.select()` method. This method takes multiple arguments - one for each column you want to select. These arguments can either be the column name as a string (one for each column) or a column object (using the `df.colName` syntax). When you pass a column object, you can perform operations like addition or subtraction on the column to change the data contained in it, much like inside `.withColumn()`.

#### Difference between `.select()` and `.withColumn()`
The difference between `.select()` and `.withColumn()` methods is that `.select()` returns only the columns you specify, while `.withColumn()` returns all the columns of the DataFrame in addition to the one you defined. It's often a good idea to drop columns you don't need at the beginning of an operation so that you're not dragging around extra data as you're wrangling. In this case, you would use `.select()` and not `.withColumn()`.


In [0]:
# BY STRINGS: Selecting the columns tailnum, origin, and dest from flights by passing the column names as strings.
selected1 = flights.select('tailnum', 'origin', 'dest')

# BY BOOLEAN MASK: Selecting the columns origin, dest, and carrier using the df.colName
temp = flights.select(flights.origin, flights.dest, flights.carrier)

# Definiing boolean filters and filtering data based on these filters
filterA = flights.origin == "SEA"
filterB = flight.dest == "PDX"
selected2  = flights.filter(filterA).filter(filterB)

# Part 3: Getting started with machine learning pipelines
PySpark has built-in, cutting-edge machine learning routines, along with utilities to create full machine learning pipelines. We'll learn about them in this part.

# Part 4: Model tuning and selection
In this last part, we'll apply what you've learned to create a model that predicts which flights will be delayed.