# Bike Sharing analysis using Dask Structures


Course: Advanced Python 
Date: May/19/2019

Author: Marcos Bergés



# Loading Libraries

In [26]:
import pandas as pd
from dask import dataframe as dd
from dask_ml.linear_model import LinearRegression
from dask_ml.preprocessing import Categorizer, DummyEncoder, MinMaxScaler, StandardScaler
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.metrics import r2_score, explained_variance_score
from dask_ml.metrics import mean_squared_error
import dask_ml.model_selection as dcv
import dask_ml
from scipy import stats
from distributed import Client, progress

# Loading Data

In [5]:
ddf=dd.read_csv(
    "https://s3.amazonaws.com/dask-bike/hour.csv?versionId=6G.uFUQD_PJbnvTr4EEbPjYOMQqiEvul",
    sep=",",
    parse_dates=["dteday"])

# Pre-processing

Renaming Columns

In [6]:
ddf = ddf.rename(
    columns={
        "weathersit": "weather",
        "mnth": "month",
        "hr": "hour",
        "hum": "humidity",
        "cnt": "count",
        "yr": "year",
    })

ddf = ddf.drop(["dteday"], axis=1)

Categorizing and One-hot Encoding

In [7]:
cats= ["month","hour","holiday","weekday","workingday","weather"]

pipeline = make_pipeline(Categorizer(columns=cats), DummyEncoder(),)

ddf_onehot=pipeline.fit_transform(ddf)

Normalizing

In [8]:
cols = list(ddf_onehot.columns[2:6])

scaler=MinMaxScaler()

ddf_onehot[cols]=scaler.fit_transform(ddf_onehot[cols])

Fixing Skewness

In [9]:
def correct_skewness(columns=None, max_skewness=0.5):
    if columns is None:
        raise ValueError(
            f"columns argument is None. Please set columns argument to a list of columns"
        )


    for col in columns:
        skewness = stats.skew(ddf_onehot[col])
        max_val = ddf_onehot[col].max().compute()
        min_val = ddf_onehot[col].min().compute()

        if abs(skewness) > max_skewness and (max_val > 1 or min_val < 0):
            delta = 1.0
            if min_val < 0:
                delta = max(1, -min_val + 1)
            ddf_onehot[col] = da.log(delta + ddf_onehot[col])
    return ddf_onehot


In [10]:
correct_skewness(columns=["season",'instant','year','temp','atemp','humidity','windspeed'])

Unnamed: 0_level_0,instant,season,year,temp,atemp,humidity,windspeed,casual,registered,count,month_1,month_2,month_3,month_4,month_5,month_6,month_7,month_8,month_9,month_10,month_11,month_12,hour_0,hour_1,hour_2,hour_3,hour_4,hour_5,hour_6,hour_7,hour_8,hour_9,hour_10,hour_11,hour_12,hour_13,hour_14,hour_15,hour_16,hour_17,hour_18,hour_19,hour_20,hour_21,hour_22,hour_23,holiday_0,holiday_1,weekday_6,weekday_0,weekday_1,weekday_2,weekday_3,weekday_4,weekday_5,workingday_0,workingday_1,weather_1,weather_2,weather_3,weather_4
npartitions=1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1,Unnamed: 23_level_1,Unnamed: 24_level_1,Unnamed: 25_level_1,Unnamed: 26_level_1,Unnamed: 27_level_1,Unnamed: 28_level_1,Unnamed: 29_level_1,Unnamed: 30_level_1,Unnamed: 31_level_1,Unnamed: 32_level_1,Unnamed: 33_level_1,Unnamed: 34_level_1,Unnamed: 35_level_1,Unnamed: 36_level_1,Unnamed: 37_level_1,Unnamed: 38_level_1,Unnamed: 39_level_1,Unnamed: 40_level_1,Unnamed: 41_level_1,Unnamed: 42_level_1,Unnamed: 43_level_1,Unnamed: 44_level_1,Unnamed: 45_level_1,Unnamed: 46_level_1,Unnamed: 47_level_1,Unnamed: 48_level_1,Unnamed: 49_level_1,Unnamed: 50_level_1,Unnamed: 51_level_1,Unnamed: 52_level_1,Unnamed: 53_level_1,Unnamed: 54_level_1,Unnamed: 55_level_1,Unnamed: 56_level_1,Unnamed: 57_level_1,Unnamed: 58_level_1,Unnamed: 59_level_1,Unnamed: 60_level_1,Unnamed: 61_level_1
,int64,int64,float64,float64,float64,float64,float64,int64,int64,int64,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8,uint8
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


# Modeling

In [11]:
client = Client()

client

Dask needs bokeh >= 0.13.0 for the dashboard.
Continuing without the dashboard.
  "\nDask needs bokeh >= 0.13.0 for the dashboard."


0,1
Client  Scheduler: tcp://127.0.0.1:58165,Cluster  Workers: 4  Cores: 4  Memory: 8.59 GB


In [12]:
def score_lin(X_train, X_test, y_train, y_test):
    lm = LinearRegression()
    lm.fit(X_train, y_train)
    y_pred = lm.predict(X_test)
    print("Intercept:", lm.intercept_)
    print("Coefficients:", lm.coef_)
    print("Variance score (R2): {:.2f}".format(r2_score(y_test.compute(), y_pred.compute())))
    return y_pred

### Data Split
Splitting Data for predicting 3 different values: Registered, Casual and Count
    

In [13]:
def split_data(dataset, Target):
    X = dataset.loc[:, dataset.columns != Target]
    y = dataset.loc[:, Target]
    train_size = int(len(dataset) * 0.875)
    X_train, X_test, y_train, y_test = (
        X.loc[0:train_size-1], #dask uses loc instead of iloc, and -1 to avoid repeating in both train and test
        X.loc[train_size : len(dataset)],
        y.loc[0:train_size-1], #dask uses loc instead of iloc, and -1 to avoid repeating in both train and test
        y.loc[train_size : len(dataset)],
    )
    return X_train, X_test, y_train, y_test

In [14]:
x_train_registered, x_test_registered, y_train_registered, y_test_registered = split_data(ddf_onehot, "registered")
x_train_casual, x_test_casual, y_train_casual, y_test_casual = split_data(ddf_onehot, "casual")
x_train_count, x_test_count, y_train_count, y_test_count = split_data(ddf_onehot, "count")

### Converting to arrays


The model does not support dask dataframes, so all dask-dfs will be converted  to dask arrays by using the .values method.


## Count

In [15]:
X_train_count = x_train_count.drop(["casual", "registered"], axis = 1)
X_test_count = x_test_count.drop(["casual", "registered"], axis = 1)

In [16]:
X_train_count, X_test_count, \
y_train_count, y_test_count = \
X_train_count.values, X_test_count.values, \
y_train_count.values, y_test_count.values

In [17]:
baseline_count_pred = score_lin(X_train_count, X_test_count, y_train_count, y_test_count)

  contains = index in indices
  sub[blockwise_token(i)] = blockwise_token(indices.index(index))


Intercept: -47.35351350631875
Coefficients: [ 1.38770424e+01  4.83504384e-03  1.37112925e+01  4.63215356e+01
  1.05299656e+02  9.74763100e+01 -8.15091170e+01 -3.03264063e+01
 -9.60728938e+00 -7.13914631e+00  1.22248355e+01  1.88822063e+01
  3.37224410e+01  1.27029331e+01 -2.11388494e+01 -5.43995335e+00
  1.80206389e+01  3.66165337e+00 -1.45306728e+01 -1.78563421e+01
 -1.16832963e+02 -1.34274964e+02 -1.42449442e+02 -1.53570743e+02
 -1.56875302e+02 -1.40770816e+02 -8.36827228e+01  4.60149336e+01
  1.78507650e+02  3.88181429e+01 -1.18192452e+01  1.24847300e+01
  4.96157372e+01  4.57376870e+01  3.06565555e+01  3.83727313e+01
  9.84041908e+01  2.51534414e+02  2.24069489e+02  1.19991129e+02
  4.09480260e+01 -7.68843956e+00 -4.48346695e+01 -8.45646780e+01
  1.53897658e+01 -3.12029045e+00  8.71178880e+00 -3.34227311e+00
 -9.47135048e-01  1.58549260e+00  3.38288745e+00  3.37531072e+00
  6.94517828e+00  8.88646625e-01  3.36421047e+00  1.56148486e+01
  6.24158284e+00 -4.52188991e+01]


  contains = index in indices
  sub[blockwise_token(i)] = blockwise_token(indices.index(index))


Variance score (R2): 0.63


## Registered

In [18]:
X_train_registered = x_train_registered.drop(["casual", "count"], axis = 1)
X_test_registered = x_test_registered.drop(["casual", "count"], axis = 1)

In [19]:
X_train_registered, X_test_registered, \
y_train_registered, y_test_registered = \
X_train_registered.values, X_test_registered.values, \
y_train_registered.values, y_test_registered.values

In [20]:
baseline_reg_pred = score_lin(X_train_registered, X_test_registered, y_train_registered, y_test_registered)

  contains = index in indices
  sub[blockwise_token(i)] = blockwise_token(indices.index(index))


Intercept: -45.39694045446746
Coefficients: [-4.38989127e-01  5.88621132e-03  1.30119505e+01  2.44811991e+01
  3.30730437e+01  8.76533390e+01 -5.45731420e+01 -1.20379494e+01
  1.80605767e+00  4.51562527e+00  9.78816052e+00  1.19689686e+01
  2.50426020e+01  1.36278059e+01 -1.43845957e+01 -3.74713664e+00
  1.11262087e+01 -6.22355144e+00 -1.89133594e+01 -1.83450844e+01
 -9.51486762e+01 -1.09609590e+02 -1.16706452e+02 -1.25574753e+02
 -1.28339575e+02 -1.14119863e+02 -6.03897752e+01  6.28416889e+01
  1.86818217e+02  4.09270509e+01 -2.22735528e+01 -7.55486613e+00
  2.35961315e+01  1.74561143e+01  3.90388059e-01  8.95281241e+00
  6.94134184e+01  2.19376121e+02  2.02059009e+02  1.07871683e+02
  4.02564054e+01 -1.32247241e+00 -3.36733734e+01 -6.74279958e+01
  1.53503534e+01 -3.48146665e+00 -2.85750016e+00 -1.08792947e+01
  1.71599243e+00  7.46834086e+00  9.06061862e+00  9.30448808e+00
  5.50479961e+00 -1.00279673e+01  1.41419720e+01  1.25810381e+01
  6.42869134e+00 -3.73423666e+01]


  contains = index in indices
  sub[blockwise_token(i)] = blockwise_token(indices.index(index))


Variance score (R2): 0.63


## Casual

In [21]:
X_train_casual=x_train_casual.drop(["count","registered"],axis=1)
X_test_casual=x_test_casual.drop(["count","registered"],axis=1)

In [22]:
X_train_casual, X_test_casual, \
y_train_casual, y_test_casual = \
X_train_casual.values, X_test_casual.values, \
y_train_casual.values, y_test_casual.values

In [23]:
baseline_casual_pred = score_lin(X_train_casual, X_test_casual, y_train_casual, y_test_casual)

  contains = index in indices
  sub[blockwise_token(i)] = blockwise_token(indices.index(index))


Intercept: 0.3261807292656939
Coefficients: [-5.72695687e+00 -1.20653180e-03  7.12406474e-01  2.32010051e+01
  7.24745824e+01  9.58873335e+00 -2.69230826e+01 -1.82988425e+01
 -8.61018516e+00 -8.74520148e+00  5.44703591e+00  1.00260824e+01
  1.18983271e+01  2.39784633e+00 -3.32951688e+00  1.84668493e+00
  1.05501398e+01  1.36508015e+01  8.26607702e+00  4.50306729e+00
 -1.70627697e+01 -2.00435768e+01 -2.11208489e+01 -2.33730194e+01
 -2.39132146e+01 -2.20277703e+01 -1.86693772e+01 -1.22029904e+01
 -3.68694586e+00  2.51461954e+00  1.50771103e+01  2.46618365e+01
  3.06414876e+01  3.29030506e+01  3.48874475e+01  3.40410928e+01
  3.36118311e+01  3.67795945e+01  2.66325166e+01  1.67423149e+01
  5.31526645e+00 -1.74173321e+00 -6.53671436e+00 -1.25116841e+01
  5.78276370e+00  6.09208030e+00  1.42044393e+01  1.01701541e+01
 -1.85955093e-02 -3.24112542e+00 -3.03631226e+00 -3.28770277e+00
  4.08182889e+00  1.29095962e+01 -8.79352658e+00  5.31037912e+00
  2.08974784e+00 -5.60055774e+00]


  contains = index in indices
  sub[blockwise_token(i)] = blockwise_token(indices.index(index))


Variance score (R2): 0.54


### Adding predictions: Casual + Registered

In [24]:
added_prediction=baseline_casual_pred+baseline_reg_pred

In [28]:
explained_variance_score(y_test_count.compute() ,added_prediction.compute())

  contains = index in indices


0.6372334260082841