# NYC Taxi: Predict Trip Duration

Below is a pipeline that predicts the trip duration from attributes such as pickup location, distance traveled, time of day, and so on. This pipeline is a typical ML workflow written in Python, using Scikit-learn (RandomForrest). We see pre-processing, training, and testing steps. 

In [None]:
# https://www.kaggle.com/stephaniestallworth/nyc-taxi-eda-regression-fivethirtyeight-viz/notebook
import math
import numpy as np
import pandas as pd
from sklearn import metrics
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split

def manhattan_distance(x1, y1, x2, y2):
    return abs(x1 - x2) + abs(y1 - y2)

def roundtime(tstring):
    hours, mins, secs = tstring.split(':')
    if int(mins) >= 30:
        if hours == '23':
            return '00'
        else:
            return str(int(hours) + 1)
    else:
        return hours

def weekday(start):
    from datetime import datetime
    fmt = '%Y-%m-%d %H:%M:%S'
    tstamp = datetime.strptime(start, fmt)
    return int(tstamp.weekday())

data_df = pd.read_csv('train.csv')

data_df['distance'] = [i for i in map(manhattan_distance,
    data_df['pickup_longitude'], data_df['pickup_latitude'], 
    data_df['dropoff_longitude'], data_df['dropoff_latitude'])]

# Remove outliers in passenger_count
data_df = data_df[data_df['passenger_count']>0]
data_df = data_df[data_df['passenger_count']<9]

# Remove coordinate outliers
data_df = data_df[data_df['pickup_longitude'] <= -73.75]
data_df = data_df[data_df['pickup_longitude'] >= -74.03]
data_df = data_df[data_df['pickup_latitude'] <= 40.85]
data_df = data_df[data_df['pickup_latitude'] >= 40.63]
data_df = data_df[data_df['dropoff_longitude'] <= -73.75]
data_df = data_df[data_df['dropoff_longitude'] >= -74.03]
data_df = data_df[data_df['dropoff_latitude'] <= 40.85]
data_df = data_df[data_df['dropoff_latitude'] >= 40.63]

# Remove trip_duration outliers
trip_duration_mean = np.mean(data_df['trip_duration'])
trip_duration_std = np.std(data_df['trip_duration'])
data_df = data_df[data_df['trip_duration'] <= trip_duration_mean + 2*trip_duration_std]
data_df = data_df[data_df['trip_duration'] >= trip_duration_mean - 2*trip_duration_std]
data_df = data_df[data_df['trip_duration'] >= 30]
data_df = data_df[data_df['trip_duration'] <= 60*240]

data_df['start_hr'] = data_df['pickup_datetime'].apply(lambda x: int(roundtime(x.split(' ')[1])))
data_df['start_month'] = data_df['pickup_datetime'].apply(lambda x: int(x.split(' ')[0].split('-')[1]))
data_df['start_weekday']= data_df['pickup_datetime'].apply(lambda x: weekday(x))

X = data_df[['vendor_id', 'pickup_longitude',
            'pickup_latitude', 'dropoff_longitude', 
            'dropoff_latitude', 'distance',
            'start_hr', 'start_month', 'start_weekday']]
y = data_df['trip_duration']

X_train, X_test, y_train, y_test = train_test_split(X, 
    y, test_size = 0.2, random_state = 0)

clf = RandomForestRegressor(n_estimators=20, n_jobs=3)
clf.fit(X_train, y_train)

preds = clf.predict(X_test)
score = metrics.explained_variance_score(y_test, preds)
rmse = np.sqrt(metrics.mean_squared_error(y_test, preds))

print("R2: {}".format(score))
print("RMSE: {}".format(rmse))

## Coarsest Jarvis Pipeline: Wrap an existing pipeline

We expect many of our users to already have pipelines that they built and refined over time. We expect our users to trust these pipelines, and these pipelines to be effective. For such users, we add value by versioning the artifacts the user wants us to track, as well as recording the relationships between some objects at a corase level. To wrap an existing pipeline in Jarvis, simply put the pipeline code inside a decorated function, and tell us what the inputs and outputs of the function are. Please see example below.

In [None]:
import jarvis

with jarvis.Experiment('coarsest') as ex:
    
    jarvis.setNotebookName('Taxi.ipynb')
    ex.groundClient('ground')
    
    @jarvis.func
    def run_existing_pipeline(path_to_data):
        import math
        import numpy as np
        import pandas as pd
        from sklearn import metrics
        from sklearn.ensemble import RandomForestRegressor
        from sklearn.model_selection import train_test_split
    
        def manhattan_distance(x1, y1, x2, y2):
            return abs(x1 - x2) + abs(y1 - y2)

        def roundtime(tstring):
            hours, mins, secs = tstring.split(':')
            if int(mins) >= 30:
                if hours == '23':
                    return '00'
                else:
                    return str(int(hours) + 1)
            else:
                return hours

        def weekday(start):
            from datetime import datetime
            fmt = '%Y-%m-%d %H:%M:%S'
            tstamp = datetime.strptime(start, fmt)
            return int(tstamp.weekday())

        data_df = pd.read_csv(path_to_data)

        data_df['distance'] = [i for i in map(manhattan_distance,
            data_df['pickup_longitude'], data_df['pickup_latitude'], 
            data_df['dropoff_longitude'], data_df['dropoff_latitude'])]

        # Remove outliers in passenger_count
        data_df = data_df[data_df['passenger_count']>0]
        data_df = data_df[data_df['passenger_count']<9]

        # Remove coordinate outliers
        data_df = data_df[data_df['pickup_longitude'] <= -73.75]
        data_df = data_df[data_df['pickup_longitude'] >= -74.03]
        data_df = data_df[data_df['pickup_latitude'] <= 40.85]
        data_df = data_df[data_df['pickup_latitude'] >= 40.63]
        data_df = data_df[data_df['dropoff_longitude'] <= -73.75]
        data_df = data_df[data_df['dropoff_longitude'] >= -74.03]
        data_df = data_df[data_df['dropoff_latitude'] <= 40.85]
        data_df = data_df[data_df['dropoff_latitude'] >= 40.63]

        # Remove trip_duration outliers
        trip_duration_mean = np.mean(data_df['trip_duration'])
        trip_duration_std = np.std(data_df['trip_duration'])
        data_df = data_df[data_df['trip_duration'] <= trip_duration_mean + 2*trip_duration_std]
        data_df = data_df[data_df['trip_duration'] >= trip_duration_mean - 2*trip_duration_std]
        data_df = data_df[data_df['trip_duration'] >= 30]
        data_df = data_df[data_df['trip_duration'] <= 60*240]

        data_df['start_hr'] = data_df['pickup_datetime'].apply(lambda x: int(roundtime(x.split(' ')[1])))
        data_df['start_month'] = data_df['pickup_datetime'].apply(lambda x: int(x.split(' ')[0].split('-')[1]))
        data_df['start_weekday']= data_df['pickup_datetime'].apply(lambda x: weekday(x))

        X = data_df[['vendor_id', 'pickup_longitude',
                    'pickup_latitude', 'dropoff_longitude', 
                    'dropoff_latitude', 'distance',
                    'start_hr', 'start_month', 'start_weekday']]
        y = data_df['trip_duration']

        X_train, X_test, y_train, y_test = train_test_split(X, 
            y, test_size = 0.2, random_state = 0)

        clf = RandomForestRegressor(n_estimators=20, n_jobs=3)
        clf.fit(X_train, y_train)

        preds = clf.predict(X_test)
        score = metrics.explained_variance_score(y_test, preds)
        rmse = np.sqrt(metrics.mean_squared_error(y_test, preds))

        score = "R2: {}".format(score)
        rmse = "RMSE: {}".format(rmse)
        
        print(score, rmse)
    
        return clf, score, rmse
    
    data = ex.artifact('train.csv')
    do_all = ex.action(run_existing_pipeline, [data])
    model = ex.artifact('model.pkl', do_all)
    score = ex.artifact('score.txt', do_all)
    rmse = ex.artifact('rmse.txt', do_all)

In [None]:
score.plot()

## Coarse Jarvis Pipeline with Hyper-parameter Sweeps

Maybe after some time, a trusted and effective pipeline may need some tuning. We enable pipeline (and model) tuning via Jarvis Literals. A user can declare a Literal, with some iterable value or any Python basic type. When a literal is passed to an Action, the experiment may become a multi-trial experiment. Jarvis is able to execute every trial with some optimizations, and relate the experiment outcomes to the experiment configuration.

In [None]:
import jarvis

with jarvis.Experiment('coarse') as ex:
    import math
    import numpy as np
    import pandas as pd
    from sklearn import metrics
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.model_selection import train_test_split
    
    jarvis.setNotebookName('Taxi.ipynb')
    ex.groundClient('ground')
    
    @jarvis.func
    def run_existing_pipeline(path_to_data, n_estimators):
    
        def manhattan_distance(x1, y1, x2, y2):
            return abs(x1 - x2) + abs(y1 - y2)

        def roundtime(tstring):
            hours, mins, secs = tstring.split(':')
            if int(mins) >= 30:
                if hours == '23':
                    return '00'
                else:
                    return str(int(hours) + 1)
            else:
                return hours

        def weekday(start):
            from datetime import datetime
            fmt = '%Y-%m-%d %H:%M:%S'
            tstamp = datetime.strptime(start, fmt)
            return int(tstamp.weekday())

        data_df = pd.read_csv(path_to_data)

        data_df['distance'] = [i for i in map(manhattan_distance,
            data_df['pickup_longitude'], data_df['pickup_latitude'], 
            data_df['dropoff_longitude'], data_df['dropoff_latitude'])]

        # Remove outliers in passenger_count
        data_df = data_df[data_df['passenger_count']>0]
        data_df = data_df[data_df['passenger_count']<9]

        # Remove coordinate outliers
        data_df = data_df[data_df['pickup_longitude'] <= -73.75]
        data_df = data_df[data_df['pickup_longitude'] >= -74.03]
        data_df = data_df[data_df['pickup_latitude'] <= 40.85]
        data_df = data_df[data_df['pickup_latitude'] >= 40.63]
        data_df = data_df[data_df['dropoff_longitude'] <= -73.75]
        data_df = data_df[data_df['dropoff_longitude'] >= -74.03]
        data_df = data_df[data_df['dropoff_latitude'] <= 40.85]
        data_df = data_df[data_df['dropoff_latitude'] >= 40.63]

        # Remove trip_duration outliers
        trip_duration_mean = np.mean(data_df['trip_duration'])
        trip_duration_std = np.std(data_df['trip_duration'])
        data_df = data_df[data_df['trip_duration'] <= trip_duration_mean + 2*trip_duration_std]
        data_df = data_df[data_df['trip_duration'] >= trip_duration_mean - 2*trip_duration_std]
        data_df = data_df[data_df['trip_duration'] >= 30]
        data_df = data_df[data_df['trip_duration'] <= 60*240]

        data_df['start_hr'] = data_df['pickup_datetime'].apply(lambda x: int(roundtime(x.split(' ')[1])))
        data_df['start_month'] = data_df['pickup_datetime'].apply(lambda x: int(x.split(' ')[0].split('-')[1]))
        data_df['start_weekday']= data_df['pickup_datetime'].apply(lambda x: weekday(x))

        X = data_df[['vendor_id', 'pickup_longitude',
                    'pickup_latitude', 'dropoff_longitude', 
                    'dropoff_latitude', 'distance',
                    'start_hr', 'start_month', 'start_weekday']]
        y = data_df['trip_duration']

        X_train, X_test, y_train, y_test = train_test_split(X, 
            y, test_size = 0.2, random_state = 0)

        clf = RandomForestRegressor(n_estimators=n_estimators, n_jobs=3)
        clf.fit(X_train, y_train)

        preds = clf.predict(X_test)
        score = metrics.explained_variance_score(y_test, preds)
        rmse = np.sqrt(metrics.mean_squared_error(y_test, preds))

        score = "R2: {}".format(score)
        rmse = "RMSE: {}".format(rmse)
        
        print(score, rmse)
    
        return clf, score, rmse
    
    data = ex.artifact('train.csv')
    num_est = ex.literal([15, 20, 30], 'num_estimators')
    num_est.forEach()
    do_all = ex.action(run_existing_pipeline, [data, num_est])
    model = ex.artifact('model.pkl', do_all)
    score = ex.artifact('score.txt', do_all)
    rmse = ex.artifact('rmse.txt', do_all)

In [None]:
score.plot()

## Fine Jarvis Pipeline: Reuse Intermediate Artifacts & Compose Experiments

Perhaps our user will observe that the preprocessing step produces a result that can be shared with the entire organization; alteranively, it's possible that the pre-processing step is expensive, and the user will not want to re-run this step every time that the system runs the pipeline with a new configuration (caching intermediate shared results). A subgraph in the Jarvis pipeline may be its own experiment, or part of a parent experiment. When the Jarvis pipeline is shared with others (as opposed to merely sharing the end results), it's possible for other users or customers of the data to understand the lineage of the results: what transformations were applied, and what the source was, etc. We include an example below for how a user might separete the preprocessing step from the remaining pipeline, so that the subpipeline may be more easily shared. 

In [None]:
import math
import numpy as np
import pandas as pd
from sklearn import metrics
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split

import jarvis

with jarvis.Experiment('fine') as ex:
    
    jarvis.setNotebookName('Taxi.ipynb')
    ex.groundClient('ground')
    
    @jarvis.func
    def prepare_data(path_to_data):
    
        def manhattan_distance(x1, y1, x2, y2):
            return abs(x1 - x2) + abs(y1 - y2)

        def roundtime(tstring):
            hours, mins, secs = tstring.split(':')
            if int(mins) >= 30:
                if hours == '23':
                    return '00'
                else:
                    return str(int(hours) + 1)
            else:
                return hours

        def weekday(start):
            from datetime import datetime
            fmt = '%Y-%m-%d %H:%M:%S'
            tstamp = datetime.strptime(start, fmt)
            return int(tstamp.weekday())

        data_df = pd.read_csv(path_to_data)

        data_df['distance'] = [i for i in map(manhattan_distance,
            data_df['pickup_longitude'], data_df['pickup_latitude'], 
            data_df['dropoff_longitude'], data_df['dropoff_latitude'])]

        # Remove outliers in passenger_count
        data_df = data_df[data_df['passenger_count']>0]
        data_df = data_df[data_df['passenger_count']<9]

        # Remove coordinate outliers
        data_df = data_df[data_df['pickup_longitude'] <= -73.75]
        data_df = data_df[data_df['pickup_longitude'] >= -74.03]
        data_df = data_df[data_df['pickup_latitude'] <= 40.85]
        data_df = data_df[data_df['pickup_latitude'] >= 40.63]
        data_df = data_df[data_df['dropoff_longitude'] <= -73.75]
        data_df = data_df[data_df['dropoff_longitude'] >= -74.03]
        data_df = data_df[data_df['dropoff_latitude'] <= 40.85]
        data_df = data_df[data_df['dropoff_latitude'] >= 40.63]

        # Remove trip_duration outliers
        trip_duration_mean = np.mean(data_df['trip_duration'])
        trip_duration_std = np.std(data_df['trip_duration'])
        data_df = data_df[data_df['trip_duration'] <= trip_duration_mean + 2*trip_duration_std]
        data_df = data_df[data_df['trip_duration'] >= trip_duration_mean - 2*trip_duration_std]
        data_df = data_df[data_df['trip_duration'] >= 30]
        data_df = data_df[data_df['trip_duration'] <= 60*240]

        data_df['start_hr'] = data_df['pickup_datetime'].apply(lambda x: int(roundtime(x.split(' ')[1])))
        data_df['start_month'] = data_df['pickup_datetime'].apply(lambda x: int(x.split(' ')[0].split('-')[1]))
        data_df['start_weekday']= data_df['pickup_datetime'].apply(lambda x: weekday(x))

        return data_df
    
    @jarvis.func
    def train_test_model(data_df, n_estimators):
        X = data_df[['vendor_id', 'pickup_longitude',
                    'pickup_latitude', 'dropoff_longitude', 
                    'dropoff_latitude', 'distance',
                    'start_hr', 'start_month', 'start_weekday']]
        y = data_df['trip_duration']

        X_train, X_test, y_train, y_test = train_test_split(X, 
            y, test_size = 0.2, random_state = 0)

        clf = RandomForestRegressor(n_estimators=n_estimators, n_jobs=3)
        clf.fit(X_train, y_train)

        preds = clf.predict(X_test)
        score = metrics.explained_variance_score(y_test, preds)
        rmse = np.sqrt(metrics.mean_squared_error(y_test, preds))

        score = "R2: {}".format(score)
        rmse = "RMSE: {}".format(rmse)
        
        print(score, rmse)
    
        return clf, score, rmse
    
    data = ex.artifact('train.csv')
    num_est = ex.literal([15, 20, 30], 'num_estimators')
    num_est.forEach()
    
    do_prep = ex.action(prepare_data, [data])
    prepd = ex.artifact('prepped_data.pkl', do_prep)
    
    do_tr_te = ex.action(train_test_model, [prepd, num_est])

    
    model = ex.artifact('model.pkl', do_tr_te)
    score = ex.artifact('score.txt', do_tr_te)
    rmse = ex.artifact('rmse.txt', do_tr_te)

In [None]:
score.plot()

## Finest Pipeline: Jarvis Proper

After becoming familiar with Jarvis, it is likely that our user will want to build her next pipeline using Jarvis from the start: this was the use-case we envisioed at first. In addition to gaining the benefits from coarser Jarvis pipelines, the user will be given a set of tools for pipeline development in an interactive environment such as Jupyter. We are currently investigating techniques for detecting poor experiment methods, such as p-hacking or data dredging. Whenever a user "peeks" a Jarvis artifact, the event is recorded. We plan to use this information to infer when information from the test data may have been compromised and contaminated the analysis, and when it's likely that the user has overfitted to the data, and it's time to collect a fresh sample.

In [None]:
import jarvis

In [None]:
jarvis.setNotebookName('Taxi.ipynb')

In [None]:
ex = jarvis.Experiment('finest')

In [None]:
ex.groundClient('ground')

In [None]:
data = ex.artifact('train.csv')

In [None]:
@jarvis.func
def dataframize(csvpath):
    import pandas as pd
    return pd.read_csv(csvpath)

In [None]:
do_dfize = ex.action(dataframize, [data])
tr_data_df = ex.artifact('train_df.pkl', do_dfize)

Below is an example of *peek*

In [None]:
tr_data_df.peek()

In [None]:
@jarvis.func
def calculate_distance(data_df):
    def manhattan_distance(x1, y1, x2, y2):
        return abs(x1 - x2) + abs(y1 - y2)
    data_df['distance'] = [ i for i in map(manhattan_distance, data_df['pickup_longitude'], data_df['pickup_latitude'], 
                                           data_df['dropoff_longitude'], data_df['dropoff_latitude'])]
    return data_df

In [None]:
do_calc_dist = ex.action(calculate_distance, [tr_data_df])
tr_data_dist_df = ex.artifact('train_dist_df.pkl', do_calc_dist)

In [None]:
@jarvis.func
def preproc(train_data):
    import numpy as np
    # https://www.kaggle.com/stephaniestallworth/nyc-taxi-eda-regression-fivethirtyeight-viz/notebook
    train_data = train_data[train_data['passenger_count']>0]
    train_data = train_data[train_data['passenger_count']<9]

    # Remove coordinate outliers
    train_data = train_data[train_data['pickup_longitude'] <= -73.75]
    train_data = train_data[train_data['pickup_longitude'] >= -74.03]
    train_data = train_data[train_data['pickup_latitude'] <= 40.85]
    train_data = train_data[train_data['pickup_latitude'] >= 40.63]
    train_data = train_data[train_data['dropoff_longitude'] <= -73.75]
    train_data = train_data[train_data['dropoff_longitude'] >= -74.03]
    train_data = train_data[train_data['dropoff_latitude'] <= 40.85]
    train_data = train_data[train_data['dropoff_latitude'] >= 40.63]

    # Remove trip_duration outliers
    trip_duration_mean = np.mean(train_data['trip_duration'])
    trip_duration_std = np.std(train_data['trip_duration'])
    train_data = train_data[train_data['trip_duration']<=trip_duration_mean + 2*trip_duration_std]
    train_data = train_data[train_data['trip_duration']>= trip_duration_mean - 2*trip_duration_std]
    train_data = train_data[train_data['trip_duration'] >= 30]
    train_data = train_data[train_data['trip_duration'] <= 60*240]

    return train_data

In [None]:
do_preproc = ex.action(preproc, [tr_data_dist_df])
tr_ready = ex.artifact('train_ready.pkl', do_preproc)

In [None]:
@jarvis.func
def split(data_df):
    X = data_df[['vendor_id', 'passenger_count', 'pickup_longitude',
        'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude',
       'store_and_fwd_flag', 'pickup_datetime', 'distance']]
    y = data_df['trip_duration']

    from sklearn.model_selection import train_test_split
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 0)

    return X_train, X_test, y_train, y_test


In [None]:
do_split = ex.action(split, [tr_ready])
xTrain = ex.artifact('xTrain.pkl', do_split)
xTest = ex.artifact('xTest.pkl', do_split)
yTrain = ex.artifact('yTrain.pkl', do_split)
yTest = ex.artifact('yTest.pkl', do_split)

In [None]:
@jarvis.func
def train(data_df, trainingy, num_estimators):
    from sklearn.ensemble import RandomForestRegressor

    data_df['duration'] = trainingy

    def roundtime(tstring):
        hours, mins, secs = tstring.split(':')
        if int(mins) >= 30:
            if hours == '23':
                return '00'
            else:
                return str(int(hours) + 1)
        else:
            return hours

    def weekday(start):
        from datetime import datetime
        fmt = '%Y-%m-%d %H:%M:%S'
        tstamp = datetime.strptime(start, fmt)
        return int(tstamp.weekday())

    data_df['start_hr'] = data_df['pickup_datetime'].apply(lambda x: int(roundtime(x.split(' ')[1])))
    data_df['start_month'] = data_df['pickup_datetime'].apply(lambda x: int(x.split(' ')[0].split('-')[1]))
    data_df['start_weekday']= data_df['pickup_datetime'].apply(lambda x: weekday(x))

    import math

    clf = RandomForestRegressor(n_estimators=20, n_jobs=3)
    clf.fit(data_df[['vendor_id', 'start_hr', 'start_month', 'start_weekday', 'distance', 
                     'pickup_longitude', 'pickup_latitude', 'dropoff_longitude',
                    'dropoff_latitude']].values, data_df['duration'].values )
    return clf

In [None]:
num_est = ex.literal([15, 20, 30], "num_estimators")
num_est.forEach()
do_train = ex.action(train, [xTrain, yTrain, num_est])
model = ex.artifact('model.pkl', do_train)

In [None]:
@jarvis.func
def test(model, data_df, testingy):
    import numpy as np
    data_df['duration'] = testingy
    def roundtime(tstring):
        hours, mins, secs = tstring.split(':')
        if int(mins) >= 30:
            if hours == '23':
                return '00'
            else:
                return str(int(hours) + 1)
        else:
            return hours
    def weekday(start):
        from datetime import datetime
        fmt = '%Y-%m-%d %H:%M:%S'
        tstamp = datetime.strptime(start, fmt)
        return int(tstamp.weekday())

    data_df['start_hr'] = data_df['pickup_datetime'].apply(lambda x: int(roundtime(x.split(' ')[1])))
    data_df['start_month'] = data_df['pickup_datetime'].apply(lambda x: int(x.split(' ')[0].split('-')[1]))
    data_df['start_weekday']= data_df['pickup_datetime'].apply(lambda x: weekday(x))


    preds = model.predict(data_df[['vendor_id', 'start_hr', 'start_month', 'start_weekday', 'distance', 
                     'pickup_longitude', 'pickup_latitude', 'dropoff_longitude',
                    'dropoff_latitude']].values)
    from sklearn import metrics
    import math
    score = metrics.explained_variance_score(data_df['duration'].values, preds)
    rmse = np.sqrt(metrics.mean_squared_error(data_df['duration'].values, preds))
    return str(score), str(rmse)

In [None]:
do_test = ex.action(test, [model, xTest, yTest])
score = ex.artifact('score.txt', do_test)
rmse = ex.artifact('rmse.txt', do_test)
ex.__exit__()

In [None]:
score.plot()