# **Big Data with Spark in Google Colab**

## Spark and Colaboratory setup

In [None]:
# Install spark-related depdencies for Python

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz
!tar xf spark-2.4.3-bin-hadoop2.7.tgz

!pip install -q findspark
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/37/98/244399c0daa7894cdf387e7007d5e8b3710a79b67f3fd991c0b0b644822d/pyspark-2.4.3.tar.gz (215.6MB)
[K     |████████████████████████████████| 215.6MB 98kB/s 
[?25hCollecting py4j==0.10.7 (from pyspark)
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 40.7MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Stored in directory: /root/.cache/pip/wheels/8d/20/f0/b30e2024226dc112e256930dd2cd4f06d00ab053c86278dcf3
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.3


In [None]:
# Set up required environment variables

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.3-bin-hadoop2.7"

In [None]:
# Point Colaboratory to your Google Drive

from google.colab import drive
drive.mount('/content/gdrive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /content/gdrive


## Data download to Google Drive

In [None]:
# Download datasets directly to your Google Drive "Colab Datasets" folder

import requests

# 2007 data

file_url = "http://stat-computing.org/dataexpo/2009/2007.csv.bz2"

r = requests.get(file_url, stream = True) 

with open("/content/gdrive/My Drive/Colab Datasets/2007.csv.bz2", "wb") as file: 
	for block in r.iter_content(chunk_size = 1024): 
		if block: 
			file.write(block)

# 2008 data

file_url = "http://stat-computing.org/dataexpo/2009/2008.csv.bz2"

r = requests.get(file_url, stream = True) 

with open("/content/gdrive/My Drive/Colab Datasets/2008.csv.bz2", "wb") as file: 
	for block in r.iter_content(chunk_size = 1024): 
		if block: 
			file.write(block)

##  **Import** tools from PySpark


In [None]:
# Tools we need to connect to the Spark server, load our data, clean it, and prepare, execute, and evaluate a model

from pyspark import SparkContext
from pyspark.sql import SparkSession

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

from pyspark.sql.functions import isnan, when, count, col

## Set Constants

In [None]:
CSV_2007= "/content/gdrive/My Drive/Colab Datasets/2007.csv.bz2" 
CSV_2008= "/content/gdrive/My Drive/Colab Datasets/2008.csv.bz2"
APP_NAME = "Flight Delays"
SPARK_URL = "local[*]"
RANDOM_SEED = 141109
TRAINING_DATA_RATIO = 0.7
RF_NUM_TREES = 8
RF_MAX_DEPTH = 4
RF_NUM_BINS = 32

## Connect to the server and load data

In [None]:
# Connect to the Spark server

spark = SparkSession.builder.appName(APP_NAME).master(SPARK_URL).getOrCreate()

# Load datasets

df_2007 = spark.read.options(header="true",inferschema = "true").csv(CSV_2007)
df_2008 = spark.read.options(header="true",inferschema = "true").csv(CSV_2008)

# We concatenate both datasets

df = df_2007.unionAll(df_2008)

## Prepare, clean and validate the data



In [None]:
# What's the data shape before starting cleaning ?

print(f"The shape is {df.count():d} rows by {len(df.columns):d} columns.")

The shape is 14462943 rows by 29 columns.


In [None]:
# What's the number of null values ?

null_counts = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) 
                         for c in df.columns]).toPandas().to_dict(orient='records')

print(f"We have {sum(null_counts[0].values()):d} null values in this dataset.")

We have 14248147 null values in this dataset.


In [None]:
# Drop null columns and inputs ?

df = df.drop(df.CancellationCode)
df = df.na.drop()

In [None]:
# Confirm there are no null values

null_counts = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) 
                         for c in df.columns]).toPandas().to_dict(orient='records')

print(f"We have {sum(null_counts[0].values()):d} null values in this dataset.")

We have 0 null values in this dataset.


In [None]:
# What's the data shape after cleaning ?

print(f"The shape is {df.count():d} rows by {len(df.columns):d} columns.")

The shape is 14379556 rows by 28 columns.


## Set up and run our classifier in Spark

In [None]:
# What are the column's type ?

df.dtypes

[('Year', 'int'),
 ('Month', 'int'),
 ('DayofMonth', 'int'),
 ('DayOfWeek', 'int'),
 ('DepTime', 'string'),
 ('CRSDepTime', 'int'),
 ('ArrTime', 'string'),
 ('CRSArrTime', 'int'),
 ('UniqueCarrier', 'string'),
 ('FlightNum', 'int'),
 ('TailNum', 'string'),
 ('ActualElapsedTime', 'string'),
 ('CRSElapsedTime', 'string'),
 ('AirTime', 'string'),
 ('ArrDelay', 'string'),
 ('DepDelay', 'string'),
 ('Origin', 'string'),
 ('Dest', 'string'),
 ('Distance', 'int'),
 ('TaxiIn', 'string'),
 ('TaxiOut', 'string'),
 ('Cancelled', 'int'),
 ('Diverted', 'int'),
 ('CarrierDelay', 'string'),
 ('WeatherDelay', 'string'),
 ('NASDelay', 'string'),
 ('SecurityDelay', 'string'),
 ('LateAircraftDelay', 'string')]

In [None]:
# Create list of feature columns

feature_cols = ['Year', 'Month', 'DayofMonth', 'DayOfWeek', 'CRSDepTime', 
                'CRSArrTime', 'FlightNum', 'Distance', 'Diverted']

In [None]:
# Generate and create our new feature vector column

df = VectorAssembler(inputCols=feature_cols, outputCol="features").transform(df)

In [None]:
# Select input columns

df.select("Cancelled", "features").show(5)

+---------+--------------------+
|Cancelled|            features|
+---------+--------------------+
|        0|[2007.0,1.0,1.0,1...|
|        0|[2007.0,1.0,1.0,1...|
|        0|[2007.0,1.0,1.0,1...|
|        0|[2007.0,1.0,1.0,1...|
|        0|[2007.0,1.0,1.0,1...|
+---------+--------------------+
only showing top 5 rows



In [None]:
# Build the training indexers

# Generate a labelIndexer
labelIndexer = StringIndexer(inputCol="Cancelled", outputCol="indexedLabel").fit(df)

# Generate the indexed feature vector
featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(df)
    
# Split the data into training and tests sets
(trainingData, testData) = df.randomSplit([TRAINING_DATA_RATIO, 1 - TRAINING_DATA_RATIO])

# Train the RandomForest model
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=RF_NUM_TREES)

# Chain indexers and the forest models in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf])

In [None]:
# Train the model

model = pipeline.fit(trainingData)

In [None]:
# Make predictions

predictions = model.transform(testData)

## Evaluate our model

In [None]:
# Select prediction, true label and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Test Error = {(1.0 - accuracy):g}")
print(f"Accuracy = {accuracy:g}")

Test Error = 0.0149426
Accuracy = 0.985057
