In [7]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:80% !important; }</style>"))

import os

# Python ≥3.5 is required
import sys
assert sys.version_info >= (3, 5)

# Scikit-Learn ≥0.20 is required
import sklearn
assert sklearn.__version__ >= "0.20"

from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import cross_val_score
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.metrics import mean_absolute_error

import pandas as pd
import numpy as np
pd.set_option('display.max_columns', None)

# To plot pretty figures
%matplotlib inline
import matplotlib as mpl
import matplotlib.pyplot as plt

import scipy.stats as stats

import seaborn as sns

from datetime import datetime

from typing import List, Dict

import re

from tqdm.notebook import tqdm

sys.path.insert(0, '../tools/')

from tools import * 

In [8]:
month_names = ['Gener','Febrer','Marc','Abril','Maig','Juny','Juliol','Agost','Setembre','Octubre','Novembre','Desembre']
months = range(1,13)
i2m = list(zip(months, month_names))

In [9]:
import dask.dataframe as dd
from dask.distributed import Client

In [10]:
client = Client()
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 38411 instead
  http_address["port"], self.http_server.port


0,1
Client  Scheduler: tcp://127.0.0.1:46605  Dashboard: http://127.0.0.1:38411/status,Cluster  Workers: 4  Cores: 16  Memory: 16.44 GB


In [35]:
def print_partitions(ddf:dd.core.DataFrame) -> None:
    for i in range(ddf.npartitions):
        print('Partion:', i)
        print(ddf.partitions[i].head())

def read_dask_dataframe(folder_path:str, folder_type:str, config:dict, add_meta:bool=False) -> dd.core.DataFrame:
    assert folder_path != '' 
    assert folder_type != '' 
    assert not config.empty
    
    ddf = None
    
    if folder_type == 'csv':
        # re read file
        ddf = dd.read_csv(
            urlpath=f'{folder_path}/{config.year}/{config.dataset}/{config.year}_{config.month:02d}_{config.monthname}_{config.dataset}.{folder_type}',
            blocksize='default',
            lineterminator=None,
            compression='infer',
            sample=256000,
            enforce=False,
            assume_missing=False,
            storage_options=None,
            include_path_column=False,
            header=0
        )
    else: 
        raise 'Not supported yet'
    
    if add_meta: 
        ddf._name = f'{config.year}-{config.month}'
        # we have one partion
        # TODO
        # ddf.divisions = (0, ddf.shape[0].compute()-1)
    
    return ddf

def read_dask_dataframes(folder_path:str, folder_type:str, input_dataset:str, years:List[int]) -> Dict[str, dd.core.DataFrame]:
    assert folder_path != '' 
    assert folder_type != ''
    assert input_dataset != ""
    
    data = dict()
    
    for year in tqdm(years):
        assert year >= 2018 and year <= 2023
        ddf_year_list = list()
        
        #print('--> ', year, input_dataset)
        
        config = pd.Series({
            'year':year,
            'dataset': input_dataset,
            'month': np.nan,
            'monthname': np.nan
        })
        
        for month, month_name in tqdm(i2m):
            config.month = month
            config.monthname = month_name
            #print('----> ', year, month, month_name, input_dataset)
            
            ddf_year_list.append(
                read_dask_dataframe(folder_path, folder_type, config)
            )
            
            #print('----> ', 'Done -------- ----------')
        
        data[year] = dd.concat(ddf_year_list, interleave_partitions=False)
        
        #print('--> ', 'Done -------- ----------')
        
    return data

def get_ddf_shape(ddf:dd.core.DataFrame):
    return ddf.shape[0].compute(), ddf.shape[1]


In [17]:
# Testing how to concat the file

In [18]:
df = pd.DataFrame({"nums": [1, 2, 3, 4, 5, 6], "letters": ["a", "b", "c", "d", "e", "f"]})
ddf1 = dd.from_pandas(df, npartitions=2)
ddf1.divisions # (0, 3, 5)

(0, 3, 5)

In [19]:
print_partitions(ddf1)

Partion: 0
   nums letters
0     1       a
1     2       b
2     3       c
Partion: 1
   nums letters
3     4       d
4     5       e
5     6       f


In [20]:
df = pd.DataFrame({"nums": [88, 99], "letters": ["xx", "yy"]})
ddf3 = dd.from_pandas(df, npartitions=1)
ddf3.divisions # (0, 1)

(0, 1)

In [21]:
print_partitions(ddf3)

Partion: 0
   nums letters
0    88      xx
1    99      yy


In [22]:
ddf4 = dd.concat([ddf1, ddf3], interleave_partitions=True)
ddf4.divisions # (None, None, None, None)

(0, 1, 3, 5)

In [23]:
print_partitions(ddf4)

Partion: 0
   nums letters
0     1       a
0    88      xx
Partion: 1
   nums letters
1     2       b
2     3       c
1    99      yy
Partion: 2
   nums letters
3     4       d
4     5       e
5     6       f


In [24]:
ddf4 = dd.concat([ddf1, ddf3], interleave_partitions=False)
ddf4.divisions # (None, None, None, None)

(None, None, None, None)

In [25]:
print_partitions(ddf4)

Partion: 0
   nums letters
0     1       a
1     2       b
2     3       c
Partion: 1
   nums letters
3     4       d
4     5       e
5     6       f
Partion: 2
   nums letters
0    88      xx
1    99      yy


In [26]:
data = read_dask_dataframes('../dades', 'csv', 'BicingNou_ESTACIONS_CLEAN', [2021])

  0%|          | 0/1 [00:00<?, ?it/s]

-->  2021 BicingNou_ESTACIONS_CLEAN


  0%|          | 0/12 [00:00<?, ?it/s]

---->  2021 1 Gener BicingNou_ESTACIONS_CLEAN
---->  Done -------- ----------
---->  2021 2 Febrer BicingNou_ESTACIONS_CLEAN
---->  Done -------- ----------
---->  2021 3 Marc BicingNou_ESTACIONS_CLEAN
---->  Done -------- ----------
---->  2021 4 Abril BicingNou_ESTACIONS_CLEAN
---->  Done -------- ----------
---->  2021 5 Maig BicingNou_ESTACIONS_CLEAN
---->  Done -------- ----------
---->  2021 6 Juny BicingNou_ESTACIONS_CLEAN
---->  Done -------- ----------
---->  2021 7 Juliol BicingNou_ESTACIONS_CLEAN
---->  Done -------- ----------
---->  2021 8 Agost BicingNou_ESTACIONS_CLEAN
---->  Done -------- ----------
---->  2021 9 Setembre BicingNou_ESTACIONS_CLEAN
---->  Done -------- ----------
---->  2021 10 Octubre BicingNou_ESTACIONS_CLEAN
---->  Done -------- ----------
---->  2021 11 Novembre BicingNou_ESTACIONS_CLEAN
---->  Done -------- ----------
---->  2021 12 Desembre BicingNou_ESTACIONS_CLEAN
---->  Done -------- ----------
-->  Done -------- ----------


In [28]:
data[2021]

Unnamed: 0_level_0,station_id,num_bikes_available,num_bikes_available_types.mechanical,num_bikes_available_types.ebike,num_docks_available,last_reported,is_charging_station,status,is_installed,is_renting,is_returning,last_updated,year_last_updated_date,month_last_updated_date,week_last_updated_date,dayofweek_last_updated_date,dayofmonth_last_updated_date,dayofyear_last_updated_date,hour_last_updated_date,traffic
npartitions=12,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1
,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [31]:
get_ddf_shape(data[2021])

(4411605, 20)

In [38]:
len(data[2021].divisions) # divisions mark the start and end of each partiotion
# in our case we have nones but that is okay

13

In [36]:
print_partitions(data[2021])

Partion: 0
   station_id  num_bikes_available  num_bikes_available_types.mechanical  \
0         1.0            43.000000                             43.000000   
1         1.0            40.666667                             40.666667   
2         1.0            41.000000                             41.000000   
3         1.0            41.000000                             41.000000   
4         1.0            41.000000                             41.000000   

   num_bikes_available_types.ebike  num_docks_available  last_reported  \
0                              0.0             3.000000   1.609457e+09   
1                              0.0             5.333333   1.609461e+09   
2                              0.0             5.000000   1.609464e+09   
3                              0.0             5.000000   1.609468e+09   
4                              0.0             5.000000   1.609472e+09   

   is_charging_station  status  is_installed  is_renting  is_returning  \
0            

   station_id  num_bikes_available  num_bikes_available_types.mechanical  \
0         1.0                 27.0                                  27.0   
1         1.0                 27.0                                  27.0   
2         1.0                 27.0                                  27.0   
3         1.0                 27.0                                  27.0   
4         1.0                 27.0                                  27.0   

   num_bikes_available_types.ebike  num_docks_available  last_reported  \
0                              0.0                 19.0   1.619820e+09   
1                              0.0                 19.0   1.619822e+09   
2                              0.0                 19.0   1.619825e+09   
3                              0.0                 19.0   1.619829e+09   
4                              0.0                 19.0   1.619832e+09   

   is_charging_station  status  is_installed  is_renting  is_returning  \
0                  1.0  

   station_id  num_bikes_available  num_bikes_available_types.mechanical  \
0         1.0            39.000000                             35.285714   
1         1.0            31.857143                             28.857143   
2         1.0            27.400000                             24.000000   
3         1.0            27.727273                             24.363636   
4         1.0            30.000000                             26.000000   

   num_bikes_available_types.ebike  num_docks_available  last_reported  \
0                         3.714286             2.857143   1.630448e+09   
1                         3.000000            10.285714   1.630452e+09   
2                         3.400000            14.600000   1.630456e+09   
3                         3.363636            15.000000   1.630460e+09   
4                         4.000000            14.000000   1.630463e+09   

   is_charging_station  status  is_installed  is_renting  is_returning  \
0                  1.0  

In [39]:
data[2021]

Unnamed: 0,station_id,num_bikes_available,num_bikes_available_types.mechanical,num_bikes_available_types.ebike,num_docks_available,last_reported,is_charging_station,status,is_installed,is_renting,is_returning,last_updated,year_last_updated_date,month_last_updated_date,week_last_updated_date,dayofweek_last_updated_date,dayofmonth_last_updated_date,dayofyear_last_updated_date,hour_last_updated_date,traffic
0,1.0,43.0,43.0,0.0,3.0,1609457000.0,1.0,0.0,1.0,1.0,1.0,1609456000.0,2020.0,12.0,53.0,3.0,31.0,366.0,23.0,
1,1.0,40.666667,40.666667,0.0,5.333333,1609461000.0,1.0,0.0,1.0,1.0,1.0,1609459000.0,2021.0,1.0,53.0,4.0,1.0,1.0,0.0,
2,1.0,41.0,41.0,0.0,5.0,1609464000.0,1.0,0.0,1.0,1.0,1.0,1609463000.0,2021.0,1.0,53.0,4.0,1.0,1.0,1.0,
3,1.0,41.0,41.0,0.0,5.0,1609468000.0,1.0,0.0,1.0,1.0,1.0,1609466000.0,2021.0,1.0,53.0,4.0,1.0,1.0,2.0,
4,1.0,41.0,41.0,0.0,5.0,1609472000.0,1.0,0.0,1.0,1.0,1.0,1609470000.0,2021.0,1.0,53.0,4.0,1.0,1.0,3.0,
