<div style="float:right; padding-top: 15px; padding-right: 15px">
    <div>
        <a href="https://whiteboxml.com">
            <img src="https://whiteboxml.com/static/img/logo/black_bg_white.svg" width="250">
        </a>
    </div>
</div>

# dataframe calculations, extended with big data tools...

## 1. introduction

* calculations with pandas are 'easy'
* calculations are usually fast depending on your machine and size of dataset
* when dataset size exceeds your computer memory, can use big data tools to break the dataset in chunks and process it step by step
* big data tools allow you to make this process automatically and take care of everything under the hood with little extra code
* Spark is the most popular Big Data framework so far
* Spark syntax resembles pandas with some differences

## 2. pure pandas approach

In [1]:
import pandas as pd

### 2.1 data loading

let's use a 'huge' dataset from Renfe trips...

https://www.kaggle.com/thegurusteam/spanish-high-speed-rail-system-ticket-pricing/data

In [2]:
renfe = pd.read_csv('./data/renfe_sample.csv', low_memory=True)

with this cell we can inspect memory usage, it runs slow...

In [3]:
renfe.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100000 entries, 0 to 99999
Data columns (total 13 columns):
 #   Column       Non-Null Count   Dtype  
---  ------       --------------   -----  
 0   Unnamed: 0   100000 non-null  int64  
 1   insert_date  100000 non-null  object 
 2   origin       100000 non-null  object 
 3   destination  100000 non-null  object 
 4   start_date   100000 non-null  object 
 5   end_date     100000 non-null  object 
 6   train_type   100000 non-null  object 
 7   price        68978 non-null   float64
 8   train_class  71104 non-null   object 
 9   fare         71104 non-null   object 
 10  price_tree   70016 non-null   object 
 11  batch        58099 non-null   object 
 12  id           71561 non-null   float64
dtypes: float64(2), int64(1), object(10)
memory usage: 9.9+ MB


### 2.2 some calculations with pandas

#### combining string columns

first, let's make an expensive string column combination in pure pandas...

In [4]:
renfe['destination_origin'] = renfe['origin'] + '_' + renfe['destination']

In [5]:
renfe['destination_origin']

0         MADRID_BARCELONA
1         BARCELONA_MADRID
2        MADRID_PONFERRADA
3          MADRID_PALENCIA
4            GIRONA_MADRID
               ...        
99995       SEVILLA_MADRID
99996          LEON_MADRID
99997      VALENCIA_MADRID
99998      VALENCIA_MADRID
99999     MADRID_TARRAGONA
Name: destination_origin, Length: 100000, dtype: object

#### working with dates

let's check our departure and arrival dates data type...

In [6]:
renfe['start_date']

0        2019-04-19 11:30:00
1        2020-05-29 18:00:00
2        2019-05-20 12:20:00
3        2020-04-11 08:00:00
4        2020-05-30 08:11:00
                ...         
99995    2019-07-18 20:15:00
99996    2020-04-29 06:40:00
99997    2020-03-18 09:50:00
99998    2019-06-29 14:50:00
99999    2020-03-31 20:30:00
Name: start_date, Length: 100000, dtype: object

let's convert those columns into a proper date column using `to_datetime` function...

In [7]:
renfe['start_date'] = pd.to_datetime(renfe['start_date'])
renfe['end_date'] = pd.to_datetime(renfe['end_date'])

now, we have a `datetime64[ns]` column... cool right?

In [8]:
renfe['start_date']

0       2019-04-19 11:30:00
1       2020-05-29 18:00:00
2       2019-05-20 12:20:00
3       2020-04-11 08:00:00
4       2020-05-30 08:11:00
                ...        
99995   2019-07-18 20:15:00
99996   2020-04-29 06:40:00
99997   2020-03-18 09:50:00
99998   2019-06-29 14:50:00
99999   2020-03-31 20:30:00
Name: start_date, Length: 100000, dtype: datetime64[ns]

let's make all those trains departure an hour later!

In [9]:
renfe['start_date'] + pd.to_timedelta('1 hour')

0       2019-04-19 12:30:00
1       2020-05-29 19:00:00
2       2019-05-20 13:20:00
3       2020-04-11 09:00:00
4       2020-05-30 09:11:00
                ...        
99995   2019-07-18 21:15:00
99996   2020-04-29 07:40:00
99997   2020-03-18 10:50:00
99998   2019-06-29 15:50:00
99999   2020-03-31 21:30:00
Name: start_date, Length: 100000, dtype: datetime64[ns]

let's compute the duration of the trip, just substract arrival and departure date to do that...

In [10]:
renfe['trip_duration'] = renfe['end_date'] - renfe['start_date']

the new column looks like this:

In [11]:
renfe['trip_duration']

0       03:10:00
1       03:10:00
2       06:32:00
3       02:37:00
4       04:59:00
          ...   
99995   02:21:00
99996   04:46:00
99997   07:38:00
99998   07:27:00
99999   02:33:00
Name: trip_duration, Length: 100000, dtype: timedelta64[ns]

look at the column type, it is a `timedelta64[ns]`, used for time intervals...

in case we need it in hours, using [`dt`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.dt.html) accesor...

In [12]:
renfe['trip_duration'].dt.seconds / 3600

0        3.166667
1        3.166667
2        6.533333
3        2.616667
4        4.983333
           ...   
99995    2.350000
99996    4.766667
99997    7.633333
99998    7.450000
99999    2.550000
Name: trip_duration, Length: 100000, dtype: float64

`timedelta64[ns]` colums supports operations like:

In [13]:
renfe['trip_duration'] * 10

0       1 days 07:40:00
1       1 days 07:40:00
2       2 days 17:20:00
3       1 days 02:10:00
4       2 days 01:50:00
              ...      
99995   0 days 23:30:00
99996   1 days 23:40:00
99997   3 days 04:20:00
99998   3 days 02:30:00
99999   1 days 01:30:00
Name: trip_duration, Length: 100000, dtype: timedelta64[ns]

timedeltas can also be used with `timedelta64[ns]` columns...

In [14]:
renfe['trip_duration'] + pd.to_timedelta('01:00:00')

0       04:10:00
1       04:10:00
2       07:32:00
3       03:37:00
4       05:59:00
          ...   
99995   03:21:00
99996   05:46:00
99997   08:38:00
99998   08:27:00
99999   03:33:00
Name: trip_duration, Length: 100000, dtype: timedelta64[ns]

#### conditional calculations

let's use numpy to create a new column based on conditions...

In [15]:
import numpy as np

In [16]:
np.where(renfe['trip_duration'].dt.seconds > 3600 * 2.5, 'long_trip', 'short_trip')

array(['long_trip', 'long_trip', 'long_trip', ..., 'long_trip',
       'long_trip', 'long_trip'], dtype='<U10')

assign array to a new column...

In [17]:
renfe['duration_description'] = np.where(renfe['trip_duration'].dt.seconds > 3600 * 2.5, 
                                         'long_trip', 
                                         'short_trip')

there is a [`.where`](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.where.html) method for `DataFrame` and `Serie` which is very slow...

## 3. using pandas multicore with [modin](https://github.com/modin-project/modin)

to get modin:

let's import modin pandas wrapper as `mp` (modin pandas)...

In [18]:
import modin.pandas as mp

let's load some data... (**you may have to restart your kernel to free memory**)...

In [19]:
renfe = mp.read_csv('./data/renfe_sample.csv')

from here, usage is similar to a standard `DataFrame`

let's create a column to identify route...

In [20]:
renfe['route'] = renfe['origin'] + '_' + renfe['destination']

let's make some computation...

In [21]:
renfe.groupby('route')['price'].mean()

route
ALBACETE_MADRID       29.442580
ALICANTE_MADRID       46.498631
BARCELONA_MADRID      82.726995
CADIZ_MADRID          62.529825
CASTELLON_MADRID      43.772038
CIUDAD REAL_MADRID    30.904651
CORDOBA_MADRID        54.736619
CUENCA_MADRID         24.257658
GIRONA_MADRID         88.319275
GRANADA_MADRID        55.968454
GUADALAJARA_MADRID    14.360976
HUESCA_MADRID         48.702941
LEON_MADRID           37.079920
LLEIDA_MADRID         62.201745
MADRID_ALBACETE       29.726063
MADRID_ALICANTE       45.366523
MADRID_BARCELONA      82.709074
MADRID_CADIZ          66.286170
MADRID_CASTELLON      44.025358
MADRID_CIUDAD REAL    31.568367
MADRID_CORDOBA        56.247775
MADRID_CUENCA         24.398421
MADRID_GIRONA         89.032615
MADRID_GRANADA        54.736787
MADRID_GUADALAJARA    13.096296
MADRID_HUESCA         49.210526
MADRID_LEON           34.085885
MADRID_LLEIDA         61.685412
MADRID_MALAGA         66.755578
MADRID_PALENCIA       29.857619
MADRID_PONFERRADA     43.845738
MA

let's make some filters...

In [22]:
filter_with_price_tree = renfe['price_tree'] != '{}'
filter_not_null = renfe['price_tree'].notnull()

let's get a price tree...

In [23]:
renfe[filter_not_null & filter_with_price_tree].iat[10000, 9]

'Flexible'

## 4. big data tools: Spark

[Spark](https://spark.apache.org/) is probably the most renowned big data tool on the market. It started as an open source project from Harvard University and quickly evolved into a standard.

### 4.1 installation

install Spark doing...

yeah... you need Java to run Spark. It is written in [Scala](https://www.scala-lang.org/), a JVM based (and functional style) language. Scala is the language of Big Data (although Python is catching up quickly)...

### 4.2 setup (yeah... not so easy as pandas)

In [24]:
from pyspark.sql import SparkSession

In [25]:
spark = SparkSession.builder. \
            appName('big_data_tools'). \
            master('local[*]'). \
            config('spark.sql.repl.eagerEval.enabled', True). \
            config('spark.sql.session.timeZone', 'UTC'). \
            config('spark.driver.memory', '16G'). \
            config('spark.driver.maxResultSize', '2G'). \
            getOrCreate()

In [26]:
spark

let's load some data...

### 4.3 load data

In [27]:
renfe = spark.read.option("quote", "\"").option("escape", "\"").csv('./data/renfe_sample.csv', header=True, inferSchema=True)

let's check data types...

In [28]:
renfe.dtypes

[('_c0', 'int'),
 ('insert_date', 'timestamp'),
 ('origin', 'string'),
 ('destination', 'string'),
 ('start_date', 'timestamp'),
 ('end_date', 'timestamp'),
 ('train_type', 'string'),
 ('price', 'double'),
 ('train_class', 'string'),
 ('fare', 'string'),
 ('price_tree', 'string'),
 ('batch', 'timestamp'),
 ('id', 'double')]

In [29]:
renfe.show()

+--------+-------------------+----------+-----------+-------------------+-------------------+----------+-----+-------------------+--------+--------------------+-------------------+-----------+
|     _c0|        insert_date|    origin|destination|         start_date|           end_date|train_type|price|        train_class|    fare|          price_tree|              batch|         id|
+--------+-------------------+----------+-----------+-------------------+-------------------+----------+-----+-------------------+--------+--------------------+-------------------+-----------+
| 4569288|2019-04-18 11:47:08|    MADRID|  BARCELONA|2019-04-19 09:30:00|2019-04-19 12:40:00|       AVE|107.7|            Turista|Flexible|                null|               null|       null|
|22860054|2020-03-18 19:42:00| BARCELONA|     MADRID|2020-05-29 16:00:00|2020-05-29 19:10:00|       AVE| null|               null|    null|                  {}|2020-03-18 17:15:00|1.2103742E7|
| 5687485|2019-04-29 19:08:10|    M

### 4.4 basic operations

#### select a column

In [30]:
renfe.select('origin').show()

+----------+
|    origin|
+----------+
|    MADRID|
| BARCELONA|
|    MADRID|
|    MADRID|
|    GIRONA|
|   SEVILLA|
|  VALENCIA|
|   CORDOBA|
|    MADRID|
|  VALENCIA|
|  VALENCIA|
| BARCELONA|
|    MADRID|
|    MADRID|
|    MADRID|
|    GIRONA|
|    MADRID|
|VALLADOLID|
|    MADRID|
|   SEVILLA|
+----------+
only showing top 20 rows



#### get the data (note the lazy evaluation vs instant computation difference)

In [31]:
origin_destination = renfe.select(['origin', 'destination']).collect()

In [32]:
origin_destination[0].asDict()

{'origin': 'MADRID', 'destination': 'BARCELONA'}

#### filter data

In [33]:
from pyspark.sql import functions as sf

In [34]:
renfe_filtered = renfe.filter(renfe.price_tree.isNotNull())

In [35]:
renfe_filtered = renfe_filtered.filter((sf.col('price_tree') != "{}") & \
                                       (sf.col('price') < 60))

In [36]:
renfe_filtered.show()

+--------+-------------------+--------+-----------+-------------------+-------------------+----------+-----+------------------+--------+--------------------+--------------------+-----------+
|     _c0|        insert_date|  origin|destination|         start_date|           end_date|train_type|price|       train_class|    fare|          price_tree|               batch|         id|
+--------+-------------------+--------+-----------+-------------------+-------------------+----------+-----+------------------+--------+--------------------+--------------------+-----------+
|13856339|2020-02-22 01:04:41|  MADRID|   PALENCIA|2020-04-11 06:00:00|2020-04-11 08:37:00|     LD-MD| 38.9|Turista con enlace|Flexible|{"Turista con enl...| 2020-02-21 23:00:00|  3110678.0|
|16109477|2020-02-29 14:04:45| CORDOBA|     MADRID|2020-04-28 08:42:00|2020-04-28 10:36:00|     ALVIA| 55.9|           Turista|Flexible|{"Turista": {"Fle...| 2020-02-29 06:00:00|  5363816.0|
|18531373|2020-03-05 06:52:37|  MADRID|    CO

#### create new columns

In [37]:
from pyspark.sql.types import IntegerType

In [38]:
renfe.withColumn('duration', (sf.col('end_date').cast(IntegerType()) - sf.col('start_date').cast(IntegerType())) / 3600)

_c0,insert_date,origin,destination,start_date,end_date,train_type,price,train_class,fare,price_tree,batch,id,duration
4569288,2019-04-18 11:47:08,MADRID,BARCELONA,2019-04-19 09:30:00,2019-04-19 12:40:00,AVE,107.7,Turista,Flexible,,,,3.1666666666666665
22860054,2020-03-18 19:42:00,BARCELONA,MADRID,2020-05-29 16:00:00,2020-05-29 19:10:00,AVE,,,,{},2020-03-18 17:15:00,12103742.0,3.1666666666666665
5687485,2019-04-29 19:08:10,MADRID,PONFERRADA,2019-05-20 10:20:00,2019-05-20 16:52:00,LD-MD,34.35,Turista con enlace,Promo +,,,,6.533333333333333
13856339,2020-02-22 01:04:41,MADRID,PALENCIA,2020-04-11 06:00:00,2020-04-11 08:37:00,LD-MD,38.9,Turista con enlace,Flexible,"{""Turista con enl...",2020-02-21 23:00:00,3110678.0,2.6166666666666667
19660069,2020-03-08 20:05:07,GIRONA,MADRID,2020-05-30 06:11:00,2020-05-30 11:10:00,AVANT-AVE,78.2,Turista con enlace,Promo,"{""Turista con enl...",2020-03-08 16:00:00,8871715.0,4.983333333333333
5301099,2019-04-23 22:22:55,SEVILLA,MADRID,2019-05-29 15:45:00,2019-05-29 18:17:00,AVE,76.3,Turista,Flexible,,,,2.533333333333333
10926000,2019-11-10 02:04:44,VALENCIA,MADRID,2019-12-14 07:10:00,2019-12-14 11:22:00,INTERCITY,,Turista,Flexible,,,180339.0,4.2
16109477,2020-02-29 14:04:45,CORDOBA,MADRID,2020-04-28 08:42:00,2020-04-28 10:36:00,ALVIA,55.9,Turista,Flexible,"{""Turista"": {""Fle...",2020-02-29 06:00:00,5363816.0,1.9
15481446,2019-10-28 10:19:12,MADRID,SEVILLA,2019-12-05 07:30:00,2019-12-05 10:14:00,ALVIA,67.2,Turista,Flexible,,,4735785.0,2.7333333333333334
6503835,2019-05-12 21:35:06,VALENCIA,MADRID,2019-05-18 06:10:00,2019-05-18 10:28:00,INTERCITY,30.9,Turista Plus,Promo,,,,4.3


#### make aggregations

In [39]:
renfe.groupby(['origin', 'destination']).agg({'price': 'mean'})

origin,destination,avg(price)
PONFERRADA,MADRID,42.49064153439151
ZARAGOZA,MADRID,44.09832723470992
MADRID,ZARAGOZA,44.94266409266416
GIRONA,MADRID,88.3192748091603
MADRID,HUESCA,49.21052631578947
ALBACETE,MADRID,29.442579505300337
HUESCA,MADRID,48.70294117647059
MADRID,LEON,34.085885167464085
MADRID,ALBACETE,29.726062499999983
MADRID,TARRAGONA,63.61884848484853


In [40]:
renfe.count()

100000

#### apply custom functions

In [41]:
import json

@sf.udf('integer')
def get_seats(price_tree):
    
    if not price_tree:
        return 0

    try:
        price_tree_dict = eval(price_tree)
        first_available_class = [*price_tree_dict][0]
        first_available_fare = [*price_tree_dict[first_available_class]][0]
        seats = price_tree_dict[first_available_class][first_available_fare]['seats']
        return seats

    except:
        return 0

In [42]:
renfe_filtered = renfe_filtered.withColumn('seats', get_seats(sf.col('price_tree')))

In [43]:
renfe_filtered.show()

+--------+-------------------+--------+-----------+-------------------+-------------------+----------+-----+------------------+--------+--------------------+--------------------+-----------+-----+
|     _c0|        insert_date|  origin|destination|         start_date|           end_date|train_type|price|       train_class|    fare|          price_tree|               batch|         id|seats|
+--------+-------------------+--------+-----------+-------------------+-------------------+----------+-----+------------------+--------+--------------------+--------------------+-----------+-----+
|13856339|2020-02-22 01:04:41|  MADRID|   PALENCIA|2020-04-11 06:00:00|2020-04-11 08:37:00|     LD-MD| 38.9|Turista con enlace|Flexible|{"Turista con enl...| 2020-02-21 23:00:00|  3110678.0|  143|
|16109477|2020-02-29 14:04:45| CORDOBA|     MADRID|2020-04-28 08:42:00|2020-04-28 10:36:00|     ALVIA| 55.9|           Turista|Flexible|{"Turista": {"Fle...| 2020-02-29 06:00:00|  5363816.0|   30|
|18531373|2020-

#### create virtual sql tables and query them

In [44]:
renfe_filtered.createTempView('renfe')

In [45]:
sql_query = """
select
origin,
destination,
avg(price) as mean_price, 
avg(seats) as mean_seats
from renfe
group by origin, destination
order by mean_price desc
"""

In [46]:
routes_prices = spark.sql(sql_query)

In [47]:
routes_prices.show()

+---------+-----------+------------------+------------------+
|   origin|destination|        mean_price|        mean_seats|
+---------+-----------+------------------+------------------+
|   GIRONA|     MADRID|  52.1551724137931| 99.56896551724138|
|BARCELONA|     MADRID| 50.22027595269386|  140.215505913272|
|   MADRID|     MALAGA|49.575160349854194|214.51895043731778|
|   MADRID|     HUESCA| 49.21052631578947| 91.89473684210526|
|   LLEIDA|     MADRID|49.205967078189296| 136.3909465020576|
|   HUESCA|     MADRID| 48.70294117647059|  94.3529411764706|
|   MALAGA|     MADRID| 48.19140672782871| 207.5993883792049|
|   MADRID|  BARCELONA| 47.82461128860492|143.47390841320555|
|   MADRID|    CORDOBA| 47.61044394281416| 166.6561324303988|
|   MADRID|    SEVILLA| 47.19120300751877|139.20193340494092|
|TARRAGONA|     MADRID| 47.13607038123168|154.20527859237538|
|   MADRID|      CADIZ| 46.98571428571429|201.28571428571428|
|   MADRID|     LLEIDA|  46.7782222222222| 152.2711111111111|
|   MADR

#### transform Spark DataFrame into pandas DataFrame

In [48]:
routes_prices_pandas_df = routes_prices.toPandas()

In [49]:
routes_prices_pandas_df

Unnamed: 0,origin,destination,mean_price,mean_seats
0,GIRONA,MADRID,52.155172,99.568966
1,BARCELONA,MADRID,50.220276,140.215506
2,MADRID,MALAGA,49.57516,214.51895
3,MADRID,HUESCA,49.210526,91.894737
4,LLEIDA,MADRID,49.205967,136.390947
5,HUESCA,MADRID,48.702941,94.352941
6,MALAGA,MADRID,48.191407,207.599388
7,MADRID,BARCELONA,47.824611,143.473908
8,MADRID,CORDOBA,47.610444,166.656132
9,MADRID,SEVILLA,47.191203,139.201933


<div style="padding-top: 25px; float: right">
    <div>    
        <i>&nbsp;&nbsp;© Copyright by</i>
    </div>
    <div>
        <a href="https://whiteboxml.com">
            <img src="https://whiteboxml.com/static/img/logo/black_bg_white.svg" width="125">
        </a>
    </div>
</div>