In [0]:
%pyspark
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

spark.conf.set("parquet.encryption.key.provider", "null")
spark.conf.set("parquet.encryption.column.keys", "")
spark.conf.set("parquet.encryption.footer.key", "")

aftm_store_path = "/data/zeus/store/ais_fraud_prediction"
aftm_upload_path = "/data/zeus/upload/ais_fraud_prediction"

time_ver = "202312071741"

In [1]:
%pyspark
aftm_df_orig = spark.read.parquet(f"{aftm_store_path}/dataset/purchase_history")
aftm_df_prof = spark.read.parquet(f"{aftm_store_path}/dataset/customer_profile")

aftm_df_orig.createOrReplaceTempView("purchase_history")
aftm_df_prof.createOrReplaceTempView("customer_profile")

aftm_df = spark.sql("select a2.*, a1.is_fraud from purchase_history as a1 left join customer_profile as a2")

In [2]:
%pyspark
aftm_num_samp_rows = 1000
aftm_samp_fr = aftm_num_samp_rows / aftm_df.count()
aftm_df_sample = aftm_df.sample(aftm_samp_fr)
aftm_df_pd = aftm_df_sample.toPandas()

In [3]:
%pyspark
aftm_us_states = [
    "AL", "AK", "AZ", "AR", "CA", "CO", "CT", "DE", "FL", "GA",
    "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MD",
    "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ",
    "NM", "NY", "NC", "ND", "OH", "OK", "OR", "PA", "RI", "SC", 
    "SD", "TN", "TX", "UT", "VT", "VA", "WA", "WV", "WI", "WY"
]

def aftm_format_phone_no(phone_no):
    phone_no_red = phone_no

    if "x" in phone_no:
        phone_no_red = phone_no.split("x")[0]
    if "+" in phone_no:
        phone_no_red = "".join(phone_no_red.split("-")[1:])

    return phone_no_red \
        .replace("-", "") \
        .replace(".", "") \
        .replace("(", "") \
        .replace(")", "")

aftm_df_pd_trf = aftm_df_pd.copy()
aftm_df_pd_trf["mailing_platform"] = aftm_df_pd_trf["email"].apply(lambda x: x.split("@")[1])
aftm_df_pd_trf["state"] = aftm_df_pd_trf["address"].apply(lambda x: x.split(" ")[-2])
aftm_df_pd_trf["state_valid"] = aftm_df_pd_trf["state"].apply(lambda x: 1 if x in aftm_us_states else 0)
aftm_df_pd_trf["state"] = aftm_df_pd_trf[["state", "state_valid"]].apply(lambda x: x[0] if x[1] else "IV", axis=1)
aftm_df_pd_trf["contact_phone"] = aftm_df_pd_trf["contact_phone"].apply(lambda x: aftm_format_phone_no(x))
aftm_df_pd_trf["contact_phone_valid"] = aftm_df_pd_trf["contact_phone"].apply(lambda x: 1 if len(x) == 10 else 0)
aftm_df_pd_trf["credit_card_no_valid"] = aftm_df_pd_trf["credit_card_no"].apply(lambda x: 1 if len(x) == 10 else 0)
aftm_df_pd_trf["birth_month"] = aftm_df_pd_trf["date_of_birth"].apply(lambda x: int(x.split("-")[1]))
aftm_df_pd_trf["birth_year"] = aftm_df_pd_trf["date_of_birth"].apply(lambda x: int(x.split("-")[0]))
aftm_df_pd_trf["membership_month"] = aftm_df_pd_trf["member_since"].apply(lambda x: int(x.split("-")[1]))
aftm_df_pd_trf["membership_year"] = aftm_df_pd_trf["member_since"].apply(lambda x: int(x.split("-")[0]))

In [4]:
%pyspark
from sklearn.pipeline import Pipeline
from sklearn.pipeline import FeatureUnion
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OrdinalEncoder
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import MinMaxScaler
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier

aftm_oe_inp_cols = ["mailing_platform", "state", "sex", "status"]
aftm_ohe_inp_cols = ["state_valid", "contact_phone_valid", "credit_card_no_valid", "birth_month", "membership_month"]
aftm_mms_inp_cols = ["birth_year", "membership_year"]

aftm_pl = Pipeline(steps=[
    ("fu", 
        FeatureUnion([
            ("pl", 
                Pipeline(steps=[
                    ("ctoe", ColumnTransformer(transformers=[("oe", OrdinalEncoder(), aftm_oe_inp_cols)])),
                    ("ohe0", OneHotEncoder(sparse=False))
                ])
            ),
            ("ctohe", ColumnTransformer(transformers=[("ohe1", OneHotEncoder(sparse=False), aftm_ohe_inp_cols)])),
            ("ctmms", ColumnTransformer(transformers=[("mms", MinMaxScaler(), aftm_mms_inp_cols)]))
        ])
    ),
    ("pca", PCA()),
    ("rfc", RandomForestClassifier())
])

aftm_pl_md = aftm_pl.fit(aftm_df_pd_trf, aftm_df_pd["is_fraud"].values)

In [5]:
%pyspark
import pickle

pickle.dump(aftm_pl_md, open(f"{aftm_store_path}/mvp1/model_{time_ver}.pkl", "wb"))

In [6]:
%pyspark