# Porting an analysis from local to distributed

<a href = "http://yogen.io"><img src="http://yogen.io/assets/logo.svg" alt="yogen" style="width: 200px; float: right;"/></a>

Now comes the opportunity to put in practice what we have just learned!

### If you are running this notebook in Google Colab

Copy the following to a code cell and run it. It will install and set up Spark for you.

```python
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.uvigo.es/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar -xf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark

import os
import findspark
from pyspark.sql import SparkSession

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"
findspark.init()
spark = SparkSession.builder.master("local[*]").getOrCreate()
```

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.uvigo.es/spark/spark-2.4.6/spark-2.4.6-bin-hadoop2.7.tgz
!tar -xf spark-2.4.6-bin-hadoop2.7.tgz
!pip install -q findspark pyspark==2.4.6
 
import os
import findspark
from pyspark.sql import SparkSession
 
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.6-bin-hadoop2.7"
findspark.init()
spark = SparkSession.builder.master("local[*]").getOrCreate()

[K     |████████████████████████████████| 218.4MB 59kB/s 
[K     |████████████████████████████████| 204kB 42.7MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


# Guided exercise

Recreate the boxplot we did in the pandas section, in Spark!

Since matplotlib boxplot needs all the data and that would be unfeasible with Big Data, we will calculate the quartiles ourselves.

Once the analysis is ported, we will be able to run it on the whole historical series! You can find it at https://transtats.bts.gov (On time performance reporting carrier).

##  Workflow

The basic idea is the same that we applied in the Amadeus Challenge:

* Build prototype with small data: in this section, we will be using `06-intro_to_pandas_practical.ipynb` as our already made prototype

* Modify your prototype so that it works with Big Data: In this case, it means porting it to Spark

* Test your "Big Data" prototype with small data: We will first test it with a sample locally, then upload it to a cluster and test it with Big Data.

    * You can run your analyses building your own cluster and storage bucket in Google Cloud Storage. More in notebook #4!

* Run your prototype with Big Data.

    


## Modify the prototype so that it works with Big Data

## Read csv

We'll use the `SparkSession.read.csv` method.

In [None]:
df = spark.read.csv('/content/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_12.csv.gz', header = True)
df.show(10)

+----+-------+-----+----------+---------+----------+-----------------+------------------------+---------------------------+-----------+-------------------------------+---------------+------------------+------------------+------+--------------+-----------+---------------+---------------+---------+-------------+----------------+----------------+----+----------------+---------+-------------+-------------+-------+----------+-------+--------+---------------+--------+--------------------+----------+-------+---------+--------+------+----------+-------+--------+---------------+--------+------------------+----------+---------+----------------+--------+--------------+-----------------+-------+-------+--------+-------------+------------+------------+--------+-------------+-----------------+------------+-------------+---------------+------------------+--------------+--------------------+-----------+-----------+-----------+-------------+----------------+------------+--------------+----------------+

## Select relevant columns

Literally the same syntax as Pandas!

```python
df = [['FlightDate', 'DayOfWeek', 'Reporting_Airline', 'Tail_Number', 'Flight_Number_Reporting_Airline', 'Origin', 
                'OriginCityName', 'OriginStateName', 'Dest', 'DestCityName', 'DestStateName',
                'DepTime', 'DepDelay', 'AirTime', 'Distance']]

df
```

In [None]:
df = df[['FlightDate', 'DayOfWeek', 'Reporting_Airline', 'Tail_Number', 'Flight_Number_Reporting_Airline', 'Origin', 
                'OriginCityName', 'OriginStateName', 'Dest', 'DestCityName', 'DestStateName',
                'DepTime', 'DepDelay', 'AirTime', 'Distance']]
 
df

DataFrame[FlightDate: string, DayOfWeek: string, Reporting_Airline: string, Tail_Number: string, Flight_Number_Reporting_Airline: string, Origin: string, OriginCityName: string, OriginStateName: string, Dest: string, DestCityName: string, DestStateName: string, DepTime: string, DepDelay: string, AirTime: string, Distance: string]

### Extract "Hour" variable

The DepTimes have been inferred to be floats. We need them as ints, representing each o fthe 24 hours in a day.

In [None]:
df.select(df['DepTime']).show(10)

+-------+
|DepTime|
+-------+
|   1048|
|   0638|
|   1710|
|   1318|
|   0953|
|   1646|
|   1813|
|   1450|
|   0953|
|   1219|
+-------+
only showing top 10 rows



In [None]:
from pyspark.sql import types

x = 548

df2 = df.withColumn('Hour', (df['DepTime'] / 100).cast(types.IntegerType()))

## Generate the relative distributions

In order to be able to handle the data, we need to reduce its dimensionality. Since we want to describe a discrete distribution, we can just count how many values of each level of the 'DepDelay' variable we find for each hour (24 different discrete distributions). We also want the totals in order to do the relative distribution.

### Totals

In [None]:
totals_per_hour = df2.groupBy('Hour').count()
totals_per_hour.show(25)

+----+-----+
|Hour|count|
+----+-----+
|  12|36925|
|  22|15646|
|null| 6526|
|   1|  846|
|  13|33163|
|   6|37398|
|  16|34761|
|   3|  200|
|  20|28354|
|   5|22597|
|  19|30966|
|  15|35691|
|  17|36591|
|   9|34355|
|   4| 1374|
|   8|36357|
|  23| 5835|
|   7|35336|
|  10|34600|
|  24|   40|
|  21|19028|
|  11|36740|
|  14|34619|
|   2|  276|
|   0| 2283|
+----+-----+
only showing top 25 rows



In [None]:
totals_per_delay_hour = df2.groupBy('DepDelay', 'Hour').count()
totals_per_delay_hour.show(25)
totals_per_delay_hour

+--------+----+-----+
|DepDelay|Hour|count|
+--------+----+-----+
|    0.00|  10| 2004|
|    7.00|  13|  399|
|   46.00|   9|   37|
|  105.00|  17|   26|
|   38.00|  14|   78|
|  184.00|  14|    4|
|   13.00|  11|  288|
|   90.00|  20|   32|
|   78.00|  15|   29|
|   49.00|  11|   28|
|   75.00|   7|    7|
|  147.00|  12|   10|
|   34.00|  23|   23|
|   46.00|   8|   25|
|  166.00|  17|    8|
|  243.00|  20|    4|
|  -15.00|   5|   97|
|  -14.00|  13|  151|
|  111.00|   9|    7|
|   68.00|  11|   18|
|  168.00|  22|    7|
|  484.00|  15|    2|
|  126.00|  10|    7|
|  266.00|  23|    2|
|  214.00|  10|    1|
+--------+----+-----+
only showing top 25 rows



DataFrame[DepDelay: string, Hour: int, count: bigint]

In [None]:
df.count()

593842

In [None]:
joined = totals_per_delay_hour.join(totals_per_hour, on = 'Hour')
joined.show()

+----+--------+-----+-----+
|Hour|DepDelay|count|count|
+----+--------+-----+-----+
|  10|    0.00| 2004|34600|
|  13|    7.00|  399|33163|
|   9|   46.00|   37|34355|
|  17|  105.00|   26|36591|
|  14|   38.00|   78|34619|
|  14|  184.00|    4|34619|
|  11|   13.00|  288|36740|
|  20|   90.00|   32|28354|
|  15|   78.00|   29|35691|
|  11|   49.00|   28|36740|
|   7|   75.00|    7|35336|
|  12|  147.00|   10|36925|
|  23|   34.00|   23| 5835|
|   8|   46.00|   25|36357|
|  17|  166.00|    8|36591|
|  20|  243.00|    4|28354|
|   5|  -15.00|   97|22597|
|  13|  -14.00|  151|33163|
|   9|  111.00|    7|34355|
|  11|   68.00|   18|36740|
+----+--------+-----+-----+
only showing top 20 rows



In [None]:
relative_freqs = joined.select('Hour',
                               'Depdelay',
                               (totals_per_delay_hour['count'] / totals_per_delay_hour['count']).alias('relative_freq')).cache()

# relative_freqs = joined.withColumn('relative_freq',(totals_per_delay_hour['count'] / totals_per_delay_hour['count']).alias('realtive_freq')).cache()


relative_freqs.show(10)

+----+--------+-------------+
|Hour|Depdelay|relative_freq|
+----+--------+-------------+
|  10|    0.00|          1.0|
|  13|    7.00|          1.0|
|   9|   46.00|          1.0|
|  17|  105.00|          1.0|
|  14|   38.00|          1.0|
|  14|  184.00|          1.0|
|  11|   13.00|          1.0|
|  20|   90.00|          1.0|
|  15|   78.00|          1.0|
|  11|   49.00|          1.0|
+----+--------+-------------+
only showing top 10 rows



### Distributions

In [None]:
from pyspark.sql import types, functions

def tupleize(a, b): return a, b

tupleize_udf = functions.udf(tupleize, returnType = types.StructField([types.StructField('a', types.DoubleType()),
                                                                       types.StructField('b', types.DoubleType())]))

tupled = relative_freqs.withColumn('tuples', tupleize_udf('DepDelay', 'relative_freq'))
tupled.printSchema()
tupled.show()


TypeError: ignored

In [None]:
distributions = tupled.groupby('Hour').agg(functions.collect_list('tuples'))
distributions.printSchema()
distributions.show()

#Collect_list -> Crea un DataFrame con el hour y una colección de listas 
# que se identifican con la hour. Colecciona los datos en listas.
# Colección de todos los valores individuales de Dep Delay a la hora en concreto...
# y el 5 que representan

In [None]:
#Ya es manejable y podríamos bajarlo a pandas y terminar con ello
import pandas as pd

distributions.toPandas()

#Pero llegados a este punto, ya continuámos en PySpark

Now we join both and calculate what fraction of the total for each hour each level of DepDelay represents.

### Generate distributions

We have to group on the hour. Each group will be a bunch of delays and the corresponding frequencies.

These groups are definitely manageable: the number of levels will be on the order of a few hundreds to a couple thousands. We can combine them into lists straight away.

Now it's be easy to use a UDF to merge the two lists and sort them.

Careful! If we keep that string return type, it might be problematic later.

### Calculating the quartiles

We are finally ready to calculate the quartiles! We will use a UDF.

The input to our custom function will be one of the distributions coded like we did: as a list of tuples `(value, relative_frequency)`. The quartiles are defined as the values at which we cross the 0.0, .25, .5, .75 and 1.00 relative frequencies. Since the distributions are ordered, we can just iterate over one while keeping track of what portion of the total distribution we have seen, and annotate where we cross the thresholds.

In [None]:
import random 

delays = list(range(-20, 20, 3))
random.shuffle(delays)
delays

[7, 19, 1, 10, -2, -17, 4, -5, 13, -8, 16, -14, -11, -20]

In [None]:
ns = freqs = [ random.randint(0, 5000) for _ in delays ]
rel_freqs = [ n / sum(ns) for n in ns ]
rel_freqs

[0.12468401297142712,
 0.02627480019406072,
 0.01761867068406404,
 0.019074126088399763,
 0.024359727293618978,
 0.09708142889972678,
 0.11342338431682966,
 0.04774915098434747,
 0.12274340576564614,
 0.11618108929346577,
 0.10938896407323238,
 0.08193958583356739,
 0.08778694175624951,
 0.011694711845364247]

In [None]:
distribution = list(zip(delays, rel_freqs))
distribution

[(7, 0.12468401297142712),
 (19, 0.02627480019406072),
 (1, 0.01761867068406404),
 (10, 0.019074126088399763),
 (-2, 0.024359727293618978),
 (-17, 0.09708142889972678),
 (4, 0.11342338431682966),
 (-5, 0.04774915098434747),
 (13, 0.12274340576564614),
 (-8, 0.11618108929346577),
 (16, 0.10938896407323238),
 (-14, 0.08193958583356739),
 (-11, 0.08778694175624951),
 (-20, 0.011694711845364247)]

In [None]:
def quartiles(distribution):

  result = []
  acc = 0

  for delay, rel_freq in sorted(distribution):
    prev = acc
    acc += rel_freq

    if prev == 0.0 and acc > 0.0:
      result.append(delay)

    if prev <= .25 and acc > .25:
      result.append(delay)

    if prev <= .50 and acc > .50:
      result.append(delay)

    if prev <= .75 and acc > .75:
      result.append(delay)

  result.append(delay)

  return result

quartiles(distribution)

[-20, -11, 4, 13, 19]

Apply to the dataframe:

In [None]:
quartiles_udf = functions.udf(quartiles)

In [None]:
calculated_quartiles = distributions.select('Hour', quartiles_udf('collect_list(tuples)')).cache()
calculated_quartiles.printSchema()
calculated_quartiles.show()

### Plotting

We got it! Let's move this over to Pandas for convenient handling

In [None]:
pd_df = calculated_quartiles.toPandas()
pd_df.head()

And we are ready to plot!

In [None]:
for hour, quartiles in pd_df['Hour']:
  print(hour)

In [None]:
 qfor hour, quartiles in pd_df['quartiles(collect_list(tuples))']:
  print(eval(hour))

# Eval() evalua ese código python

In [None]:
xs = []
ys = []

for _, hour, quartiles in pdf_df.itertuples():

  for q in eval(quartiles):
    xs.append((hour))
    ys.append((quart))

xs, ys

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

plt.scatter(xs, ys)
plt.gcd().set_size_inches(12, 6)
plt.gca().set_ylim(-25, 50)

## Test your "Big Data" prototype with small data

### Summary

This is the whole process, collected in one place as is:

In [None]:
from __future__ import print_function
from pyspark.sql import types, functions, SparkSession
import sys

#Ruta de entrada y ruta de salida, argc[1] y argv[2] los tomo como un argumento del script.
if __name__=='__main__':

spark = SparkSession...
 
    file = sys.argv[1]
    out = sys.argv[2]

df = spark.read.csv('/content/On_Time_Reporting_Carrier_On_Time_Performance_(1987_present)_2018_12.csv.gz', header = True, inferSchema = True)
totals_per_hour = df2.groupBy('Hour').count()
totals_per_delay_hour = totals_per_hour.groupBy('DepDelay', 'Hour')

### Pyspark job

In order to run the process in a cluster, we need to transform it into a pyspark job file. 

We need to tidy up the function definitions, add the relevant imports, and modify the input and output to use command-line arguments.

We will put the result in a file called mysparkjob.py:

```python
from __future__ import print_function
from pyspark.sql import types, functions, SparkSession
import sys

def zipsort(a, b):
    return sorted(zip(a, b))

def quartiles(histogram):
    area = 0
    result = []
    
    for value, percentage in histogram:
        if area == 0:
            result.append(value)
        elif area <= .25 and area + percentage > .25:
            result.append(value)
        elif area <= .5 and area + percentage > .5:
            result.append(value)
        elif area <= .75 and area + percentage > .75:
            result.append(value)
        area += percentage
    
    result.append(value)
    return result

if __name__=='__main__':
    
    file = sys.argv[1]
    out = sys.argv[2]
    
    spark = SparkSession.builder.getOrCreate()
    df = spark.read.csv(file, header= True, inferSchema=True)
    df = df.select(['FlightDate', 'DayOfWeek', 'Reporting_Airline', 'Tail_Number', 'Flight_Number_Reporting_Airline', 'Origin', 
                    'OriginCityName', 'OriginStateName', 'Dest', 'DestCityName', 'DestStateName',
                    'DepTime', 'DepDelay', 'AirTime', 'Distance'])

    df2 = df.withColumn('Hour', (df['DepTime'] / 100).cast(types.IntegerType()))
    totals = df2.groupBy('Hour').count()
    distributions = df2.groupBy(['Hour', 'DepDelay']).count()
    annotated = distributions.join(totals, on='Hour')
    frequencies = annotated.withColumn('relative', distributions['count'] / totals['count'])
    groups = frequencies.groupBy(totals['Hour'])\
                        .agg(functions.collect_list('DepDelay').alias('delays'),
                             functions.collect_list('relative').alias('relatives'))



    zipsort_typed = functions.udf(zipsort, types.ArrayType(types.ArrayType(types.FloatType())))
    distributions = groups.withColumn('distributions', zipsort_typed('delays', 'relatives'))



    quartiles_udf = functions.udf(quartiles, returnType=types.ArrayType(types.FloatType()))

    result = distributions.select('Hour',
                                  quartiles_udf('distributions').alias('quartiles'))

    result.write.json(out)
    spark.stop()
```

### Running with spark-submit

If the following works, we are ready to test it in the cluster!

```python
spark-submit mysparkjob.py On_Time_On_Time_Performance_2015_8.csv out.csv
```

In [None]:
!spark-submit 