In [1]:
import findspark
findspark.init() 

In [2]:
from datetime import datetime
import pyspark
import mlflow
from mlflow.tracking import MlflowClient
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import hour, minute, second, year, month, dayofmonth, dayofweek, count, to_timestamp, when, isnan
from pyspark.sql.functions import col, isnan, when, count
from pyspark.sql.functions import countDistinct, udf
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

# Check unprocessed data

In [3]:
import os
import boto3
import hdfs
import subprocess

def run_cmd(args_list):
    """
    run linux commands
    """
    # import subprocess
    print('Running system command: {0}'.format(' '.join(args_list)))
    proc = subprocess.Popen(args_list, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    s_output, s_err = proc.communicate()
    s_return =  proc.returncode
    return s_return, s_output, s_err 

def get_s3_hdfs_idff():
    # Get file list from s3
    session = boto3.session.Session()

    s3 = session.client(
        service_name='s3',
        endpoint_url='https://storage.yandexcloud.net'
    )

    response = s3.list_objects_v2(Bucket='fraud-data')
    s3_files = list(map(lambda item: item.get('Key'), response.get('Contents', [])))
    
    # Get file list from hdfs
    (ret, out, err)= run_cmd(['hdfs', 'dfs', '-ls', '/user/fraud-data'])
    hdfs_files = [os.path.basename(line.rsplit(None,1)[-1]) for line in out.decode().split('\n') if len(line.rsplit(None,1))]
    
    new_files = set(s3_files) - set(hdfs_files)
    return new_files

def get_new_files_list():
    processed_files = []
    processed_store_file = 'processed.txt'
    if os.path.exists(processed_store_file):
        with open('processed.txt', 'r') as pr_file:
            processed_files = pr_file.readlines()
        
    # Get file list from hdfs
    (ret, out, err)= run_cmd(['hdfs', 'dfs', '-ls', '-C', '/user/fraud-data'])
    hdfs_files = [os.path.basename(line) for line in out.decode().split('\n') if len(os.path.basename(line))]
    
    new_files = set(hdfs_files).difference(set(processed_files))
    new_files = list(map(lambda f: "/user/fraud-data/" + f, new_files))
    new_files.sort()
    return new_files

def get_files_list():        
    # Get file list from hdfs
    (ret, out, err)= run_cmd(['hdfs', 'dfs', '-ls', '-C', '/user/fraud-data'])
    hdfs_files = [line for line in out.decode().split('\n') if len(line)]
    hdfs_files.sort()
    return hdfs_files

# Read and preprocess new files

In [4]:
# Define the schema for the DataFrame

def read_files(file_list):
    schema = StructType([
        StructField("transaction_id", IntegerType(), True),
        StructField("tx_datetime", StringType(), True),
        StructField("customer_id", IntegerType(), True),
        StructField("terminal_id", IntegerType(), True),
        StructField("tx_amount", DoubleType(), True),
        StructField("tx_time_seconds", IntegerType(), True),
        StructField("tx_time_days", IntegerType(), True),
        StructField("tx_fraud", IntegerType(), True),
        StructField("tx_fraud_scenario", IntegerType(), True)
    ])

    # Load the CSV file into a DataFrame
    df = (spark.read
        .format("csv")
        .schema(schema)
        .option("header", False)
        .option("sep", ',')
        .option("comment", '#')
        .load(file_list)
    )
    return df


def explore(df):
    # Show the DataFrame
    df.show()
    
    row_count = df.count()
    print(f"Row count: {row_count}")
    
    # Count the number of NaN values in each column
    #print("Count of NaN values in each column:")
    #nan_count = df.select([count(when(isnan(c), c)).alias(c) for c in df.columns])
    #nan_count.show()
    
    #df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()
    
    #for column in ["customer_id", "terminal_id", "tx_fraud_scenario"]:
    #    distinct_count = df.agg(countDistinct(column)).collect()[0][0]
    #    print(f"{column}: {distinct_count} unique values")
    
    df.groupBy('tx_fraud').count().show()


# Define a UDF to handle the special case
def convert_timestamp(s):
    if s[11:13] == '24':
        return s[:11] + '00' + s[13:]
    return s


def preprocess(df):
    # Convert the tx_datetime column to a timestamp type
    df = df.limit(df.count() // 10)
    convert_timestamp_udf = udf(convert_timestamp)
    df = df.withColumn("tx_datetime", convert_timestamp_udf(df["tx_datetime"]))
    df = df.withColumn("ts", to_timestamp(df["tx_datetime"], "yyyy-MM-dd HH:mm:ss"))
    
    df = df.fillna({'terminal_id': 0})
    
    # Extract new features from the tx_datetime column
    df = df.withColumn("is_weekend", dayofweek("ts").isin([1,7]).cast("int"))
    df = df.withColumn("year", year(df["ts"]))
    df = df.withColumn("month", month(df["ts"]))
    df = df.withColumn("day_of_month", dayofmonth(df["ts"]))
    df = df.withColumn("day_of_week", dayofweek(df["ts"]))
    df = df.withColumn("hour", hour(df["ts"]))
    df = df.withColumn("minute", minute(df["ts"]))
    df = df.withColumn("second", second(df["ts"]))
        
    return df


def build_train_pipeline():
    # Define the pipeline for feature extraction and transformation
    # Here, we'll use StringIndexer and OneHotEncoder for categorical features.
    # For numerical features, we'll use StandardScaler to scale them.
    # Define the pipeline stages
    stages = []

    # Terminal_id, hour_of_day, and day_of_week should be treated as categorical features
    #categorical_columns = ["hour", "day_of_week", "month"]
    #for column in categorical_columns:
    #    string_indexer = StringIndexer(inputCol=column, outputCol=f"{column}_index", handleInvalid="keep")
    #    encoder = OneHotEncoder(inputCols=[string_indexer.getOutputCol()], outputCols=[f"{column}_ohe"])
    #    stages += [string_indexer, encoder]

    # Define numerical columns and apply StandardScaler
    numerical_columns = [
         "tx_amount",
         "is_weekend",
         "is_night",
         "customer_id_nb_tx_1day_window",
         "customer_id_avg_amount_1day_window",
         "customer_id_nb_tx_7day_window",
         "customer_id_avg_amount_7day_window",
         "customer_id_nb_tx_30day_window",
         "customer_id_avg_amount_30day_window",
         "terminal_id_nb_tx_1day_window",
         "terminal_id_risk_1day_window",
         "terminal_id_nb_tx_7day_window",
         "terminal_id_risk_7day_window",
         "terminal_id_nb_tx_30day_window",
         "terminal_id_risk_30day_window"
    ]
    
    #for column in numerical_columns:
        #vector_assembler = VectorAssembler(inputCols=[column], outputCol=f"{column}_vec")
        #scaler = StandardScaler(inputCol=vector_assembler.getOutputCol(), outputCol=f"{column}_scaled", withStd=True, withMean=True)
        #stages += [vector_assembler] #, scaler]

    # Combine all the transformed features into a single "features" column
    #assembler_input = [f"{column}_ohe" for column in categorical_columns] + [f"{column}_scaled" for column in numerical_columns] 
    assembler_input = [column for column in numerical_columns] 
    vector_assembler = VectorAssembler(inputCols=assembler_input, outputCol="features")
    stages += [vector_assembler]
    
    # Add model
    #classification = RandomForestClassifier(featuresCol='features', labelCol='tx_fraud')
    classification = LogisticRegression(featuresCol='features', labelCol='tx_fraud')
    stages += [classification]

    # Create the pipeline
    pipeline = Pipeline(stages=stages)
    
#     # Fit the pipeline to the transactions DataFrame
#     pipeline_model = pipeline.fit(df)

#     # Transform the data
#     df_transformed = pipeline_model.transform(df)
    
#     #df_transformed.show(truncate=False)

#     # Show the transformed data
#     df_transformed = df_transformed.select("year", "month", "features", "tx_fraud")

#     df_transformed.show(truncate=False)
    
#     return df_transformed
    return pipeline

def calculate_accuracy(predictions):
    predictions = predictions.withColumn(
        "fraudPrediction",
        when((predictions.tx_fraud==1) & (predictions.prediction==1), 1).otherwise(0)
    )

    accurateFraud = predictions.groupBy("fraudPrediction").count().where(predictions.fraudPrediction==1).head()[1]
    totalFraud = predictions.groupBy("tx_fraud").count().where(predictions.tx_fraud==1).head()[1]
    accuracy = (accurateFraud/totalFraud)*100
    return accuracy

# Train and log with mlflow

In [5]:
if __name__ == "__main__":
    spark = (
        pyspark.sql.SparkSession.builder
            #.config('spark.executor.instances', 8)
            .config("spark.executor.cores", 4)
            .appName("fraud_data_train")
            .getOrCreate()
    )
    spark.conf.set('spark.sql.repl.eagerEval.enabled', True)  # to pretty print pyspark.DataFrame in jupyter
    
    df = spark.read.parquet("/user/transformed_full/")
    df_train = df.filter(col('ts').between("2019-09-21", "2019-10-13"))
    df_test = df.filter(col('ts').between("2019-10-21", "2019-10-27"))

    #df.show()
    #print(df.dtypes)
    # Prepare MLFlow experiment for logging
    client = MlflowClient()
    experiment = client.get_experiment_by_name("Fraud_Data")
    experiment_id = experiment.experiment_id

    run_name = 'Fraud_data_pipeline' + ' ' + str(datetime.now())

    with mlflow.start_run(run_name=run_name, experiment_id=experiment_id):
        inf_pipeline = build_train_pipeline()

        print("Fitting new model / inference pipeline ...")
        model = inf_pipeline.fit(df_train)

        print("Scoring the model ...")
        evaluator = BinaryClassificationEvaluator(labelCol='tx_fraud', rawPredictionCol='prediction')

        predictions_train = model.transform(df_train)
        predictions_train.head()
        areaUnderROC_train = evaluator.evaluate(predictions_train)

        predictions_test = model.transform(df_test)
        areaUnderROC_test = evaluator.evaluate(predictions_test)

        run_id = mlflow.active_run().info.run_id
        print(f"Logging metrics to MLflow run {run_id} ...")
        mlflow.log_metric("ROC-train", areaUnderROC_train)
        print(f"Model ROC-train: {areaUnderROC_train}")
        mlflow.log_metric("ROC-test", areaUnderROC_test)
        print(f"Model ROC-test: {areaUnderROC_test}")

        #print("Saving model ...")
        #mlflow.spark.save_model(model, 'fraud_classifier')
        predictions_test = predictions_test.withColumn(
            "fraudPrediction", when((predictions_test.tx_fraud==1) & (predictions_test.prediction==1), 1).otherwise(0))
        predictions_test.groupBy("fraudPrediction").count().show()

        accurateFraud = predictions_test.groupBy("fraudPrediction").count().where(predictions_test.fraudPrediction==1).head()[1]
        totalFraud = predictions_test.groupBy("tx_fraud").count().where(predictions_test.tx_fraud==1).head()[1]
        FraudPredictionAccuracy = (accurateFraud/totalFraud)*100
        print("FraudPredictionAccuracy:", FraudPredictionAccuracy)

        print("Exporting/logging model ...")
        mlflow.spark.log_model(model, 'fraud_classifier', registered_model_name='fraud_classifier')
        print("Done")
    
    spark.stop()

Fitting new model / inference pipeline ...
Scoring the model ...
Logging metrics to MLflow run 3837f9ac5093472ebcc16699699354ad ...
Model ROC-train: 0.8141328175899902
Model ROC-test: 0.872197153901282
+---------------+-------+
|fraudPrediction|  count|
+---------------+-------+
|              1| 473090|
|              0|8927455|
+---------------+-------+

FraudPredictionAccuracy: 74.98224859573712
Exporting/logging model ...


Registered model 'fraud_classifier' already exists. Creating a new version of this model...
2023/06/19 16:33:21 INFO mlflow.tracking._model_registry.client: Waiting up to 300 seconds for model version to finish creation. Model name: fraud_classifier, version 14
Created version '14' of model 'fraud_classifier'.


Done


In [6]:
# new_files = get_new_files_list()
# new_files

In [7]:
# df = read_files(new_files[0])

In [8]:
# explore(df)

In [9]:
# df = preprocess(df)

In [10]:
# # Prepare MLFlow experiment for logging
# client = MlflowClient()
# experiment = client.get_experiment_by_name("Fraud_Data")
# experiment_id = experiment.experiment_id

# run_name = 'Fraud_data_pipeline' + ' ' + str(datetime.now())

# with mlflow.start_run(run_name=run_name, experiment_id=experiment_id):
#     inf_pipeline = build_train_pipeline()
    
#     # Balance classes
#     majority_class = df.filter(df['tx_fraud'] == 0)
#     minority_class = df.filter(df['tx_fraud'] == 1)
#     #sampled_majority_class = majority_class.sample(False, 1/16, seed=42)  # set the fraction based on your needs
#     balanced_df = majority_class.limit(minority_class.count()).union(minority_class)
#     balanced_df = balanced_df.sample(withReplacement=False, fraction=1.0, seed=42)
#     print("Number of items of each class after rebalancing:")
#     balanced_df.groupBy('tx_fraud').count().show()
    
#     train, test = balanced_df.randomSplit([0.9, 0.1], seed=12345)

#     print("Fitting new model / inference pipeline ...")
#     model = inf_pipeline.fit(train)

#     print("Scoring the model ...")
#     evaluator = BinaryClassificationEvaluator(labelCol='tx_fraud', rawPredictionCol='prediction')
    
#     predictions_train = model.transform(train)
#     predictions_train.head()
#     areaUnderROC_train = evaluator.evaluate(predictions_train)
    
#     predictions_test = model.transform(test)
#     areaUnderROC_test = evaluator.evaluate(predictions_test)

#     run_id = mlflow.active_run().info.run_id
#     print(f"Logging metrics to MLflow run {run_id} ...")
#     mlflow.log_metric("ROC-train", areaUnderROC_train)
#     print(f"Model ROC-train: {areaUnderROC_train}")
#     mlflow.log_metric("ROC-test", areaUnderROC_test)
#     print(f"Model ROC-test: {areaUnderROC_test}")

#     #print("Saving model ...")
#     #mlflow.spark.save_model(model, 'fraud_classifier')
#     predictions_test = predictions_test.withColumn(
#         "fraudPrediction", when((predictions_test.tx_fraud==1) & (predictions_test.prediction==1), 1).otherwise(0))
#     predictions_test.groupBy("fraudPrediction").count().show()
    
#     accurateFraud = predictions_test.groupBy("fraudPrediction").count().where(predictions_test.fraudPrediction==1).head()[1]
#     totalFraud = predictions_test.groupBy("tx_fraud").count().where(predictions_test.tx_fraud==1).head()[1]
#     FraudPredictionAccuracy = (accurateFraud/totalFraud)*100
#     print("FraudPredictionAccuracy:", FraudPredictionAccuracy)

#     print("Exporting/logging model ...")
#     mlflow.spark.log_model(model, 'fraud_classifier', registered_model_name='fraud_classifier')
#     print("Done")

In [11]:
# spark.stop()