# Applied Project in Big Data on Industrial Dataset

## MODELING
## Part I. Spark modeling

### 1. Libraries and Spark setup

In [None]:
import os
import sys
import json
import datetime
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.ml.feature import Word2Vec
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array
from pyspark.ml.feature import Imputer
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import HashingTF, IDF
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

In [None]:
PRJ_PATH = '/home/jovyan/__RAYPFP24'


def access_data(file_path):
    with open(file_path) as file:
        access_data = json.load(file)
    return access_data


access_s3_data = access_data(f'{PRJ_PATH}/.access_jhub_data')

In [None]:
def uiWebUrl(self):
    from urllib.parse import urlparse
    web_url = self._jsc.sc().uiWebUrl().get()
    port = urlparse(web_url).port
    return '{}proxy/{}/jobs/'.format(os.environ['JUPYTERHUB_SERVICE_PREFIX'], port)


SparkContext.uiWebUrl = property(uiWebUrl)
conf = SparkConf()
conf.set('spark.master', 'local[*]')
conf.set('spark.driver.memory', '40G')
conf.set('spark.driver.maxResultSize', '8G')
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
spark._jsc.hadoopConfiguration().set('fs.s3a.access.key', access_s3_data['aws_access_key_id'])
spark._jsc.hadoopConfiguration().set('fs.s3a.secret.key', access_s3_data['aws_secret_access_key'])
spark._jsc.hadoopConfiguration().set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
spark._jsc.hadoopConfiguration().set('fs.s3a.multipart.size', '104857600')
spark._jsc.hadoopConfiguration().set('fs.s3a.block.size', '33554432')
spark._jsc.hadoopConfiguration().set('fs.s3a.threads.max', '256')
spark._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 'http://storage.yandexcloud.net')
spark._jsc.hadoopConfiguration().set('fs.s3a.aws.credentials.provider', 
                                     'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')
spark

### 2. Dataset

In [None]:
VER = 'v0'
PROC_DS = False
PROC_LAGS = True
FRAC_0 = .002  # used only if `PROC_LAGS = True`
PROC_VECS = True
BUCKET = access_s3_data['bucket_name']
REPART = 1
W2V = True

files_path = 'data/events'
files_mask = f'{files_path}/data_2023-*-*.csv'

file_path_ds = f's3a://{BUCKET}/work/{VER}/data_raw.parquet'
file_path_lags = f's3a://{BUCKET}/work/{VER}/data_lags.parquet'
file_path_trn = f's3a://{BUCKET}/work/{VER}/data_vec_train.parquet'
file_path_tst = f's3a://{BUCKET}/work/{VER}/data_vec_test.parquet'

In [None]:
def clean_parquet(path):
    cmd = path.replace(
        f's3a://{BUCKET}',
        f'rm -rf {PRJ_PATH}'
    )
    !{cmd}
    return f'command to run: {cmd}'

#### 2.1. Load or preprocess data - `raw` stage

In [None]:
%%time

flag_min_datetime = datetime.datetime(2023, 8, 1, 0, 0, 0)
flag_max_datetime = datetime.datetime(2023, 8, 7, 23, 59, 59)
print(
    'from', flag_min_datetime, 
    'to', flag_max_datetime
)

if PROC_DS:
    sdf = spark.read.option('escape','"').csv(f's3a://{BUCKET}/{files_mask}', header=True)
    sdf = sdf.withColumn('event_datetime', F.to_timestamp("event_datetime"))
    sdf = sdf.withColumn(
        'payment_event_flag', 
        (
            (F.col('event_name').like('%Мои штрафы/Оплата/Завершили оплату%') | 
            F.col('event_name').like('%Мои штрафы/Оплата/Платёж принят%')) &
            F.col('event_datetime').between(flag_min_datetime, flag_max_datetime)
        ).cast("int")
    )
    sdf = sdf.select(
        'profile_id',
        'event_datetime',
        'payment_event_flag',
        'event_name'
    )
    cmd = file_path_ds.replace(
        f's3a://{BUCKET}',
        f'ls -la {PRJ_PATH}'
    )
    clean_parquet(file_path_ds)
    sdf.repartition(REPART).write.parquet(file_path_ds)
    sdf.unpersist()

sdf = spark.read.parquet(file_path_ds)
sdf.limit(5).toPandas()

### 2.2. Load or preprocess data - `lags` stage

In [None]:
def dataset_lags(sdf, shift=0):
    hour = 60 * 60
    day = 24 * 60 * 60
    w_10min_to_1hour = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-1 * hour + shift, -10 * 60 + shift))
    w_1_to_24hours = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-24 * hour + shift, -hour + shift))
    w_1day_to_3days = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-3 * day + shift, -day + shift))
    w_3days_to_7days = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-7 * day + shift, -3 * day + shift))
    w_7days_to_15days = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-15 * day + shift, -7 * day + shift))
    w_15days_to_30days = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-30 * day + shift, -15 * day + shift))
    w_1mth_to_2mth = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-2 * 30 * day + shift, -1 * 30 * day + shift))
    w_2mth_to_3mth = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-3 * 30 * day + shift, -2 * 30 * day + shift))
    w_3mth_to_4mth = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-4 * 30 * day + shift, -3 * 30 * day + shift))
    w_4mth_to_5mth = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-5 * 30 * day + shift, -4 * 30 * day + shift))
    w_5mth_to_6mth = (Window()
          .partitionBy(F.col('profile_id'))
          .orderBy(F.col('event_datetime').cast('timestamp').cast('long'))
          .rangeBetween(-6 * 30 * day + shift, -5 * 30 * day + shift))
    return (
        sdf
            .withColumn('lag_10min_to_1hour', F.collect_list('event_name').over(w_10min_to_1hour))
            .withColumn('lag_1_to_24hours', F.collect_list('event_name').over(w_1_to_24hours))
            .withColumn('lag_1day_to_3days', F.collect_list('event_name').over(w_1day_to_3days))
            .withColumn('lag_3days_to_7days', F.collect_list('event_name').over(w_3days_to_7days))
            #.withColumn('lag_7days_to_15days', F.collect_list('event_name').over(w_7days_to_15days))
            #.withColumn('lag_15days_to_30days', F.collect_list('event_name').over(w_15days_to_30days))
            #.withColumn('lag_1mth_to_2mth', F.collect_list('event_name').over(w_1mth_to_2mth))
            #.withColumn('lag_2mth_to_3mth', F.collect_list('event_name').over(w_2mth_to_3mth))
            #.withColumn('lag_3mth_to_4mth', F.collect_list('event_name').over(w_3mth_to_4mth))
            #.withColumn('lag_4mth_to_5mth', F.collect_list('event_name').over(w_4mth_to_5mth))
            #.withColumn('lag_5mth_to_6mth', F.collect_list('event_name').over(w_5mth_to_6mth))
            .select(
                'profile_id',
                'event_datetime',
                'payment_event_flag',
                'event_name',
                'lag_10min_to_1hour',
                'lag_1_to_24hours',
                'lag_1day_to_3days',
                'lag_3days_to_7days',
                #'lag_7days_to_15days',
                #'lag_15days_to_30days',
                #'lag_1mth_to_2mth',
                #'lag_2mth_to_3mth',
                #'lag_3mth_to_4mth',
                #'lag_4mth_to_5mth',
                #'lag_5mth_to_6mth'
            )
        .orderBy(F.col('event_datetime'), ascending=False)
    )

In [None]:
%%time

if PROC_LAGS:
    sdf = sdf.sampleBy(
        'payment_event_flag', 
        fractions={0: FRAC_0, 1: 1}, 
        seed=2023
    )
    sdf = dataset_lags(sdf)
    dates  = (flag_min_datetime, flag_max_datetime)
    sdf = sdf.filter(sdf.event_datetime.between(*dates))
    sdf = sdf.filter(
        (F.size('lag_10min_to_1hour')   > 0) |
        (F.size('lag_1_to_24hours')     > 0) |
        (F.size('lag_1day_to_3days')    > 0) |
        (F.size('lag_3days_to_7days')   > 0)
        #(F.size('lag_7days_to_15days')  > 0) |
        #(F.size('lag_15days_to_30days') > 0) |
        #(F.size('lag_1mth_to_2mth')     > 0) |
        #(F.size('lag_2mth_to_3mth')     > 0) |
        #(F.size('lag_3mth_to_4mth')     > 0) |
        #(F.size('lag_4mth_to_5mth')     > 0) |
        #(F.size('lag_5mth_to_6mth')     > 0)
    )
    clean_parquet(file_path_lags)
    sdf.repartition(REPART).write.parquet(file_path_lags)
    sdf.unpersist()

sdf = spark.read.parquet(file_path_lags)
sdf.groupBy('payment_event_flag').count().toPandas()

In [None]:
def stratified_split(sdf, frac, label, seed=2023):
    zeros = sdf.filter(sdf[label] == 0)
    ones = sdf.filter(sdf[label] == 1)
    train_, test_ = zeros.randomSplit([1 - frac, frac], seed=seed)
    train, test = ones.randomSplit([1 - frac, frac], seed=seed)
    train = train.union(train_)
    test = test.union(test_)
    return train, test

In [None]:
sdf_train, sdf_test = stratified_split(
    sdf,
    frac=.2,
    label='payment_event_flag',
    seed=2023
)

In [None]:
sdf_train.groupBy('payment_event_flag').count().toPandas()

In [None]:
sdf_test.groupBy('payment_event_flag').count().toPandas()

### 2.3. Load or preprocess data - `vectorize` stage

In [None]:
lags = [
    'lag_10min_to_1hour',
    'lag_1_to_24hours',
    'lag_1day_to_3days',
    'lag_3days_to_7days',
    #'lag_7days_to_15days',
    #'lag_15days_to_30days',
    #'lag_1mth_to_2mth',
    #'lag_2mth_to_3mth',
    #'lag_3mth_to_4mth',
    #'lag_4mth_to_5mth',
    #'lag_5mth_to_6mth'
]

In [None]:
def datasets_vecorized(sdf_train, sdf_test, lags, vec_size=10):
    vectorizers = []
    for lag in tqdm(lags):
        word2Vec = Word2Vec(
            vectorSize=vec_size,
            minCount=0,
            inputCol=lag,
            outputCol=lag + '_vec'
        )
        vectorizer = word2Vec.fit(sdf_train)
        sdf_train = vectorizer.transform(sdf_train)
        sdf_test = vectorizer.transform(sdf_test)
        vectorizers.append(vectorizer)
    return sdf_train, sdf_test, vectorizers


def datasets_tfidf(sdf_train, sdf_test, lags, min_freq=3, num_features=10):
    """
    Good explanation is here:
    https://www.analyticsvidhya.com/blog/2022/09/implementing-count-vectorizer-and-tf-idf-in-nlp-using-pyspark/

    """
    idfmodels = {}
    features_dict = {}
    count = 0
    for lag in tqdm(lags):
        hashingTF = HashingTF(
            inputCol=lag,
            outputCol=lag + '_tf',
            numFeatures=num_features
        )
        featurizedData = hashingTF.transform(sdf_train)
        idf = IDF(
            inputCol=lag + '_tf',
            outputCol=lag + '_tfidf',
            minDocFreq=min_freq
        )
        idfModel = idf.fit(featurizedData)
        sdf_train = idfModel.transform(featurizedData)
        sdf_test = idfModel.transform(
            hashingTF.transform(sdf_test)
        )
        idfmodels[lag] = idfModel
        events = [
            x
            for xs in sdf_train.select(lag).distinct().rdd.flatMap(lambda x: x).collect()
            for x in xs
        ]
        hash_dict = {}
        for e in events:
            hash_dict[lag + '_' + e] = hashingTF.indexOf(e)
        for feat_num in range(num_features):
            tmp_list = []
            for k, v in hash_dict.items():
                if v == feat_num: tmp_list.append(k)
            features_dict[count * num_features + feat_num] = tmp_list
        count += 1
    return sdf_train, sdf_test, features_dict, idfmodels

In [None]:
if PROC_VECS:
    if W2V:
        sdf_train, sdf_test, vectorizers = datasets_vecorized(
            sdf_train,
            sdf_test,
            lags,
            vec_size=10
        )
    else:
        sdf_train, sdf_test, features_dict, idfmodels = datasets_tfidf(
            sdf_train,
            sdf_test,
            lags,
            min_freq=3,
            num_features=100
        )
        print('tfidf len features:', len(features_dict.items()))
    clean_parquet(file_path_trn)
    sdf_train.repartition(REPART).write.parquet(file_path_trn)
    clean_parquet(file_path_tst)
    sdf_test.repartition(REPART).write.parquet(file_path_tst)
    sdf_train.unpersist()
    sdf_test.unpersist()

sdf_train = spark.read.parquet(file_path_trn)
sdf_test = spark.read.parquet(file_path_tst)

In [None]:
sdf_train.printSchema()

## 3. Model

### 3.1. Features assembling

In [None]:
def features_assembled(sdf, feats, postfix='_vec'):
    cols_to_model = [x + postfix for x in feats]
    cols_to_model.extend(['payment_event_flag'])
    print('columns to model:', cols_to_model)
    vecAssembler = VectorAssembler(
        inputCols=[c for c in cols_to_model if c != 'payment_event_flag'], 
        outputCol='features'
    )
    features = sdf.select(cols_to_model)
    features_vec = vecAssembler.transform(features)
    features_data = features_vec.select('payment_event_flag', 'features')
    return features_data


def upsampled(sdf, label, upsample='max'):
    zeros = sdf.filter(sdf[label] == 0)
    ones = sdf.filter(sdf[label] == 1)
    res = zeros.union(ones)
    if upsample == 'max':
        up_count = int(zeros.count() / ones.count())
        for _ in range(up_count - 1):
            res = res.union(ones)
    else:
        for _ in range(upsample - 1):
            res = res.union(ones)
    return res

In [None]:
UPSAMPLE = None  # can be None or 'max'

In [None]:
feats = [
    'lag_10min_to_1hour',
    'lag_1_to_24hours',
    'lag_1day_to_3days',
    'lag_3days_to_7days',
    #'lag_7days_to_15days',
    #'lag_15days_to_30days',
    #'lag_1mth_to_2mth',
    #'lag_2mth_to_3mth',
    #'lag_3mth_to_4mth',
    #'lag_4mth_to_5mth',
    #'lag_5mth_to_6mth'
]
if W2V:
    postfix = '_vec'
else:
    postfix = '_tfidf'
features_train = features_assembled(sdf_train, feats=feats, postfix=postfix)
features_test = features_assembled(sdf_test, feats=feats, postfix=postfix)
if UPSAMPLE:
    features_train = upsampled(
        features_train,
        label='payment_event_flag',
        upsample=UPSAMPLE
    )
    # Use to upsample test set
    features_test = upsampled(
        features_test,
        label='payment_event_flag',
        upsample=UPSAMPLE
    )

In [None]:
features_train.groupBy('payment_event_flag').count().toPandas()

In [None]:
features_test.groupBy('payment_event_flag').count().toPandas()

### 3.2. Training and evaluating

In [None]:
rf = RandomForestClassifier(
    labelCol='payment_event_flag',
    featuresCol='features',
    numTrees=100,
    maxDepth=8
)

In [None]:
%%time
model = rf.fit(features_train)

In [None]:
predictions = model.transform(features_test)
payment_event_flag_preds = predictions.select('prediction', 'payment_event_flag')
metrics = BinaryClassificationMetrics(
    payment_event_flag_preds.rdd.map(
        lambda lines: [float(x) for x in lines]
    )
)
print('ROC AUC:', metrics.areaUnderROC)
print('Area under PR-curve:', metrics.areaUnderPR)

### 3.3. Interpretation

In [None]:
TH = .01

features_imps = {}
for i, v in enumerate(model.featureImportances.toArray()):
    if v >= TH: features_imps[i] = v
features_imps = dict(sorted(features_imps.items(), key=lambda x: x[1], reverse=True))
features_imps

In [None]:
if W2V:
    pass
else:
    for k, v in features_imps.items():
        print('-' * 100)
        print('feature number:', k, '| feature importance:', v)
        print('features:', features_dict[k])

### 3.3. Future look

In [None]:
sdf_pred = spark.read.parquet(file_path_ds)
sdf_pred.limit(5).toPandas()

In [None]:
%%time

SHIFT = 2 * 24 * 60 * 60  # 2 days ahead

sdf_pred = sdf_pred.sample(fraction=.0001)
sdf_pred = dataset_lags(sdf_pred, shift=SHIFT)
sdf = sdf.filter(
    (F.size('lag_10min_to_1hour')   > 0) |
    (F.size('lag_1_to_24hours')     > 0) |
    (F.size('lag_1day_to_3days')    > 0) |
    (F.size('lag_3days_to_7days')   > 0)
    #(F.size('lag_7days_to_15days')  > 0) |
    #(F.size('lag_15days_to_30days') > 0) |
    #(F.size('lag_1mth_to_2mth')     > 0) |
    #(F.size('lag_2mth_to_3mth')     > 0) |
    #(F.size('lag_3mth_to_4mth')     > 0) |
    #(F.size('lag_4mth_to_5mth')     > 0) |
    #(F.size('lag_5mth_to_6mth')     > 0)
)
sdf_pred.count()

In [None]:
print(lags)

In [None]:
if W2V:
    for i, lag in enumerate(lags):
        sdf_pred = vectorizers[i].transform(sdf_pred)
        print(lag, '-> vec done')
else:
    for i, lag in enumerate(lags):
        hashingTF = HashingTF(
            inputCol=lag,
            outputCol=lag + '_tf',
            numFeatures=100
        )
        sdf_pred = idfmodels[lag].transform(
            hashingTF.transform(sdf_pred)
        )
        print(lag, '-> idf done')

In [None]:
features_pred = features_assembled(sdf_pred, feats=feats, postfix=postfix)

In [None]:
predictions_future = model.transform(features_pred)

In [None]:
%%time

df_pred = sdf_pred.select(sdf_pred.profile_id).toPandas()
df_pred.head()

In [None]:
%%time

df_predictions_future = predictions_future.withColumn(
    'tmp',
    vector_to_array('probability')
).select(
    F.col('tmp')[1].alias('prob_next7days')
).toPandas()
df_predictions_future.head()

In [None]:
df_predictions_future.filter(
    df_predictions_future.prob_next7days > .5
).count()

In [None]:
file_path_preds = f'{PRJ_PATH}/work/{VER}/preds.csv'
df_pred.join(df_predictions_future).to_csv(file_path_preds, header=True)