In [1]:
from spark_utils import get_spark_session
import pyspark.sql.functions as F

spark = get_spark_session()
# run transform_latest.py if this don't exist
test_data = spark.read.parquet('data_transformed/amex-default-prediction/test_data_aggregated')
train_data = spark.read.parquet('data_transformed/amex-default-prediction/train_data_aggregated')
# run format_data.py if these don't exist
train_labels = spark.read.parquet('data/amex-default-prediction/train_labels')
sample_submission = spark.read.parquet('data/amex-default-prediction/sample_submission')

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/16 23:48:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
assert train_data.count() == train_data.select('customer_ID').distinct().count()
assert train_labels.count() == train_labels.select('customer_ID').distinct().count()
assert train_data.count() == train_data.join(train_labels, on='customer_ID', how='inner').count()

assert test_data.count() == test_data.select('customer_ID').distinct().count()
assert sample_submission.count() == sample_submission.select('customer_ID').distinct().count()
assert test_data.count() == test_data.join(sample_submission, on='customer_ID', how='inner').count()

                                                                                

In [3]:
%%time
from format_data import CATEGORICAL_VARIABLES
from encoder import CategoricalToIntegerEncoders

categorical_columns = []
for c in CATEGORICAL_VARIABLES:
    categorical_columns += [
        f'{c}_first',
        f'{c}_last',
        f'{c}_mode',
    ]
encs = CategoricalToIntegerEncoders(columns=categorical_columns).fit(train_data)

CPU times: user 54.2 ms, sys: 52.7 ms, total: 107 ms
Wall time: 16.2 s


In [4]:
%%time
train_pdf = encs.transform(
    spark=spark,
    df=train_data.join(train_labels, on='customer_ID', how='inner')
).toPandas()

22/06/16 23:49:11 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

CPU times: user 1.63 s, sys: 1.26 s, total: 2.89 s
Wall time: 29.3 s


In [5]:
%%time
test_pdf = encs.transform(
    spark=spark,
    df=test_data
).toPandas()

                                                                                

CPU times: user 1.97 s, sys: 3.69 s, total: 5.66 s
Wall time: 31 s


In [6]:
from format_data import TARGET_VARIABLE, ID_VARIABLES

date_cols = ['S_2_first', 'S_2_last']
feature_columns = [
    c for c in train_pdf.columns 
    if c not in [TARGET_VARIABLE,] + ID_VARIABLES + date_cols
]
', '.join(feature_columns)

'num_statements, P_2_first, P_2_last, P_2_mean, D_39_first, D_39_last, D_39_mean, B_1_first, B_1_last, B_1_mean, B_2_first, B_2_last, B_2_mean, R_1_first, R_1_last, R_1_mean, S_3_first, S_3_last, S_3_mean, D_41_first, D_41_last, D_41_mean, B_3_first, B_3_last, B_3_mean, D_42_first, D_42_last, D_42_mean, D_43_first, D_43_last, D_43_mean, D_44_first, D_44_last, D_44_mean, B_4_first, B_4_last, B_4_mean, D_45_first, D_45_last, D_45_mean, B_5_first, B_5_last, B_5_mean, R_2_first, R_2_last, R_2_mean, D_46_first, D_46_last, D_46_mean, D_47_first, D_47_last, D_47_mean, D_48_first, D_48_last, D_48_mean, D_49_first, D_49_last, D_49_mean, B_6_first, B_6_last, B_6_mean, B_7_first, B_7_last, B_7_mean, B_8_first, B_8_last, B_8_mean, D_50_first, D_50_last, D_50_mean, D_51_first, D_51_last, D_51_mean, B_9_first, B_9_last, B_9_mean, R_3_first, R_3_last, R_3_mean, D_52_first, D_52_last, D_52_mean, P_3_first, P_3_last, P_3_mean, B_10_first, B_10_last, B_10_mean, D_53_first, D_53_last, D_53_mean, S_5_firs

In [7]:
X = train_pdf[feature_columns]
y = train_pdf[TARGET_VARIABLE]
print(y.unique())
# Note that the negative class has been subsampled for this dataset at 5%, and thus receives a 20x weighting in the scoring metric.
w = y.apply(lambda x: 20. if x == 1. else 1.)
print(w.unique())

[0. 1.]
[ 1. 20.]


In [8]:
X_test = test_pdf[feature_columns]

In [9]:
from sklearn.model_selection import train_test_split

X_train, X_valid, y_train, y_valid, w_train, w_valid = train_test_split(X, y, w)
X_train.shape, X_valid.shape, y_train.shape, y_valid.shape, w_train.shape, w_valid.shape

((344184, 565), (114729, 565), (344184,), (114729,), (344184,), (114729,))

In [10]:
%%time
import os
from tempfile import TemporaryDirectory
from format_data import PREDICTION_VARIABLE
import pandas as pd
from evaluation import evaluate
from lightgbm import LGBMClassifier
import mlflow


mlflow.lightgbm.autolog()
experiment_id = mlflow.get_experiment_by_name('use_aggregated.ipynb').experiment_id
with mlflow.start_run(experiment_id=experiment_id) as run:
    run_id = run.info.run_id
    print(f'run_id: {run_id}')

    m = LGBMClassifier().fit(
        X=X_train, y=y_train, sample_weight=w_train,
        categorical_feature=encs.columns_encoded,
    )
    # these are auto logged
    score_train = m.score(X=X_train, y=y_train, sample_weight=w_train)
    score_valid = m.score(X=X_valid, y=y_valid, sample_weight=w_valid)
    # these are not, so we'll need to log them manually
    score_amex_train = evaluate(X_train, y_train, m=m)
    score_amex_valid = evaluate(X_valid, y_valid, m=m)
    mlflow.log_metric('score_amex_train', score_amex_train)
    mlflow.log_metric('score_amex_valid', score_amex_valid)

    pred_df = pd.DataFrame({
        PREDICTION_VARIABLE: m.predict_proba(X_test)[:, 1],
        'customer_ID': test_pdf['customer_ID'],
    })
    pred_and_sample_joined_counts = (
        spark
        .createDataFrame(pred_df)
        .join(sample_submission, on='customer_ID', how='inner')
        .count()
    )
    assert pred_and_sample_joined_counts == len(pred_df), \
        f'''These should be identical:
        sample_submission has {sample_submission.count()} rows,
        pred_and_sample_joined_counts is {pred_and_sample_joined_counts},
        pred_df has {len(pred_df)} rows
        '''
    with TemporaryDirectory() as p:
        p = os.path.join(p, 'submission.csv')
        pred_df.to_csv(p, header=True, index=False)
        mlflow.log_artifact(local_path=p)


run_id: 1e0a4409d0f64b01a242d38c75df61cd


22/06/16 23:51:26 WARN TaskSetManager: Stage 164 contains a task of very large size (8582 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

CPU times: user 4min 33s, sys: 18.4 s, total: 4min 51s
Wall time: 1min 20s
