## 1. Importing the Libraries

In [1]:
import os
import boto3
import pickle
import warnings

import numpy as np
import pandas as pd
import xgboost as xgb

import sklearn
from sklearn.metrics import r2_score
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics.pairwise import rbf_kernel
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import (
    OneHotEncoder,
    OrdinalEncoder,
    StandardScaler,
    MinMaxScaler,
    PowerTransformer,
    FunctionTransformer
)

from feature_engine.outliers import Winsorizer
from feature_engine.datetime import DatetimeFeatures
from feature_engine.selection import SelectBySingleFeaturePerformance
from feature_engine.encoding import (
    RareLabelEncoder,
    MeanEncoder,
    CountFrequencyEncoder
)

import sagemaker
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
from sagemaker.tuner import (
    IntegerParameter,
    ContinuousParameter,
    HyperparameterTuner
)

sagemaker.config INFO - Not applying SDK defaults from location: /etc/xdg/sagemaker/config.yaml
sagemaker.config INFO - Not applying SDK defaults from location: /home/ec2-user/.config/sagemaker/config.yaml


## 2. Display Settings

In [2]:
pd.set_option("display.max_columns", None)

In [3]:
sklearn.set_config(transform_output="pandas")

In [4]:
warnings.filterwarnings("ignore")

## 3. Reading the Datasets

In [5]:
train = pd.read_csv("train.csv")
train

Unnamed: 0,airline,date_of_journey,source,destination,dep_time,arrival_time,duration,total_stops,additional_info,price
0,Air India,2019-03-21,Delhi,Cochin,17:15:00,19:15:00,1560,2.0,No Info,8245
1,Vistara,2019-05-15,Kolkata,Banglore,07:10:00,10:45:00,1655,1.0,No Info,8452
2,Jet Airways,2019-06-09,Banglore,Delhi,18:55:00,22:00:00,185,0.0,No Info,8016
3,Air India,2019-06-09,Kolkata,Banglore,09:50:00,23:15:00,805,3.0,No Info,14960
4,Spicejet,2019-05-09,Banglore,Delhi,05:55:00,08:35:00,160,0.0,No check-in baggage included,3573
...,...,...,...,...,...,...,...,...,...,...
635,Jet Airways,2019-03-09,Delhi,Cochin,19:15:00,04:25:00,550,1.0,In-flight meal not included,14053
636,Jet Airways,2019-03-27,Mumbai,Hyderabad,02:55:00,04:25:00,90,0.0,In-flight meal not included,1840
637,Multiple Carriers,2019-06-12,Delhi,Cochin,09:45:00,16:10:00,385,1.0,No Info,9646
638,Indigo,2019-04-15,Banglore,Delhi,22:10:00,01:00:00,170,0.0,No Info,3943


In [6]:
val = pd.read_csv("val.csv")
val

Unnamed: 0,airline,date_of_journey,source,destination,dep_time,arrival_time,duration,total_stops,additional_info,price
0,Indigo,2019-03-21,Delhi,Cochin,06:40:00,12:00:00,320,1.0,No Info,5761
1,Spicejet,2019-04-03,Kolkata,Banglore,22:20:00,00:40:00,140,0.0,No Info,3873
2,Indigo,2019-05-09,Banglore,Delhi,01:30:00,04:15:00,165,0.0,No Info,3419
3,Vistara,2019-06-09,Kolkata,Banglore,20:20:00,20:20:00,1440,1.0,No Info,8085
4,Indigo,2019-06-01,Kolkata,Banglore,04:40:00,07:15:00,155,0.0,No Info,4804
...,...,...,...,...,...,...,...,...,...,...
155,Air Asia,2019-04-12,Banglore,Delhi,11:10:00,13:55:00,165,0.0,No Info,4483
156,Jet Airways,2019-03-12,Banglore,New Delhi,08:55:00,10:25:00,1530,1.0,In-flight meal not included,11087
157,Indigo,2019-05-09,Delhi,Cochin,18:35:00,01:30:00,415,1.0,No Info,8542
158,Air India,2019-06-12,Kolkata,Banglore,16:50:00,11:10:00,1100,1.0,No Info,8366


In [7]:
test = pd.read_csv("test.csv")
test

Unnamed: 0,airline,date_of_journey,source,destination,dep_time,arrival_time,duration,total_stops,additional_info,price
0,Indigo,2019-05-06,Banglore,Delhi,07:10:00,10:05:00,175,0.0,No Info,4823
1,Spicejet,2019-04-06,Banglore,Delhi,09:30:00,12:20:00,170,0.0,No check-in baggage included,4319
2,Jet Airways,2019-06-27,Delhi,Cochin,11:30:00,19:00:00,450,1.0,In-flight meal not included,10262
3,Jet Airways,2019-03-24,Kolkata,Banglore,21:10:00,09:15:00,725,1.0,No Info,14231
4,Air India,2019-03-12,Banglore,New Delhi,11:50:00,08:55:00,1265,1.0,No Info,11707
...,...,...,...,...,...,...,...,...,...,...
195,Air Asia,2019-06-03,Kolkata,Banglore,10:20:00,12:55:00,155,0.0,No Info,4409
196,Jet Airways,2019-05-21,Kolkata,Banglore,16:30:00,22:35:00,365,1.0,No Info,14781
197,Jet Airways,2019-06-06,Delhi,Cochin,11:40:00,19:00:00,440,2.0,In-flight meal not included,15812
198,Jet Airways,2019-06-27,Delhi,Cochin,19:45:00,19:00:00,1395,1.0,In-flight meal not included,10262


## 4. Preprocessing Operations

In [8]:
# airline
air_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("grouper", RareLabelEncoder(tol=0.1, replace_with="Other", n_categories=2)),
    ("encoder", OneHotEncoder(sparse_output=False, handle_unknown="ignore"))
])

#doj
feature_to_extract = ["month", "week", "day_of_week", "day_of_year"]

doj_transformer = Pipeline(steps=[
    ("dt", DatetimeFeatures(features_to_extract=feature_to_extract, yearfirst=True, format="mixed")),
    ("scaler", MinMaxScaler())
])

# source & destination
location_pipe1 = Pipeline(steps=[
    ("grouper", RareLabelEncoder(tol=0.1, replace_with="Other", n_categories=2)),
    ("encoder", MeanEncoder()),
    ("scaler", PowerTransformer())
])

def is_north(X):
    columns = X.columns.to_list()
    north_cities = ["Delhi", "Kolkata", "Mumbai", "New Delhi"]
    return (
        X
        .assign(**{
            f"{col}_is_north": X.loc[:, col].isin(north_cities).astype(int)
            for col in columns
        })
        .drop(columns=columns)
    )

location_transformer = FeatureUnion(transformer_list=[
    ("part1", location_pipe1),
    ("part2", FunctionTransformer(func=is_north))
])

# dep_time & arrival_time
time_pipe1 = Pipeline(steps=[
    ("dt", DatetimeFeatures(features_to_extract=["hour", "minute"])),
    ("scaler", MinMaxScaler())
])

def part_of_day(X, morning=4, noon=12, eve=16, night=20):
    columns = X.columns.to_list()
    X_temp = X.assign(**{
        col: pd.to_datetime(X.loc[:, col]).dt.hour
        for col in columns
    })

    return (
        X_temp
        .assign(**{
            f"{col}_part_of_day": np.select(
                [X_temp.loc[:, col].between(morning, noon, inclusive="left"),
                 X_temp.loc[:, col].between(noon, eve, inclusive="left"),
                 X_temp.loc[:, col].between(eve, night, inclusive="left")],
                ["morning", "afternoon", "evening"],
                default="night"
            )
            for col in columns
        })
        .drop(columns=columns)
    )

time_pipe2 = Pipeline(steps=[
    ("part", FunctionTransformer(func=part_of_day)),
    ("encoder", CountFrequencyEncoder()),
    ("scaler", MinMaxScaler())
])

time_transformer = FeatureUnion(transformer_list=[
    ("part1", time_pipe1),
    ("part2", time_pipe2)
])

# duration
class RBFPercentileSimilarity(BaseEstimator, TransformerMixin):
    def __init__(self, variables=None, percentiles=[0.25, 0.5, 0.75], gamma=0.1):
        self.variables = variables
        self.percentiles = percentiles
        self.gamma = gamma


    def fit(self, X, y=None):
        if not self.variables:
            self.variables = X.select_dtypes(include="number").columns.to_list()

        self.reference_values_ = {
            col: (
                X
                .loc[:, col]
                .quantile(self.percentiles)
                .values
                .reshape(-1, 1)
            )
            for col in self.variables
        }

        return self


    def transform(self, X):
        objects = []
        for col in self.variables:
            columns = [f"{col}_rbf_{int(percentile * 100)}" for percentile in self.percentiles]
            obj = pd.DataFrame(
                data=rbf_kernel(X.loc[:, [col]], Y=self.reference_values_[col], gamma=self.gamma),
                columns=columns
            )
            objects.append(obj)
        return pd.concat(objects, axis=1)
    

def duration_category(X, short=180, med=400):
    return (
        X
        .assign(duration_cat=np.select([X.duration.lt(short),
                                    X.duration.between(short, med, inclusive="left")],
                                    ["short", "medium"],
                                    default="long"))
        .drop(columns="duration")
    )

def is_over(X, value=1000):
    return (
        X
        .assign(**{
            f"duration_over_{value}": X.duration.ge(value).astype(int)
        })
        .drop(columns="duration")
    )

duration_pipe1 = Pipeline(steps=[
    ("rbf", RBFPercentileSimilarity()),
    ("scaler", PowerTransformer())
])

duration_pipe2 = Pipeline(steps=[
    ("cat", FunctionTransformer(func=duration_category)),
    ("encoder", OrdinalEncoder(categories=[["short", "medium", "long"]]))
])

duration_union = FeatureUnion(transformer_list=[
    ("part1", duration_pipe1),
    ("part2", duration_pipe2),
    ("part3", FunctionTransformer(func=is_over)),
    ("part4", StandardScaler())
])

duration_transformer = Pipeline(steps=[
    ("outliers", Winsorizer(capping_method="iqr", fold=1.5)),
    ("imputer", SimpleImputer(strategy="median")),
    ("union", duration_union)
])

# total_stops
def is_direct(X):
    return X.assign(is_direct_flight=X.total_stops.eq(0).astype(int))


total_stops_transformer = Pipeline(steps=[
    ("imputer", SimpleImputer(strategy="most_frequent")),
    ("", FunctionTransformer(func=is_direct))
])

# additional_info
info_pipe1 = Pipeline(steps=[
    ("group", RareLabelEncoder(tol=0.1, n_categories=2, replace_with="Other")),
    ("encoder", OneHotEncoder(handle_unknown="ignore", sparse_output=False))
])

def have_info(X):
    return X.assign(additional_info=X.additional_info.ne("No Info").astype(int))

info_union = FeatureUnion(transformer_list=[
("part1", info_pipe1),
("part2", FunctionTransformer(func=have_info))
])

info_transformer = Pipeline(steps=[
("imputer", SimpleImputer(strategy="constant", fill_value="unknown")),
("union", info_union)
])

# column transformer
column_transformer = ColumnTransformer(transformers=[
("air", air_transformer, ["airline"]),
("doj", doj_transformer, ["date_of_journey"]),
("location", location_transformer, ["source", 'destination']),
("time", time_transformer, ["dep_time", "arrival_time"]),
("dur", duration_transformer, ["duration"]),
("stops", total_stops_transformer, ["total_stops"]),
("info", info_transformer, ["additional_info"])
], remainder="passthrough")

# feature selector
estimator = RandomForestRegressor(n_estimators=10, max_depth=3, random_state=42)

selector = SelectBySingleFeaturePerformance(
estimator=estimator,
scoring="r2",
threshold=0.1
) 

# preprocessor
preprocessor = Pipeline(steps=[
("ct", column_transformer),
("selector", selector)
])

In [9]:
preprocessor.fit(
    train.drop(columns="price"),
    train.price.copy()
)

In [10]:
preprocessor.transform(train.drop(columns="price"))

Unnamed: 0,air__airline_Indigo,air__airline_Jet Airways,doj__date_of_journey_week,doj__date_of_journey_day_of_year,location__source,location__destination,time__arrival_time_hour,dur__duration_rbf_25,dur__duration_cat,dur__duration_over_1000,dur__duration,stops__total_stops,stops__is_direct_flight
0,0.0,0.0,0.176471,0.169492,1.018030,1.018411,0.826087,-0.366504,2.0,1,1.800279,2.0,0
1,0.0,0.0,0.647059,0.635593,-0.079180,-0.096198,0.434783,-0.366504,2.0,1,1.985423,1.0,0
2,0.0,1.0,0.823529,0.847458,-1.130315,-1.707797,0.956522,-0.366504,1.0,0,-0.879436,0.0,1
3,0.0,0.0,0.823529,0.847458,-0.079180,-0.096198,1.000000,-0.366504,2.0,0,0.328872,3.0,0
4,0.0,0.0,0.588235,0.584746,-1.130315,-1.707797,0.347826,-0.363551,0.0,0,-0.928158,0.0,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...
635,0.0,1.0,0.058824,0.067797,1.018030,1.018411,0.173913,-0.366504,2.0,0,-0.168093,1.0,0
636,0.0,1.0,0.235294,0.220339,-1.678967,-1.046938,0.173913,-0.366504,0.0,0,-1.064580,0.0,1
637,0.0,0.0,0.882353,0.872881,1.018030,1.018411,0.695652,-0.366504,1.0,0,-0.489659,1.0,0
638,1.0,0.0,0.411765,0.381356,-1.130315,-1.707797,0.043478,3.097943,0.0,0,-0.908669,0.0,1


## 4. Preprocessing Data and Upload to S3 Bucket

In [11]:
BUCKET_NAME = "sagemaker-flights-bucket"

DATA_PREFIX = "data"

In [12]:
def get_file_name(name):
    return f"{name}-pre.csv"

In [13]:
def export_data(data, name, pre):
    # split data into X and y subsets
    X = data.drop(columns="price")
    y = data.price.copy()
    
    # transformation
    X_pre = pre.transform(X)
    
    # exporting
    file_name = get_file_name(name)
    (
        y
        .to_frame()
        .join(X_pre)
        .to_csv(file_name, index=False, header=False)
    )

In [14]:
def upload_to_bucket(name):
    file_name = get_file_name(name)
    
    (
        boto3
        .Session()
        .resource("s3")
        .Bucket(BUCKET_NAME)
        .Object(os.path.join(DATA_PREFIX, f"{name}/{name}.csv"))
        .upload_file(file_name)
    )

In [15]:
def export_and_upload_bucket(data, name, pre):
    export_data(data, name, pre)
    upload_to_bucket(name)

In [16]:
export_and_upload_bucket(train, "train", preprocessor)

In [17]:
export_and_upload_bucket(val, "val", preprocessor)

In [18]:
export_and_upload_bucket(test, "test", preprocessor)

## 5. Model Training and Hyperparameter Tuning Setup

In [19]:
session = sagemaker.Session()
region_name = session.boto_region_name

In [20]:
output_path = f"s3://{BUCKET_NAME}/model/output"

In [21]:
model = Estimator(
    image_uri=sagemaker.image_uris.retrieve("xgboost", region_name, "1.2-1"),
    role=sagemaker.get_execution_role(),
    instance_count=1,
    instance_type="ml.m4.xlarge",
    volume_size=5,
    output_path=output_path,
    use_spot_instances=True,
    max_run=300,
    max_wait=600,
    sagemaker_session=session
)

In [22]:
model.set_hyperparameters(
    objective="reg:linear",
    num_round=10,
    eta=0.1,
    max_depth=5,
    subsample=0.8,
    colsample_bytree=0.8,
    alpha=0.1
)

In [23]:
hyperparameter_ranges = {
    'eta': ContinuousParameter(0.05, 0.2),
    'alpha': ContinuousParameter(0, 1),
    'max_depth': IntegerParameter(3, 10),
    'gamma': ContinuousParameter(0, 5),
    'lambda': ContinuousParameter(0, 5),
    'min_child_weight': IntegerParameter(1, 10),
    'subsample': ContinuousParameter(0.5, 1.0),
    'colsample_bytree': ContinuousParameter(0.5, 1.0)
}

In [24]:
tuner = HyperparameterTuner(
    estimator=model,
    objective_metric_name="validation:rmse",
    hyperparameter_ranges=hyperparameter_ranges,
    strategy="Bayesian",
    objective_type="Minimize",
    max_jobs=10,
    max_parallel_jobs=2,
)

## 6. Data Channels

In [25]:
def get_data_channel(name):
    bucket_path = f"s3://{BUCKET_NAME}/{DATA_PREFIX}/{name}"
    return TrainingInput(bucket_path, content_type="csv")

In [26]:
train_data_channel = get_data_channel("train")
train_data_channel

<sagemaker.inputs.TrainingInput at 0x7fb09149e8c0>

In [27]:
val_data_channel = get_data_channel("val")

In [28]:
data_channels = {
    "train": train_data_channel,
    "validation": val_data_channel
}

## 7. Train and Tune the Model

In [29]:
tuner.fit(data_channels)

No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config
No finished training job found associated with this estimator. Please make sure this estimator is only used for building workflow config


...................................................................................................................................................................................................!


## 8. Model Evaluation

In [30]:
with open("xgboost-model", "rb") as f:
    best_model = pickle.load(f)
    
best_model

<xgboost.core.Booster at 0x7fb091e47ac0>

In [31]:
def evaluate_model(name):
    file_name = get_file_name(name)
    data = pd.read_csv(file_name)
    
    X = xgb.DMatrix(data.iloc[:, 1:])
    y = data.iloc[:, 0].copy()
    
    pred = best_model.predict(X)
    
    return r2_score(y, pred)

In [32]:
evaluate_model("train")

0.6939330151135665

In [33]:
evaluate_model("val")

0.6195710091845281

In [34]:
evaluate_model("test")

0.6043713989940565