In [15]:
%restart_ipy

Successfully restarted IPyParallel cluster. Please restart the kernels on your notebooks before using IPyParallel again.


In [1]:
%%px

import snowflake.connector
#import bodo
import time
from sqlalchemy.dialects import registry
registry.register('snowflake', 'snowflake.sqlalchemy', 'dialect')

In [2]:
%%px

import os
with open("./snowflake-secrets.txt","r") as f:
    for line in f:
        args=line.split("=")
        os.environ[args[0]]=args[1].strip()
username=os.environ["username"]
password=os.environ["password"]
account=os.environ["account"]
warehouse="BODO_VW"
database="BODO_DB"
role="BODO_R1"

In [3]:
%%px

import bodo
import pandas as pd

@bodo.jit
def general_merge(trip_df, weather):
    return trip_df.merge(weather, left_on='trip_date', right_on='weather_date', how='left')


@bodo.jit()
def tc5():
    trips=pd.read_sql('select * from PUBLIC.TRIPS', f"snowflake://{username}:{password}@{account}/{database}/public?role={role}&warehouse={warehouse}",)
    weather=pd.read_sql('select * from PUBLIC.WEATHER', f"snowflake://{username}:{password}@{account}/{database}/public?role={role}&warehouse={warehouse}",)
    
    # 1
    #Remove trips having trip duration(TRIPDURATION) more than  3 hours (180 minutes)
    
    trips = trips[trips["tripduration"]<181]
    
    
    #2
    #Calculate the age of the person by taking the difference between birth year(BIRTH_YEAR) 
    #and year of the trip( extract year from the STARTTIME)
    
    trips["age"] = pd.DatetimeIndex(trips["starttime"]).year - trips["birth_year"]
    
    
    #3
    #Remove records where age is more than 90 and make sure that we do not eliminate 
    #the records where the birth year(BIRTH_YEAR) is missing.
    
    trips = trips.loc[(trips["age"]<90) | (trips["birth_year"].isnull() == True)]
    
    
    #4
    #Replace the missing birth year(BIRTH_YEAR) with the median value of birth year.
    
    trips["birth_year"].fillna(int(trips["birth_year"].median()), inplace=True)
    
    #5
    #Aggregate the data at day level by using the USER_TYPE as the group by variable.
    #Create the following features at the aggregated level:
    #-count of number of trips on that day(no_of_trips_day)
    
    trips["trip_date"] = trips["starttime"].dt.date
    trip_df = trips.groupby(["trip_date", "usertype"], as_index = False).size().rename(columns = {'size': 'no_of_trips_day'}, inplace = True)
    
    
    #6
    #Left join the Trips and WEATHER data using key TRIP_DATE= WEATHER_DATE
    
    df = general_merge(trip_df, weather)
    
    
    #7
    #Select records which fall on or before 31st Dec 2017 (TRIP_DATE) as 
    #training data (Create a table Trips_Training).
    
    trips_training = df[df["trip_date"] < pd.Timestamp(2018,1,1)]
    
    
    #8
    #Select records which fall after 31st Dec 2017(TRIP_DATE) as testing data 
    #(Create a table Trips_Testing)
    
    trips_testing = df[df["trip_date"] >= pd.Timestamp(2018,1,1)]
    
    
    
    return trips_training, trips_testing
trips_training, trips_testing=tc5()

%px: 100%|██████████| 32/32 [00:26<00:00,  1.23tasks/s]


In [4]:
%%px

from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import MinMaxScaler
import pickle
import joblib
import numpy as np

trips_training = bodo.rebalance(trips_training, parallel=True)

@bodo.jit()
def tc6(trips_training):
    
    #1
    #Build linear regression model OR any other suitable model on the Training dataset(Trips_Training)
    #Target : count of number of trips on that day(no_of_trips_day)
    #Input/Independent variables : AWND,PRCP,SNOW,SNWD,TMAX,TMIN,WDF2,WDF5,WSF2,WSF5
    
    trips_training = trips_training.dropna(how='any')
    
    X_train = trips_training[['awnd','prcp','snow','snwd','tmax','tmin','wdf2','wdf5','wsf2','wsf5']]
    y_train = trips_training['no_of_trips_day']
    
    with bodo.objmode():
        
        #scaling
        scaler = MinMaxScaler()
        X_train_normalized = scaler.fit_transform(X_train)
        
        #training
        reg = LinearRegression()
        model = reg.fit(X_train_normalized, y_train)
    
        #2
        #Save the model object , parameter estimate and R-square statistics on the training data. 
        
        #saving the model with pickle
        filename = 'model.pkl'
        pickle.dump(model, open(filename, 'wb'))
        
        #saving the model with joblib (recommended for Sklrean models)
        filename = 'model.joblib'
        joblib.dump(model, open(filename, 'wb'))
        
        print(f'r2 score on learn dataset: {reg.score(X_train_normalized, y_train)}')
        


tc6(trips_training)

%px:   0%|          | 0/32 [00:04<?, ?tasks/s]

[stdout:19] r2 score on learn dataset: 0.09086532620908516


[stdout:15] r2 score on learn dataset: 0.1065490614855168


[stdout:11] r2 score on learn dataset: 0.08793640700233352


[stdout:7] r2 score on learn dataset: 0.08771192220891988


[stdout:26] r2 score on learn dataset: 0.09164625957852779


[stdout:2] r2 score on learn dataset: 0.0882659169736264


[stdout:22] r2 score on learn dataset: 0.10182589098522465


[stdout:31] r2 score on learn dataset: 0.11800751788030517


[stdout:23] r2 score on learn dataset: 0.0596102301075383


[stdout:18] r2 score on learn dataset: 0.07589767820243554


[stdout:30] r2 score on learn dataset: 0.05900687033403951


[stdout:9] r2 score on learn dataset: 0.08145908174957117


[stdout:17] r2 score on learn dataset: 0.085643896471573


%px:  41%|████      | 13/32 [00:04<00:00, 128.41tasks/s]

[stdout:25] r2 score on learn dataset: 0.04888219507193414


[stdout:21] r2 score on learn dataset: 0.08582880592053599


[stdout:13] r2 score on learn dataset: 0.07777300448053337


[stdout:10] r2 score on learn dataset: 0.08360811969877202


[stdout:6] r2 score on learn dataset: 0.03262601901859152


[stdout:29] r2 score on learn dataset: 0.017570332314969517


[stdout:14] r2 score on learn dataset: 0.11254834741145092


[stdout:5] r2 score on learn dataset: 0.12169756632813755


[stdout:3] r2 score on learn dataset: 0.09512015494561643


[stdout:1] r2 score on learn dataset: 0.03411455352546133


[stdout:27] r2 score on learn dataset: 0.08043881473741099


%px:  75%|███████▌  | 24/32 [00:04<00:00, 117.05tasks/s]

[stdout:16] r2 score on learn dataset: 0.15007705080938682


[stdout:24] r2 score on learn dataset: 0.12149882862598249


[stdout:4] r2 score on learn dataset: 0.07031518775441237


[stdout:20] r2 score on learn dataset: 0.05731666309930472


[stdout:12] r2 score on learn dataset: 0.06383694168125509


[stdout:28] r2 score on learn dataset: 0.12791855949901632


[stdout:8] r2 score on learn dataset: 0.08473101995452736


%px:  97%|█████████▋| 31/32 [00:05<00:00, 65.62tasks/s] 

[stdout:0] r2 score on learn dataset: 0.058593193678348254


%px: 100%|██████████| 32/32 [00:05<00:00,  6.06tasks/s]


In [5]:
%%px

from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
import math

trips_testing = bodo.rebalance(trips_testing, parallel=True)

tc7_output_type = bodo.typeof(pd.DataFrame({"yhat": [0.0]}))

@bodo.jit()
def tc7_a(trips_testing, trips_training):
    
    #1
    #Build linear regression model OR any other suitable model on the Training dataset(Trips_Training)
    #Target : count of number of trips on that day(no_of_trips_day)
    #Input/Independent variables : AWND,PRCP,SNOW,SNWD,TMAX,TMIN,WDF2,WDF5,WSF2,WSF5
    
    trips_testing = trips_testing.dropna(how='any')
    trips_training = trips_training.dropna(how='any')
    
    X_train = trips_training[['awnd','prcp','snow','snwd','tmax','tmin','wdf2','wdf5','wsf2','wsf5']]
    y_train = trips_training['no_of_trips_day']
    
    X_test = trips_testing[['awnd','prcp','snow','snwd','tmax','tmin','wdf2','wdf5','wsf2','wsf5']]
    y_test = trips_testing['no_of_trips_day']
    
    #2-a
    #Use the test data(Trips_Testing) , score the data using the model object and store 
    #the prediction in the test data. 
    
    with bodo.objmode(y_preds_data = tc7_output_type):
        reg = LinearRegression()
        model = reg.fit(X_train, y_train)
        y_preds_numpy = model.predict(X_test)
        
        print("MSE: ", mean_squared_error(y_test, y_preds_numpy))
        print("RMSE: ", math.sqrt(mean_squared_error(y_test, y_preds_numpy)))
        print("MAE: ", mean_absolute_error(y_test, y_preds_numpy))
        
        y_preds_data = pd.DataFrame(y_preds_numpy, columns = ["yhat"])
        
    return y_preds_data
    
    
@bodo.jit()
def tc7_b(trips_testing, trips_training):
    
    #2-b
    # Store the prediction in the test data. Save the Test data.
    
    y_preds = tc7_a(trips_testing, trips_training)
    
    trips_testing_total = pd.concat([trips_testing, y_preds], axis=1)
    
    trips_testing_total.to_parquet("Test_data.pq")
    return trips_testing_total
    
trips_testing_total=tc7_b(trips_testing, trips_training)


%px:   0%|          | 0/32 [00:07<?, ?tasks/s]

[stdout:18] MSE:  2303523.2843269254
RMSE:  1517.736236744358
MAE:  1303.5153878739222


[stdout:22] MSE:  1964237.0228452706
RMSE:  1401.512405526712
MAE:  1277.0663650360148


[stdout:28] MSE:  1284077.9422793873
RMSE:  1133.1716296657746
MAE:  996.5316493541835


[stdout:24] MSE:  2664045.781627454
RMSE:  1632.1904857054687
MAE:  1391.4683817900816


[stdout:31] MSE:  1329621.8969323419
RMSE:  1153.0923193449612
MAE:  954.090709042393


[stdout:19] MSE:  2348901.4052643483
RMSE:  1532.6126076945695
MAE:  1189.589612705093


[stdout:29] MSE:  3684656.552356999
RMSE:  1919.5459234821653
MAE:  1742.8738714816436


[stdout:25] MSE:  2594793.9245708254
RMSE:  1610.8364052785823
MAE:  1390.894456021349


[stdout:5] MSE:  1409287.8515552233
RMSE:  1187.1343022401566
MAE:  993.871568490356


[stdout:0] MSE:  1265421.362792574
RMSE:  1124.9094909336368
MAE:  972.2888247129694


[stdout:23] MSE:  2061738.3072126145
RMSE:  1435.8754497562156
MAE:  1231.740686772589


[stdout:14] MSE:  3329853.270802231
RMSE:  1824.78855509405
MAE:  1661.1187004693902


[stdout:7] MSE:  1656880.0857615976
RMSE:  1287.1985417027156
MAE:  1131.4177479650946


[stdout:30] MSE:  2057227.2329031753
RMSE:  1434.3037449937776
MAE:  1211.3531698313784


[stdout:21] MSE:  2497550.488265617
RMSE:  1580.364036627516
MAE:  1323.0718387991633


[stdout:12] MSE:  3100499.3100216286
RMSE:  1760.823474974601
MAE:  1393.6276114735576


[stdout:26] MSE:  3249536.1288073272
RMSE:  1802.6469784201583
MAE:  1391.7830724902449


[stdout:16] MSE:  1887475.1222440826
RMSE:  1373.8541124311862
MAE:  1051.870314894426


[stdout:15] MSE:  1385827.787876214
RMSE:  1177.21187042784
MAE:  957.0925500514226


[stdout:8] MSE:  2294980.440142595
RMSE:  1514.9192850256397
MAE:  1313.113849831983


[stdout:6] MSE:  1893131.5134638557
RMSE:  1375.911157547556
MAE:  1062.441383009875


[stdout:17] MSE:  2186392.0092410813
RMSE:  1478.6453290904758
MAE:  1250.75756025117


[stdout:1] MSE:  1089975.513393703
RMSE:  1044.0189238676198
MAE:  856.680536750324


[stdout:11] MSE:  1129263.8304134626
RMSE:  1062.668259812752
MAE:  1001.5543746620503


[stdout:27] MSE:  1574064.8008828273
RMSE:  1254.6173922287333
MAE:  1053.9240079031322


[stdout:20] MSE:  1729310.858441473
RMSE:  1315.0326453900195
MAE:  918.0336243821606


[stdout:4] MSE:  2332492.718368402
RMSE:  1527.2500510291045
MAE:  1248.6018088722742


[stdout:9] MSE:  1412127.9481044242
RMSE:  1188.3298986831999
MAE:  931.4999106848704


[stdout:10] MSE:  1692036.4303795656
RMSE:  1300.7830066462145
MAE:  1147.5398569560887


[stdout:13] MSE:  2451447.927025966
RMSE:  1565.7100392556617
MAE:  1365.3795413634086


[stdout:2] MSE:  1770589.8851535553
RMSE:  1330.6351435136362
MAE:  1109.0505383264813


[stdout:3] MSE:  2312826.083920431
RMSE:  1520.7978445278093
MAE:  1321.011829930355


%px: 100%|██████████| 32/32 [00:07<00:00,  4.47tasks/s]


In [6]:
%%px
@bodo.jit(cache=True)
def write_to_sf(df,tablename):
    df.to_sql(tablename,f"snowflake://{username}:{password}@{account}/{database}/public?role={role}&warehouse={warehouse}",if_exists="replace",index=False)
write_to_sf(trips_testing_total,"trips_testing_total")
write_to_sf(trips_testing_total,"trips_testing")

%px:   0%|          | 0/32 [00:02<?, ?tasks/s]

  results = connection.execute(


%px:   0%|          | 0/32 [00:05<?, ?tasks/s]

  results = connection.execute(
  results = connection.execute(


  results = connection.execute(
  results = connection.execute(


%px:   0%|          | 0/32 [00:05<?, ?tasks/s]

  results = connection.execute(
  results = connection.execute(


%px:   0%|          | 0/32 [00:06<?, ?tasks/s]

  results = connection.execute(
  results = connection.execute(


  results = connection.execute(
  results = connection.execute(


  results = connection.execute(
  results = connection.execute(


  results = connection.execute(
  results = connection.execute(


  results = connection.execute(


  results = connection.execute(
  results = connection.execute(


  results = connection.execute(


%px:   0%|          | 0/32 [00:06<?, ?tasks/s]

  results = connection.execute(
  results = connection.execute(


  results = connection.execute(


  results = connection.execute(


  results = connection.execute(


  results = connection.execute(


  results = connection.execute(
  results = connection.execute(


  results = connection.execute(
  results = connection.execute(


  results = connection.execute(


  results = connection.execute(
  results = connection.execute(


%px:   0%|          | 0/32 [00:06<?, ?tasks/s]

  results = connection.execute(
  results = connection.execute(


  results = connection.execute(


  results = connection.execute(
  results = connection.execute(


  results = connection.execute(
  results = connection.execute(


  results = connection.execute(


  results = connection.execute(
  results = connection.execute(


  results = connection.execute(
  results = connection.execute(


  results = connection.execute(


  results = connection.execute(
  results = connection.execute(


  results = connection.execute(


  results = connection.execute(
  results = connection.execute(


%px:   0%|          | 0/32 [00:06<?, ?tasks/s]

  results = connection.execute(
  results = connection.execute(


%px: 100%|██████████| 32/32 [00:29<00:00,  1.07tasks/s]
