# Data Pipeline with Flux


In [1]:
from sklearn.datasets import fetch_openml
from sklearn.model_selection import train_test_split


# Just for demostration purposes we manually download some data
titanic = fetch_openml('Titanic', version=1)
data = titanic['data']
target = titanic['target']

data_train, data_test, target_train, target_test = train_test_split(data, target, test_size=0.1, shuffle=True, random_state=1234)

data_train.to_csv('../data/titanic/titanic.csv', index=False)
target_train.to_csv('../data/titanic/titanic_target.csv', index=False)

data_test.to_csv('../data/titanic/titanic_test.csv', index=False)
target_test.to_csv('../data/titanic/titanic_target_test.csv', index=False)

## Imports

In [2]:
from flux import Flux
from flux import CSVDataset, PickleDataset

### Initialize Flux


In [3]:
from fileinput import filename


flux = Flux()

flux.add_datasets({
    "titanic":CSVDataset(
        filename='../data/titanic/titanic.csv',
        save_params={"index":False}
    ),
    "titanic_target":CSVDataset(
        filename='../data/titanic/titanic_target.csv',
        save_params={"index":False}
    ),
    "titanic_pro":CSVDataset(
        filename='../data/titanic/titanic_pro.csv',
        save_params={"index":False}
    ),
    "imputers":PickleDataset(
        filename="../data/titanic/imputers_titanic.pkl"
    ),
    "encoders":PickleDataset(
        filename="../data/titanic/encoders_titanic.pkl"
    ),
    "model":PickleDataset(
        filename="../data/titanic/model_titanic.pkl"
    ),
    "name_prefixes":PickleDataset(
        filename="../data/titanic/name_prefixes.pkl"
    )
})

### Fit - Transform Data


In [4]:
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import re
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import make_pipeline

def get_name_prefix(titanic:pd.DataFrame):
    titanic['name'] = titanic['name'].apply(lambda s: re.findall("[\w.]+\.", s)[0][:-1])
    return titanic

flux.add_node(
    func=get_name_prefix,
    inputs='titanic',
    outputs='titanic_int',
)


In [5]:

def save_common_name_prefix(titanic_int:pd.DataFrame):
    # also save the top 10 common prefixes
    common_prefixes = titanic_int['name'].value_counts()[:10].index.to_list()
    return common_prefixes


flux.add_node(
    func=save_common_name_prefix,
    inputs='titanic_int',
    outputs=['name_prefixes'],
)



In [6]:
def reduce_name_prefix_cardinality(titanic_int, name_prefixes):
    titanic_int['name'] = titanic_int['name'].apply(lambda x: x if x in name_prefixes else "other")
    return titanic_int

flux.add_node(
    func=reduce_name_prefix_cardinality,
    inputs=['titanic_int','name_prefixes'],
    outputs=['titanic_int_1'],
)

# flux.run()

In [7]:
def fit_transform_data_impute(titanic_int:pd.DataFrame):

    num_data = titanic_int.select_dtypes(include='number')
    cat_data = titanic_int.select_dtypes(exclude='number')
    
    print("num features",num_data.columns.to_list())
    print("cat features",cat_data.columns.to_list())

    num_imputer = SimpleImputer(strategy='median')
    cat_imputer = SimpleImputer(strategy='most_frequent')

    imputers = ColumnTransformer([
        ('num_imputer', num_imputer, num_data.columns.to_list()),
        ('cat_imputer', cat_imputer, cat_data.columns.to_list())
    ])

    titanic_int_np = imputers.fit_transform(titanic_int)

    # return titanic_int_np, imputers

    titanic_int = pd.DataFrame(titanic_int_np, columns=imputers.get_feature_names_out(), index=titanic_int.index)

    return titanic_int, imputers

flux.add_node(
    func=fit_transform_data_impute,
    inputs='titanic_int_1',
    outputs=['titanic_int_2','imputers'],
)

In [9]:
from sklearn.model_selection import cross_validate
import numpy as np


def fit_transform_encode_categorical_features(titanic_int:pd.DataFrame):
    
    cat_data = titanic_int.select_dtypes(exclude='number') 
    num_data = titanic_int.select_dtypes(include='number')  

    ord_features = [f for f in cat_data.columns.to_list() if cat_data[f].nunique()<10]
    oh_features = [f for f in cat_data.columns.to_list() if f not in ord_features]

    ord_encoder = OrdinalEncoder(handle_unknown="use_encoded_value",unknown_value=-1)
    oh_encoder = OneHotEncoder(sparse=False, handle_unknown='ignore',)

    encoders = ColumnTransformer([
        ('ordinal_features',ord_encoder, ord_features),
        ('one_hot_features',oh_encoder, oh_features)
    ])
    
    cat_data_np = encoders.fit_transform(cat_data)

    titanic_out = pd.concat([
        pd.DataFrame(cat_data_np, columns=encoders.get_feature_names_out(), index=cat_data.index),
        num_data
        ],axis=1)
    
    return titanic_out, encoders


flux.add_node(
    func=fit_transform_encode_categorical_features,
    inputs='titanic_int_2',
    outputs=['titanic_int_3','encoders'],
)


def train_classification_model(titanic_pro:pd.DataFrame, target:pd.Series):

    model = RandomForestClassifier()

    results=cross_validate(model, titanic_pro, np.ravel(target), cv=3, return_train_score=True, scoring=['accuracy','precision','recall'])
    print("CV RESULTS")
    for k,v in results.items():
        print(k,": ",round(np.mean(v),3))

    model.fit(titanic_pro, np.ravel(target))

    return model

flux.add_node(
    func=train_classification_model,
    inputs=['titanic_int_3','titanic_target'],
    outputs=['model'],
)


flux.run()

Replacing Existing Node: fit_transform_encode_categorical_features
Replacing Existing Node: train_classification_model


num features ['pclass', 'age', 'sibsp', 'parch', 'fare', 'body']
cat features ['name', 'sex', 'ticket', 'cabin', 'embarked', 'boat', 'home.dest']
CV RESULTS
fit_time :  0.347
score_time :  0.038
test_accuracy :  0.952
train_accuracy :  0.999
test_precision :  0.963
train_precision :  0.997
test_recall :  0.911
train_recall :  1.0


In [10]:
flux.save('../data/titanic/fit_flux_titanic.pkl')

### Transforming Data

In [19]:
flux = Flux()

flux.add_datasets({
    "titanic":CSVDataset(
        filename='../data/titanic/titanic_test.csv',
        save_params={"index":False}
    ),
    "titanic_target":CSVDataset(
        filename='../data/titanic/titanic_target_test.csv',
        save_params={"index":False}
    ),
    "imputers":PickleDataset(
        filename="../data/titanic/imputers_titanic.pkl"
    ),
    "encoders":PickleDataset(
        filename="../data/titanic/encoders_titanic.pkl"
    ),
    "model":PickleDataset(
        filename="../data/titanic/model_titanic.pkl"
    ),
    "name_prefixes":PickleDataset(
        filename="../data/titanic/name_prefixes.pkl"
    )
})



In [20]:

flux.add_node(
    func=get_name_prefix,
    inputs='titanic',
    outputs='titanic_int',
)

flux.add_node(
    func=reduce_name_prefix_cardinality,
    inputs=['titanic_int','name_prefixes'],
    outputs=['titanic_int_1'],
)

In [21]:
def transform_data(titanic_int, imputers ,encoders):

    titanic_int_np = imputers.transform(titanic_int)
    titanic_int = pd.DataFrame(titanic_int_np, columns=imputers.get_feature_names_out(), index=titanic_int.index)

    cat_data = titanic_int.select_dtypes(exclude='number')
    num_data = titanic_int.select_dtypes(include='number')
    cat_data_np = encoders.transform(cat_data)

    titanic_out = pd.concat([
        pd.DataFrame(cat_data_np, columns=encoders.get_feature_names_out(), index=cat_data.index),
        num_data
        ],axis=1)
    return titanic_out

flux.add_node(
    func=transform_data,
    inputs=['titanic_int_1','imputers','encoders'],
    outputs=['titanic_int_2'],
)

In [27]:
def predictions(titanic_int, model):
    return pd.DataFrame(model.predict(titanic_int), index=titanic_int.index, columns=['y_pred'])

flux.add_node(
    func=predictions,
    inputs=['titanic_int_2','model'],
    outputs=['y_pred'],
)

Replacing Existing Node: predictions


In [30]:
from sklearn.metrics import accuracy_score, recall_score, precision_score

def evaluate_predictions(titanic_target_test, y_pred):
    acc=accuracy_score(titanic_target_test, y_pred)
    rec=recall_score(titanic_target_test, y_pred)
    pre=precision_score(titanic_target_test, y_pred)
    return {'acc':acc,"rec":rec,"pre":pre}

flux.add_node(
    func=evaluate_predictions,
    inputs=['titanic_target','y_pred'],
    outputs=['metrics'],
)

In [31]:
flux()

In [33]:
flux.load_dataset('metrics')

{'acc': 0.9465648854961832, 'rec': 0.9375, 'pre': 0.9183673469387755}