# Experiment Missing Values Strategy
Try different Missing Values Strategy

In [1]:
import os
from working_dir import set_wd
set_wd()
os.getcwd()

'/home/tales/ds/kaggle/football-match-prediction'

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.config('spark.ui.showConsoleProgress', 'false') \
                            .config("spark.sql.debug.maxToStringFields", 500) \
                            .config("spark.sql.debug.autoBroadcastJoinThreshold", -1) \
                            .config("spark.driver.memory", "12g") \
                            .appName("ExperimentMissingValuesStrategy").getOrCreate()

22/05/23 16:54:20 WARN Utils: Your hostname, tales-samsung resolves to a loopback address: 127.0.1.1; using 192.168.0.107 instead (on interface wlxd03745e80dbf)
22/05/23 16:54:20 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/23 16:54:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
import pandas as pd
from datetime import datetime
import uuid
import pyspark.sql.functions as f
from pyspark.sql.functions import when
import matplotlib.pyplot as plt

from src.dao import dao, dao_processed, dao_ml
from src.utils import dflib, stats, pretties, plot, plot_domain, palette
from src.ml.transformers import DropNaTransformer

In [4]:
pretties.max_data_frame_columns()

In [5]:
BASIC_COLS = ['id', 'target', 'league_id', 'league_name',
              'home_team_name', 'away_team_name', 
              'match_date']

N_FOLDS = 10

In [6]:
def remove_cols(cols, cols_to_remove):
    for col_to_remove in cols_to_remove:
        if col_to_remove in cols:
            cols.remove(col_to_remove)
    return cols

# Loading Data

In [None]:
id_data_build = dao_processed.most_recent_data_build_id()
print(id_data_build)

In [None]:
feature_selection_data = dao_ml.load_feature_selection(id_data=id_data_build)[0]
metadata_json = dao_processed.load_processed_metadata(id_data=id_data_build)

In [None]:
use_features = remove_cols(cols=metadata_json["use_features"], cols_to_remove=feature_selection_data["cols_to_remove"])

In [None]:
df_ttrain = dao_processed.load_processed_data(which_dataset="train_train", id_data=id_data_build, spark=spark)
df_ttrain = dflib.sample(df_ttrain, n=df_ttrain.count())
df_tvalid = dao_processed.load_processed_data(which_dataset="train_valid", id_data=id_data_build, spark=spark)
df_test = dao_processed.load_processed_data(which_dataset="test", id_data=id_data_build, spark=spark)

print(f"df_ttrain shape: {dflib.shape(df_ttrain)}")
print(f"df_tvalid shape: {dflib.shape(df_tvalid)}")
print(f"df_test shape: {dflib.shape(df_test)}")

# Data Pipeline

In [None]:
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.feature import VectorAssembler #A feature transformer that merges multiple columns into a vector column.
from pyspark.ml.feature import StringIndexer #A label indexer that maps a string column of labels to an ML column of label indices.
from src.ml.transformers import UndersamplingTransformer, ProbaVectorToPrediction
from src.ml.estimators import FillProbaEstimator
from pyspark.ml.feature import Imputer

from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.tuning import CrossValidator, CrossValidatorModel, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from src.ml import metrics

### Defining

In [None]:
undersampling_transformer = UndersamplingTransformer(target_colname="target")

feature_assembler_transformer = VectorAssembler(inputCols=use_features, 
                                                outputCol="features")

target_indexer_transformer = StringIndexer(inputCol="target", 
                                           outputCol="target_indexed", 
                                           stringOrderType="alphabetDesc").fit(df_ttrain)

In [None]:
pipeline_train = PipelineModel(stages=[feature_assembler_transformer, 
                                       target_indexer_transformer])

pipeline_test = PipelineModel(stages=[feature_assembler_transformer])

### Applying

In [None]:
df_ttrain_na = dflib.filter_any_null(df_ttrain, subset=use_features)
df_ttrain = DropNaTransformer(subset=use_features).transform(df_ttrain)

df_tvalid_na = dflib.filter_any_null(df_tvalid, subset=use_features)
df_tvalid = DropNaTransformer(subset=use_features).transform(df_tvalid)

df_test_na = dflib.filter_any_null(df_test)
df_test = DropNaTransformer().transform(df_test)

print(f"df_ttrain shape: {dflib.shape(df_ttrain)}")
print(f"df_ttrain_na shape: {dflib.shape(df_ttrain_na)}")
print(f"df_tvalid shape: {dflib.shape(df_tvalid)}")
print(f"df_tvalid_na shape: {dflib.shape(df_tvalid_na)}")
print(f"df_test shape: {dflib.shape(df_test)}")
print(f"df_test_na shape: {dflib.shape(df_test_na)}")

In [None]:
df_ttrain = pipeline_train.transform(df_ttrain)
df_tvalid = pipeline_train.transform(df_tvalid)
df_test = pipeline_test.transform(df_test)

In [None]:
df_train = df_ttrain.union(df_tvalid)

# Choosing best strategy
For dataset with <b>missing values</b> in features

#### Imputation

In [None]:
def predict_na_imputer(clf, df_train, df_valid, strategy='median'):
    imputer = Imputer(strategy=strategy, inputCols=use_features, outputCols=use_features).fit(df_train)
    df_valid_imputed = imputer.transform(df_valid)
    df_valid_imputed = pipeline_test.transform(df_valid_imputed)
    df_valid_imputed = target_indexer_transformer.transform(df_valid_imputed)
    
    preds_valid_imputed = clf.transform(df_valid_imputed)
    
    return preds_valid_imputed

In [None]:
preds_tvalid_na_median_imputed = predict_na_imputer(clf, df_ttrain, df_tvalid_na, strategy='median')
preds_tvalid_na_mean_imputed = predict_na_imputer(clf, df_ttrain, df_tvalid_na, strategy='mean')

#### Filling Prediction with Global Frequency

In [None]:
def predict_na_filler(df_train, df_valid):
    proba_filler = FillProbaEstimator(strategy="global_frequency", labels=target_indexer_transformer.labels,
                                      proba_vector_col=rfc.getProbabilityCol()).fit(df_train)

    pred_indexer = ProbaVectorToPrediction(target_transformer=target_indexer_transformer, 
                                           prediction_col="prediction")

    df_valid_proba_filled = proba_filler.transform(df_valid)
    preds_valid_filled = pred_indexer.transform(df_valid_proba_filled)
    preds_valid_filled = target_indexer_transformer.transform(preds_valid_filled)
    return preds_valid_filled

preds_tvalid_na_filled = predict_na_filler(df_ttrain, df_tvalid_na)

# Comparing missing values filling strategy

In [None]:
print(f"score_imputer (median): {preds_tvalid_na_median_imputed.count()}")
print(evaluator.evaluate(preds_tvalid_na_median_imputed))
print()
print(f"score_imputer (mean): {preds_tvalid_na_mean_imputed.count()}")
print(evaluator.evaluate(preds_tvalid_na_mean_imputed))
print()
print(f"score_filler: {preds_tvalid_na_filled.count()}")
print(evaluator.evaluate(preds_tvalid_na_filled))