# `vaex` @ PyData Budapest 2020

## Machine Learning Example - Predict the duration of taxi trips

To find out more details check out
[ML impossible: Train 1 billion samples in 5 minutes on your laptop using Vaex and Scikit-Learn](https://towardsdatascience.com/ml-impossible-train-a-1-billion-sample-model-in-20-minutes-with-vaex-and-scikit-learn-on-your-9e2968e6f385).

Running this notebooks requires `vaex==3.0.0`

In [None]:
import vaex
vaex.multithreading.thread_count_default = 8

import pylab as plt
import numpy as np

import warnings; warnings.simplefilter('ignore')

### Initial step: reading the data and do a train/test split immediately

In [None]:
!du -h /data/taxi/*

In [None]:
df = vaex.open('/data/taxi/yellow_taxi_2012.hdf5')

# Train / test split (by date)
df_train, df_test = df.ml.train_test_split(test_size=0.15)

print(f'Number of samples in the full dataset: {len(df):,}')
print(f'Number of samples in the training set: {len(df_train):,}')
print(f'Number of samples in the test set:       {len(df_test):,}')

# Check if the lengths of the datasets match
assert len(df) == len(df_test) + len(df_train)

### Define the label

In [None]:
# Time in transit (minutes) - This is the target variable
df_train['trip_duration_min'] = (df_train.dropoff_datetime - df_train.pickup_datetime) / \
                                   np.timedelta64(1, 'm')

### Feature engineering

In [None]:
# Speed (miles per hour) - Used for cleaning of the training data
df_train['trip_speed_mph'] = df_train.trip_distance / \
                                ((df_train.dropoff_datetime - df_train.pickup_datetime) / \
                                np.timedelta64(1, 'h'))

In [None]:
# Pickup datetime features
df_train['pickup_time'] = df_train.pickup_datetime.dt.hour + df_train.pickup_datetime.dt.minute / 60.
df_train['pickup_day'] = df_train.pickup_datetime.dt.dayofweek
df_train['pickup_is_weekend'] = (df_train.pickup_day>=5).astype('int')

In [None]:
# Arc distance  in miles
def arc_distance(theta_1, phi_1, theta_2, phi_2):
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    distance = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return distance * 3958.8

# Create the feature
df_train['arc_distance'] = arc_distance(df_train.pickup_longitude, 
                                        df_train.pickup_latitude, 
                                        df_train.dropoff_longitude, 
                                        df_train.dropoff_latitude).jit_numba()

In [None]:
def direction_angle(theta_1, phi_1, theta_2, phi_2):
    dtheta = theta_2 - theta_1
    dphi = phi_2 - phi_1
    radians = np.arctan2(dtheta, dphi)
    return np.rad2deg(radians)

# Create the feature
df_train['direction_angle'] = direction_angle(df_train.pickup_longitude, 
                                              df_train.pickup_latitude, 
                                              df_train.dropoff_longitude, 
                                              df_train.dropoff_latitude).jit_numba()

### Data cleaning

In [None]:
# Filter abnormal number of passengers
df_train = df_train[(df_train.passenger_count>0) & (df_train.passenger_count<7)]

# Select taxi trips have travelled maximum 7 miles (but also with non-zero distance).
df_train = df_train[(df_train.trip_distance > 0) & (df_train.trip_distance < 7)]

# Filter taxi trips that have durations longer than 25 minutes or that lasted less than 3 minutes
df_train = df_train[(df_train.trip_duration_min > 3) & (df_train.trip_duration_min < 25)]

# Filter out errouneous average trip speeds.
df_train = df_train[(df_train.trip_speed_mph > 1) & (df_train.trip_speed_mph < 60)]

# Define the NYC boundaries
long_min = -74.05
long_max = -73.75
lat_min = 40.58
lat_max = 40.90

# Make a selection based on the boundaries
df_train = df_train[(df_train.pickup_longitude > long_min)  & (df_train.pickup_longitude < long_max) & \
                    (df_train.pickup_latitude > lat_min)    & (df_train.pickup_latitude < lat_max) & \
                    (df_train.dropoff_longitude > long_min) & (df_train.dropoff_longitude < long_max) & \
                    (df_train.dropoff_latitude > lat_min)   & (df_train.dropoff_latitude < lat_max)]

# If there are unknown (N/A) pick-up or drop-off locations, choose a representative value. 
df_train['dropoff_latitude'] = df_train.dropoff_latitude.fillna(value=40.76)
df_train['pickup_latitude'] = df_train.pickup_latitude.fillna(value=40.76)

df_train['dropoff_longitude'] = df_train.dropoff_longitude.fillna(value=-73.99)
df_train['pickup_longitude'] = df_train.pickup_longitude.fillna(value=-73.99)

## `vaex-ml`

A `vaex` package for machine learning. 

Implements various data transformers:
 - numerical scalers
 - categorical encoders
 - PCA transformer
 - GroupBy transformers
 - more coming soon
 
Wrappers around other popular model libraries
 - xgboost / lightgbm / catboost
 - scikit-learn
 - tensorflow / keras (coming soon!)
 
[We are working on better integration between scikit-learn and vaex](https://github.com/scikit-learn/scikit-learn/pull/14963).

scikit-learn PR #14963

In [None]:
import vaex.ml

### Transform features: PCA of the pick-up and drop-off locations

In [None]:
# PCA of the pickup and dropoff locations - helps to "straighten out" the coordinates

# pickup transformations
pca_pu = vaex.ml.PCA(features=['pickup_longitude', 'pickup_latitude'], n_components=2, progress=True)
df_train = pca_pu.fit_transform(df_train)

# dropoff transformations
pca_do = vaex.ml.PCA(features=['dropoff_longitude', 'dropoff_latitude'], n_components=2, progress=True)
df_train = pca_do.fit_transform(df_train)

Preview the new columns (PCA transformations)

In [None]:
df_train

In [None]:
plt.figure(figsize=(14, 5))

plt.subplot(121)
plt.title('pickup - original')
df_train.plot(df_train.pickup_longitude, df_train.pickup_latitude,
           colormap='plasma', f='log1p', shape=256, colorbar=False)

plt.subplot(122)
plt.title('pickup - PCA transformed')
df_train.plot(df_train.PCA_0, df_train.PCA_1,
           colormap='plasma', f='log1p', shape=256, colorbar=False)

plt.tight_layout()
plt.show()

### Handling temporal (cyclical) features

- Assume the temporal feature is the θ coordinate of a unit circle in polar coordinates. Conver it to Cartesian (x,y) coordinates. This preserves the continuity (12 o'clock is close to 1 o'clock).

In [None]:
# Time
cycl_transform_time = vaex.ml.CycleTransformer(features=['pickup_time'], n=24)
df_train = cycl_transform_time.fit_transform(df_train)

# Day
cycl_transform_day = vaex.ml.CycleTransformer(features=['pickup_day'], n=7)
df_train = cycl_transform_day.fit_transform(df_train)

# Direction angle
cycl_transform_angle = vaex.ml.CycleTransformer(features=['direction_angle'], n=360)
df_train = cycl_transform_angle.fit_transform(df_train)

In [None]:
# Let's see how the transformed date would look like
df_train.plot(x='pickup_time_x', y='pickup_time_y',
              shape=128, limits=[-1, 1],
              figsize=(5, 5),
              colorbar=False)
plt.show()

### Scaling of numerical features

In [None]:
# Standard scaling of numerical features
standard_scaler = vaex.ml.StandardScaler(features=['arc_distance'])
df_train = standard_scaler.fit_transform(df_train)

### Preview the training features

In [None]:
# Select all the features to be used for training the model
features = df_train.get_column_names(regex='PCA*') + \
           df_train.get_column_names(regex='standard_scaled_*') + \
           df_train.get_column_names(regex='.*_x') + \
           df_train.get_column_names(regex='.*_y') + \
           ['pickup_is_weekend']

# Preview the features
df_train.head(10)[features]

### Set the target variable

In [None]:
target = 'trip_duration_min'

### Train a machine learning model

In [None]:
from sklearn.linear_model import SGDRegressor
from vaex.ml.sklearn import IncrementalPredictor

# Define the base model
model = SGDRegressor(learning_rate='constant', eta0=0.0001)

# The Vaex incremental model wrapper
vaex_model = IncrementalPredictor(features=features,
                                  target=target,
                                  model=model,
                                  batch_size=11_000_000, 
                                  num_epochs=1, 
                                  shuffle=False, 
                                  prediction_name='predicted_duration_min')

# Fit the model
vaex_model.fit(df=df_train, progress='widget')

### Let's see the predictions!

In [None]:
# Standard in-memory predict
vaex_model.predict(df_train)

Vaex makes models transformers!

In [None]:
df_train = vaex_model.transform(df_train)
# See a portion of the predictions
df_train.head(5)['trip_duration_min', 'predicted_duration_min']

The prediction is an expression! Opprotunities for post-processing, ensembles etc..

Values lower than 3 minutes are set to 3; values higher than 25 minutes are set to 25

In [None]:
df_train['pred_final'] = df_train.predicted_duration_min.clip(3, 25)

## But.. what about the test set?!

### State transfer (a.k.a the `vaex` automatic pipeline)

The operations done on the data are recorded in the `state` of the DataFrame.

In [None]:
df_train.state_get()

Write the state to disk (serializes the operations and model).

In [None]:
df_train.state_write('./taxi_ml_state.json')