# NYC Taxi: Predict Trip Duration

Below is a pipeline that predicts the trip duration from attributes such as pickup location, distance traveled, time of day, and so on. This pipeline is a typical ML workflow written in Python, using Scikit-learn (RandomForrest). We see pre-processing, training, and testing steps. 

In [1]:
# https://www.kaggle.com/stephaniestallworth/nyc-taxi-eda-regression-fivethirtyeight-viz/notebook
import math
import numpy as np
import pandas as pd
from sklearn import metrics
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split

def manhattan_distance(x1, y1, x2, y2):
    return abs(x1 - x2) + abs(y1 - y2)

def roundtime(tstring):
    hours, mins, secs = tstring.split(':')
    if int(mins) >= 30:
        if hours == '23':
            return '00'
        else:
            return str(int(hours) + 1)
    else:
        return hours

def weekday(start):
    from datetime import datetime
    fmt = '%Y-%m-%d %H:%M:%S'
    tstamp = datetime.strptime(start, fmt)
    return int(tstamp.weekday())

data_df = pd.read_csv('train.csv')

data_df['distance'] = [i for i in map(manhattan_distance,
    data_df['pickup_longitude'], data_df['pickup_latitude'], 
    data_df['dropoff_longitude'], data_df['dropoff_latitude'])]

# Remove outliers in passenger_count
data_df = data_df[data_df['passenger_count']>0]
data_df = data_df[data_df['passenger_count']<9]

# Remove coordinate outliers
data_df = data_df[data_df['pickup_longitude'] <= -73.75]
data_df = data_df[data_df['pickup_longitude'] >= -74.03]
data_df = data_df[data_df['pickup_latitude'] <= 40.85]
data_df = data_df[data_df['pickup_latitude'] >= 40.63]
data_df = data_df[data_df['dropoff_longitude'] <= -73.75]
data_df = data_df[data_df['dropoff_longitude'] >= -74.03]
data_df = data_df[data_df['dropoff_latitude'] <= 40.85]
data_df = data_df[data_df['dropoff_latitude'] >= 40.63]

# Remove trip_duration outliers
trip_duration_mean = np.mean(data_df['trip_duration'])
trip_duration_std = np.std(data_df['trip_duration'])
data_df = data_df[data_df['trip_duration'] <= trip_duration_mean + 2*trip_duration_std]
data_df = data_df[data_df['trip_duration'] >= trip_duration_mean - 2*trip_duration_std]
data_df = data_df[data_df['trip_duration'] >= 30]
data_df = data_df[data_df['trip_duration'] <= 60*240]

data_df['start_hr'] = data_df['pickup_datetime'].apply(lambda x: int(roundtime(x.split(' ')[1])))
data_df['start_month'] = data_df['pickup_datetime'].apply(lambda x: int(x.split(' ')[0].split('-')[1]))
data_df['start_weekday']= data_df['pickup_datetime'].apply(lambda x: weekday(x))

X = data_df[['vendor_id', 'pickup_longitude',
            'pickup_latitude', 'dropoff_longitude', 
            'dropoff_latitude', 'distance',
            'start_hr', 'start_month', 'start_weekday']]
y = data_df['trip_duration']

X_train, X_test, y_train, y_test = train_test_split(X, 
    y, test_size = 0.2, random_state = 0)

clf = RandomForestRegressor(n_estimators=20, n_jobs=3)
clf.fit(X_train, y_train)

preds = clf.predict(X_test)
score = metrics.explained_variance_score(y_test, preds)
rmse = np.sqrt(metrics.mean_squared_error(y_test, preds))

print("R2: {}".format(score))
print("RMSE: {}".format(rmse))

R2: 0.7793251793702737
RMSE: 303.39793188582206


## Coarsest Flor Pipeline: Wrap an existing pipeline

We expect many of our users to already have pipelines that they built and refined over time. We expect our users to trust these pipelines, and these pipelines to be effective. For such users, we add value by versioning the artifacts the user wants us to track, as well as recording the relationships between some objects at a corase level. To wrap an existing pipeline in Flor, simply put the pipeline code inside a decorated function, and tell us what the inputs and outputs of the function are. Please see example below.

In [1]:
import flor

# Create a context manager for the experiment and is named 'coarsest'
with flor.Experiment('coarsest') as ex:
    
    #If the notebook name has not already been set, you are able to set the name in code. 
    flor.setNotebookName('Taxi.ipynb')
    
    #Initialize the ground client for a particular experiment
    ex.groundClient('ground')
    
    #Wrap existing functions with the @flor.func decorator so they are able to be referenced by Flor actions
    @flor.func
    def run_existing_pipeline(path_to_data):
        import math
        import numpy as np
        import pandas as pd
        from sklearn import metrics
        from sklearn.ensemble import RandomForestRegressor
        from sklearn.model_selection import train_test_split
    
        def manhattan_distance(x1, y1, x2, y2):
            return abs(x1 - x2) + abs(y1 - y2)

        def roundtime(tstring):
            hours, mins, secs = tstring.split(':')
            if int(mins) >= 30:
                if hours == '23':
                    return '00'
                else:
                    return str(int(hours) + 1)
            else:
                return hours

        def weekday(start):
            from datetime import datetime
            fmt = '%Y-%m-%d %H:%M:%S'
            tstamp = datetime.strptime(start, fmt)
            return int(tstamp.weekday())

        data_df = pd.read_csv(path_to_data)

        data_df['distance'] = [i for i in map(manhattan_distance,
            data_df['pickup_longitude'], data_df['pickup_latitude'], 
            data_df['dropoff_longitude'], data_df['dropoff_latitude'])]

        # Remove outliers in passenger_count
        data_df = data_df[data_df['passenger_count']>0]
        data_df = data_df[data_df['passenger_count']<9]

        # Remove coordinate outliers
        data_df = data_df[data_df['pickup_longitude'] <= -73.75]
        data_df = data_df[data_df['pickup_longitude'] >= -74.03]
        data_df = data_df[data_df['pickup_latitude'] <= 40.85]
        data_df = data_df[data_df['pickup_latitude'] >= 40.63]
        data_df = data_df[data_df['dropoff_longitude'] <= -73.75]
        data_df = data_df[data_df['dropoff_longitude'] >= -74.03]
        data_df = data_df[data_df['dropoff_latitude'] <= 40.85]
        data_df = data_df[data_df['dropoff_latitude'] >= 40.63]

        # Remove trip_duration outliers
        trip_duration_mean = np.mean(data_df['trip_duration'])
        trip_duration_std = np.std(data_df['trip_duration'])
        data_df = data_df[data_df['trip_duration'] <= trip_duration_mean + 2*trip_duration_std]
        data_df = data_df[data_df['trip_duration'] >= trip_duration_mean - 2*trip_duration_std]
        data_df = data_df[data_df['trip_duration'] >= 30]
        data_df = data_df[data_df['trip_duration'] <= 60*240]

        data_df['start_hr'] = data_df['pickup_datetime'].apply(lambda x: int(roundtime(x.split(' ')[1])))
        data_df['start_month'] = data_df['pickup_datetime'].apply(lambda x: int(x.split(' ')[0].split('-')[1]))
        data_df['start_weekday']= data_df['pickup_datetime'].apply(lambda x: weekday(x))

        X = data_df[['vendor_id', 'pickup_longitude',
                    'pickup_latitude', 'dropoff_longitude', 
                    'dropoff_latitude', 'distance',
                    'start_hr', 'start_month', 'start_weekday']]
        y = data_df['trip_duration']

        X_train, X_test, y_train, y_test = train_test_split(X, 
            y, test_size = 0.2, random_state = 0)

        clf = RandomForestRegressor(n_estimators=20, n_jobs=3)
        clf.fit(X_train, y_train)

        preds = clf.predict(X_test)
        score = metrics.explained_variance_score(y_test, preds)
        rmse = np.sqrt(metrics.mean_squared_error(y_test, preds))

        score = "R2: {}".format(score)
        rmse = "RMSE: {}".format(rmse)
        
        print(score, rmse)
    
        return clf, score, rmse
    
    #Create a Flor artifact that corresponds to the input 'train.csv'
    data = ex.artifact('train.csv')
    
    #Create a Flor action that maps the input parameters of 'data' to the function 'run_existing_pipeline'
    do_all = ex.action(run_existing_pipeline, [data])
    
    #Create multiple Flor artifacts that correspond to the outputs of the function specified in 'do_all' 
    model = ex.artifact('model.pkl', do_all)
    score = ex.artifact('score.txt', do_all)
    rmse = ex.artifact('rmse.txt', do_all)

In [2]:
#plot() creates a graph representation of the lineage to a particular artifact
score.plot()

[<flor.viz.VizNode object at 0x7f97bc1f5588>, <flor.viz.VizNode object at 0x7f97bc1f5a90>, <flor.viz.VizNode object at 0x7f97bc1f5860>]


In [None]:
score.pull()


R2: 0.7798383654104799 RMSE: 303.04352147995013
{<flor.object_model.artifact.Artifact object at 0x7fc85fc8bc18>, <flor.object_model.artifact.Artifact object at 0x7fc85fc7f438>, <flor.object_model.artifact.Artifact object at 0x7fc85fc8bc50>, <flor.object_model.action.Action object at 0x7fc85fc7f320>}
score.txt
model.pkl
rmse.txt


## Coarse Flor Pipeline with Hyper-parameter Sweeps

Maybe after some time, a trusted and effective pipeline may need some tuning. We enable pipeline (and model) tuning via Flor Literals. A user can declare a Literal, with some iterable value or any Python basic type. When a literal is passed to an Action, the experiment may become a multi-trial experiment. Flor is able to execute every trial with some optimizations, and relate the experiment outcomes to the experiment configuration.

In [4]:
import flor

# Create a context manager for the experiment and is named 'coarsest'
with flor.Experiment('coarse') as ex:
    import math
    import numpy as np
    import pandas as pd
    from sklearn import metrics
    from sklearn.ensemble import RandomForestRegressor
    from sklearn.model_selection import train_test_split
    
    #If the notebook name has not already been set, you are able to set the name in code. 
    flor.setNotebookName('Taxi.ipynb')
    
    #Initialize the ground client for a particular experiment
    ex.groundClient('ground')
    
    #Wrap existing functions with the @flor.func decorator so they are able to be referenced by Flor actions
    #The decorated function now has an additional parmeter of 'n_estimators'
    @flor.func
    def run_existing_pipeline(path_to_data, n_estimators):
    
        def manhattan_distance(x1, y1, x2, y2):
            return abs(x1 - x2) + abs(y1 - y2)

        def roundtime(tstring):
            hours, mins, secs = tstring.split(':')
            if int(mins) >= 30:
                if hours == '23':
                    return '00'
                else:
                    return str(int(hours) + 1)
            else:
                return hours

        def weekday(start):
            from datetime import datetime
            fmt = '%Y-%m-%d %H:%M:%S'
            tstamp = datetime.strptime(start, fmt)
            return int(tstamp.weekday())

        data_df = pd.read_csv(path_to_data)

        data_df['distance'] = [i for i in map(manhattan_distance,
            data_df['pickup_longitude'], data_df['pickup_latitude'], 
            data_df['dropoff_longitude'], data_df['dropoff_latitude'])]

        # Remove outliers in passenger_count
        data_df = data_df[data_df['passenger_count']>0]
        data_df = data_df[data_df['passenger_count']<9]

        # Remove coordinate outliers
        data_df = data_df[data_df['pickup_longitude'] <= -73.75]
        data_df = data_df[data_df['pickup_longitude'] >= -74.03]
        data_df = data_df[data_df['pickup_latitude'] <= 40.85]
        data_df = data_df[data_df['pickup_latitude'] >= 40.63]
        data_df = data_df[data_df['dropoff_longitude'] <= -73.75]
        data_df = data_df[data_df['dropoff_longitude'] >= -74.03]
        data_df = data_df[data_df['dropoff_latitude'] <= 40.85]
        data_df = data_df[data_df['dropoff_latitude'] >= 40.63]

        # Remove trip_duration outliers
        trip_duration_mean = np.mean(data_df['trip_duration'])
        trip_duration_std = np.std(data_df['trip_duration'])
        data_df = data_df[data_df['trip_duration'] <= trip_duration_mean + 2*trip_duration_std]
        data_df = data_df[data_df['trip_duration'] >= trip_duration_mean - 2*trip_duration_std]
        data_df = data_df[data_df['trip_duration'] >= 30]
        data_df = data_df[data_df['trip_duration'] <= 60*240]

        data_df['start_hr'] = data_df['pickup_datetime'].apply(lambda x: int(roundtime(x.split(' ')[1])))
        data_df['start_month'] = data_df['pickup_datetime'].apply(lambda x: int(x.split(' ')[0].split('-')[1]))
        data_df['start_weekday']= data_df['pickup_datetime'].apply(lambda x: weekday(x))

        X = data_df[['vendor_id', 'pickup_longitude',
                    'pickup_latitude', 'dropoff_longitude', 
                    'dropoff_latitude', 'distance',
                    'start_hr', 'start_month', 'start_weekday']]
        y = data_df['trip_duration']

        X_train, X_test, y_train, y_test = train_test_split(X, 
            y, test_size = 0.2, random_state = 0)

        clf = RandomForestRegressor(n_estimators=n_estimators, n_jobs=3)
        clf.fit(X_train, y_train)

        preds = clf.predict(X_test)
        score = metrics.explained_variance_score(y_test, preds)
        rmse = np.sqrt(metrics.mean_squared_error(y_test, preds))

        score = "R2: {}".format(score)
        rmse = "RMSE: {}".format(rmse)
        
        print(score, rmse)
    
        return clf, score, rmse
    
    #Create a Flor artifact that corresponds to the input 'train.csv'
    data = ex.artifact('train.csv')
    
    #Create a Flor literal that corresponds to the array specified by the first parameter
    num_est = ex.literal([15, 20, 30], 'num_estimators')
    
    #Changes the configuration of the Flor literal to be evaluated element-wise of the array specified above
#     num_est.forEach()
    
    #Create a Flor action that maps the input parameters of 'data' and 'num_est' 
    #to the function 'run_existing_pipeline'
    do_all = ex.action(run_existing_pipeline, [data, num_est])
    
    #Create multiple Flor artifacts that correspond to the outputs of the function specified in 'do_all' 
    model = ex.artifact('model.pkl', do_all)
    score = ex.artifact('score.txt', do_all)
    rmse = ex.artifact('rmse.txt', do_all)

In [5]:
#plot() creates a graph representation of the lineage to a particular artifact
score.plot()

## Fine Flor Pipeline: Reuse Intermediate Artifacts & Compose Experiments

Perhaps our user will observe that the preprocessing step produces a result that can be shared with the entire organization; alternatively, it's possible that the pre-processing step is expensive, and the user will not want to re-run this step every time that the system runs the pipeline with a new configuration (caching intermediate shared results). A subgraph in the Flor pipeline may be its own experiment, or part of a parent experiment. When the Flor pipeline is shared with others (as opposed to merely sharing the end results), it's possible for other users or customers of the data to understand the lineage of the results: what transformations were applied, and what the source was, etc. We include an example below for how a user might separete the preprocessing step from the remaining pipeline, so that the subpipeline may be more easily shared. 

In [6]:
import math
import numpy as np
import pandas as pd
from sklearn import metrics
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split

import flor

# Create a context manager for the experiment and is named 'fine'
with flor.Experiment('fine') as ex:
    
    #If the notebook name has not already been set, you are able to set the name in code. 
    flor.setNotebookName('Taxi.ipynb')
    
    #Initialize the ground client for a particular experiment
    ex.groundClient('ground')
    
    #Wrap existing functions with the @flor.func decorator so they are able to be referenced by Flor actions
    @flor.func
    def prepare_data(path_to_data):
    
        def manhattan_distance(x1, y1, x2, y2):
            return abs(x1 - x2) + abs(y1 - y2)

        def roundtime(tstring):
            hours, mins, secs = tstring.split(':')
            if int(mins) >= 30:
                if hours == '23':
                    return '00'
                else:
                    return str(int(hours) + 1)
            else:
                return hours

        def weekday(start):
            from datetime import datetime
            fmt = '%Y-%m-%d %H:%M:%S'
            tstamp = datetime.strptime(start, fmt)
            return int(tstamp.weekday())

        data_df = pd.read_csv(path_to_data)

        data_df['distance'] = [i for i in map(manhattan_distance,
            data_df['pickup_longitude'], data_df['pickup_latitude'], 
            data_df['dropoff_longitude'], data_df['dropoff_latitude'])]

        # Remove outliers in passenger_count
        data_df = data_df[data_df['passenger_count']>0]
        data_df = data_df[data_df['passenger_count']<9]

        # Remove coordinate outliers
        data_df = data_df[data_df['pickup_longitude'] <= -73.75]
        data_df = data_df[data_df['pickup_longitude'] >= -74.03]
        data_df = data_df[data_df['pickup_latitude'] <= 40.85]
        data_df = data_df[data_df['pickup_latitude'] >= 40.63]
        data_df = data_df[data_df['dropoff_longitude'] <= -73.75]
        data_df = data_df[data_df['dropoff_longitude'] >= -74.03]
        data_df = data_df[data_df['dropoff_latitude'] <= 40.85]
        data_df = data_df[data_df['dropoff_latitude'] >= 40.63]

        # Remove trip_duration outliers
        trip_duration_mean = np.mean(data_df['trip_duration'])
        trip_duration_std = np.std(data_df['trip_duration'])
        data_df = data_df[data_df['trip_duration'] <= trip_duration_mean + 2*trip_duration_std]
        data_df = data_df[data_df['trip_duration'] >= trip_duration_mean - 2*trip_duration_std]
        data_df = data_df[data_df['trip_duration'] >= 30]
        data_df = data_df[data_df['trip_duration'] <= 60*240]

        data_df['start_hr'] = data_df['pickup_datetime'].apply(lambda x: int(roundtime(x.split(' ')[1])))
        data_df['start_month'] = data_df['pickup_datetime'].apply(lambda x: int(x.split(' ')[0].split('-')[1]))
        data_df['start_weekday']= data_df['pickup_datetime'].apply(lambda x: weekday(x))

        return data_df
    
    #Wrap existing functions with the @flor.func decorator so they are able to be referenced by Flor actions
    @flor.func
    def train_test_model(data_df, n_estimators):
        X = data_df[['vendor_id', 'pickup_longitude',
                    'pickup_latitude', 'dropoff_longitude', 
                    'dropoff_latitude', 'distance',
                    'start_hr', 'start_month', 'start_weekday']]
        y = data_df['trip_duration']

        X_train, X_test, y_train, y_test = train_test_split(X, 
            y, test_size = 0.2, random_state = 0)

        clf = RandomForestRegressor(n_estimators=n_estimators, n_jobs=3)
        clf.fit(X_train, y_train)

        preds = clf.predict(X_test)
        score = metrics.explained_variance_score(y_test, preds)
        rmse = np.sqrt(metrics.mean_squared_error(y_test, preds))

        score = "R2: {}".format(score)
        rmse = "RMSE: {}".format(rmse)
        
        print(score, rmse)
    
        return clf, score, rmse
    
    
    #Create a Flor artifact that corresponds to the input 'train.csv'
    data = ex.artifact('train.csv')
    
    #Create a Flor literal that corresponds to the array specified by the first parameter
    num_est = ex.literal([15, 20, 30], 'num_estimators')
    
    #Changes the configuration of the Flor literal to be evaluated element-wise of the array specified above
    num_est.forEach()
    
    #Create a Flor action that maps the input parameters of 'data' to the function 'prepare_data'
    do_prep = ex.action(prepare_data, [data])
    
    #Creates a Flor artifact that corresponds to the output of 'do_prep' 
    #and is stored in the file 'prepped_data.pkl'
    prepd = ex.artifact('prepped_data.pkl', do_prep)
    
    #Create a Flor action that maps the input parameters of 'prepd' and 'num_est' 
    #to the function 'train_test_model'
    do_tr_te = ex.action(train_test_model, [prepd, num_est])

    #Create multiple Flor artifacts that correspond to the outputs of the function specified in 'do_tr_te' 
    model = ex.artifact('model.pkl', do_tr_te)
    score = ex.artifact('score.txt', do_tr_te)
    rmse = ex.artifact('rmse.txt', do_tr_te)

In [7]:
#plot() creates a graph representation of the lineage to a particular artifact
score.plot()

## Finest Pipeline: Flor Proper

After becoming familiar with Flor, it is likely that our user will want to build her next pipeline using Flor from the start: this was the use-case we envisioed at first. In addition to gaining the benefits from coarser Flor pipelines, the user will be given a set of tools for pipeline development in an interactive environment such as Jupyter. We are currently investigating techniques for detecting poor experiment methods, such as p-hacking or data dredging. Whenever a user "peeks" a Flor artifact, the event is recorded. We plan to use this information to infer when information from the test data may have been compromised and contaminated the analysis, and when it's likely that the user has overfitted to the data, and it's time to collect a fresh sample.

In [8]:
import flor

In [9]:
#If the notebook name has not already been set, you are able to set the name in code. 
flor.setNotebookName('Taxi.ipynb')

In [10]:
# Create a context manager for the experiment and is named 'finest'
ex = flor.Experiment('finest')

In [11]:
#Initialize the ground client for a particular experiment
ex.groundClient('ground')

In [12]:
#Create a Flor artifact that corresponds to the input 'train.csv'
data = ex.artifact('train.csv')

In [13]:
#Wrap existing functions with the @flor.func decorator so they are able to be referenced by Flor actions
@flor.func
def dataframize(csvpath):
    import pandas as pd
    return pd.read_csv(csvpath)

In [14]:
#Create a Flor action that maps the input parameters of 'data' to the function 'dataframize'
do_dfize = ex.action(dataframize, [data])

#Creates a Flor artifact that corresponds to the output of 'do_dfize' 
#and is stored in the file 'train_df.pkl'
tr_data_df = ex.artifact('train_df.pkl', do_dfize)

Below is an example of *peek*

In [15]:
#Peek executes the subgraph necessary to produce the desired artifact, and preview it, 
#recording the event that the user viewed the Artifact.
tr_data_df.peek()

Unnamed: 0,id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration
0,id2875421,2,2016-03-14 17:24:55,2016-03-14 17:32:30,1,-73.982155,40.767937,-73.964630,40.765602,N,455
1,id2377394,1,2016-06-12 00:43:35,2016-06-12 00:54:38,1,-73.980415,40.738564,-73.999481,40.731152,N,663
2,id3858529,2,2016-01-19 11:35:24,2016-01-19 12:10:48,1,-73.979027,40.763939,-74.005333,40.710087,N,2124
3,id3504673,2,2016-04-06 19:32:31,2016-04-06 19:39:40,1,-74.010040,40.719971,-74.012268,40.706718,N,429
4,id2181028,2,2016-03-26 13:30:55,2016-03-26 13:38:10,1,-73.973053,40.793209,-73.972923,40.782520,N,435
5,id0801584,2,2016-01-30 22:01:40,2016-01-30 22:09:03,6,-73.982857,40.742195,-73.992081,40.749184,N,443
6,id1813257,1,2016-06-17 22:34:59,2016-06-17 22:40:40,4,-73.969017,40.757839,-73.957405,40.765896,N,341
7,id1324603,2,2016-05-21 07:54:58,2016-05-21 08:20:49,1,-73.969276,40.797779,-73.922470,40.760559,N,1551
8,id1301050,1,2016-05-27 23:12:23,2016-05-27 23:16:38,1,-73.999481,40.738400,-73.985786,40.732815,N,255
9,id0012891,2,2016-03-10 21:45:01,2016-03-10 22:05:26,1,-73.981049,40.744339,-73.973000,40.789989,N,1225


In [16]:
#Wrap existing functions with the @flor.func decorator so they are able to be referenced by Flor actions
@flor.func
def calculate_distance(data_df):
    def manhattan_distance(x1, y1, x2, y2):
        return abs(x1 - x2) + abs(y1 - y2)
    data_df['distance'] = [ i for i in map(manhattan_distance, data_df['pickup_longitude'], data_df['pickup_latitude'], 
                                           data_df['dropoff_longitude'], data_df['dropoff_latitude'])]
    return data_df

In [17]:
#Create a Flor action that maps the input parameters of 'tr_data_df' to the function 'calculate_distance'
do_calc_dist = ex.action(calculate_distance, [tr_data_df])

#Creates a Flor artifact that corresponds to the output of 'do_calc_dist' 
#and is stored in the file 'train_dist_df.pkl'
tr_data_dist_df = ex.artifact('train_dist_df.pkl', do_calc_dist)

In [18]:
#Wrap existing functions with the @flor.func decorator so they are able to be referenced by Flor actions
@flor.func
def preproc(train_data):
    import numpy as np
    # https://www.kaggle.com/stephaniestallworth/nyc-taxi-eda-regression-fivethirtyeight-viz/notebook
    train_data = train_data[train_data['passenger_count']>0]
    train_data = train_data[train_data['passenger_count']<9]

    # Remove coordinate outliers
    train_data = train_data[train_data['pickup_longitude'] <= -73.75]
    train_data = train_data[train_data['pickup_longitude'] >= -74.03]
    train_data = train_data[train_data['pickup_latitude'] <= 40.85]
    train_data = train_data[train_data['pickup_latitude'] >= 40.63]
    train_data = train_data[train_data['dropoff_longitude'] <= -73.75]
    train_data = train_data[train_data['dropoff_longitude'] >= -74.03]
    train_data = train_data[train_data['dropoff_latitude'] <= 40.85]
    train_data = train_data[train_data['dropoff_latitude'] >= 40.63]

    # Remove trip_duration outliers
    trip_duration_mean = np.mean(train_data['trip_duration'])
    trip_duration_std = np.std(train_data['trip_duration'])
    train_data = train_data[train_data['trip_duration']<=trip_duration_mean + 2*trip_duration_std]
    train_data = train_data[train_data['trip_duration']>= trip_duration_mean - 2*trip_duration_std]
    train_data = train_data[train_data['trip_duration'] >= 30]
    train_data = train_data[train_data['trip_duration'] <= 60*240]

    return train_data

In [19]:
#Create a Flor action that maps the input parameters of 'tr_data_dist_df' to the function 'preproc'
do_preproc = ex.action(preproc, [tr_data_dist_df])

#Creates a Flor artifact that corresponds to the output of 'do_preproc' 
#and is stored in the file 'train_ready.pkl'
tr_ready = ex.artifact('train_ready.pkl', do_preproc)

In [20]:
#Wrap existing functions with the @flor.func decorator so they are able to be referenced by Flor actions
@flor.func
def split(data_df):
    X = data_df[['vendor_id', 'passenger_count', 'pickup_longitude',
        'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude',
       'store_and_fwd_flag', 'pickup_datetime', 'distance']]
    y = data_df['trip_duration']

    from sklearn.model_selection import train_test_split
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, random_state = 0)

    return X_train, X_test, y_train, y_test


In [21]:
#Create a Flor action that maps the input parameters of 'tr_ready' to the function 'split'
do_split = ex.action(split, [tr_ready])

#Create multiple Flor artifacts that correspond to the outputs of the function specified in 'do_split'
xTrain = ex.artifact('xTrain.pkl', do_split)
xTest = ex.artifact('xTest.pkl', do_split)
yTrain = ex.artifact('yTrain.pkl', do_split)
yTest = ex.artifact('yTest.pkl', do_split)

In [22]:
#Wrap existing functions with the @flor.func decorator so they are able to be referenced by Flor actions
@flor.func
def train(data_df, trainingy, num_estimators):
    from sklearn.ensemble import RandomForestRegressor

    data_df['duration'] = trainingy

    def roundtime(tstring):
        hours, mins, secs = tstring.split(':')
        if int(mins) >= 30:
            if hours == '23':
                return '00'
            else:
                return str(int(hours) + 1)
        else:
            return hours

    def weekday(start):
        from datetime import datetime
        fmt = '%Y-%m-%d %H:%M:%S'
        tstamp = datetime.strptime(start, fmt)
        return int(tstamp.weekday())

    data_df['start_hr'] = data_df['pickup_datetime'].apply(lambda x: int(roundtime(x.split(' ')[1])))
    data_df['start_month'] = data_df['pickup_datetime'].apply(lambda x: int(x.split(' ')[0].split('-')[1]))
    data_df['start_weekday']= data_df['pickup_datetime'].apply(lambda x: weekday(x))

    import math

    clf = RandomForestRegressor(n_estimators=20, n_jobs=3)
    clf.fit(data_df[['vendor_id', 'start_hr', 'start_month', 'start_weekday', 'distance', 
                     'pickup_longitude', 'pickup_latitude', 'dropoff_longitude',
                    'dropoff_latitude']].values, data_df['duration'].values )
    return clf

In [23]:
#Create a Flor literal that corresponds to the array specified by the first parameter
num_est = ex.literal([15, 20, 30], 'num_estimators')

#Changes the configuration of the Flor literal to be evaluated element-wise of the array specified above
num_est.forEach()

#Create a Flor action that maps the input parameters of 'xTrain', 'yTrain' 
# and 'num_est' to the function 'train'
do_train = ex.action(train, [xTrain, yTrain, num_est])

#Creates a Flor artifact that corresponds to the output of 'do_train' 
#and is stored in the file 'model.pkl'
model = ex.artifact('model.pkl', do_train)

In [24]:
#Wrap existing functions with the @flor.func decorator so they are able to be referenced by Flor actions
@flor.func
def test(model, data_df, testingy):
    import numpy as np
    data_df['duration'] = testingy
    def roundtime(tstring):
        hours, mins, secs = tstring.split(':')
        if int(mins) >= 30:
            if hours == '23':
                return '00'
            else:
                return str(int(hours) + 1)
        else:
            return hours
    def weekday(start):
        from datetime import datetime
        fmt = '%Y-%m-%d %H:%M:%S'
        tstamp = datetime.strptime(start, fmt)
        return int(tstamp.weekday())

    data_df['start_hr'] = data_df['pickup_datetime'].apply(lambda x: int(roundtime(x.split(' ')[1])))
    data_df['start_month'] = data_df['pickup_datetime'].apply(lambda x: int(x.split(' ')[0].split('-')[1]))
    data_df['start_weekday']= data_df['pickup_datetime'].apply(lambda x: weekday(x))


    preds = model.predict(data_df[['vendor_id', 'start_hr', 'start_month', 'start_weekday', 'distance', 
                     'pickup_longitude', 'pickup_latitude', 'dropoff_longitude',
                    'dropoff_latitude']].values)
    from sklearn import metrics
    import math
    score = metrics.explained_variance_score(data_df['duration'].values, preds)
    rmse = np.sqrt(metrics.mean_squared_error(data_df['duration'].values, preds))
    return str(score), str(rmse)

In [25]:
print(ex.xp_state.versioningDirectory)

/home/eric/flor.d


In [33]:
#Create a Flor action that maps the input parameters of 'xTest', 'yTest' 
# and 'model' to the function 'test'
do_test = ex.action(test, [model, xTest, yTest])

#Create multiple Flor artifacts that correspond to the outputs of the function specified in 'do_test'
score = ex.artifact('score.txt', do_test)
rmse = ex.artifact('rmse.txt', do_test)

#Exit the experiment which commits any changes to Git and Ground
ex.__exit__()

FileNotFoundError: [Errno 2] No such file or directory: 'train.csv'

In [28]:
import os


In [30]:
os.chdir(ex.xp_state.versioningDirectory)

In [32]:
print(os.listdir())

['plate_demo']


In [None]:
#plot() creates a graph representation of the lineage to a particular artifact
score.plot()