<div style="float:right; padding-top: 15px; padding-right: 15px">
    <div>
        <a href="https://whiteboxml.com">
            <img src="https://whiteboxml.com/static/img/logo/black_bg_white.svg" width="250">
        </a>
    </div>
</div>

# Spark

## 1. Introduction

* When dataset size exceeds your computer memory (RAM or even storage), [Big Data](https://en.wikipedia.org/wiki/Big_data) tools are used to break the dataset in chunks and process it step by step
* Big Data tools allow you to make this process automatically and take care of everything under the hood with little extra code
* [Spark](https://spark.apache.org/) is the most popular Big Data framework so far
* Spark syntax resembles pandas API with some differences

## 2. Installation

You need Java to run Spark. It is written in [Scala](https://www.scala-lang.org/), a JVM based (and functional style) language

### 2.1 Java installation

#### 2.1.1 Conda

```
conda install openjdk -y
```

#### 2.1.2 Apt

```
sudo apt install default-jdk
```

### 2.2 PySpark installation

#### 2.2.1 Conda

```
conda install pyspark -y
```

#### 2.2.2 Pip

```
pip install pyspark
```

## 3. Setup

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
            .appName('big_data_session') \
            .master('local[*]') \
            .config('spark.ui.showConsoleProgress', True) \
            .config('spark.sql.repl.eagerEval.enabled', True) \
            .config('spark.sql.session.timeZone', 'UTC') \
            .getOrCreate()

In [2]:
spark

## 4. Data source

All data used in this workshop will be downloaded from [datamarket.es](https://datamarket.es/), the reference website for retrieving external data in Spain. Two sources has been sampled:

- Renfe trips
- Supermarket products

## 5. Data processing

### 5.1 Renfe trips

In [3]:
DATA_PATH = '/home/ubuntu/Desktop/renfe.csv'

sdf = spark.read.option('quote', '"').option('escape', '\\').csv(DATA_PATH, 
                                                                 header=True, 
                                                                 inferSchema=True)

sdf

id,company,origin,destination,departure,arrival,duration,vehicle_type,vehicle_class,price,fare,seats,meta,insert_date
19,renfe,MADRID,BARCELONA,2019-04-18 12:30:00,2019-04-18 15:30:00,3.0,AVE,Turista,107.7,Flexible,,{},2019-04-11 21:49:46
23,renfe,MADRID,BARCELONA,2019-04-18 14:00:00,2019-04-18 16:30:00,2.5,AVE,Turista,100.4,Promo,,{},2019-04-11 21:49:46
33,renfe,MADRID,BARCELONA,2019-04-18 19:30:00,2019-04-18 22:40:00,3.17,AVE,Turista,85.1,Promo,,{},2019-04-11 21:49:46
35,renfe,MADRID,BARCELONA,2019-04-18 21:25:00,2019-04-18 23:55:00,2.5,AVE,,,,,{},2019-04-11 21:49:46
60,renfe,MADRID,BARCELONA,2019-05-18 16:30:00,2019-05-18 19:15:00,2.75,AVE,Turista Plus,80.15,Promo,,{},2019-04-11 21:49:48
67,renfe,MADRID,BARCELONA,2019-05-18 12:30:00,2019-05-18 15:30:00,3.0,AVE,Turista,75.4,Promo,,{},2019-04-11 21:49:48
80,renfe,MADRID,BARCELONA,2019-05-22 07:20:00,2019-05-22 09:50:00,2.5,AVE,Turista,100.4,Promo,,{},2019-04-11 21:50:04
89,renfe,MADRID,BARCELONA,2019-04-22 06:30:00,2019-04-22 09:20:00,2.83,AVE,Turista,75.4,Promo,,{},2019-04-11 21:50:04
109,renfe,MADRID,BARCELONA,2019-05-22 08:20:00,2019-05-22 11:05:00,2.75,AVE,Turista,40.95,Promo,,{},2019-04-11 21:50:04
123,renfe,MADRID,BARCELONA,2019-05-22 07:15:00,2019-05-22 16:37:00,9.37,R. EXPRES,Turista,43.25,Adulto ida,,{},2019-04-11 21:50:04


__VERY IMPORTANT INFO: sdf is a Spark DataFrame, which means it is a distributed DataFrame, not a typical Python object that lives in RAM memory (Pandas DataFrame)__

- From DataBricks (Spark creators) about what a Spark DataFrame is:

_"In Spark, a DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a dataframe in Python, but with richer optimizations under the hood. DataFrames can be constructed from a wide array of sources such as: structured data files, tables in Hive, external databases, or existing RDDs (Resilient Distributed Datasets)"_

- Spark DataFrames do no live in computers / cluster nodes memory, they are evaluated at the time some calculations are required

#### types

In [4]:
sdf.dtypes

[('id', 'int'),
 ('company', 'string'),
 ('origin', 'string'),
 ('destination', 'string'),
 ('departure', 'timestamp'),
 ('arrival', 'timestamp'),
 ('duration', 'double'),
 ('vehicle_type', 'string'),
 ('vehicle_class', 'string'),
 ('price', 'double'),
 ('fare', 'string'),
 ('seats', 'int'),
 ('meta', 'string'),
 ('insert_date', 'timestamp')]

In [5]:
from pyspark.sql import functions as sf

date_cols_meta = ['departure', 'arrival', 'insert_date']

for dt_col in date_cols_meta:
    sdf = sdf.withColumn(dt_col, sf.to_timestamp(dt_col))

sdf

id,company,origin,destination,departure,arrival,duration,vehicle_type,vehicle_class,price,fare,seats,meta,insert_date
19,renfe,MADRID,BARCELONA,2019-04-18 12:30:00,2019-04-18 15:30:00,3.0,AVE,Turista,107.7,Flexible,,{},2019-04-11 21:49:46
23,renfe,MADRID,BARCELONA,2019-04-18 14:00:00,2019-04-18 16:30:00,2.5,AVE,Turista,100.4,Promo,,{},2019-04-11 21:49:46
33,renfe,MADRID,BARCELONA,2019-04-18 19:30:00,2019-04-18 22:40:00,3.17,AVE,Turista,85.1,Promo,,{},2019-04-11 21:49:46
35,renfe,MADRID,BARCELONA,2019-04-18 21:25:00,2019-04-18 23:55:00,2.5,AVE,,,,,{},2019-04-11 21:49:46
60,renfe,MADRID,BARCELONA,2019-05-18 16:30:00,2019-05-18 19:15:00,2.75,AVE,Turista Plus,80.15,Promo,,{},2019-04-11 21:49:48
67,renfe,MADRID,BARCELONA,2019-05-18 12:30:00,2019-05-18 15:30:00,3.0,AVE,Turista,75.4,Promo,,{},2019-04-11 21:49:48
80,renfe,MADRID,BARCELONA,2019-05-22 07:20:00,2019-05-22 09:50:00,2.5,AVE,Turista,100.4,Promo,,{},2019-04-11 21:50:04
89,renfe,MADRID,BARCELONA,2019-04-22 06:30:00,2019-04-22 09:20:00,2.83,AVE,Turista,75.4,Promo,,{},2019-04-11 21:50:04
109,renfe,MADRID,BARCELONA,2019-05-22 08:20:00,2019-05-22 11:05:00,2.75,AVE,Turista,40.95,Promo,,{},2019-04-11 21:50:04
123,renfe,MADRID,BARCELONA,2019-05-22 07:15:00,2019-05-22 16:37:00,9.37,R. EXPRES,Turista,43.25,Adulto ida,,{},2019-04-11 21:50:04


#### sample data

In [6]:
sdf_sample = sdf.sample(fraction=0.1, withReplacement=False)

sdf_sample

id,company,origin,destination,departure,arrival,duration,vehicle_type,vehicle_class,price,fare,seats,meta,insert_date
67,renfe,MADRID,BARCELONA,2019-05-18 12:30:00,2019-05-18 15:30:00,3.0,AVE,Turista,75.4,Promo,,{},2019-04-11 21:49:48
80,renfe,MADRID,BARCELONA,2019-05-22 07:20:00,2019-05-22 09:50:00,2.5,AVE,Turista,100.4,Promo,,{},2019-04-11 21:50:04
89,renfe,MADRID,BARCELONA,2019-04-22 06:30:00,2019-04-22 09:20:00,2.83,AVE,Turista,75.4,Promo,,{},2019-04-11 21:50:04
152,renfe,MADRID,BARCELONA,2019-04-22 12:30:00,2019-04-22 15:30:00,3.0,AVE,Preferente,181.5,Flexible,,{},2019-04-11 21:50:04
313,renfe,MADRID,SEVILLA,2019-06-02 20:35:00,2019-06-02 23:15:00,2.67,AVE,Turista Plus,64.05,Promo,,{},2019-04-11 21:50:38
338,renfe,MADRID,SEVILLA,2019-06-02 18:00:00,2019-06-02 20:32:00,2.53,AVE,Preferente,69.4,Promo,,{},2019-04-11 21:50:38
370,renfe,MADRID,SEVILLA,2019-05-05 14:00:00,2019-05-05 16:32:00,2.53,AVE,Turista,53.4,Promo,,{},2019-04-11 21:50:48
380,renfe,MADRID,SEVILLA,2019-05-05 19:00:00,2019-05-05 21:38:00,2.63,AVE,Turista,60.3,Promo,,{},2019-04-11 21:50:48
415,renfe,MADRID,SEVILLA,2019-05-06 09:00:00,2019-05-06 11:38:00,2.63,AVE,Turista,76.3,Flexible,,{},2019-04-11 21:50:52
431,renfe,MADRID,SEVILLA,2019-05-06 19:00:00,2019-05-06 21:38:00,2.63,AVE,Turista,53.4,Promo,,{},2019-04-11 21:50:52


#### persist data

In [7]:
SAMPLE_PATH = '/home/ubuntu/Desktop/renfe_sample'

sdf_sample.write.mode('overwrite').parquet(SAMPLE_PATH)

#### query data

In [8]:
sdf_sample = spark.read.parquet(SAMPLE_PATH)

sdf_sample

id,company,origin,destination,departure,arrival,duration,vehicle_type,vehicle_class,price,fare,seats,meta,insert_date
19790784,renfe,MADRID,MALAGA,2020-03-02 13:00:00,2020-03-02 16:05:00,3.08,AVE,Turista,81.2,Flexible,,"{""Turista"": {""Fle...",2020-02-22 00:56:13
19790789,renfe,MADRID,MALAGA,2020-04-17 19:30:00,2020-04-17 22:23:00,2.88,AVE,Turista,64.15,Promo,,"{""Turista"": {""Pro...",2020-02-22 00:56:13
19790821,renfe,MADRID,MALAGA,2020-04-13 14:35:00,2020-04-13 17:21:00,2.77,AVE,Turista,50.35,Promo,,"{""Turista"": {""Pro...",2020-02-22 00:56:14
19790849,renfe,MADRID,MALAGA,2020-04-21 11:35:00,2020-04-21 14:17:00,2.7,AVE,Turista,43.85,Promo,,"{""Turista"": {""Pro...",2020-02-22 00:56:15
19790911,renfe,MADRID,SEVILLA,2020-02-22 17:00:00,2020-02-22 19:38:00,2.63,AVE,Turista,77.1,Flexible,,"{""Turista"": {""Fle...",2020-02-22 00:56:17
19791062,renfe,MADRID,SEVILLA,2020-03-18 09:00:00,2020-03-18 11:38:00,2.63,AVE,Turista,53.95,Promo,,"{""Turista"": {""Pro...",2020-02-22 00:56:33
19791121,renfe,MADRID,SEVILLA,2020-03-15 13:10:00,2020-03-15 20:26:00,7.27,MD-LD,Turista con enlace,34.35,Promo +,,"{""Turista con enl...",2020-02-22 00:56:36
19791202,renfe,MADRID,SEVILLA,2020-03-20 21:25:00,2020-03-21 00:10:00,2.75,AV City,Turista,49.7,Promo,,"{""Turista"": {""Pro...",2020-02-22 00:56:41
19791275,renfe,MADRID,SEVILLA,2020-03-26 08:30:00,2020-03-26 11:14:00,2.73,ALVIA,Turista,67.9,Flexible,,"{""Turista"": {""Fle...",2020-02-22 00:56:45
19791396,renfe,MADRID,SEVILLA,2020-03-09 15:05:00,2020-03-09 18:30:00,3.42,LD-AVE,,,,,{},2020-02-22 00:56:50


In [9]:
sdf_sample.select(['origin', 'destination']).limit(5)

origin,destination
MADRID,MALAGA
MADRID,MALAGA
MADRID,MALAGA
MADRID,MALAGA
MADRID,SEVILLA


#### filter data

In [10]:
meta_filter = sf.col('meta') != '{}'
duration_filter = sf.col('duration') < 4.0
seats_filter = sf.col('seats').isNotNull()

sdf_filtered = sdf_sample.filter(meta_filter & duration_filter & seats_filter)

sdf_filtered

id,company,origin,destination,departure,arrival,duration,vehicle_type,vehicle_class,price,fare,seats,meta,insert_date
37402874,renfe,MADRID,BARCELONA,2020-10-18 19:00:00,2020-10-18 21:30:00,2.5,AVE,Turista,128.5,Flexible,259,"{""Turista"": {""Fle...",2020-09-28 16:10:...
37403016,renfe,MADRID,BARCELONA,2020-11-02 07:30:00,2020-11-02 10:40:12,3.17,AVE,Turista,91.5,Promo +,258,"{""Turista"": {""Pro...",2020-09-28 16:10:...
37403170,renfe,MADRID,BARCELONA,2020-10-27 07:00:00,2020-10-27 09:30:00,2.5,AVE,Turista,86.1,Promo +,334,"{""Turista"": {""Pro...",2020-09-28 16:10:...
37403380,renfe,MADRID,BARCELONA,2020-11-22 14:30:00,2020-11-22 17:25:12,2.92,AVE,Turista,108.9,Flexible,246,"{""Turista"": {""Fle...",2020-09-28 16:10:...
37403501,renfe,MADRID,BARCELONA,2020-11-17 09:30:00,2020-11-17 12:34:12,3.07,AVE,Turista Plus,87.55,Promo +,50,"{""Turista"": {""Fle...",2020-09-28 16:10:...
37403561,renfe,BARCELONA,MADRID,2020-11-10 19:00:00,2020-11-10 22:00:00,3.0,AVE,Turista,108.9,Flexible,263,"{""Turista"": {""Fle...",2020-09-28 16:11:...
37404034,renfe,BARCELONA,MADRID,2020-11-26 06:05:00,2020-11-26 09:20:00,3.25,AVE,Turista,108.9,Flexible,263,"{""Turista"": {""Fle...",2020-09-28 16:11:...
37404067,renfe,BARCELONA,MADRID,2020-11-15 19:00:00,2020-11-15 22:00:00,3.0,AVE,Turista,108.9,Flexible,331,"{""Turista"": {""Fle...",2020-09-28 16:11:...
37404135,renfe,BARCELONA,MADRID,2020-10-06 17:00:00,2020-10-06 19:45:00,2.75,AVE,Turista,108.9,Flexible,295,"{""Turista"": {""Fle...",2020-09-28 16:11:...
37404388,renfe,MADRID,CÓRDOBA,2020-11-11 14:35:00,2020-11-11 16:17:00,1.7,AVE,Turista,27.25,Promo +,294,"{""Turista"": {""Pro...",2020-09-28 16:12:...


#### create new columns

In [11]:
from pyspark.sql.types import IntegerType

sdf_filtered.withColumn('duration_computed', (sf.col('arrival').cast(IntegerType()) - sf.col('departure').cast(IntegerType())) / 3600)

id,company,origin,destination,departure,arrival,duration,vehicle_type,vehicle_class,price,fare,seats,meta,insert_date,duration_computed
37402874,renfe,MADRID,BARCELONA,2020-10-18 19:00:00,2020-10-18 21:30:00,2.5,AVE,Turista,128.5,Flexible,259,"{""Turista"": {""Fle...",2020-09-28 16:10:...,2.5
37403016,renfe,MADRID,BARCELONA,2020-11-02 07:30:00,2020-11-02 10:40:12,3.17,AVE,Turista,91.5,Promo +,258,"{""Turista"": {""Pro...",2020-09-28 16:10:...,3.17
37403170,renfe,MADRID,BARCELONA,2020-10-27 07:00:00,2020-10-27 09:30:00,2.5,AVE,Turista,86.1,Promo +,334,"{""Turista"": {""Pro...",2020-09-28 16:10:...,2.5
37403380,renfe,MADRID,BARCELONA,2020-11-22 14:30:00,2020-11-22 17:25:12,2.92,AVE,Turista,108.9,Flexible,246,"{""Turista"": {""Fle...",2020-09-28 16:10:...,2.92
37403501,renfe,MADRID,BARCELONA,2020-11-17 09:30:00,2020-11-17 12:34:12,3.07,AVE,Turista Plus,87.55,Promo +,50,"{""Turista"": {""Fle...",2020-09-28 16:10:...,3.07
37403561,renfe,BARCELONA,MADRID,2020-11-10 19:00:00,2020-11-10 22:00:00,3.0,AVE,Turista,108.9,Flexible,263,"{""Turista"": {""Fle...",2020-09-28 16:11:...,3.0
37404034,renfe,BARCELONA,MADRID,2020-11-26 06:05:00,2020-11-26 09:20:00,3.25,AVE,Turista,108.9,Flexible,263,"{""Turista"": {""Fle...",2020-09-28 16:11:...,3.25
37404067,renfe,BARCELONA,MADRID,2020-11-15 19:00:00,2020-11-15 22:00:00,3.0,AVE,Turista,108.9,Flexible,331,"{""Turista"": {""Fle...",2020-09-28 16:11:...,3.0
37404135,renfe,BARCELONA,MADRID,2020-10-06 17:00:00,2020-10-06 19:45:00,2.75,AVE,Turista,108.9,Flexible,295,"{""Turista"": {""Fle...",2020-09-28 16:11:...,2.75
37404388,renfe,MADRID,CÓRDOBA,2020-11-11 14:35:00,2020-11-11 16:17:00,1.7,AVE,Turista,27.25,Promo +,294,"{""Turista"": {""Pro...",2020-09-28 16:12:...,1.7


#### make aggregations

In [12]:
sdf_filtered.groupby(['origin', 'destination']).agg({'price': 'mean'})

origin,destination,avg(price)
BARCELONA,VALENCIA,28.215748663101472
ZARAGOZA,MADRID,44.62390364422449
MADRID,CÓRDOBA,48.557590132827656
MADRID,ZARAGOZA,44.26541476159339
MÁLAGA,MADRID,56.82871759890891
MADRID,LEON,35.610778985507366
VALLADOLID,MADRID,32.214733201581424
MADRID,BARCELONA,81.89261602034317
MADRID,VALLADOLID,32.4869668680218
CORDOBA,MADRID,45.82846938775479


In [13]:
sdf_filtered.count()

55951

#### apply custom functions

In [14]:
sdf_filtered.select(['meta']).show(truncate=False)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|meta                                                                                                                                                                                                             |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|{"Turista": {"Flexible": {"price": 128.5, "seats": 259}}, "Turista Plus": {"Flexible": {"price": 154.2, "seats": 118}}}                                                                                          |
|{"Turista": {"Promo +": {"price": 91.5, "seats": 258}, "Flexible": {"price": 108.9, "seats": 258}}, "Turista Plus": {"Promo +": {"price": 109.8, "seats

In [15]:
import json

@sf.udf('integer')
def get_first_class_first_fare_seats(meta):
    try:
        meta_dict = eval(meta)
        first_available_class = [*meta_dict][0]
        first_available_fare = [*meta_dict[first_available_class]][0]
        seats = meta_dict[first_available_class][first_available_fare]['seats']
        return seats

    except:
        return 0

In [16]:
sdf_filtered = sdf_filtered.withColumn('seats_first_class_first_fare', get_first_class_first_fare_seats(sf.col('meta')))

sdf_filtered

id,company,origin,destination,departure,arrival,duration,vehicle_type,vehicle_class,price,fare,seats,meta,insert_date,seats_first_class_first_fare
37402874,renfe,MADRID,BARCELONA,2020-10-18 19:00:00,2020-10-18 21:30:00,2.5,AVE,Turista,128.5,Flexible,259,"{""Turista"": {""Fle...",2020-09-28 16:10:...,259
37403016,renfe,MADRID,BARCELONA,2020-11-02 07:30:00,2020-11-02 10:40:12,3.17,AVE,Turista,91.5,Promo +,258,"{""Turista"": {""Pro...",2020-09-28 16:10:...,258
37403170,renfe,MADRID,BARCELONA,2020-10-27 07:00:00,2020-10-27 09:30:00,2.5,AVE,Turista,86.1,Promo +,334,"{""Turista"": {""Pro...",2020-09-28 16:10:...,334
37403380,renfe,MADRID,BARCELONA,2020-11-22 14:30:00,2020-11-22 17:25:12,2.92,AVE,Turista,108.9,Flexible,246,"{""Turista"": {""Fle...",2020-09-28 16:10:...,246
37403501,renfe,MADRID,BARCELONA,2020-11-17 09:30:00,2020-11-17 12:34:12,3.07,AVE,Turista Plus,87.55,Promo +,50,"{""Turista"": {""Fle...",2020-09-28 16:10:...,298
37403561,renfe,BARCELONA,MADRID,2020-11-10 19:00:00,2020-11-10 22:00:00,3.0,AVE,Turista,108.9,Flexible,263,"{""Turista"": {""Fle...",2020-09-28 16:11:...,263
37404034,renfe,BARCELONA,MADRID,2020-11-26 06:05:00,2020-11-26 09:20:00,3.25,AVE,Turista,108.9,Flexible,263,"{""Turista"": {""Fle...",2020-09-28 16:11:...,263
37404067,renfe,BARCELONA,MADRID,2020-11-15 19:00:00,2020-11-15 22:00:00,3.0,AVE,Turista,108.9,Flexible,331,"{""Turista"": {""Fle...",2020-09-28 16:11:...,331
37404135,renfe,BARCELONA,MADRID,2020-10-06 17:00:00,2020-10-06 19:45:00,2.75,AVE,Turista,108.9,Flexible,295,"{""Turista"": {""Fle...",2020-09-28 16:11:...,295
37404388,renfe,MADRID,CÓRDOBA,2020-11-11 14:35:00,2020-11-11 16:17:00,1.7,AVE,Turista,27.25,Promo +,294,"{""Turista"": {""Pro...",2020-09-28 16:12:...,294


#### rename columns

In [17]:
sdf_filtered = sdf_filtered.withColumnRenamed('seats', 'seats_cheapest_class_cheapest_fare')

sdf_filtered

id,company,origin,destination,departure,arrival,duration,vehicle_type,vehicle_class,price,fare,seats_cheapest_class_cheapest_fare,meta,insert_date,seats_first_class_first_fare
37402874,renfe,MADRID,BARCELONA,2020-10-18 19:00:00,2020-10-18 21:30:00,2.5,AVE,Turista,128.5,Flexible,259,"{""Turista"": {""Fle...",2020-09-28 16:10:...,259
37403016,renfe,MADRID,BARCELONA,2020-11-02 07:30:00,2020-11-02 10:40:12,3.17,AVE,Turista,91.5,Promo +,258,"{""Turista"": {""Pro...",2020-09-28 16:10:...,258
37403170,renfe,MADRID,BARCELONA,2020-10-27 07:00:00,2020-10-27 09:30:00,2.5,AVE,Turista,86.1,Promo +,334,"{""Turista"": {""Pro...",2020-09-28 16:10:...,334
37403380,renfe,MADRID,BARCELONA,2020-11-22 14:30:00,2020-11-22 17:25:12,2.92,AVE,Turista,108.9,Flexible,246,"{""Turista"": {""Fle...",2020-09-28 16:10:...,246
37403501,renfe,MADRID,BARCELONA,2020-11-17 09:30:00,2020-11-17 12:34:12,3.07,AVE,Turista Plus,87.55,Promo +,50,"{""Turista"": {""Fle...",2020-09-28 16:10:...,298
37403561,renfe,BARCELONA,MADRID,2020-11-10 19:00:00,2020-11-10 22:00:00,3.0,AVE,Turista,108.9,Flexible,263,"{""Turista"": {""Fle...",2020-09-28 16:11:...,263
37404034,renfe,BARCELONA,MADRID,2020-11-26 06:05:00,2020-11-26 09:20:00,3.25,AVE,Turista,108.9,Flexible,263,"{""Turista"": {""Fle...",2020-09-28 16:11:...,263
37404067,renfe,BARCELONA,MADRID,2020-11-15 19:00:00,2020-11-15 22:00:00,3.0,AVE,Turista,108.9,Flexible,331,"{""Turista"": {""Fle...",2020-09-28 16:11:...,331
37404135,renfe,BARCELONA,MADRID,2020-10-06 17:00:00,2020-10-06 19:45:00,2.75,AVE,Turista,108.9,Flexible,295,"{""Turista"": {""Fle...",2020-09-28 16:11:...,295
37404388,renfe,MADRID,CÓRDOBA,2020-11-11 14:35:00,2020-11-11 16:17:00,1.7,AVE,Turista,27.25,Promo +,294,"{""Turista"": {""Pro...",2020-09-28 16:12:...,294


#### create virtual sql tables and query them

In [18]:
sdf_filtered.createTempView('renfe_big_data_class')

In [19]:
SQL_QUERY = """
select
origin,
destination,
avg(price) as mean_price, 
avg(seats_cheapest_class_cheapest_fare) as mean_seats
from renfe_big_data_class
group by origin, destination
order by mean_price desc
"""

In [20]:
routes_prices_sdf = spark.sql(SQL_QUERY)

routes_prices_sdf

origin,destination,mean_price,mean_seats
MADRID,BARCELONA,81.89261602034317,262.15956770502225
BARCELONA,MADRID,81.57739938080479,268.4984520123839
MÁLAGA,MADRID,56.82871759890891,240.4556616643929
MADRID,MÁLAGA,56.49381250000039,242.92375
MADRID,SEVILLA,53.02640109890138,223.46813186813188
SEVILLA,MADRID,51.40903648802761,219.3568985176739
MADRID,MALAGA,51.020959264126745,232.5203679369251
MALAGA,MADRID,49.42875968992303,235.34883720930236
CÓRDOBA,MADRID,48.94294510005174,219.66854797331965
MADRID,CÓRDOBA,48.557590132827656,220.14753320683116


#### transform Spark DataFrame into pandas DataFrame

In [21]:
routes_prices_df = routes_prices_sdf.toPandas()

routes_prices_df

Unnamed: 0,origin,destination,mean_price,mean_seats
0,MADRID,BARCELONA,81.892616,262.159568
1,BARCELONA,MADRID,81.577399,268.498452
2,MÁLAGA,MADRID,56.828718,240.455662
3,MADRID,MÁLAGA,56.493813,242.92375
4,MADRID,SEVILLA,53.026401,223.468132
5,SEVILLA,MADRID,51.409036,219.356899
6,MADRID,MALAGA,51.020959,232.520368
7,MALAGA,MADRID,49.42876,235.348837
8,CÓRDOBA,MADRID,48.942945,219.668548
9,MADRID,CÓRDOBA,48.55759,220.147533


### 5.2 Supermarket products

In [22]:
DATA_PATH = '/home/ubuntu/Desktop/supermarkets.csv'

sdf = spark.read.csv(DATA_PATH, 
                     header=True, 
                     inferSchema=True)

sdf

id,supermarket,category,name,description,price,reference_price,reference_unit,insert_date
120682,carrefour-es,el_mercado_pescad...,Bonito en rodaja ...,,4.25,8.5,kg,2020-07-21 17:30:00
120904,carrefour-es,el_mercado_carnic...,Estofado de añojo...,Envase De 500.0 ...,5.95,11.9,kg,2020-07-21 17:30:00
120912,carrefour-es,el_mercado_carnic...,Jamoncito de poll...,,3.49,10.9,kg,2020-07-21 17:30:00
120923,carrefour-es,el_mercado_carnic...,Contramuslo de po...,Bandeja De 1000.0...,2.59,2.59,kg,2020-07-21 17:30:00
120924,carrefour-es,el_mercado_carnic...,Chuleta de pavo m...,Bandeja De 600.0 ...,3.48,5.8,kg,2020-07-21 17:30:00
120929,carrefour-es,el_mercado_carnic...,Pollo campero lim...,Bandeja De 1400.0...,7.7,5.5,kg,2020-07-21 17:30:00
120937,carrefour-es,el_mercado_carnic...,Traseros de pollo...,Bandeja De 880.0 ...,3.74,4.25,kg,2020-07-21 17:30:00
120949,carrefour-es,el_mercado_carnic...,Chuletón de vacun...,650 G Aprox 650.0...,9.07,13.95,kg,2020-07-21 17:30:00
120959,carrefour-es,el_mercado_carnic...,Entrecot de terne...,,10.75,21.5,kg,2020-07-21 17:30:00
120973,carrefour-es,el_mercado_carnic...,Jamón de cerdo tr...,Bandeja De 500.0...,2.65,5.3,kg,2020-07-21 17:30:00


#### write partitioned

In [23]:
date_col = 'insert_date'

sdf = sdf.withColumn('year', sf.year(date_col))
sdf = sdf.withColumn('month', sf.month(date_col))
sdf = sdf.withColumn('day', sf.dayofmonth(date_col))

sdf

id,supermarket,category,name,description,price,reference_price,reference_unit,insert_date,year,month,day
120682,carrefour-es,el_mercado_pescad...,Bonito en rodaja ...,,4.25,8.5,kg,2020-07-21 17:30:00,2020,7,21
120904,carrefour-es,el_mercado_carnic...,Estofado de añojo...,Envase De 500.0 ...,5.95,11.9,kg,2020-07-21 17:30:00,2020,7,21
120912,carrefour-es,el_mercado_carnic...,Jamoncito de poll...,,3.49,10.9,kg,2020-07-21 17:30:00,2020,7,21
120923,carrefour-es,el_mercado_carnic...,Contramuslo de po...,Bandeja De 1000.0...,2.59,2.59,kg,2020-07-21 17:30:00,2020,7,21
120924,carrefour-es,el_mercado_carnic...,Chuleta de pavo m...,Bandeja De 600.0 ...,3.48,5.8,kg,2020-07-21 17:30:00,2020,7,21
120929,carrefour-es,el_mercado_carnic...,Pollo campero lim...,Bandeja De 1400.0...,7.7,5.5,kg,2020-07-21 17:30:00,2020,7,21
120937,carrefour-es,el_mercado_carnic...,Traseros de pollo...,Bandeja De 880.0 ...,3.74,4.25,kg,2020-07-21 17:30:00,2020,7,21
120949,carrefour-es,el_mercado_carnic...,Chuletón de vacun...,650 G Aprox 650.0...,9.07,13.95,kg,2020-07-21 17:30:00,2020,7,21
120959,carrefour-es,el_mercado_carnic...,Entrecot de terne...,,10.75,21.5,kg,2020-07-21 17:30:00,2020,7,21
120973,carrefour-es,el_mercado_carnic...,Jamón de cerdo tr...,Bandeja De 500.0...,2.65,5.3,kg,2020-07-21 17:30:00,2020,7,21


In [24]:
OUTPUT_PATH = '/home/ubuntu/Desktop/supermarkets_partitioned'

sdf.write.partitionBy('year', 'month', 'day').mode('overwrite').parquet(OUTPUT_PATH)

#### eda

In [25]:
sdf.select('supermarket', 'name').distinct().groupby('supermarket').count()

supermarket,count
carrefour-es,29658
dia-es,8934
mercadona-es,6690


In [26]:
sdf.select(sf.min(date_col))

min(insert_date)
2020-07-21 16:06:00


In [27]:
sdf.select(sf.max(date_col))

max(insert_date)
2021-01-13 00:00:00


In [28]:
max_insert_date = sdf.select(sf.max(date_col).alias('max_insert_date')).collect()[0]['max_insert_date']

max_insert_date

datetime.datetime(2021, 1, 13, 1, 0)

In [29]:
sdf.filter(sf.col(date_col) == max_insert_date) \
  .groupby('supermarket', 'category') \
  .agg(sf.min('price') \
  .alias('min_price')) \
  .sort('min_price')

supermarket,category,min_price
dia-es,especial_navidad_...,0.1
mercadona-es,panaderia_y_paste...,0.1
dia-es,especial_navidad_...,0.1
dia-es,especial_navidad_...,0.13
carrefour-es,bebidas_aguas_y_z...,0.15
dia-es,especial_navidad_...,0.17
mercadona-es,agua_y_refrescos_...,0.18
dia-es,especial_navidad_...,0.19
mercadona-es,panaderia_y_paste...,0.22
carrefour-es,la_despensa_alime...,0.22


In [30]:
supermarket_filter = sf.col('supermarket') == 'mercadona-es'
category_filter = sf.col('category') == 'panaderia_y_pasteleria_pan_de_horno'
date_filter = sf.col(date_col) == max_insert_date

sdf.filter(supermarket_filter & category_filter & date_filter).sort('price', sf.col('reference_price').desc())

id,supermarket,category,name,description,price,reference_price,reference_unit,insert_date,year,month,day
11033123,mercadona-es,panaderia_y_paste...,Panecillo,,0.1,0.22,100 g,2021-01-13 00:00:00,2021,1,13
11033113,mercadona-es,panaderia_y_paste...,Pan mini semillas,,0.25,2.78,kg,2021-01-13 00:00:00,2021,1,13
11033110,mercadona-es,panaderia_y_paste...,Pan de cristal re...,,0.35,4.49,kg,2021-01-13 00:00:00,2021,1,13
11033098,mercadona-es,panaderia_y_paste...,Barra de pan camp...,,0.55,2.2,kg,2021-01-13 00:00:00,2021,1,13
11033112,mercadona-es,panaderia_y_paste...,4 Panes chapata d...,Pack-4,1.0,3.51,kg,2021-01-13 00:00:00,2021,1,13
11033089,mercadona-es,panaderia_y_paste...,3 Barras de pan,,1.1,1.47,kg,2021-01-13 00:00:00,2021,1,13
11033132,mercadona-es,panaderia_y_paste...,Barra pan de pueb...,,1.15,1.44,kg,2021-01-13 00:00:00,2021,1,13
11033135,mercadona-es,panaderia_y_paste...,3 Barras de pan a...,,1.3,2.17,kg,2021-01-13 00:00:00,2021,1,13


<div style="padding-top: 25px; float: right">
    <div>    
        <i>&nbsp;&nbsp;© Copyright by</i>
    </div>
    <div>
        <a href="https://whiteboxml.com">
            <img src="https://whiteboxml.com/static/img/logo/black_bg_white.svg" width="125">
        </a>
    </div>
</div>