## PySpark Setup

In [None]:
import os
import sys
from pyspark.sql import SparkSession

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
spark = SparkSession.builder.appName("YourAppName")\
                .config("spark.driver.memory", "4g")\
                .config("spark.executor.memory", "4g")\
                .config("spark.driver.maxResultSize", "2g")\
                .getOrCreate()

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('pySparkSetup').getOrCreate()

In [None]:
# Read the CSV files
train_id = spark.read.csv('datasets/ps-2/train_identity.csv', header=True, inferSchema=True)
train_tr = spark.read.csv('datasets/ps-2/train_transaction.csv', header=True, inferSchema=True)
test_id = spark.read.csv('datasets/ps-2/test_identity.csv', header=True, inferSchema=True)
test_tr = spark.read.csv('datasets/ps-2/test_transaction.csv', header=True, inferSchema=True)

# Merge the DataFrames
train = train_tr.join(train_id, on='TransactionID', how='left')
test = test_tr.join(test_id, on='TransactionID', how='left')

In [None]:
from functools import reduce
from pyspark.sql import functions as F

# Drop columns from train and test DataFrames
train_X = train.drop('isFraud', 'TransactionDT', 'TransactionID')
train_y = train.select('isFraud')

test_X = test.drop('TransactionDT', 'TransactionID')

## Feature Engineering

In [None]:
# Find the difference between train_X and test_X column sets
set1 = set(train_X.columns)
set2 = set(test_X.columns)

diff1 = set1 - set2
diff2 = set2 - set1

print("Difference between train_X and test_X columns:", diff1)
print("Difference between test_X and train_X columns:", diff2)

# Rename columns in the test_X DataFrame
column_mapping = {
    'id-38': 'id_38', 'id-36': 'id_36', 'id-29': 'id_29', 'id-05': 'id_05', 'id-15': 'id_15', 'id-22': 'id_22',
    'id-01': 'id_01', 'id-31': 'id_31', 'id-33': 'id_33', 'id-20': 'id_20', 'id-28': 'id_28', 'id-37': 'id_37',
    'id-30': 'id_30', 'id-27': 'id_27', 'id-35': 'id_35', 'id-32': 'id_32', 'id-08': 'id_08', 'id-17': 'id_17',
    'id-19': 'id_19', 'id-21': 'id_21', 'id-13': 'id_13', 'id-04': 'id_04', 'id-06': 'id_06', 'id-09': 'id_09',
    'id-11': 'id_11', 'id-02': 'id_02', 'id-34': 'id_34', 'id-10': 'id_10', 'id-26': 'id_26', 'id-24': 'id_24',
    'id-25': 'id_25', 'id-07': 'id_07', 'id-14': 'id_14', 'id-18': 'id_18', 'id-03': 'id_03', 'id-12': 'id_12',
    'id-16': 'id_16', 'id-23': 'id_23'
}

for old_col, new_col in column_mapping.items():
    test_X = test_X.withColumnRenamed(old_col, new_col)

In [None]:
from pyspark.sql.functions import col
from pyspark.sql import functions as F

def impute_NaN_with_mode(df):
    mode_dict = {}
    for column in df.columns:
        # Get a mode value for that dataframe
        mode_value = df.groupBy(column).agg(F.count(column).alias('count')).orderBy(F.desc('count')).collect()[0][column]
        mode_dict[column] = mode_value
    return df.na.fill(mode_dict)

train_X = impute_NaN_with_mode(train_X)
test_X = impute_NaN_with_mode(test_X)

In [None]:
from pyspark.ml.feature import StringIndexer

cat_cols = [
    "ProductCD", "card1", "card2", "card3", "card4", "card5", "card6", "addr1", "addr2",
    "P_emaildomain", "R_emaildomain", "M1", "M2", "M3", "M4", "M5", "M6", "M7", "M8", "M9",
    "DeviceType", "DeviceInfo", "id_12", "id_13", "id_14", "id_15", "id_16", "id_17", "id_18", "id_19",
    "id_20", "id_21", "id_22", "id_23", "id_24", "id_25", "id_26", "id_27",
    "id_28", "id_29", "id_30", "id_31", "id_32", "id_33", "id_34", "id_35", "id_36", "id_37", "id_38"
]

for col in cat_cols:
    indexer = StringIndexer(inputCol=col, outputCol=f"{col}_indexed", stringOrderType="alphabetAsc")
    
    # Fit the StringIndexer on the combined train and test DataFrames
    combined_df = train_X.union(test_X)
    indexer_model = indexer.fit(combined_df)
    
    # Transform the train_X and test_X DataFrames
    train_X = indexer_model.transform(train_X).drop(col).withColumnRenamed(f"{col}_indexed", col)
    test_X = indexer_model.transform(test_X).drop(col).withColumnRenamed(f"{col}_indexed", col)

## Model Training and Evaluation

In [None]:
# Parameters for lgb, explanation in each line
lgb_params = {
                'objective': 'binary', # Fraud or not fraud, binary classification problem
                'max_depth': -1, # Maximum depth is set to infinite
                "boosting_type": "gbdt", # Gradient boosted decision tree is the one im most familiar with 
                "metric": 'auc', # Using area under curve for metric, optimal for binary classification, can also use binary_logloss
                "verbosity": -1, # Logging all activities
            }

In [None]:
import lightgbm as lgb
import numpy as np
from sklearn.model_selection import train_test_split

# Convert train_X and train_y Spark DataFrames to pandas DataFrames
train_X_pd = train_X.toPandas()
train_y_pd = train_y.toPandas()

# Split the data into train and validation sets
# 70/30 standard train test split
train_X, val_X, train_y, val_y = train_test_split(train_X_pd, train_y_pd, test_size=0., random_state=42)


In [None]:

from lightgbm.plotting import *
from matplotlib import pyplot
import lightgbm as lgb

train_dataset = lgb.Dataset(train_X, train_y, categorical_feature = cat_cols)
eval_dataset = lgb.Dataset(val_X, val_y, categorical_feature = cat_cols, reference = train_dataset)



lgb_model = lgb.train(lgb_params,
                train_dataset, 
                valid_sets=[train_dataset, eval_dataset]
              )

ax = plot_importance(lgb_model, max_num_features=20)
pyplot.show()

In [None]:
import pandas as pd

# Assuming test_data is your test DataFrame without target labels
predictions = lgb_model.predict(test_X)

# Convert the predictions to a DataFrame
predictions_df = pd.DataFrame(predictions, columns=["prediction"])

# Save the predictions DataFrame as a CSV file
predictions_df.to_csv("submission_telkomsel_ps2.csv", index=False)