## Docker Spark setup

This notebook is meant to run on a spark 2 docker container. First i'll describe the steps to set it up.

On a Linux based system install Docker and Docker-compose.Create this file : docker-compose.yml. The contents is listed below.  Then run: docker-compose build . Afterwards run this command : docker-compose build -d



```
version: "2"

services:
  master:
    image: singularities/spark
    command: start-spark master
    hostname: master
    ports:
      - "6066:6066"
      - "7070:7070"
      - "8080:8080"
      - "50070:50070"
      - "8888:8888"
  worker:
    image: singularities/spark
    command: start-spark worker master
    environment:
      SPARK_WORKER_CORES: 1
      SPARK_WORKER_MEMORY: 2g
    links:
      - master
```

```
With docker ps , check if the master and worker containers are running.
Connect to the master node:
docker exec -it [container id master] bash
On the master node continue with setting up as described below.
```

## Spark and conda env setup

```
First install Anaconda 4 (latest version) on the Docker container with Spark Master. Then install a new Conda environment for Spark, using python 3.5 (3.6 has a bug).  

conda create -n spark python=3.5
source activate spark
conda install notebook ipykernel
ipython kernel install --user --name spark --display-name spark

Make jupyter start script, and run it:
PYSPARK_PYTHON=/root/anaconda3/envs/spark/bin/python
PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS='notebook --ip=0.0.0.0 --port=8888' $SPARK_HOME/bin/pyspark

Now go to the url it gives (http://0.0.0.0:8888/<some code>)
, Run the nodebook sections.
```


In [84]:
#Start this in spark conda env to test


from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
import pyspark.sql.functions as fn


## Example data

```
This example works if you clone https://github.com/PacktPublishing/Learning-PySpark

and make sure its in /root/learningPySpark on the Docker container with Spark Master. 

To install git on this container run command: apt-get install git
, on github (or bitbucket) create a repository so you can save changes from the container and push it to Github. Use the following commands on the Docker container to init and push the data :

git init
git add <your file>
git commit -m "first commit"
git remote add origin https://github.com/michelnossin/pyspark_training_docker.git
git push -u origin master
```

In [111]:
#Execute this section to test Spark
flights = "file:/root/learningPySpark/Chapter03/flight-data/departuredelays.csv" 
airports = "file:/root/learningPySpark/Chapter03/flight-data/airport-codes-na.txt" 
airports_df = spark.read.csv(airports,header='true',inferSchema='true',sep='\t')
airports_df.createOrReplaceTempView("airports")
flights_df = spark.read.csv(flights,header='true')
flights_df.createOrReplaceTempView("flights")
flights_df.cache()

DataFrame[date: string, delay: string, distance: string, origin: string, destination: string]

## First look at data:

```
source activate spark
python -m pip install pandas
```

In [112]:
import pandas as pd

In [113]:
spark.sql("select count(1) from flights").toPandas()

Unnamed: 0,count(1)
0,1391578


In [114]:
flights_df.limit(5).toPandas()

Unnamed: 0,date,delay,distance,origin,destination
0,1011245,6,602,ABE,ATL
1,1020600,-8,369,ABE,DTW
2,1021245,-2,602,ABE,ATL
3,1020605,-4,602,ABE,ATL
4,1031245,-4,602,ABE,ATL


In [115]:
airports_df.limit(5).toPandas()

Unnamed: 0,City,State,Country,IATA
0,Abbotsford,BC,Canada,YXX
1,Aberdeen,SD,USA,ABR
2,Abilene,TX,USA,ABI
3,Akron,OH,USA,CAK
4,Alamosa,CO,USA,ALS


## Cleaning data

Your data can be stained with duplicates, missing observations and outliers, non- existent addresses, wrong phone numbers and area codes, inaccurate geographical coordinates, wrong dates, incorrect labels, mixtures of upper and lower cases, trailing spaces, and many other more subtle problems. It is your job to clean it, irrespective of whether you are a data scientist or data engineer,

### Duplicate rows check and remove

In [116]:
def showDuplicateRowsCount(df):
    'Show row count with full duplicated rows'
    print("====Checking table duplicate rows =====")
    print('Count of rows: {0}'.format(df.count()))
    print('Count of distinct rows: {0}'.format(df.distinct().count()))
    print('===> nr of duplicate rows {0}'.format(df.count()-df.distinct().count()))
def showDuplicatesColumnCount(df,col):
    'Show duplicate rows based on a specific (id) col.'
    print("=====Checking col {0}".format(col))
    print('Count of values: {0}'.format(df.count()))
    distinct_col_count = df.select([
           c for c in df.columns if c != col
       ]).distinct().count()
    print('Count of distinct column values: {0}'.format(distinct_col_count))
    print ("====> duplicate count {0}".format(df.count() - distinct_col_count))
def showDuplicateColumnsCount(df):
    'Show duplicate rows based on all columns in a dataframe'
    for col in df.columns:
        showDuplicatesColumn(df,col)
def dropDuplicateColumn(df,col):
    'drop rows with duplicate columns based on certain (id) column'
    df = df.dropDuplicates(subset=[
       c for c in df.columns if c != col
    ])
def getDFDuplicateColumns(df,col,new_col):
    duplicate_df = df.select([
           c for c in df.columns if c != col
       ]).distinct()
    
    return(duplicate_df.withColumn(new_col, \
                            fn.monotonically_increasing_id()))
    

In [117]:
#Check duplicates rows, same value?
df_flights = spark.sql("select * from flights")
showDuplicateRowsCount(df_flights)
df_airports= spark.sql("select * from airports")
showDuplicateRowsCount(df_airports)


====Checking table duplicate rows =====
Count of rows: 1391578
Count of distinct rows: 1391071
===> nr of duplicate rows 507
====Checking table duplicate rows =====
Count of rows: 526
Count of distinct rows: 526
===> nr of duplicate rows 0


In [81]:
#pure duplicates, could drop them, however without flightname we cant be
#sure if 2 flights leave at the same schedule time with the same route

#df_flights =df_flights.dropDuplicates()
#showDuplicateRowsCount(df_flights)

### Duplicate columns check

In [118]:
#airports IATA should be uniq. It seems 15 rows have identical data 
#but different IATA code
showDuplicatesColumnCount(df_airports,'IATA')

=====Checking col IATA
Count of values: 526
Count of distinct column values: 511
====> duplicate count 15


```
We could call dropDuplicateColumn(df_airports,'IATA')

However this would delete rows without knowing the correct IATA. 
The Flights tables does not have uniq field like flightname,
so will not delete any rows there either.
```

In [120]:
df_duplicate_airports = getDFDuplicateColumns(df_airports,'IATA','new_id')
#df_duplicate_airports.toPandas()