<div style="float:right; padding-top: 15px; padding-right: 15px">
    <div>
        <a href="https://whiteboxml.com">
            <img src="https://whiteboxml.com/static/img/logo/black_bg_white.svg" width="250">
        </a>
    </div>
</div>

# Spark

## 1. Introduction

* When dataset size exceeds your computer memory (RAM or even storage), [Big Data](https://en.wikipedia.org/wiki/Big_data) tools are used to break the dataset in chunks and process it step by step
* Big Data tools allow you to make this process automatically and take care of everything under the hood with little extra code
* [Spark](https://spark.apache.org/) is the most popular Big Data framework so far
* Spark syntax resembles pandas API with some differences

## 2. Installation

You need Java to run Spark. It is written in [Scala](https://www.scala-lang.org/), a JVM based (and functional style) language

### 2.1 Java installation

#### 2.1.1 Conda

```
conda install openjdk -y
```

#### 2.1.2 Apt

```
sudo apt install default-jdk
```

### 2.2 PySpark installation

#### 2.2.1 Conda

```
conda install pyspark -y
```

#### 2.2.2 Pip

```
pip install pyspark
```

## 3. Setup

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
            .appName('big_data_session') \
            .master('local[*]') \
            .config('spark.ui.showConsoleProgress', True) \
            .config('spark.sql.repl.eagerEval.enabled', True) \
            .config('spark.sql.session.timeZone', 'UTC') \
            .getOrCreate()

In [None]:
spark

## 4. Data source

All data used in this workshop will be downloaded from [datamarket.es](https://datamarket.es/), the reference website for retrieving external data in Spain. Two sources has been sampled:

- Renfe trips
- Supermarket products

## 5. Data processing

### 5.1 Renfe trips

In [None]:
DATA_PATH = '/home/ubuntu/Desktop/renfe.csv'

sdf = spark.read.option('quote', '"').option('escape', '\\').csv(DATA_PATH, 
                                                                 header=True, 
                                                                 inferSchema=True)

sdf

__VERY IMPORTANT INFO: sdf is a Spark DataFrame, which means it is a distributed DataFrame, not a typical Python object that lives in RAM memory (Pandas DataFrame)__

- From DataBricks (Spark creators) about what a Spark DataFrame is:

_"In Spark, a DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a dataframe in Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs (Resilient Distributed Datasets)"_

- Spark DataFrames do no live in computers / cluster nodes memory, they are evaluated at the time some calculations are required

#### types

In [None]:
sdf.dtypes

In [None]:
from pyspark.sql import functions as sf

from pyspark.sql.types import TimestampType

date_cols_meta = ['departure', 'arrival', 'insert_date']

for dt_col in date_cols_meta:
    sdf = sdf.withColumn(dt_col, sf.to_timestamp(dt_col))

sdf

#### sample data

In [None]:
sdf_sample = sdf.sample(fraction=0.1, withReplacement=False)

sdf_sample

#### persist data

In [None]:
SAMPLE_PATH = '/home/ubuntu/Desktop/renfe_sample'

sdf_sample.write.mode('overwrite').parquet(SAMPLE_PATH)

#### query data

In [None]:
sdf_sample = spark.read.parquet(SAMPLE_PATH)

sdf_sample

In [None]:
sdf_sample.select(['origin', 'destination']).limit(5)

#### filter data

In [None]:
meta_filter = sf.col('meta') != '{}'
duration_filter = sf.col('duration') < 4.0
seats_filter = sf.col('seats').isNotNull()

sdf_filtered = sdf_sample.filter(meta_filter & duration_filter & seats_filter)

sdf_filtered

#### create new columns

In [None]:
from pyspark.sql.types import IntegerType

sdf_filtered.withColumn('duration_computed', (sf.col('arrival').cast(IntegerType()) - sf.col('departure').cast(IntegerType())) / 3600)

#### make aggregations

In [None]:
sdf_filtered.groupby(['origin', 'destination']).agg({'price': 'mean'})

In [None]:
sdf_filtered.count()

#### apply custom functions

In [None]:
sdf_filtered.select(['meta']).show(truncate=False)

In [None]:
import json

@sf.udf('integer')
def get_first_class_first_fare_seats(meta):
    try:
        meta_dict = eval(meta)
        first_available_class = [*meta_dict][0]
        first_available_fare = [*meta_dict[first_available_class]][0]
        seats = meta_dict[first_available_class][first_available_fare]['seats']
        return seats

    except:
        return 0

In [None]:
sdf_filtered = sdf_filtered.withColumn('seats_first_class_first_fare', get_first_class_first_fare_seats(sf.col('meta')))

sdf_filtered

#### rename columns

In [None]:
sdf_filtered = sdf_filtered.withColumnRenamed('seats', 'seats_cheapest_class_cheapest_fare')

sdf_filtered

#### create virtual sql tables and query them

In [None]:
sdf_filtered.createTempView('renfe')

In [None]:
SQL_QUERY = """
select
origin,
destination,
avg(price) as mean_price, 
avg(seats_cheapest_class_cheapest_fare) as mean_seats
from renfe
group by origin, destination
order by mean_price desc
"""

In [None]:
routes_prices_sdf = spark.sql(SQL_QUERY)

routes_prices_sdf

#### transform Spark DataFrame into pandas DataFrame

In [None]:
routes_prices_df = routes_prices_sdf.toPandas()

routes_prices_df

### 5.2 Supermarket products

In [None]:
DATA_PATH = '/home/ubuntu/Desktop/supermarkets.csv'

sdf = spark.read.csv(DATA_PATH, 
                     header=True, 
                     inferSchema=True)

sdf

#### write partitioned

In [None]:
date_col = 'insert_date'

sdf = sdf.withColumn('year', sf.year(date_col))
sdf = sdf.withColumn('month', sf.month(date_col))
sdf = sdf.withColumn('day', sf.dayofmonth(date_col))

sdf

In [None]:
OUTPUT_PATH = '/home/ubuntu/Desktop/supermarkets_partitioned'

sdf.write.partitionBy('year', 'month', 'day').mode('overwrite').parquet(OUTPUT_PATH)

#### eda

In [None]:
sdf.select('supermarket', 'name').distinct().groupby('supermarket').count()

In [None]:
sdf.select(sf.min(date_col))

In [None]:
sdf.select(sf.max(date_col))

In [None]:
max_insert_date = sdf.select(sf.max(date_col).alias('max_insert_date')).collect()[0]['max_insert_date']

max_insert_date

In [None]:
sdf.filter(sf.col(date_col) == max_insert_date) \
  .groupby('supermarket', 'category') \
  .agg(sf.min('price') \
  .alias('min_price')) \
  .sort('min_price')

In [None]:
supermarket_filter = sf.col('supermarket') == 'mercadona-es'
category_filter = sf.col('category') == 'panaderia_y_pasteleria_pan_de_horno'
date_filter = sf.col(date_col) == max_insert_date

sdf.filter(supermarket_filter & category_filter & date_filter).sort('price', sf.col('reference_price').desc())

<div style="padding-top: 25px; float: right">
    <div>    
        <i>&nbsp;&nbsp;© Copyright by</i>
    </div>
    <div>
        <a href="https://whiteboxml.com">
            <img src="https://whiteboxml.com/static/img/logo/black_bg_white.svg" width="125">
        </a>
    </div>
</div>