In [38]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "3"

In [39]:
print(os.getcwd())

/home/notebook


In [40]:
from ipynb.fs.full.pyspark_setup import get_spark_session
from ipynb.fs.full.conf_template import Struct as Section

In [41]:
spark = get_spark_session("data_bc", swan_spark_conf)

In [42]:
from pyspark import StorageLevel

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql import Row
from pyspark.sql import DataFrame

from pyspark.ml.classification import LinearSVC
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.classification import DecisionTreeClassifier

from pyspark.ml.fpm import FPGrowth

from pyspark.ml.feature import Normalizer
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import RobustScaler
from pyspark.ml.feature import MaxAbsScaler

from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

from pyspark.ml.linalg import Vectors,VectorUDT

import time
import datetime
import numpy as np
from dateutil.relativedelta import relativedelta
import random
from functools import reduce

# Postgres Data Brand_Commerce

In [14]:
user_bc_1 = spark.read.format("jdbc").\
options(
         url='jdbc:postgresql://postgres-db-lb.naufalhilmi.svc.cluster.local:5432/postgres',
         dbtable='brand_commerce.users',
         user='postgres',
         password='postgres',
         driver='org.postgresql.Driver').\
load()

In [15]:
user_bc_2 = spark.read.format("jdbc").\
options(
         url='jdbc:postgresql://postgres-db-lb.naufalhilmi.svc.cluster.local:5432/postgres',
         dbtable='public.users',
         user='postgres',
         password='postgres',
         driver='org.postgresql.Driver').\
load()

In [16]:
order_shipping_bc = spark.read.format("jdbc").\
options(
         url='jdbc:postgresql://postgres-db-lb.naufalhilmi.svc.cluster.local:5432/postgres',
         dbtable='brand_commerce.order_shipping',
         user='postgres',
         password='postgres',
         driver='org.postgresql.Driver').\
load()

In [17]:
order_product_bc = spark.read.format("jdbc").\
options(
         url='jdbc:postgresql://postgres-db-lb.naufalhilmi.svc.cluster.local:5432/postgres',
         dbtable='brand_commerce.order_product',
         user='postgres',
         password='postgres',
         driver='org.postgresql.Driver').\
load()

In [18]:
order_bc = spark.read.format("jdbc").\
options(
         url='jdbc:postgresql://postgres-db-lb.naufalhilmi.svc.cluster.local:5432/postgres',
         dbtable='brand_commerce.order_transaction_bc_naufal',
         user='postgres',
         password='postgres',
         driver='org.postgresql.Driver').\
load()

# Postgres Data Shopee

In [19]:
order_shopee = spark.read.format("jdbc").\
options(
         url='jdbc:postgresql://postgres-db-lb.naufalhilmi.svc.cluster.local:5432/postgres',
         dbtable='shopee.order_transaction_shopee_naufal',
         user='postgres',
         password='postgres',
         driver='org.postgresql.Driver').\
load()

In [20]:
order_product_shopee = spark.read.format("jdbc").\
options(
         url='jdbc:postgresql://postgres-db-lb.naufalhilmi.svc.cluster.local:5432/postgres',
         dbtable='shopee.income_item_detail',
         user='postgres',
         password='postgres',
         driver='org.postgresql.Driver').\
load()

In [21]:
lst_bc = ['wc-closed', 'accepted', 'Completed', 'completed', 'wc-completed', 'Waiting Payment', 'Under Shipment']
lst_shopee = ['READY_TO_SHIP', 'COMPLETED', 'SHIPPED', 'TO_CONFIRM_RECEIVE']

In [22]:
lst_brand_bc = ['Emina', 'Wardah', 'Kahf']
lst_brand_shopee = [59763733, 63983008, 326487418]

In [23]:
order_bc = order_bc.filter(col('order_status').isin(lst_bc))
order_bc = order_bc.filter(col("date_paid").isNotNull())
order_bc = order_bc.filter(col('brand').isin(lst_brand_bc))

In [24]:
order_shopee = order_shopee.filter(col('order_status').isin(lst_shopee))
order_shopee = order_shopee.filter(col("pay_time").isNotNull())
order_shopee = order_shopee.filter(col('shopid').isin(lst_brand_shopee))

# Preprocessing raw data

In [27]:
# Update order_id in order_billing data to match the brand
user_bc_1 = user_bc_1.select('brand','user_id','nickname')
user_bc_2 = user_bc_2.select('brand','user_id','nickname')
user_bc = user_bc_1.union(user_bc_2)

user_bc = user_bc\
      .withColumn("user_id", when(user_bc.brand == "Emina",concat(col('user_id'), lit("E"))) \
      .when(user_bc.brand == "Wardah",concat(col('user_id'), lit("W"))) \
      .when(user_bc.brand == "Kahf",concat(col('user_id'), lit("K"))) \
      .otherwise(user_bc_1.user_id))

order_shipping_bc_2 = order_shipping_bc\
      .withColumn("order_id", when(order_shipping_bc.brand == "Emina",concat(col('order_id'), lit("E"))) \
      .when(order_shipping_bc.brand == "Wardah",concat(col('order_id'), lit("W"))) \
      .when(order_shipping_bc.brand == "Kahf",concat(col('order_id'), lit("K"))) \
      .otherwise(order_shipping_bc.order_id))

order_shipping_bc = order_shipping_bc_2\
      .withColumn("shipping_province", when(upper(col('shipping_province')).contains('JAWA BARAT'),'Jawa Barat') \
      .when(upper(col('shipping_province')).contains('DKI JAKARTA'),'DKI Jakarta') \
      .when(upper(col('shipping_province')).contains('BANTEN'),'Banten') \
      .when(upper(col('shipping_province')).contains('JAWA TENGAH'),'Jawa Tengah') \
      .when(upper(col('shipping_province')).contains('JAWA TIMUR'),'Jawa Timur') \
      .when(upper(col('shipping_province')).contains('SUMATERA UTARA'),'Sumatera Utara') \
      .otherwise('Other'))
    
order_product_bc = order_product_bc\
      .withColumn("order_id", when(order_product_bc.brand == "Emina",concat(col('order_id'), lit("E"))) \
      .when(order_product_bc.brand == "Wardah",concat(col('order_id'), lit("W"))) \
      .when(order_product_bc.brand == "Kahf",concat(col('order_id'), lit("K"))) \
      .otherwise(order_product_bc.order_id))\
      .withColumn("order_item_id", when(order_product_bc.brand == "Emina",concat(col('order_item_id'), lit("E"))) \
      .when(order_product_bc.brand == "Wardah",concat(col('order_item_id'), lit("W"))) \
      .when(order_product_bc.brand == "Kahf",concat(col('order_item_id'), lit("K"))) \
      .otherwise(order_product_bc.order_item_id))\
      .withColumn("product_id", when(order_product_bc.brand == "Emina",concat(col('product_id'), lit("E"))) \
      .when(order_product_bc.brand == "Wardah",concat(col('product_id'), lit("W"))) \
      .when(order_product_bc.brand == "Kahf",concat(col('product_id'), lit("K"))) \
      .otherwise(order_product_bc.product_id))

order_bc = order_bc\
      .withColumn("customer_id", when(order_bc.brand == "Emina",concat(col('customer_id'), lit("E"))) \
      .when(order_bc.brand == "Wardah",concat(col('customer_id'), lit("W"))) \
      .when(order_bc.brand == "Kahf",concat(col('customer_id'), lit("K"))) \
      .otherwise(order_bc.customer_id))\
      .withColumn("order_id", when(order_bc.brand == "Emina",concat(col('order_id'), lit("E"))) \
      .when(order_bc.brand == "Wardah",concat(col('order_id'), lit("W"))) \
      .when(order_bc.brand == "Kahf",concat(col('order_id'), lit("K"))) \
      .otherwise(order_bc.order_id))\

In [28]:
order_shopee_2 = order_shopee

order_shopee = order_shopee_2\
      .withColumn("recipient_province", when(upper(col('recipient_province')).contains('JAWA BARAT'),'Jawa Barat') \
      .when(upper(col('recipient_province')).contains('DKI JAKARTA'),'DKI Jakarta') \
      .when(upper(col('recipient_province')).contains('BANTEN'),'Banten') \
      .when(upper(col('recipient_province')).contains('JAWA TENGAH'),'Jawa Tengah') \
      .when(upper(col('recipient_province')).contains('JAWA TIMUR'),'Jawa Timur') \
      .when(upper(col('recipient_province')).contains('SUMATERA UTARA'),'Sumatera Utara') \
      .otherwise('Other'))

order_shopee = order_shopee\
      .withColumn("shopid", when(order_shopee.shopid == 59763733, 'Wardah') \
      .when(order_shopee.shopid == 63983008, 'Emina') \
      .when(order_shopee.shopid == 326487418, 'Kahf') \
      .otherwise(order_shopee.shopid))

# Extract feature for ML for Task 1

In [29]:
def transform_data_shopee():
    order_shopee_tmp = order_shopee.select('ordersn', 'create_time', 'buyer_username', 'recipient_province','shopid')
    
    predict_month = order_shopee_tmp.agg(trunc(max(col('create_time')),'month')).collect()[0][0]
    
    min_month = predict_month + relativedelta(months=-6)
    
    max_extract_month = predict_month + relativedelta(months=-1)
    
    order_shopee_extract = order_shopee_tmp.filter((trunc(col('create_time'),'month') < predict_month) & 
                                                   (trunc(col('create_time'),'month') >= min_month))
    
    qty_order = order_product_shopee.groupBy('ordersn').agg(sum('original_price').alias('revenue'), sum('qty_purchased').alias('qty'))\
    .select('ordersn','revenue','qty')
    
    days_diff = order_shopee_extract.select('create_time', 'buyer_username')
    days_diff = days_diff.withColumn('date_only', to_date(col('create_time')))
    w = Window().partitionBy('buyer_username').orderBy(['buyer_username', 'create_time'])
    days_diff = days_diff.dropDuplicates(["buyer_username","date_only"])
    days_diff = days_diff.select("*", lag("date_only").over(w).alias("new_col_1"))\
    .withColumn('days_diff', datediff(col('date_only'),col("new_col_1"))).fillna(999,subset=['days_diff'])
    days_diff = days_diff.groupby("buyer_username").agg(min('days_diff').alias('days_diff'))
    
    order_shopee_extract = order_shopee_extract.join(qty_order, ['ordersn'])
    order_shopee_extract = order_shopee_extract.withColumn('count_order', lit(1))
    order_shopee_extract = order_shopee_extract.withColumnRenamed('buyer_username','customer_id')
    order_shopee_extract = order_shopee_extract.withColumn('max_extract_month', lit(max_extract_month))
    
    final_data = order_shopee_extract.groupBy('customer_id')\
    .agg(sum('count_order').alias('frequency'),max('recipient_province').alias('dc_code'),sum('revenue').alias('revenue'),
         max('create_time').alias('max_tgl'),max('max_extract_month').alias('max_extract_month'),
         size(collect_set('shopid')).alias('brand'),sum('qty').alias('qty'))\
    .withColumn('recency', datediff(last_day(col('max_extract_month')), col('max_tgl')))\
    .select('customer_id','frequency','revenue','recency','dc_code','qty','brand')
    
    days_diff = days_diff.withColumnRenamed('buyer_username','customer_id')
    final_data = final_data.join(days_diff, ['customer_id'])
    
    return final_data

In [30]:
def transform_data_bc():
    # Slicing data agar 6 bulan saja
    order_bc_tmp = order_bc.select('order_id', 'date_order', 'customer_id','brand')
    predict_month = order_bc_tmp.agg(trunc(max(col('date_order')),'month')).collect()[0][0]
    min_month = predict_month + relativedelta(months=-6)
    max_extract_month = predict_month + relativedelta(months=-1)
    order_bc_extract = order_bc_tmp.filter((trunc(col('date_order'),'month') < predict_month) & 
                                           (trunc(col('date_order'),'month') >= min_month))
    
    # Ekstrak fitur quantity, days_diff, dc_code, dan count_order
    qty_order = order_product_bc.groupBy('order_id').agg(sum('line_subtotal').alias('revenue'), sum('qty').alias('qty'))\
    .select('order_id', 'revenue','qty')

    dc_user = order_shipping_bc.select('order_id', 'shipping_province')
    
    days_diff = order_bc_extract.select('date_order', 'customer_id')
    days_diff = days_diff.withColumn('date_only', to_date(col('date_order')))
    w = Window().partitionBy('customer_id').orderBy(['customer_id', 'date_order'])
    days_diff = days_diff.dropDuplicates(["customer_id","date_only"])
    days_diff = days_diff.select("*", lag("date_only").over(w).alias("new_col_1"))\
    .withColumn('days_diff', datediff(col('date_only'),col("new_col_1"))).fillna(999,subset=['days_diff'])
    days_diff = days_diff.groupby("customer_id").agg(min('days_diff').alias('days_diff'))
    
    order_bc_extract = order_bc_extract.join(qty_order, ['order_id'])
    order_bc_extract = order_bc_extract.join(dc_user, ['order_id'])
    order_bc_extract = order_bc_extract.withColumn('count_order', lit(1))
    order_bc_extract = order_bc_extract.withColumn('max_extract_month', lit(max_extract_month))
    
    final_data = order_bc_extract.groupBy('customer_id')\
    .agg(sum('count_order').alias('frequency'),sum('revenue').alias('revenue'), max('shipping_province').alias('dc_code'),
         max('date_order').alias('max_tgl'),max('max_extract_month').alias('max_extract_month'),
         size(collect_set('brand')).alias('brand'), sum('qty').alias('qty'))\
    .withColumn('recency', datediff(last_day(col('max_extract_month')), col('max_tgl')))\
    .select('customer_id','frequency','revenue','recency','dc_code','qty','brand')
    
    final_data = final_data.join(days_diff, ['customer_id'])
    
    return final_data

In [31]:
final_data_bc = transform_data_bc()

In [32]:
final_data_shopee = transform_data_shopee()

In [33]:
final_data = final_data_shopee.union(final_data_bc)

In [34]:
frequency_median = final_data.select(percentile_approx('frequency',0.5)).take(1)[0][0]
recency_median = final_data.select(percentile_approx('recency',0.5)).take(1)[0][0]
revenue_median = final_data.select(percentile_approx('revenue',0.5)).take(1)[0][0]

final_data = final_data.withColumn('frequency_median',lit(frequency_median))
final_data = final_data.withColumn('recency_median',lit(recency_median))
final_data = final_data.withColumn('revenue_median',lit(revenue_median))

final_data = final_data.withColumn('label', when((col('frequency')>col('frequency_median'))&
                                                (col('recency')<col('recency_median'))&
                                                (col('revenue')>col('revenue_median')),1).otherwise(0))

final_data = final_data.drop('frequency_median','recency_median','revenue_median','revenue','recency','frequency')

In [43]:
final_data.cache()

DataFrame[customer_id: string, dc_code: string, qty: bigint, brand: int, days_diff: int, label: int]

##  ML Proses Task 1

In [44]:
evaluator=MulticlassClassificationEvaluator()

In [45]:
def combine_feature(data):
    indexer_dc_code = StringIndexer(inputCol='dc_code', outputCol='dc_code_num')
    index_data_dc_code = indexer_dc_code.fit(data).transform(data)
    encoder = OneHotEncoder(inputCol='dc_code_num', outputCol = 'dc_code_vec')
    onehotdata = encoder.fit(index_data_dc_code).transform(index_data_dc_code)
    
#     encoder = OneHotEncoder(inputCol='cluster_frequency', outputCol = 'cluster_frequency_vec')
#     onehotdata = encoder.fit(onehotdata).transform(onehotdata)
#     encoder = OneHotEncoder(inputCol='cluster_revenue', outputCol = 'cluster_revenue_vec')
#     onehotdata = encoder.fit(onehotdata).transform(onehotdata)
#     encoder = OneHotEncoder(inputCol='cluster_recency', outputCol = 'cluster_recency_vec')
#     onehotdata = encoder.fit(onehotdata).transform(onehotdata)
    
    va = VectorAssembler(outputCol='features')
    va.setInputCols(['days_diff','dc_code_vec','qty','brand'])
    combine_data = va.transform(onehotdata).select(['features','label'])
    return combine_data

In [46]:
def make_oversampling_data(train_data, potentialDf_count, nonPotentialDf_count):
    potentialDf = train_data.filter("label=1.0")
    nonPotentialDf = train_data.filter("label=0.0")
    sampleRatio = nonPotentialDf_count / potentialDf_count
    a = range(int(sampleRatio))
    oversampled_df = potentialDf.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')
    train_df = nonPotentialDf.unionAll(oversampled_df)
    return train_df

def make_undersampling_data(train_data, potentialDf_count, nonPotentialDf_count):
    potentialDf = train_data.filter("label=1.0")
    nonPotentialDf= train_data.filter("label=0.0")
    sampleRatio = potentialDf_count / nonPotentialDf_count
    nonPotentialSampleDf = nonPotentialDf.sample(False, sampleRatio)
    train_df = potentialDf.unionAll(nonPotentialSampleDf)
    return train_df

In [47]:
def transform_data_norm_1(data):
    normalizer = Normalizer(inputCol="features")
    data = normalizer.transform(data, {normalizer.p: 1, normalizer.outputCol:"norm_1"})
    return data.select('label',"norm_1")

def transform_data_norm_inf(data):
    normalizer = Normalizer(inputCol="features")
    data = normalizer.transform(data, {normalizer.p: float("inf"), normalizer.outputCol:"norm_inf"})
    return data.select('label',"norm_inf")

def transform_data_standard_scaler(data):
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
    scalerModel = scaler.fit(data)
    data = scalerModel.transform(data)
    return data.select('label',"scaled_features")

def transform_data_minmax_scaler(data):
    mmscaler = MinMaxScaler(inputCol="features", outputCol="mmscaled_features")
    mmscalerModel = mmscaler.fit(data)
    data = mmscalerModel.transform(data)
    return data.select('label',"mmscaled_features")

def transform_data_maxabs_scaler(data):
    mascaler = MaxAbsScaler(inputCol="features", outputCol="mascaled_features")
    mascalerModel = mascaler.fit(data)
    data = mascalerModel.transform(data)
    return data.select('label',"mascaled_features")

In [48]:
def lsvc_train_tuning_model(train_data, features):
    model = LinearSVC(labelCol="label", maxIter=10, featuresCol=features)
    trained_model = model.fit(train_data)
    return trained_model

def lr_train_tuning_model(train_data, features):
    model = LogisticRegression(labelCol="label", maxIter=10, featuresCol=features)
    trained_model = model.fit(train_data)
    return trained_model

def rf_train_tuning_model(train_data, features):
    model = RandomForestClassifier(labelCol="label", featuresCol=features)
    trained_model = model.fit(train_data)
    return trained_model

def gbt_train_tuning_model(train_data, features):
    model = GBTClassifier(labelCol="label", maxIter=10, featuresCol=features)
    trained_model = model.fit(train_data)
    return trained_model

def dt_train_tuning_model(train_data, features):
    model = DecisionTreeClassifier(labelCol="label", featuresCol=features)
    trained_model = model.fit(train_data)
    return trained_model

In [49]:
def metric_model_f1(pred):
    f1 = evaluator.evaluate(pred,{evaluator.metricName: "fMeasureByLabel", evaluator.metricLabel: 1.0})  
    return f1

In [50]:
def metric_model_precision(pred):
    precision = evaluator.evaluate(pred,{evaluator.metricName: "precisionByLabel", evaluator.metricLabel: 1.0}) 
    return precision

In [51]:
def metric_model_recall(pred):
    recall = evaluator.evaluate(pred,{evaluator.metricName: "recallByLabel", evaluator.metricLabel: 1.0})
    return recall

In [52]:
def metric_model_accuracy(pred):
    accuracy = evaluator.evaluate(pred,{evaluator.metricName: "accuracy"})
    return accuracy

In [53]:
from sklearn.metrics import f1_score
def metric_model_f1_multi(y_true, y_pred):
    f1_multi = f1_score(y_true, y_pred, average='macro')
    return f1_multi

## Prepare data for ML Task 1

In [54]:
combine_data = combine_feature(final_data)
raw_data_train, raw_data_test = combine_data.randomSplit([0.7, 0.3], seed = 42)

In [55]:
potential_count = raw_data_train.filter("label=1.0").count()

In [56]:
nonPotential_count = raw_data_train.filter("label=0.0").count()

In [58]:
data_train_under = make_undersampling_data(raw_data_train,potential_count,nonPotential_count)

data_original_train = data_train_under
data_original_test = raw_data_test

data_norm_1_train = transform_data_norm_1(data_train_under)
data_norm_1_test = transform_data_norm_1(raw_data_test)

data_norm_inf_train = transform_data_norm_inf(data_train_under)
data_norm_inf_test = transform_data_norm_inf(raw_data_test)

data_standard_scaler_train = transform_data_standard_scaler(data_train_under)
data_standard_scaler_test = transform_data_standard_scaler(raw_data_test)

data_minmax_scaler_train = transform_data_minmax_scaler(data_train_under)
data_minmax_scaler_test = transform_data_minmax_scaler(raw_data_test)

data_maxabs_scaler_train = transform_data_maxabs_scaler(data_train_under)
data_maxabs_scaler_test = transform_data_maxabs_scaler(raw_data_test)

In [59]:
final_data.unpersist()

DataFrame[customer_id: string, dc_code: string, qty: bigint, brand: int, days_diff: int, label: int]

## ML proses Task 1

In [60]:
dict_model_task_1 = {}
metric_task_1 = []

## SVM

In [92]:
data_original_train.cache()
data_original_test.cache()

DataFrame[features: vector, label: int]

In [93]:
start = time.time()
lsvc_tuning_original = lsvc_train_tuning_model(data_original_train,'features')
end = time.time()

In [None]:
pred = lsvc_tuning_original.transform(data_original_test)
f1 = metric_model_f1(pred)
precision = metric_model_precision(pred)
recall = metric_model_recall(pred)
dict_model_task_1['svm_original'] = lsvc_tuning_original

In [None]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))

In [None]:
metric_task_1.append(('svm', 'original', f1, precision, recall))

In [None]:
data_original_train.unpersist()
data_original_test.unpersist()

In [91]:
data_norm_inf_train.cache()
data_norm_inf_test.cache()

DataFrame[label: int, norm_inf: vector]

In [92]:
start = time.time()
lsvc_tuning_norm_inf = lsvc_train_tuning_model(data_norm_inf_train,'norm_inf')
end = time.time()

IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [93]:
pred = lsvc_tuning_norm_inf.transform(data_norm_inf_test)
f1 = metric_model_f1(pred)
precision = metric_model_precision(pred)
recall = metric_model_recall(pred)
dict_model_task_1['svm_norm_inf'] = lsvc_tuning_norm_inf

IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)

IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [94]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))

[[308782  53081]
 [  9275   6260]]


In [95]:
metric_task_1.append(('svm', 'norm_inf', f1, precision, recall))

In [96]:
data_norm_inf_train.unpersist()
data_norm_inf_test.unpersist()

DataFrame[label: int, norm_inf: vector]

In [97]:
data_norm_1_train.cache()
data_norm_1_test.cache()

DataFrame[label: int, norm_1: vector]

In [98]:
start = time.time()
lsvc_tuning_norm_1 = lsvc_train_tuning_model(data_norm_1_train,'norm_1')
end = time.time()

In [99]:
pred = lsvc_tuning_norm_1.transform(data_norm_1_test)
f1 = metric_model_f1(pred)
precision = metric_model_precision(pred)
recall = metric_model_recall(pred)
dict_model_task_1['svm_norm_1'] = lsvc_tuning_norm_1

IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [100]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))

[[361863      0]
 [ 15535      0]]


In [101]:
metric_task_1.append(('svm', 'norm_1', f1, precision, recall))

In [102]:
data_norm_1_train.unpersist()
data_norm_1_test.unpersist()

DataFrame[label: int, norm_1: vector]

In [103]:
data_standard_scaler_train.cache()
data_standard_scaler_test.cache()

DataFrame[label: int, scaled_features: vector]

In [104]:
start = time.time()
lsvc_tuning_scaled_features = lsvc_train_tuning_model(data_standard_scaler_train,'scaled_features')
end = time.time()

In [105]:
pred = lsvc_tuning_scaled_features.transform(data_standard_scaler_test)
f1 = metric_model_f1(pred)
precision = metric_model_precision(pred)
recall = metric_model_recall(pred)
dict_model_task_1['svm_scaled_features'] = lsvc_tuning_scaled_features

In [106]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))

[[306972  54891]
 [  9107   6428]]


In [107]:
metric_task_1.append(('svm', 'scaled_features', f1, precision, recall))

In [108]:
data_standard_scaler_train.unpersist()
data_standard_scaler_test.unpersist()

DataFrame[label: int, scaled_features: vector]

In [109]:
data_minmax_scaler_train.cache()
data_minmax_scaler_test.cache()

DataFrame[label: int, mmscaled_features: vector]

In [110]:
start = time.time()
lsvc_tuning_mmscaled_features = lsvc_train_tuning_model(data_minmax_scaler_train,'mmscaled_features')
end = time.time()

In [111]:
pred = lsvc_tuning_mmscaled_features.transform(data_minmax_scaler_test)
f1 = metric_model_f1(pred)
precision = metric_model_precision(pred)
recall = metric_model_recall(pred)
dict_model_task_1['svm_mmscaled_features'] = lsvc_tuning_mmscaled_features

In [112]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))

[[306973  54890]
 [  9107   6428]]


IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [113]:
metric_task_1.append(('svm', 'mmscaled_features', f1, precision, recall))

In [114]:
data_minmax_scaler_train.unpersist()
data_minmax_scaler_test.unpersist()

DataFrame[label: int, mmscaled_features: vector]

In [115]:
data_maxabs_scaler_train.cache()
data_maxabs_scaler_test.cache()

DataFrame[label: int, mascaled_features: vector]

In [116]:
start = time.time()
lsvc_tuning_mascaled_features = lsvc_train_tuning_model(data_maxabs_scaler_train,'mascaled_features')
end = time.time()

IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)

IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)

IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [117]:
pred = lsvc_tuning_mascaled_features.transform(data_maxabs_scaler_test)
f1 = metric_model_f1(pred)
precision = metric_model_precision(pred)
recall = metric_model_recall(pred)
dict_model_task_1['svm_mascaled_features'] = lsvc_tuning_mascaled_features

IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)

IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)

IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [118]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))

[[306972  54891]
 [  9107   6428]]


In [119]:
metric_task_1.append(('svm', 'mascaled_features', f1, precision, recall))

In [120]:
data_maxabs_scaler_train.unpersist()
data_maxabs_scaler_test.unpersist()

DataFrame[label: int, mascaled_features: vector]

## LR

In [61]:
data_original_train.cache()
data_original_test.cache()

DataFrame[features: vector, label: int]

In [62]:
start = time.time()
lr_tuning_original = lr_train_tuning_model(data_original_train,'features')
end = time.time()

In [63]:
pred = lr_tuning_original.transform(data_original_test)
f1 = metric_model_f1(pred)
precision = metric_model_precision(pred)
recall = metric_model_recall(pred)
dict_model_task_1['lr_original'] = lr_tuning_original

In [64]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))

[[233115  20245]
 [  1378  34997]]
              precision    recall  f1-score   support

           0       0.99      0.92      0.96    253360
           1       0.63      0.96      0.76     36375

    accuracy                           0.93    289735
   macro avg       0.81      0.94      0.86    289735
weighted avg       0.95      0.93      0.93    289735



In [65]:
metric_task_1.append(('lr', 'original', f1, precision, recall))

In [66]:
data_original_train.unpersist()
data_original_test.unpersist()

DataFrame[features: vector, label: int]

In [127]:
data_norm_inf_train.cache()
data_norm_inf_test.cache()

DataFrame[label: int, norm_inf: vector]

In [128]:
start = time.time()
lr_tuning_norm_inf = lr_train_tuning_model(data_norm_inf_train,'norm_inf')
end = time.time()

IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [129]:
pred = lr_tuning_norm_inf.transform(data_norm_inf_test)
f1 = metric_model_f1(pred)
precision = metric_model_precision(pred)
recall = metric_model_recall(pred)
dict_model_task_1['lr_norm_inf'] = lr_tuning_norm_inf

IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)

IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [130]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))

[[284986  76877]
 [  8235   7300]]


In [131]:
metric_task_1.append(('lr', 'norm_inf', f1, precision, recall))

In [132]:
data_norm_inf_train.unpersist()
data_norm_inf_test.unpersist()

DataFrame[label: int, norm_inf: vector]

In [133]:
data_norm_1_train.cache()
data_norm_1_test.cache()

DataFrame[label: int, norm_1: vector]

In [134]:
start = time.time()
lr_tuning_norm_1 = lr_train_tuning_model(data_norm_1_train,'norm_1')
end = time.time()

IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [135]:
pred = lr_tuning_norm_1.transform(data_norm_1_test)
f1 = metric_model_f1(pred)
precision = metric_model_precision(pred)
recall = metric_model_recall(pred)
dict_model_task_1['lr_norm_1'] = lr_tuning_norm_1

IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)

IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [136]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))

[[226827 135036]
 [  5916   9619]]


In [137]:
metric_task_1.append(('lr', 'norm_1', f1, precision, recall))

In [138]:
data_norm_1_train.unpersist()
data_norm_1_test.unpersist()

DataFrame[label: int, norm_1: vector]

In [139]:
data_standard_scaler_train.cache()
data_standard_scaler_test.cache()

DataFrame[label: int, scaled_features: vector]

In [140]:
start = time.time()
lr_tuning_scaled_features = lr_train_tuning_model(data_standard_scaler_train,'scaled_features')
end = time.time()

IOPub message rate exceeded.
The notebook server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--NotebookApp.iopub_msg_rate_limit`.

Current values:
NotebookApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
NotebookApp.rate_limit_window=3.0 (secs)



In [141]:
pred = lr_tuning_scaled_features.transform(data_standard_scaler_test)
f1 = metric_model_f1(pred)
precision = metric_model_precision(pred)
recall = metric_model_recall(pred)
dict_model_task_1['lr_scaled_features'] = lr_tuning_scaled_features

In [142]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))

[[233197 128666]
 [  6064   9471]]


In [143]:
metric_task_1.append(('lr', 'scaled_features', f1, precision, recall))

In [144]:
data_standard_scaler_train.unpersist()
data_standard_scaler_test.unpersist()

DataFrame[label: int, scaled_features: vector]

In [145]:
data_minmax_scaler_train.cache()
data_minmax_scaler_test.cache()

DataFrame[label: int, mmscaled_features: vector]

In [146]:
start = time.time()
lr_tuning_mmscaled_features = lr_train_tuning_model(data_minmax_scaler_train,'mmscaled_features')
end = time.time()

In [147]:
pred = lr_tuning_mmscaled_features.transform(data_minmax_scaler_test)
f1 = metric_model_f1(pred)
precision = metric_model_precision(pred)
recall = metric_model_recall(pred)
dict_model_task_1['lr_mmscaled_features'] = lr_tuning_mmscaled_features

In [148]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))

[[277408  84455]
 [  7772   7763]]


In [149]:
metric_task_1.append(('lr', 'mmscaled_features', f1, precision, recall))

In [150]:
data_minmax_scaler_train.unpersist()
data_minmax_scaler_test.unpersist()

DataFrame[label: int, mmscaled_features: vector]

In [151]:
data_maxabs_scaler_train.cache()
data_maxabs_scaler_test.cache()

DataFrame[label: int, mascaled_features: vector]

In [152]:
start = time.time()
lr_tuning_mascaled_features = lr_train_tuning_model(data_maxabs_scaler_train,'mascaled_features')
end = time.time()

In [153]:
pred = lr_tuning_mascaled_features.transform(data_maxabs_scaler_test)
f1 = metric_model_f1(pred)
precision = metric_model_precision(pred)
recall = metric_model_recall(pred)
dict_model_task_1['lr_mascaled_features'] = lr_tuning_mascaled_features

In [154]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))

[[235530 126333]
 [  6157   9378]]


In [155]:
metric_task_1.append(('lr', 'mascaled_features', f1, precision, recall))

In [156]:
data_maxabs_scaler_train.unpersist()
data_maxabs_scaler_test.unpersist()

DataFrame[label: int, mascaled_features: vector]

## RF

In [None]:
data_original_train.cache()
data_original_test.cache()

In [None]:
start = time.time()
rf_tuning_original = rf_train_tuning_model(data_original_train,'features')
end = time.time()

In [None]:
pred = rf_tuning_original.transform(data_original_test)
f1 = metric_model_f1(pred)
precision = metric_model_precision(pred)
recall = metric_model_recall(pred)
dict_model_task_1['rf_original'] = rf_tuning_original

In [None]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))

In [None]:
metric_task_1.append(('rf', 'original', f1, precision, recall))

In [None]:
data_original_train.unpersist()
data_original_test.unpersist()

## GBT

In [None]:
data_original_train.cache()
data_original_test.cache()

In [None]:
start = time.time()
gbt_tuning_original = gbt_train_tuning_model(data_original_train,'features')
end = time.time()

In [None]:
pred = gbt_tuning_original.transform(data_original_test)
f1 = metric_model_f1(pred)
precision = metric_model_precision(pred)
recall = metric_model_recall(pred)
dict_model_task_1['gbt_original'] = gbt_tuning_original

In [None]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))

In [None]:
metric_task_1.append(('gbt', 'original', f1, precision, recall))

In [None]:
data_original_train.unpersist()
data_original_test.unpersist()

## DT

In [None]:
data_original_train.cache()
data_original_test.cache()

In [None]:
start = time.time()
dt_tuning_original = dt_train_tuning_model(data_original_train,'features')
end = time.time()

In [None]:
pred = dt_tuning_original.transform(data_original_test)
f1 = metric_model_f1(pred)
precision = metric_model_precision(pred)
recall = metric_model_recall(pred)
dict_model_task_1['dt_original'] = dt_tuning_original

In [None]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))

In [None]:
metric_task_1.append(('dt', 'original', f1, precision, recall))

In [None]:
data_original_train.unpersist()
data_original_test.unpersist()

## Metric task 1

In [67]:
metric_columns_task_1 = ["name_model","normalize","f1","precision","recall"]
metric_df_task_1 = spark.createDataFrame(data=metric_task_1, schema = metric_columns_task_1)

In [68]:
metric_df_task_1 = metric_df_task_1.sort(col('f1').desc())

In [69]:
metric_df_task_1.show()

+----------+---------+------------------+------------------+------------------+
|name_model|normalize|                f1|         precision|            recall|
+----------+---------+------------------+------------------+------------------+
|        lr| original|0.7639848499732582|0.6335215958871873|0.9621168384879725|
+----------+---------+------------------+------------------+------------------+



## Extract best model task 1

In [70]:
name_best = metric_df_task_1.select('name_model').collect()[0][0]
normalize_best = metric_df_task_1.select('normalize').collect()[0][0]
name_best_model = name_best + '_' + normalize_best
best_model = dict_model_task_1[name_best_model]

In [71]:
best_model

LogisticRegressionModel: uid=LogisticRegression_205e72dc0a21, numClasses=2, numFeatures=9

In [76]:
best_model.write().overwrite().save('k8://saved_model')

Py4JJavaError: An error occurred while calling o1924.save.
: org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "k8"
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3281)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3301)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:124)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3352)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3320)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:479)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:361)
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:673)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:167)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Unknown Source)


In [83]:
a = LogisticRegressionModel.load("saved_model")

Py4JJavaError: An error occurred while calling o1835.load.
: java.lang.UnsupportedOperationException: empty collection
	at org.apache.spark.rdd.RDD.$anonfun$first$1(RDD.scala:1465)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.first(RDD.scala:1463)
	at org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:587)
	at org.apache.spark.ml.classification.LogisticRegressionModel$LogisticRegressionModelReader.load(LogisticRegression.scala:1298)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Unknown Source)


## Doing predict Task 1

In [73]:
def transform_data_bc_predict():
    # Slicing data agar 6 bulan saja
    order_bc_tmp = order_bc.select('order_id', 'date_order', 'customer_id','brand')
    predict_month = order_bc_tmp.agg(trunc(max(col('date_order')),'month')).collect()[0][0]
    min_month = predict_month + relativedelta(months=-5)
    order_bc_extract = order_bc_tmp.filter((trunc(col('date_order'),'month') >= min_month))
    
    # Ekstrak fitur quantity, days_diff, dc_code, dan count_order
    qty_order = order_product_bc.groupBy('order_id').agg(sum('line_subtotal').alias('revenue'), sum('qty').alias('qty'))\
    .select('order_id', 'revenue','qty')

    dc_user = order_shipping_bc.select('order_id', 'shipping_province')
    
    days_diff = order_bc_extract.select('date_order', 'customer_id')
    days_diff = days_diff.withColumn('date_only', to_date(col('date_order')))
    w = Window().partitionBy('customer_id').orderBy(['customer_id', 'date_order'])
    days_diff = days_diff.dropDuplicates(["customer_id","date_only"])
    days_diff = days_diff.select("*", lag("date_only").over(w).alias("new_col_1"))\
    .withColumn('days_diff', datediff(col('date_only'),col("new_col_1"))).fillna(999,subset=['days_diff'])
    days_diff = days_diff.groupby("customer_id").agg(min('days_diff').alias('days_diff'))
    
    order_bc_extract = order_bc_extract.join(qty_order, ['order_id'])
    order_bc_extract = order_bc_extract.join(dc_user, ['order_id'])
    order_bc_extract = order_bc_extract.withColumn('count_order', lit(1))
    order_bc_extract = order_bc_extract.withColumn('max_extract_month', lit(predict_month))
    
    final_data = order_bc_extract.groupBy('customer_id')\
    .agg(sum('count_order').alias('frequency'),sum('revenue').alias('revenue'), max('shipping_province').alias('dc_code'),
         max('date_order').alias('max_tgl'),max('max_extract_month').alias('max_extract_month'),
         size(collect_set('brand')).alias('brand'), sum('qty').alias('qty'))\
    .withColumn('recency', datediff(last_day(col('max_extract_month')), col('max_tgl')))\
    .select('customer_id','frequency','revenue','recency','dc_code','qty','brand')
    
    final_data = final_data.join(days_diff, ['customer_id'])
    
    return final_data

In [74]:
def transform_data_shopee_predict():
    order_shopee_tmp = order_shopee.select('ordersn', 'create_time', 'buyer_username', 'recipient_province','shopid')
    
    predict_month = order_shopee_tmp.agg(trunc(max(col('create_time')),'month')).collect()[0][0]
    min_month = predict_month + relativedelta(months=-5)
    
    order_shopee_extract = order_shopee_tmp.filter((trunc(col('create_time'),'month') >= min_month))
    
    qty_order = order_product_shopee.groupBy('ordersn').agg(sum('original_price').alias('revenue'), sum('qty_purchased').alias('qty'))\
    .select('ordersn','revenue','qty')
    
    days_diff = order_shopee_extract.select('create_time', 'buyer_username')
    days_diff = days_diff.withColumn('date_only', to_date(col('create_time')))
    w = Window().partitionBy('buyer_username').orderBy(['buyer_username', 'create_time'])
    days_diff = days_diff.dropDuplicates(["buyer_username","date_only"])
    days_diff = days_diff.select("*", lag("date_only").over(w).alias("new_col_1"))\
    .withColumn('days_diff', datediff(col('date_only'),col("new_col_1"))).fillna(999,subset=['days_diff'])
    days_diff = days_diff.groupby("buyer_username").agg(min('days_diff').alias('days_diff'))
    
    order_shopee_extract = order_shopee_extract.join(qty_order, ['ordersn'])
    order_shopee_extract = order_shopee_extract.withColumn('count_order', lit(1))
    order_shopee_extract = order_shopee_extract.withColumnRenamed('buyer_username','customer_id')
    order_shopee_extract = order_shopee_extract.withColumn('max_extract_month', lit(predict_month))
    
    final_data = order_shopee_extract.groupBy('customer_id')\
    .agg(sum('count_order').alias('frequency'),max('recipient_province').alias('dc_code'),sum('revenue').alias('revenue'),
         max('create_time').alias('max_tgl'),max('max_extract_month').alias('max_extract_month'),
         size(collect_set('shopid')).alias('brand'),sum('qty').alias('qty'))\
    .withColumn('recency', datediff(last_day(col('max_extract_month')), col('max_tgl')))\
    .select('customer_id','frequency','revenue','recency','dc_code','qty','brand')
    
    days_diff = days_diff.withColumnRenamed('buyer_username','customer_id')
    final_data = final_data.join(days_diff, ['customer_id'])
    
    return final_data

In [75]:
def combine_feature_predict(data):
    indexer = StringIndexer(inputCol='dc_code', outputCol='dc_code_num')
    indexd_data=indexer.fit(data).transform(data)
    encoder = OneHotEncoder(inputCol='dc_code_num', outputCol = 'dc_code_vec')
    onehotdata = encoder.fit(indexd_data).transform(indexd_data)
    va = VectorAssembler(outputCol='features')
    va.setInputCols(['days_diff','dc_code_vec','qty','brand'])
    combine_data = va.transform(onehotdata).select(['customer_id', 'features'])
    return combine_data

In [76]:
def transform_predict_data_norm_1(data):
    normalizer = Normalizer(inputCol="features")
    data = normalizer.transform(data, {normalizer.p: 1, normalizer.outputCol:"norm_1"})
    return data.select('customer_id',"norm_1")

def transform_predict_data_norm_inf(data):
    normalizer = Normalizer(inputCol="features")
    data = normalizer.transform(data, {normalizer.p: float("inf"), normalizer.outputCol:"norm_inf"})
    return data.select('customer_id',"norm_inf")

def transform_predict_data_standard_scaler(data):
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
    scalerModel = scaler.fit(data)
    data = scalerModel.transform(data)
    return data.select('customer_id',"scaled_features")

def transform_predict_data_minmax_scaler(data):
    mmscaler = MinMaxScaler(inputCol="features", outputCol="mmscaled_features")
    mmscalerModel = mmscaler.fit(data)
    data = mmscalerModel.transform(data)
    return data.select('customer_id',"mmscaled_features")

def transform_predict_data_maxabs_scaler(data):
    mascaler = MaxAbsScaler(inputCol="features", outputCol="mascaled_features")
    mascalerModel = mascaler.fit(data)
    data = mascalerModel.transform(data)
    return data.select('customer_id',"mascaled_features")

In [77]:
predict_bc = transform_data_bc_predict()

In [78]:
predict_shopee = transform_data_shopee_predict()

In [79]:
final_data_pred = predict_bc.union(predict_shopee)

In [80]:
predict_total_data = combine_feature_predict(final_data_pred)

In [81]:
if normalize_best == 'original':
    predict_total_data = predict_total_data
elif normalize_best == 'norm_1':
    predict_total_data = transform_predict_data_norm_1(predict_total_data)
elif normalize_best == 'norm_inf':
    predict_total_data = transform_predict_data_norm_inf(predict_total_data)
elif normalize_best == 'standard_scaler':
    predict_total_data = transform_predict_data_standard_scaler(predict_total_data)
elif normalize_best == 'minmax_scaler':
    predict_total_data = transform_predict_data_minmax_scaler(predict_total_data)
elif normalize_best == 'maxabs_scaler':
    predict_total_data = transform_predict_data_maxabs_scaler(predict_total_data)

In [82]:
prediction = best_model.transform(predict_total_data)

In [83]:
hasil_pred_1 = prediction.select('customer_id', 'prediction').withColumnRenamed('prediction','prediksi')
hasil_pred_1 = hasil_pred_1.withColumn('potensial_status', when(col('prediksi') == 1, 'potensial').when(col('prediksi') == 0, 'tidak potensial'))
hasil_pred_1 = hasil_pred_1.drop('prediksi')

# Extract potential user for task 2 & 3

In [84]:
user_potential_test = final_data.filter("label == 1").select('customer_id')

In [85]:
user_potential_pred = prediction.filter("prediction == 1").select('customer_id')

# Extract feature for Pattern Mining for Task 2

In [86]:
def fp_train_model(train_data, item_col):
    fpGrowth = FPGrowth(itemsCol=item_col, minSupport=0.001, minConfidence=0.001)
    model_fp = fpGrowth.fit(train_data)
    return model_fp

## Extract metric task 2

In [87]:
def bc_task_2():
    order_bc_tmp = order_bc.select('order_id', 'date_order', 'customer_id')
    
    predict_month = order_bc_tmp.agg(trunc(max(col('date_order')),'month')).collect()[0][0]
    
    min_month = predict_month + relativedelta(months=-6)
    
    order_bc_extract = order_bc_tmp.filter((trunc(col('date_order'),'month') < predict_month) & 
                                           (trunc(col('date_order'),'month') >= min_month))
    
    items_purchase = order_product_bc.groupBy('order_id').agg(collect_set('product_name').alias('items'))\
    .select('order_id', 'items')
    
    order_bc_extract = order_bc_extract.join(items_purchase, ['order_id'])

    final_data = order_bc_extract.withColumn("explode", explode(col("items")))
    final_data = final_data.groupBy('customer_id')\
    .agg(collect_set('explode').alias('lst_items_pred'))\
    .select('customer_id','lst_items_pred')
    
    order_bc_extract_2 = order_bc_tmp.filter((trunc(col('date_order'),'month') == predict_month))
    
    order_bc_extract_2 = order_bc_extract_2.join(items_purchase, ['order_id'])
    
    final_data_2 = order_bc_extract_2.withColumn("explode", explode(col("items")))
    final_data_2 = final_data_2.groupBy('customer_id')\
    .agg(collect_set('explode').alias('lst_items_true'))\
    .select('customer_id','lst_items_true')
    
    final_data = final_data.join(final_data_2,['customer_id'],'left')
    
    return final_data

In [88]:
def shopee_task_2():
    order_shopee_tmp = order_shopee.select('ordersn', 'create_time', 'buyer_username')
    
    predict_month = order_shopee_tmp.agg(trunc(max(col('create_time')),'month')).collect()[0][0]
    
    min_month = predict_month + relativedelta(months=-6)
    
    order_shopee_extract = order_shopee_tmp.filter((trunc(col('create_time'),'month') < predict_month) & 
                                                   (trunc(col('create_time'),'month') >= min_month))
    
    items_purchase = order_product_shopee.groupBy('ordersn').agg(collect_set('item_name').alias('items'))\
    .select('ordersn', 'items')
    
    order_shopee_extract = order_shopee_extract.join(items_purchase, ['ordersn'])
    order_shopee_extract = order_shopee_extract.withColumnRenamed('buyer_username','customer_id')
    
    final_data = order_shopee_extract.withColumn("explode", explode(col("items")))
    final_data = final_data.groupBy('customer_id')\
    .agg(collect_set('explode').alias('lst_items_pred'))\
    .select('customer_id','lst_items_pred')
    
    order_shopee_extract_2 = order_shopee_tmp.filter((trunc(col('create_time'),'month') == predict_month))
    
    order_shopee_extract_2 = order_shopee_extract_2.join(items_purchase, ['ordersn'])
    order_shopee_extract_2 = order_shopee_extract_2.withColumnRenamed('buyer_username','customer_id')
    order_shopee_extract_2 = order_shopee_extract_2.withColumnRenamed('shopid','brand')
    
    final_data_2 = order_shopee_extract_2.withColumn("explode", explode(col("items")))
    final_data_2 = final_data_2.groupBy('customer_id')\
    .agg(collect_set('explode').alias('lst_items_true'))\
    .select('customer_id','lst_items_true')
    
    final_data = final_data.join(final_data_2,['customer_id'],'left')
    
    return final_data

In [89]:
data_bc_task_2 = bc_task_2()

In [90]:
data_shopee_task_2 = shopee_task_2()

In [91]:
data_total_task_2 = data_bc_task_2.union(data_shopee_task_2)

In [92]:
data_total_task_2 = data_total_task_2.withColumn("lst_items_true", coalesce('lst_items_true', array()))

In [93]:
data_total_task_2.cache()

DataFrame[customer_id: string, lst_items_pred: array<string>, lst_items_true: array<string>]

In [94]:
model_fp = fp_train_model(data_total_task_2,'lst_items_pred')

In [95]:
# data_user_potential_task_2 = data_total_task_2.join(user_potential_test, ['customer_id'])

In [96]:
# result_task_2 = model_fp.transform(data_user_potential_task_2)

In [97]:
# result_task_2 = result_task_2.withColumn("intersect", size(array_intersect("lst_items_true", "prediction")))\
# .withColumn("union", size(array_union("lst_items_true", "prediction"))).withColumn("score", col('intersect')/col('union')).fillna(value=0)

In [98]:
# score = result_task_2.agg(avg(col('score'))).collect()[0][0]

In [99]:
# score * 100

## Doing predict task 2

In [100]:
def transform_data_bc_2():
    order_bc_tmp = order_bc.select('order_id', 'date_order', 'customer_id')
    
    predict_month = order_bc_tmp.agg(trunc(max(col('date_order')),'month')).collect()[0][0]
    
    min_month = predict_month + relativedelta(months=-5)
    
    order_bc_extract = order_bc_tmp.filter((trunc(col('date_order'),'month') >= min_month))
    
    order_bc_extract = order_bc_extract.join(user_potential_pred,['customer_id'])
    
    items_purchase = order_product_bc.groupBy('order_id').agg(collect_set('product_name').alias('items'))\
    .select('order_id', 'items')
    
    order_bc_extract = order_bc_extract.join(items_purchase, ['order_id'])

    final_data = order_bc_extract.withColumn("explode", explode(col("items")))
    final_data = final_data.groupBy('customer_id')\
    .agg(collect_set('explode').alias('lst_items_pred'))\
    .select('customer_id','lst_items_pred')
    
    return final_data

In [101]:
def transform_data_shopee_2():
    order_shopee_tmp = order_shopee.select('ordersn', 'create_time', 'buyer_username')
    
    predict_month = order_shopee_tmp.agg(trunc(max(col('create_time')),'month')).collect()[0][0]
    
    min_month = predict_month + relativedelta(months=-5)
    
    order_shopee_extract = order_shopee_tmp.filter((trunc(col('create_time'),'month') >= min_month))
    
    order_shopee_extract = order_shopee_extract.withColumnRenamed('buyer_username','customer_id')
    order_shopee_extract = order_shopee_extract.join(user_potential_pred,['customer_id'])
    
    items_purchase = order_product_shopee.groupBy('ordersn').agg(collect_set('item_name').alias('items'))\
    .select('ordersn', 'items')
    
    order_shopee_extract = order_shopee_extract.join(items_purchase, ['ordersn'])
    
    final_data = order_shopee_extract.withColumn("explode", explode(col("items")))
    final_data = final_data.groupBy('customer_id')\
    .agg(collect_set('explode').alias('lst_items_pred'))\
    .select('customer_id','lst_items_pred')
    
    return final_data

In [102]:
data_item_bc = transform_data_bc_2()
data_item_shopee = transform_data_shopee_2()
data_item_total = data_item_bc.union(data_item_shopee)

In [103]:
data_item_total.cache()

DataFrame[customer_id: string, lst_items_pred: array<string>]

In [104]:
result = model_fp.transform(data_item_total)

In [105]:
hasil_pred_2 = result.select('customer_id','prediction').withColumn("item_prediksi", explode(col("prediction"))).drop('prediction')

# Task 3

## Extract feature task 3

In [106]:
def transform_data_shopee_task_3():
    order_shopee_tmp = order_shopee.select('buyer_username','create_time')
    
    predict_month = order_shopee_tmp.agg(trunc(max(col('create_time')),'month')).collect()[0][0]
    
    min_month = predict_month + relativedelta(months=-6)
    
    max_extract_month = predict_month + relativedelta(months=-1)
    
    order_shopee_extract = order_shopee_tmp.filter((trunc(col('create_time'),'month') < predict_month) & 
                                                   (trunc(col('create_time'),'month') >= min_month))
#     order_shopee_extract_label = order_shopee_tmp.filter((trunc(col('create_time'),'month') == predict_month))
    
    order_shopee_extract = order_shopee_extract.withColumnRenamed('buyer_username','customer_id')
#     order_shopee_extract_label = order_shopee_extract_label.withColumnRenamed('buyer_username','customer_id')
#     order_shopee_extract_label = order_shopee_extract_label.join(user_potential_test,['customer_id'])
    order_shopee_extract = order_shopee_extract.join(user_potential_test,['customer_id'])
    
    mean_day_diff = order_shopee_extract.withColumn('date_only', to_date(col('create_time'))).drop('create_time')
    w_mean = Window().partitionBy('customer_id').orderBy(['customer_id', 'date_only'])
    mean_day_diff = mean_day_diff.dropDuplicates(["customer_id","date_only"])
    mean_day_diff = mean_day_diff.select("*", lag("date_only",1).over(w_mean).alias("t1_trans_date"))
    mean_day_diff = mean_day_diff.withColumn('t1_day_diff', datediff(col('date_only'),col("t1_trans_date")))
    mean_day_diff = mean_day_diff.groupby('customer_id').agg(mean('t1_day_diff').alias('mean_day_diff'))
    mean_day_diff = mean_day_diff.fillna(999)
    
#     max_purchase = order_shopee_extract.groupby('customer_id').agg(to_date(max('create_time')).alias('max_purchase'))
#     min_purchase = order_shopee_extract_label.groupby('customer_id').agg(to_date(min('create_time')).alias('min_purchase'))
    
#     tx_purchase_dates =  max_purchase.join(min_purchase, ['customer_id'], 'left')\
#                                      .withColumn('next_purchase', datediff(col('min_purchase'), col('max_purchase')))\
#                                      .drop('min_purchase', 'max_purchase')
    
#     tx_purchase_dates = tx_purchase_dates.withColumn('label', lit(2))
#     tx_purchase_dates = tx_purchase_dates.withColumn('label', when(col('next_purchase') > 20, 1).otherwise(col('label')))
#     tx_purchase_dates = tx_purchase_dates.withColumn('label', when(col('next_purchase') > 50, 0).otherwise(col('label')))
    
#     max_all_tgl = order_shopee_extract.agg(max('create_time')).collect()[0][0]
#     order_shopee_extract = order_shopee_extract.withColumn('max_all_tgl', lit(max_all_tgl))
    
    order_shopee_extract = order_shopee_extract.withColumn('max_extract_month', lit(max_extract_month))
    order_shopee_extract = order_shopee_extract.withColumn('date_only', to_date(col('create_time'))).drop('create_time')
    
    label = order_shopee_extract.dropDuplicates(["customer_id","date_only"])
    label = label.withColumn('days', date_format(col("date_only"), "d"))
    label = label.withColumn('week', when(col('days')>21,4).when(col('days')>14,3)\
                                                          .when(col('days')>7,2).otherwise(1))
    
    label = label.groupby('customer_id','week').agg(count('week').alias('count'))
    w_label = Window().partitionBy('customer_id').orderBy(col('count').desc(),col('week').desc())
    label = label.withColumn('Rank',row_number().over(w_label)).filter("Rank == 1").drop('Rank','count')
    label = label.withColumnRenamed('week','label')
    
#     order_shopee_extract = order_shopee_extract.groupby('customer_id').agg(round(mean(col('week'))).alias('label'))
    
    order_shopee_extract = order_shopee_extract.withColumn('recency', datediff(last_day(col('max_extract_month')), col('date_only')))
    w = Window().partitionBy('customer_id').orderBy(['customer_id', 'date_only'])
    order_shopee_extract = order_shopee_extract.dropDuplicates(["customer_id","date_only"])
    order_shopee_extract = order_shopee_extract.select("*", lag("date_only",1).over(w).alias("t1_trans_date"))
    order_shopee_extract = order_shopee_extract.select("*", lag("date_only",2).over(w).alias("t2_trans_date"))
    order_shopee_extract = order_shopee_extract.select("*", lag("date_only",3).over(w).alias("t3_trans_date"))
    
#     order_shopee_extract = order_shopee_extract.withColumn('t1_day_diff', datediff(col('date_only'),col("t1_trans_date")))\
#                                                .withColumn('t2_day_diff', datediff(col('date_only'),col("t2_trans_date")))\
#                                                .withColumn('t3_day_diff', datediff(col('date_only'),col("t3_trans_date")))

    order_shopee_extract = order_shopee_extract.withColumn('t1_day', date_format(col("date_only"), "d").cast(IntegerType()))\
                                       .withColumn('t2_day', date_format(col("t1_trans_date"), "d").cast(IntegerType()))\
                                       .withColumn('t3_day', date_format(col("t3_trans_date"), "d").cast(IntegerType()))
    
#     order_shopee_extract = order_shopee_extract.withColumn('last_1_day', date_format(col("date_only"), "d").cast(IntegerType()))
#     order_shopee_extract = order_shopee_extract.withColumn('last_1_week', when(col('last_1_day')>21,4).when(col('last_1_day')>14,3)\
#                                                           .when(col('last_1_day')>7,2).otherwise(1))

    order_shopee_extract = order_shopee_extract.fillna(999)

    order_shopee_extract = order_shopee_extract.withColumn('last_1_week', when(col('t1_day')>21,4).when(col('t1_day')>14,3)\
                                                          .when(col('t1_day')>7,2).otherwise(1))
    order_shopee_extract = order_shopee_extract.withColumn('last_2_week', when(col('t2_day')>21,4).when(col('t2_day')>14,3)\
                                                          .when(col('t2_day')>7,2).otherwise(1))
    order_shopee_extract = order_shopee_extract.withColumn('last_3_week', when(col('t3_day')>21,4).when(col('t3_day')>14,3)\
                                                          .when(col('t3_day')>7,2).otherwise(1))
    
#     date_diff_std_mean = order_shopee_extract.groupby('customer_id').agg(mean('t1_day_diff').alias('mean_day_diff'),
#                                                                          stddev('t1_day_diff').alias('std_day_diff'))
    
    w_2 = Window().partitionBy('customer_id').orderBy(col('date_only').desc())
    order_shopee_extract = order_shopee_extract.withColumn('Rank',row_number().over(w_2)).filter("Rank == 1").drop('Rank')
    
#     order_shopee_extract = order_shopee_extract.join(date_diff_std_mean, ['customer_id'])
#     order_shopee_extract = order_shopee_extract.fillna(0,subset=['std_day_diff'])
#     order_shopee_extract = order_shopee_extract.join(tx_purchase_dates, ['customer_id'])
    order_shopee_extract = order_shopee_extract.join(mean_day_diff, ['customer_id'])
    order_shopee_extract = order_shopee_extract.join(label, ['customer_id'])
    order_shopee_extract = order_shopee_extract.drop('date_only', 't1_trans_date', 't2_trans_date', 't3_trans_date','max_extract_month','days','week')
    
    return order_shopee_extract

In [107]:
def transform_data_bc_task_3():
    order_bc_tmp = order_bc.select('date_order', 'customer_id')
    
    predict_month = order_bc_tmp.agg(trunc(max(col('date_order')),'month')).collect()[0][0]
    
    min_month = predict_month + relativedelta(months=-6)
    
    max_extract_month = predict_month + relativedelta(months=-1)
    
    order_bc_extract = order_bc_tmp.filter((trunc(col('date_order'),'month') < predict_month) & 
                                           (trunc(col('date_order'),'month') >= min_month))
#     order_bc_extract_label = order_bc_tmp.filter((trunc(col('date_order'),'month') == predict_month))
    
    order_bc_extract = order_bc_extract.join(user_potential_test,['customer_id'])
    
    mean_day_diff = order_bc_extract.withColumn('date_only', to_date(col('date_order'))).drop('date_order')
    w_mean = Window().partitionBy('customer_id').orderBy(['customer_id', 'date_only'])
    mean_day_diff = mean_day_diff.dropDuplicates(["customer_id","date_only"])
    mean_day_diff = mean_day_diff.select("*", lag("date_only",1).over(w_mean).alias("t1_trans_date"))
    mean_day_diff = mean_day_diff.withColumn('t1_day_diff', datediff(col('date_only'),col("t1_trans_date")))
    mean_day_diff = mean_day_diff.groupby('customer_id').agg(mean('t1_day_diff').alias('mean_day_diff'))
    mean_day_diff = mean_day_diff.fillna(999)
    
#     order_bc_extract_label = order_bc_extract_label.join(user_potential_test,['customer_id'])
    
#     max_purchase = order_bc_extract.groupby('customer_id').agg(to_date(max('date_order')).alias('max_purchase'))
#     min_purchase = order_bc_extract_label.groupby('customer_id').agg(to_date(min('date_order')).alias('min_purchase'))
    
#     tx_purchase_dates =  max_purchase.join(min_purchase, ['customer_id'], 'left')\
#                                      .withColumn('next_purchase', datediff(col('min_purchase'), col('max_purchase')))\
#                                      .drop('min_purchase', 'max_purchase')
    
#     tx_purchase_dates = tx_purchase_dates.withColumn('label', lit(2))
#     tx_purchase_dates = tx_purchase_dates.withColumn('label', when(col('next_purchase') > 20, 1).otherwise(col('label')))
#     tx_purchase_dates = tx_purchase_dates.withColumn('label', when(col('next_purchase') > 50, 0).otherwise(col('label')))
    
#     max_all_tgl = order_bc_extract.agg(max('date_order')).collect()[0][0]
#     order_bc_extract = order_bc_extract.withColumn('max_all_tgl', lit(max_all_tgl))
                                                      
    order_bc_extract = order_bc_extract.withColumn('max_extract_month', lit(max_extract_month))
    order_bc_extract = order_bc_extract.withColumn('date_only', to_date(col('date_order'))).drop('date_order')
    
    label = order_bc_extract.dropDuplicates(["customer_id","date_only"])
    label = label.withColumn('days', date_format(col("date_only"), "d"))
    label = label.withColumn('week', when(col('days')>21,4).when(col('days')>14,3)\
                                                          .when(col('days')>7,2).otherwise(1))
    label = label.groupby('customer_id','week').agg(count('week').alias('count'))
    w_label = Window().partitionBy('customer_id').orderBy(col('count').desc(),col('week').desc())
    label = label.withColumn('Rank',row_number().over(w_label)).filter("Rank == 1").drop('Rank','count')
    
    label = label.withColumnRenamed('week','label')
    
#     order_bc_extract = order_bc_extract.groupby('customer_id').agg(round(mean(col('week'))).alias('label'))
                                                      
    order_bc_extract = order_bc_extract.withColumn('recency', datediff(last_day(col('max_extract_month')), col('date_only')))
    w = Window().partitionBy('customer_id').orderBy(['customer_id', 'date_only'])
    order_bc_extract = order_bc_extract.dropDuplicates(["customer_id","date_only"])
    order_bc_extract = order_bc_extract.select("*", lag("date_only",1).over(w).alias("t1_trans_date"))
    order_bc_extract = order_bc_extract.select("*", lag("date_only",2).over(w).alias("t2_trans_date"))
    order_bc_extract = order_bc_extract.select("*", lag("date_only",3).over(w).alias("t3_trans_date"))
    
#     order_bc_extract = order_bc_extract.withColumn('t1_day_diff', datediff(col('date_only'),col("t1_trans_date")))\
#                                        .withColumn('t2_day_diff', datediff(col('date_only'),col("t2_trans_date")))\
#                                        .withColumn('t3_day_diff', datediff(col('date_only'),col("t3_trans_date")))

    order_bc_extract = order_bc_extract.withColumn('t1_day', date_format(col("date_only"), "d").cast(IntegerType()))\
                                       .withColumn('t2_day', date_format(col("t1_trans_date"), "d").cast(IntegerType()))\
                                       .withColumn('t3_day', date_format(col("t3_trans_date"), "d").cast(IntegerType()))
    
#     order_bc_extract = order_bc_extract.withColumn('last_1_day', date_format(col("date_only"), "d").cast(IntegerType()))

    order_bc_extract = order_bc_extract.fillna(999)
    
    order_bc_extract = order_bc_extract.withColumn('last_1_week', when(col('t1_day')>21,4).when(col('t1_day')>14,3)\
                                                          .when(col('t1_day')>7,2).otherwise(1))
    order_bc_extract = order_bc_extract.withColumn('last_2_week', when(col('t2_day')>21,4).when(col('t2_day')>14,3)\
                                                          .when(col('t2_day')>7,2).otherwise(1))
    order_bc_extract = order_bc_extract.withColumn('last_3_week', when(col('t3_day')>21,4).when(col('t3_day')>14,3)\
                                                          .when(col('t3_day')>7,2).otherwise(1))
    
#     date_diff_std_mean = order_bc_extract.groupby('customer_id').agg(mean('t1_day_diff').alias('mean_day_diff'),
#                                                                      stddev('t1_day_diff').alias('std_day_diff'))
    
    w_2 = Window().partitionBy('customer_id').orderBy(col('date_only').desc())
    order_bc_extract = order_bc_extract.withColumn('Rank',row_number().over(w_2)).filter("Rank == 1").drop('Rank')
    
#     order_bc_extract = order_bc_extract.join(date_diff_std_mean, ['customer_id'])
#     order_bc_extract = order_bc_extract.fillna(0,subset=['std_day_diff'])
#     order_bc_extract = order_bc_extract.join(tx_purchase_dates, ['customer_id'])
    order_bc_extract = order_bc_extract.join(mean_day_diff, ['customer_id'])
    order_bc_extract = order_bc_extract.join(label, ['customer_id'])
    order_bc_extract = order_bc_extract.drop('date_only', 't1_trans_date', 't2_trans_date', 't3_trans_date','max_extract_month','days','week')
    
    return order_bc_extract

In [108]:
data_shopee_task_3 = transform_data_shopee_task_3()
data_bc_task_3 = transform_data_bc_task_3()

In [109]:
data_final_task_3 = data_shopee_task_3.union(data_bc_task_3)

## prepare data for predict task 3

In [110]:
data_final_task_3.cache()

DataFrame[customer_id: string, recency: int, t1_day: int, t2_day: int, t3_day: int, last_1_week: int, last_2_week: int, last_3_week: int, mean_day_diff: double, label: int]

In [111]:
va = VectorAssembler(outputCol='features')
va.setInputCols(['recency','t1_day','t2_day','mean_day_diff','last_1_week','last_2_week'])
data_final_task_3 = va.transform(data_final_task_3).select(['features','label'])

In [112]:
train_task_3, test_task_3 = data_final_task_3.randomSplit([0.7, 0.3], seed = 42)

In [113]:
data_original_train_3 = train_task_3
data_original_test_3 = test_task_3

data_norm_1_train_3 = transform_data_norm_1(train_task_3)
data_norm_1_test_3 = transform_data_norm_1(test_task_3)

data_norm_inf_train_3 = transform_data_norm_inf(train_task_3)
data_norm_inf_test_3 = transform_data_norm_inf(test_task_3)

data_standard_scaler_train_3 = transform_data_standard_scaler(train_task_3)
data_standard_scaler_test_3 = transform_data_standard_scaler(test_task_3)

data_minmax_scaler_train_3 = transform_data_minmax_scaler(train_task_3)
data_minmax_scaler_test_3 = transform_data_minmax_scaler(test_task_3)

data_maxabs_scaler_train_3 = transform_data_maxabs_scaler(train_task_3)
data_maxabs_scaler_test_3 = transform_data_maxabs_scaler(test_task_3)

In [114]:
data_final_task_3.unpersist()

DataFrame[features: vector, label: int]

## predict task 3

In [115]:
dict_model_task_3 = {}
metric_task_3 = []

### LR

In [124]:
data_original_train_3.cache()
data_original_test_3.cache()

DataFrame[features: vector, label: int]

In [125]:
lr_task_3 = lr_train_tuning_model(data_original_train_3,'features')

In [126]:
pred = lr_task_3.transform(data_original_test_3)
accuracy = metric_model_accuracy(pred)
dict_model_task_3['lr_original'] = lr_task_3

In [127]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
f1 = metric_model_f1_multi(y_true,y_pred)
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))
metric_task_3.append(('lr', 'original', f1.item(), accuracy))

[[  463  1429     0   388]
 [  168 10620     0  1996]
 [  173  2324     0  2282]
 [   23  1670     0 14546]]
              precision    recall  f1-score   support

           1       0.56      0.20      0.30      2280
           2       0.66      0.83      0.74     12784
           3       0.00      0.00      0.00      4779
           4       0.76      0.90      0.82     16239

    accuracy                           0.71     36082
   macro avg       0.49      0.48      0.46     36082
weighted avg       0.61      0.71      0.65     36082



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [128]:
data_original_train_3.unpersist()
data_original_test_3.unpersist()

DataFrame[features: vector, label: int]

In [129]:
data_norm_inf_train_3.cache()
data_norm_inf_test_3.cache()

DataFrame[label: int, norm_inf: vector]

In [130]:
lr_task_3 = lr_train_tuning_model(data_norm_inf_train_3,'norm_inf')

In [131]:
pred = lr_task_3.transform(data_norm_inf_test_3)
accuracy = metric_model_accuracy(pred)
dict_model_task_3['lr_norm_inf'] = lr_task_3

In [132]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
f1 = metric_model_f1_multi(y_true,y_pred)
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))
metric_task_3.append(('lr', 'norm_inf', f1.item(), accuracy))

[[    0  1680     0   600]
 [    0  7876     0  4908]
 [    0  2685     0  2094]
 [    0  4504     0 11735]]
              precision    recall  f1-score   support

           1       0.00      0.00      0.00      2280
           2       0.47      0.62      0.53     12784
           3       0.00      0.00      0.00      4779
           4       0.61      0.72      0.66     16239

    accuracy                           0.54     36082
   macro avg       0.27      0.33      0.30     36082
weighted avg       0.44      0.54      0.49     36082



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [133]:
data_norm_inf_train_3.unpersist()
data_norm_inf_test_3.unpersist()

DataFrame[label: int, norm_inf: vector]

In [134]:
data_norm_1_train_3.cache()
data_norm_1_test_3.cache()

DataFrame[label: int, norm_1: vector]

In [135]:
lr_task_3 = lr_train_tuning_model(data_norm_1_train_3,'norm_1')

In [136]:
pred = lr_task_3.transform(data_norm_1_test_3)
accuracy = metric_model_accuracy(pred)
dict_model_task_3['lr_norm_1'] = lr_task_3

In [137]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
f1 = metric_model_f1_multi(y_true,y_pred)
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))
metric_task_3.append(('lr', 'norm_1', f1.item(), accuracy))

[[   58  1741     0   481]
 [    2  8477     0  4305]
 [    0  2754     0  2025]
 [    3  4634     0 11602]]
              precision    recall  f1-score   support

           1       0.92      0.03      0.05      2280
           2       0.48      0.66      0.56     12784
           3       0.00      0.00      0.00      4779
           4       0.63      0.71      0.67     16239

    accuracy                           0.56     36082
   macro avg       0.51      0.35      0.32     36082
weighted avg       0.51      0.56      0.50     36082



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [138]:
data_norm_1_train_3.unpersist()
data_norm_1_test_3.unpersist()

DataFrame[label: int, norm_1: vector]

In [139]:
data_standard_scaler_train_3.cache()
data_standard_scaler_test_3.cache()

DataFrame[label: int, scaled_features: vector]

In [140]:
lr_task_3 = lr_train_tuning_model(data_standard_scaler_train_3,'scaled_features')

In [141]:
pred = lr_task_3.transform(data_standard_scaler_test_3)
accuracy = metric_model_accuracy(pred)
dict_model_task_3['lr_scaled_features'] = lr_task_3

In [142]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
f1 = metric_model_f1_multi(y_true,y_pred)
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))
metric_task_3.append(('lr','scaled_features',f1.item(), accuracy))

[[  463  1429     0   388]
 [  166 10626     0  1992]
 [  173  2334     0  2272]
 [   23  1671     0 14545]]
              precision    recall  f1-score   support

           1       0.56      0.20      0.30      2280
           2       0.66      0.83      0.74     12784
           3       0.00      0.00      0.00      4779
           4       0.76      0.90      0.82     16239

    accuracy                           0.71     36082
   macro avg       0.50      0.48      0.46     36082
weighted avg       0.61      0.71      0.65     36082



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [143]:
data_standard_scaler_train_3.unpersist()
data_standard_scaler_test_3.unpersist()

DataFrame[label: int, scaled_features: vector]

In [144]:
data_minmax_scaler_train_3.cache()
data_minmax_scaler_test_3.cache()

DataFrame[label: int, mmscaled_features: vector]

In [145]:
lr_task_3 = lr_train_tuning_model(data_minmax_scaler_train_3,'mmscaled_features')

In [146]:
pred = lr_task_3.transform(data_minmax_scaler_test_3)
accuracy = metric_model_accuracy(pred)
dict_model_task_3['lr_mmscaled_features'] = lr_task_3

In [147]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
f1 = metric_model_f1_multi(y_true,y_pred)
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))
metric_task_3.append(('lr', 'mmscaled_features', f1.item(), accuracy))

[[  419  1460     0   401]
 [   73 10663     0  2048]
 [   43  2155     0  2581]
 [    7  1553     0 14679]]
              precision    recall  f1-score   support

           1       0.77      0.18      0.30      2280
           2       0.67      0.83      0.75     12784
           3       0.00      0.00      0.00      4779
           4       0.74      0.90      0.82     16239

    accuracy                           0.71     36082
   macro avg       0.55      0.48      0.46     36082
weighted avg       0.62      0.71      0.65     36082



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [148]:
data_minmax_scaler_train_3.unpersist()
data_minmax_scaler_test_3.unpersist()

DataFrame[label: int, mmscaled_features: vector]

In [149]:
data_maxabs_scaler_train_3.cache()
data_maxabs_scaler_test_3.cache()

DataFrame[label: int, mascaled_features: vector]

In [150]:
lr_task_3 = lr_train_tuning_model(data_maxabs_scaler_train_3,'mascaled_features')

In [151]:
pred = lr_task_3.transform(data_maxabs_scaler_test_3)
accuracy = metric_model_accuracy(pred)
dict_model_task_3['lr_mascaled_features'] = lr_task_3

In [152]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
f1 = metric_model_f1_multi(y_true,y_pred)
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))
metric_task_3.append(('lr','mascaled_features', f1.item(), accuracy))

[[  463  1429     0   388]
 [  168 10620     0  1996]
 [  173  2324     0  2282]
 [   23  1670     0 14546]]
              precision    recall  f1-score   support

           1       0.56      0.20      0.30      2280
           2       0.66      0.83      0.74     12784
           3       0.00      0.00      0.00      4779
           4       0.76      0.90      0.82     16239

    accuracy                           0.71     36082
   macro avg       0.49      0.48      0.46     36082
weighted avg       0.61      0.71      0.65     36082



  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [153]:
data_maxabs_scaler_train_3.unpersist()
data_maxabs_scaler_test_3.unpersist()

DataFrame[label: int, mascaled_features: vector]

### DT

In [154]:
data_original_train_3.cache()
data_original_test_3.cache()

DataFrame[features: vector, label: int]

In [155]:
dt_task_3 = dt_train_tuning_model(data_original_train_3,'features')

In [156]:
pred = dt_task_3.transform(data_original_test_3)
accuracy = metric_model_accuracy(pred)
dict_model_task_3['dt_original'] = dt_task_3

In [157]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
f1 = metric_model_f1_multi(y_true,y_pred)
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))
metric_task_3.append(('dt', 'original', f1.item(), accuracy))

[[ 1319   276   168   517]
 [  977  9604   586  1617]
 [   47   114  4355   263]
 [  106   238   380 15515]]
              precision    recall  f1-score   support

           1       0.54      0.58      0.56      2280
           2       0.94      0.75      0.83     12784
           3       0.79      0.91      0.85      4779
           4       0.87      0.96      0.91     16239

    accuracy                           0.85     36082
   macro avg       0.78      0.80      0.79     36082
weighted avg       0.86      0.85      0.85     36082



In [158]:
data_original_train_3.unpersist()
data_original_test_3.unpersist()

DataFrame[features: vector, label: int]

### RF

In [116]:
data_original_train_3.cache()
data_original_test_3.cache()

DataFrame[features: vector, label: int]

In [117]:
rf_task_3 = rf_train_tuning_model(data_original_train_3,'features')

In [118]:
pred = rf_task_3.transform(data_original_test_3)
accuracy = metric_model_accuracy(pred)
dict_model_task_3['rf_original'] = rf_task_3

In [119]:
from sklearn.metrics import classification_report, confusion_matrix
y_true = pred.select(['label']).collect()
y_pred = pred.select(['prediction']).collect()
f1 = metric_model_f1_multi(y_true,y_pred)
print(confusion_matrix(y_true,y_pred))
print(classification_report(y_true,y_pred))
metric_task_3.append(('rf', 'original', f1.item(), accuracy))

[[ 1148   445   168   518]
 [   17 10575   586  1617]
 [    7   154  4361   262]
 [   16   328   380 15509]]
              precision    recall  f1-score   support

           1       0.97      0.50      0.66      2279
           2       0.92      0.83      0.87     12795
           3       0.79      0.91      0.85      4784
           4       0.87      0.96      0.91     16233

    accuracy                           0.88     36091
   macro avg       0.89      0.80      0.82     36091
weighted avg       0.88      0.88      0.87     36091



In [120]:
data_original_train_3.unpersist()
data_original_test_3.unpersist()

DataFrame[features: vector, label: int]

## Export metric

In [121]:
metric_columns_task_3 = ["name_model","normalize","f1","accuracy"]
metric_df_task_3 = spark.createDataFrame(data=metric_task_3, schema = metric_columns_task_3)
metric_df_task_3 = metric_df_task_3.sort(['f1','accuracy'],ascending=False)

In [122]:
metric_df_task_3.show()

+----------+---------+------------------+-----------------+
|name_model|normalize|                f1|         accuracy|
+----------+---------+------------------+-----------------+
|        rf| original|0.8224569008303385|0.875370591006068|
+----------+---------+------------------+-----------------+



In [123]:
name_best_3 = metric_df_task_3.select('name_model').collect()[0][0]
normalize_best_3 = metric_df_task_3.select('normalize').collect()[0][0]
name_best_model_3 = name_best_3 + '_' + normalize_best_3
best_model_3 = dict_model_task_3[name_best_model_3]

In [124]:
best_model_3

RandomForestClassificationModel: uid=RandomForestClassifier_1ddb3a118396, numTrees=20, numClasses=5, numFeatures=6

In [None]:
best_model_3.write().overwrite().save("/home/notebook/model_3")

## Predict Task 3

In [125]:
def transform_data_shopee_task_3_pred():
    order_shopee_tmp = order_shopee.select('buyer_username','create_time')
    
    predict_month = order_shopee_tmp.agg(trunc(max(col('create_time')),'month')).collect()[0][0]
    
    min_month = predict_month + relativedelta(months=-5)
    
    order_shopee_extract = order_shopee_tmp.filter((trunc(col('create_time'),'month') >= min_month))
    
    order_shopee_extract = order_shopee_extract.withColumnRenamed('buyer_username','customer_id')
    
    order_shopee_extract = order_shopee_extract.join(user_potential_pred,['customer_id'])
    
    mean_day_diff = order_shopee_extract.withColumn('date_only', to_date(col('create_time'))).drop('create_time')
    w_mean = Window().partitionBy('customer_id').orderBy(['customer_id', 'date_only'])
    mean_day_diff = mean_day_diff.dropDuplicates(["customer_id","date_only"])
    mean_day_diff = mean_day_diff.select("*", lag("date_only",1).over(w_mean).alias("t1_trans_date"))
    mean_day_diff = mean_day_diff.withColumn('t1_day_diff', datediff(col('date_only'),col("t1_trans_date")))
    mean_day_diff = mean_day_diff.groupby('customer_id').agg(mean('t1_day_diff').alias('mean_day_diff'))
    mean_day_diff = mean_day_diff.fillna(999)
    
    order_shopee_extract = order_shopee_extract.withColumn('max_extract_month', lit(predict_month))
    order_shopee_extract = order_shopee_extract.withColumn('date_only', to_date(col('create_time'))).drop('create_time')
    
    order_shopee_extract = order_shopee_extract.withColumn('recency', datediff(last_day(col('max_extract_month')), col('date_only')))
    w = Window().partitionBy('customer_id').orderBy(['customer_id', 'date_only'])
    order_shopee_extract = order_shopee_extract.dropDuplicates(["customer_id","date_only"])
    order_shopee_extract = order_shopee_extract.select("*", lag("date_only",1).over(w).alias("t1_trans_date"))

    order_shopee_extract = order_shopee_extract.withColumn('t1_day', date_format(col("date_only"), "d").cast(IntegerType()))\
                                       .withColumn('t2_day', date_format(col("t1_trans_date"), "d").cast(IntegerType()))

    order_shopee_extract = order_shopee_extract.fillna(999)

    order_shopee_extract = order_shopee_extract.withColumn('last_1_week', when(col('t1_day')>21,4).when(col('t1_day')>14,3)\
                                                          .when(col('t1_day')>7,2).otherwise(1))
    order_shopee_extract = order_shopee_extract.withColumn('last_2_week', when(col('t2_day')>21,4).when(col('t2_day')>14,3)\
                                                          .when(col('t2_day')>7,2).otherwise(1))
    
    w_2 = Window().partitionBy('customer_id').orderBy(col('date_only').desc())
    order_shopee_extract = order_shopee_extract.withColumn('Rank',row_number().over(w_2)).filter("Rank == 1").drop('Rank')
    
    order_shopee_extract = order_shopee_extract.join(mean_day_diff, ['customer_id'])
    order_shopee_extract = order_shopee_extract.drop('date_only','t1_trans_date','max_extract_month','days','week')
    
    return order_shopee_extract

In [126]:
def test_bc_3():
    order_shopee_tmp = order_shopee.select('buyer_username','create_time')
    predict_month = order_shopee_tmp.agg(trunc(max(col('create_time')),'month')).collect()[0][0]
    min_month = predict_month + relativedelta(months=-5)
    order_shopee_extract = order_shopee_tmp.filter((trunc(col('create_time'),'month') >= min_month))
    order_shopee_extract = order_shopee_extract.withColumnRenamed('buyer_username','customer_id')
    order_shopee_extract = order_shopee_extract.join(user_potential_pred,['customer_id'])
    
    mean_day_diff = order_shopee_extract.withColumn('date_only', to_date(col('create_time'))).drop('create_time')
    w_mean = Window().partitionBy('customer_id').orderBy(['customer_id', 'date_only'])
    mean_day_diff = mean_day_diff.dropDuplicates(["customer_id","date_only"])
    mean_day_diff = mean_day_diff.select("*", lag("date_only",1).over(w_mean).alias("t1_trans_date"))
    mean_day_diff = mean_day_diff.withColumn('t1_day_diff', datediff(col('date_only'),col("t1_trans_date")))
    mean_day_diff = mean_day_diff.groupby('customer_id').agg(mean('t1_day_diff').alias('mean_day_diff'))
    mean_day_diff = mean_day_diff.fillna(999)
    
    order_shopee_extract = order_shopee_extract.withColumn('max_extract_month', lit(predict_month))
    order_shopee_extract = order_shopee_extract.withColumn('date_only', to_date(col('create_time'))).drop('create_time')
    
    order_shopee_extract = order_shopee_extract.withColumn('recency', datediff(last_day(col('max_extract_month')), col('date_only')))
    w = Window().partitionBy('customer_id').orderBy(['customer_id', 'date_only'])
    order_shopee_extract = order_shopee_extract.dropDuplicates(["customer_id","date_only"])
    order_shopee_extract = order_shopee_extract.select("*", lag("date_only",1).over(w).alias("t1_trans_date"))

    order_shopee_extract = order_shopee_extract.withColumn('t1_day', date_format(col("date_only"), "d").cast(IntegerType()))\
                                       .withColumn('t2_day', date_format(col("t1_trans_date"), "d").cast(IntegerType()))

    order_shopee_extract = order_shopee_extract.fillna(999)

    order_shopee_extract = order_shopee_extract.withColumn('last_1_week', when(col('t1_day')>21,4).when(col('t1_day')>14,3)\
                                                          .when(col('t1_day')>7,2).otherwise(1))
    order_shopee_extract = order_shopee_extract.withColumn('last_2_week', when(col('t2_day')>21,4).when(col('t2_day')>14,3)\
                                                          .when(col('t2_day')>7,2).otherwise(1))
    
    w_2 = Window().partitionBy('customer_id').orderBy(col('date_only').desc())
    order_shopee_extract = order_shopee_extract.withColumn('Rank',row_number().over(w_2)).filter("Rank == 1").drop('Rank')
    
    order_shopee_extract = order_shopee_extract.join(mean_day_diff, ['customer_id'])
    order_shopee_extract = order_shopee_extract.drop('date_only','t1_trans_date','max_extract_month','days','week')
    
    return end-start

In [127]:
test_bc_3

<function __main__.test_bc_3()>

In [128]:
def transform_data_bc_task_3_pred():
    order_bc_tmp = order_bc.select('date_order', 'customer_id')
    
    predict_month = order_bc_tmp.agg(trunc(max(col('date_order')),'month')).collect()[0][0]
    
    min_month = predict_month + relativedelta(months=-5)
    
    order_bc_extract = order_bc_tmp.filter((trunc(col('date_order'),'month') >= min_month))
    
    order_bc_extract = order_bc_extract.join(user_potential_pred,['customer_id'])
    
    mean_day_diff = order_bc_extract.withColumn('date_only', to_date(col('date_order'))).drop('date_order')
    w_mean = Window().partitionBy('customer_id').orderBy(['customer_id', 'date_only'])
    mean_day_diff = mean_day_diff.dropDuplicates(["customer_id","date_only"])
    mean_day_diff = mean_day_diff.select("*", lag("date_only",1).over(w_mean).alias("t1_trans_date"))
    mean_day_diff = mean_day_diff.withColumn('t1_day_diff', datediff(col('date_only'),col("t1_trans_date")))
    mean_day_diff = mean_day_diff.groupby('customer_id').agg(mean('t1_day_diff').alias('mean_day_diff'))
    mean_day_diff = mean_day_diff.fillna(999)
                                                      
    order_bc_extract = order_bc_extract.withColumn('max_extract_month', lit(predict_month))
    order_bc_extract = order_bc_extract.withColumn('date_only', to_date(col('date_order'))).drop('date_order')
                                                      
    order_bc_extract = order_bc_extract.withColumn('recency', datediff(last_day(col('max_extract_month')), col('date_only')))
    w = Window().partitionBy('customer_id').orderBy(['customer_id', 'date_only'])
    order_bc_extract = order_bc_extract.dropDuplicates(["customer_id","date_only"])
    order_bc_extract = order_bc_extract.select("*", lag("date_only",1).over(w).alias("t1_trans_date"))

    order_bc_extract = order_bc_extract.withColumn('t1_day', date_format(col("date_only"), "d").cast(IntegerType()))\
                                       .withColumn('t2_day', date_format(col("t1_trans_date"), "d").cast(IntegerType()))

    order_bc_extract = order_bc_extract.fillna(999)
    
    order_bc_extract = order_bc_extract.withColumn('last_1_week', when(col('t1_day')>21,4).when(col('t1_day')>14,3)\
                                                          .when(col('t1_day')>7,2).otherwise(1))
    order_bc_extract = order_bc_extract.withColumn('last_2_week', when(col('t2_day')>21,4).when(col('t2_day')>14,3)\
                                                          .when(col('t2_day')>7,2).otherwise(1))
    
    w_2 = Window().partitionBy('customer_id').orderBy(col('date_only').desc())
    order_bc_extract = order_bc_extract.withColumn('Rank',row_number().over(w_2)).filter("Rank == 1").drop('Rank')
    
    order_bc_extract = order_bc_extract.join(mean_day_diff, ['customer_id'])
    order_bc_extract = order_bc_extract.drop('date_only','t1_trans_date','max_extract_month','days','week')
    
    return order_bc_extract

In [129]:
data_shopee_task_3_pred = transform_data_shopee_task_3_pred()
data_bc_task_3_pred = transform_data_bc_task_3_pred()
data_final_task_3_pred = data_shopee_task_3_pred.union(data_bc_task_3_pred)

In [130]:
va = VectorAssembler(outputCol='features')
va.setInputCols(['recency','t1_day','t2_day','mean_day_diff','last_1_week','last_2_week'])
data_final_task_3_pred = va.transform(data_final_task_3_pred).select(['customer_id','features'])

In [131]:
if normalize_best_3 == 'original':
    predict_total_data_3 = data_final_task_3_pred
elif normalize_best_3 == 'norm_1':
    predict_total_data_3 = transform_predict_data_norm_1(data_final_task_3_pred)
elif normalize_best_3 == 'norm_inf':
    predict_total_data_3 = transform_predict_data_norm_inf(data_final_task_3_pred)
elif normalize_best_3 == 'standard_scaler':
    predict_total_data_3 = transform_predict_data_standard_scaler(data_final_task_3_pred)
elif normalize_best_3 == 'minmax_scaler':
    predict_total_data_3 = transform_predict_data_minmax_scaler(data_final_task_3_pred)
elif normalize_best_3 == 'maxabs_scaler':
    predict_total_data_3 = transform_predict_data_maxabs_scaler(data_final_task_3_pred)

In [132]:
prediction_3 = best_model_3.transform(predict_total_data_3)

In [133]:
hasil_pred_3 = prediction_3.select('customer_id','prediction').withColumnRenamed('prediction','next_time_buy_code')
hasil_pred_3 = hasil_pred_3.withColumn('next_time_buy', when(col('next_time_buy_code') == 1, 'Minggu pertama (tanggal 1-17)')\
                                      .when(col('next_time_buy_code') == 2, 'Minggu kedua (tanggal 8-14)')\
                                      .when(col('next_time_buy_code') == 3, 'Minggu ketiga (tanggal 15-21)')\
                                      .when(col('next_time_buy_code') == 4, 'Minggu keempat (tanggal 22 keatas)'))

# Adding all result to database

## Extract Province

In [134]:
data_province_shopee = order_shopee_2.select('buyer_username','recipient_province').groupby('buyer_username')\
.agg(max('recipient_province').alias('provinsi')).withColumnRenamed('buyer_username','customer_id')

data_province_bc = order_bc.join(order_shipping_bc_2,['order_id']).select('customer_id','shipping_province')\
.groupby('customer_id').agg(max('shipping_province').alias('provinsi'))

data_province = data_province_shopee.union(data_province_bc)

## Extract Name

In [135]:
data_username_shopee = order_shopee_2.select('buyer_username','recipient_name')\
.withColumnRenamed('buyer_username','customer_id').withColumnRenamed('recipient_name','name')

data_username_bc = user_bc.select('user_id','nickname').withColumnRenamed('user_id','customer_id')\
.withColumnRenamed('nickname','name')

data_username = data_username_shopee.union(data_username_bc)
data_username = data_username.groupby('customer_id').agg(max('name').alias('nama'))

## Extract revenue

In [136]:
data_revenue = final_data_pred.select('customer_id','revenue')

## Combine and insert task 1 to db

In [137]:
combined_task_1 = hasil_pred_1.join(data_revenue,['customer_id']).join(data_province,['customer_id'],'left')\
.join(data_username,['customer_id'],'left').select('customer_id','nama','revenue','provinsi','potensial_status')

combined_task_1 = combined_task_1.na.drop(subset=['nama'])

In [139]:
combined_task_1.write.format("jdbc")\
.option("url",'jdbc:postgresql://postgres-db-lb.naufalhilmi.svc.cluster.local:5432/postgres')\
.option("dbtable",'public.hasil_task_1')\
.option("user",'postgres')\
.option("password",'postgres')\
.option("driver","org.postgresql.Driver")\
.option("truncate","true")\
.mode("overwrite").save()

## Combine and insert task 2 to db

In [140]:
combined_task_2 = hasil_pred_2.join(data_username,['customer_id'],'left').select('customer_id','nama','item_prediksi')
combined_task_2 = combined_task_2.na.drop(subset=['nama'])

In [141]:
combined_task_2.write.format("jdbc")\
.option("url",'jdbc:postgresql://postgres-db-lb.naufalhilmi.svc.cluster.local:5432/postgres')\
.option("dbtable",'public.hasil_task_2')\
.option("user",'postgres')\
.option("password",'postgres')\
.option("driver","org.postgresql.Driver")\
.option("truncate","true")\
.mode("overwrite").save()

## Combine and insert task 3 to db

In [142]:
combined_task_3 = hasil_pred_3.join(data_username,['customer_id'],'left').select('customer_id','nama','next_time_buy_code','next_time_buy')
combined_task_2 = combined_task_2.na.drop(subset=['nama'])

In [143]:
combined_task_3.write.format("jdbc")\
.option("url",'jdbc:postgresql://postgres-db-lb.naufalhilmi.svc.cluster.local:5432/postgres')\
.option("dbtable",'public.hasil_task_3')\
.option("user",'postgres')\
.option("password",'postgres')\
.option("driver","org.postgresql.Driver")\
.option("truncate","true")\
.mode("overwrite").save()

# Spark Stop

In [46]:
spark.stop()