# Interactive analysis via jupyter notebook

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Transformer
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql import DataFrame
from pyspark.sql.types import FloatType
import math
import pyspark.sql.functions as F
from pyspark.ml.linalg import Vectors, VectorUDT
import os
from pyspark.ml.regression import GBTRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pprint import pprint

## Read Hive tables

### Connect to Hive

In [2]:
TEAM = 'team24'

# location of Hive database in HDFS
WAREHOUSE = "project/hive/warehouse"

spark = SparkSession.builder\
        .appName(f"{TEAM} - spark ML")\
        .master("yarn")\
        .config("hive.metastore.uris",
                "thrift://hadoop-02.uni.innopolis.ru:9883")\
        .config("spark.sql.warehouse.dir", WAREHOUSE)\
        .config("spark.sql.avro.compression.codec", "snappy")\
        .enableHiveSupport()\
        .getOrCreate()

### Read Hive table

In [3]:
transactions = spark.read.format("avro")\
                    .table('team24_projectdb.transactions_part_buck')

### Explore the table

In [4]:
transactions.printSchema()

root
 |-- time: timestamp (nullable = true)
 |-- hour_of_day: integer (nullable = true)
 |-- sending_address: string (nullable = true)
 |-- receiving_address: string (nullable = true)
 |-- amount: float (nullable = true)
 |-- location_region: string (nullable = true)
 |-- ip_prefix: string (nullable = true)
 |-- login_frequency: integer (nullable = true)
 |-- session_duration: integer (nullable = true)
 |-- purchase_pattern: string (nullable = true)
 |-- age_group: string (nullable = true)
 |-- risk_score: float (nullable = true)
 |-- anomaly: string (nullable = true)
 |-- transaction_type: string (nullable = true)



In [5]:
transactions.show(10)

+-------------------+-----------+--------------------+--------------------+---------+---------------+---------+---------------+----------------+----------------+-----------+----------+--------+----------------+
|               time|hour_of_day|     sending_address|   receiving_address|   amount|location_region|ip_prefix|login_frequency|session_duration|purchase_pattern|  age_group|risk_score| anomaly|transaction_type|
+-------------------+-----------+--------------------+--------------------+---------+---------------+---------+---------------+----------------+----------------+-----------+----------+--------+----------------+
|2022-06-25 11:27:37|         14|0x6ea0e02fb6ee893...|0xc28cbdb253f1217...|523.94794|  North America|    172.0|              4|              56|         focused|established|     15.75|low_risk|        transfer|
|2022-11-12 17:47:34|         20|0xd6e251c23cbf52d...|0x51e8fbe24f124e0...|440.97885|         Africa|    172.0|              4|              62|         foc

In [6]:
print("(row, colunm):", (transactions.count(), len(transactions.columns)))

(row, colunm): (78600, 14)


## Preprocessing the data

### Drop data with missing values

In [7]:
transactions = transactions.dropna()

print("row:", transactions.count())

row: 78600


### Class CustomTransformer for preprocessing timestamp data type

In [8]:
def cos_sin_time(list_time):
    output = []
    parm = [0, 12, 31, 24, 60, 60]

    for i in range(len(list_time)):
        if i == 0:
            output.append(float(list_time[i]))
        else:
            cos = math.cos((2*list_time[i]*math.pi)/parm[i])
            sin = math.sin((2*list_time[i]*math.pi)/parm[i])
            output.append(cos)
            output.append(sin)
    return Vectors.dense(output)

In [9]:
class CustomTransformer(Transformer, DefaultParamsReadable,
                        DefaultParamsWritable):

    def _transform(self, dataset: DataFrame):
        input_col = dataset['time']
        transform_udf = F.udf(lambda x: cos_sin_time(
                            list(map(int, ':'.join('-'.join(str(x).split())
                                                   .split('-'))
                                     .split(':')))
                        ), VectorUDT())
        return dataset.withColumn("sin_cos_time", transform_udf(input_col))

### Select the features and preprocessing

In [10]:
categorical_features = ['transaction_type', 'location_region',
                        'purchase_pattern']
numerical_features = ['amount', 'login_frequency', 'session_duration',
                      'ip_prefix']
time = 'time'
label = 'risk_score'

In [11]:
# Convert from string to float type column 'ip_prefix'
ip_prefix_udf = F.udf(lambda x: float(x), FloatType())
transactions = transactions.withColumn('ip_prefix', ip_prefix_udf('ip_prefix'))

In [12]:
# Create string indexer for categorical features
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index",
                          handleInvalid="skip") for col in categorical_features]

# One-hot encode categorical features
onehot_encoders = [OneHotEncoder(inputCol=col+"_index",
                                 outputCol=col+"_onehot")
                   for col in categorical_features]

In [13]:
custom_transformer = CustomTransformer()

In [14]:
# Assemble all features into a single vector
assembler = VectorAssembler(
    inputCols=[col+"_onehot" for col in categorical_features]
    + numerical_features + ["sin_cos_time"] + [label],
    outputCol="features")

### Build the Pipeline

In [15]:
# Create pipeline
pipeline = Pipeline(stages=indexers + onehot_encoders + [custom_transformer]
                    + [assembler])

In [16]:
# Fit pipeline to data
pipeline_model = pipeline.fit(transactions)

In [17]:
# Transform data using the fitted pipeline
transformed_data = pipeline_model.transform(transactions)

In [18]:
transformed_data.head(3)

[Row(time=datetime.datetime(2022, 12, 12, 16, 15, 16), hour_of_day=19, sending_address='0x87cd446adc9d04f59502281dd824caf141bdff9b', receiving_address='0x1f1d8ed2ce1b2cb9718a9a5a0fbb31d6bd92130a', amount=658.8805541992188, location_region='South America', ip_prefix=172.0, login_frequency=2, session_duration=36, purchase_pattern='random', age_group='new', risk_score=94.5, anomaly='high_risk', transaction_type='scam', transaction_type_index=3.0, location_region_index=3.0, purchase_pattern_index=1.0, transaction_type_onehot=SparseVector(4, {3: 1.0}), location_region_onehot=SparseVector(4, {3: 1.0}), purchase_pattern_onehot=SparseVector(2, {1: 1.0}), sin_cos_time=DenseVector([2022.0, 1.0, -0.0, -0.7588, 0.6514, -0.5, -0.866, 0.0, 1.0, -0.1045, 0.9945]), features=DenseVector([0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 1.0, 0.0, 1.0, 658.8806, 2.0, 36.0, 172.0, 2022.0, 1.0, -0.0, -0.7588, 0.6514, -0.5, -0.866, 0.0, 1.0, -0.1045, 0.9945, 94.5])),
 Row(time=datetime.datetime(2022, 8, 28, 10, 15, 22), 

In [19]:
df = transformed_data['features', 'risk_score']

df.head(3)

[Row(features=DenseVector([0.0, 0.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 523.9479, 4.0, 56.0, 172.0, 2022.0, -1.0, 0.0, 0.3473, -0.9378, -0.9659, 0.2588, -0.9511, 0.309, -0.7431, -0.6691, 15.75]), risk_score=15.75),
 Row(features=DenseVector([0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 440.9789, 4.0, 62.0, 172.0, 2022.0, 0.866, -0.5, -0.7588, 0.6514, -0.2588, -0.9659, 0.2079, -0.9781, -0.9135, -0.4067, 15.75]), risk_score=15.75),
 Row(features=DenseVector([0.0, 0.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 246.3355, 3.0, 43.0, 172.16, 2022.0, 0.0, 1.0, 1.0, -0.0, 0.5, -0.866, -0.2079, 0.9781, -0.2079, -0.9781, 15.0]), risk_score=15.0)]

In [20]:
df.count()

78600

In [21]:
df.cache()

DataFrame[features: vector, risk_score: float]

### Split data

In [22]:
# Split the data into training and testing sets
(train_data, test_data) = df.randomSplit([0.7, 0.3], seed=24)

In [23]:
train_data.head(3)

[Row(features=SparseVector(26, {1: 1.0, 4: 1.0, 10: 655.889, 11: 3.0, 12: 61.0, 13: 192.168, 14: 2022.0, 15: 1.0, 16: -0.0, 17: 0.919, 18: -0.3944, 19: 0.2588, 20: -0.9659, 21: 1.0, 23: 1.0, 25: 26.25}), risk_score=26.25),
 Row(features=SparseVector(26, {1: 1.0, 4: 1.0, 10: 810.3839, 11: 3.0, 12: 78.0, 13: 10.0, 14: 2022.0, 15: 0.5, 16: 0.866, 17: -0.4404, 18: 0.8978, 19: 1.0, 21: -0.866, 22: 0.5, 23: 1.0, 25: 42.1875}), risk_score=42.1875),
 Row(features=SparseVector(26, {1: 1.0, 4: 1.0, 10: 1095.52, 11: 4.0, 12: 47.0, 13: 192.0, 14: 2022.0, 15: -1.0, 16: 0.0, 17: -0.6121, 18: 0.7908, 19: 1.0, 21: 0.5878, 22: 0.809, 23: 1.0, 25: 42.1875}), risk_score=42.1875)]

In [24]:
print('train_data, test_data:', (train_data.count(), test_data.count()))

train_data, test_data: (55115, 23485)


### Save train and test data to HDFS

In [25]:
def run(command):
    return os.popen(command).read()

In [26]:
# train_data.select("features", "risk_score")\
#     .coalesce(1)\
#     .write\
#     .mode("overwrite")\
#     .format("json")\
#     .save("project/data/train")

In [27]:
# test_data.select("features", "risk_score")\
#     .coalesce(1)\
#     .write\
#     .mode("overwrite")\
#     .format("json")\
#     .save("project/data/test")

In [28]:
# run("hdfs dfs -cat project/data/train/*.json >\
#     ~/project/bigdata_project_team24/data/train.json")
# run("hdfs dfs -cat project/data/test/*.json >\
#     ~/project/bigdata_project_team24/data/test.json")

## Model 1 (GBTRegressor)

### Model training

In [29]:
gbt = GBTRegressor(featuresCol="features", labelCol="risk_score")

In [30]:
model_gbt = gbt.fit(train_data)

### Prediction

In [31]:
predictions_gbt = model_gbt.transform(test_data)

In [32]:
predictions_gbt.show(10)

+--------------------+----------+-----------------+
|            features|risk_score|       prediction|
+--------------------+----------+-----------------+
|(26,[1,6,10,11,12...|   42.1875|42.18779470018738|
|(26,[1,7,10,11,12...|   35.4375|35.42826153034686|
|(26,[1,7,10,11,12...|   42.1875|42.18779470018738|
|(26,[1,7,10,11,12...|     33.75|33.75644408210856|
|(26,[1,10,11,12,1...|      25.0|24.99801837271904|
|(26,[1,10,11,12,1...|      25.0|24.99801837271904|
|(26,[1,10,11,12,1...|     33.75|33.75644408210856|
|(26,[1,10,11,12,1...|      25.0|24.99801837271904|
|(26,[1,10,11,12,1...|   35.4375|35.42826153034686|
|(26,[1,10,11,12,1...|     26.25|26.24783764372823|
+--------------------+----------+-----------------+
only showing top 10 rows



### Evaluation

In [33]:
evaluator_rmse = RegressionEvaluator(labelCol="risk_score",
                                     predictionCol="prediction",
                                     metricName="rmse")

evaluator_r2 = RegressionEvaluator(labelCol="risk_score",
                                   predictionCol="prediction", metricName="r2")

In [34]:
rmse_gbt = evaluator_rmse.evaluate(predictions_gbt)
r2_gbt = evaluator_r2.evaluate(predictions_gbt)

In [35]:
print("RMSE for GBTRegressor:", rmse_gbt)
print("R2 for GBTRegressor:", r2_gbt)

RMSE for GBTRegressor: 0.015357750672813851
R2 for GBTRegressor: 0.9999995020166124


### Hyperparameter optimization

In [36]:
grid = ParamGridBuilder()

In [37]:
param_grid_gbt = grid.addGrid(model_gbt.maxDepth, [2, 4, 6]) \
    .addGrid(model_gbt.maxBins, [16, 32, 64]) \
    .build()

In [38]:
cv_gbt = CrossValidator(estimator=gbt,
                        estimatorParamMaps=param_grid_gbt,
                        evaluator=evaluator_rmse,
                        parallelism=5,
                        numFolds=3)

In [39]:
cvModel_gbt = cv_gbt.fit(train_data)

### Select the best model

In [40]:
model_best_gbt = cvModel_gbt.bestModel
model_best_gbt

GBTRegressionModel: uid=GBTRegressor_8e7274f1a275, numTrees=20, numFeatures=26

In [41]:
pprint(model_best_gbt.extractParamMap())

{Param(parent='GBTRegressor_8e7274f1a275', name='validationTol', doc='Threshold for stopping early when fit with validation is used. If the error rate on the validation input changes by less than the validationTol, then learning will stop early (before `maxIter`). This parameter is ignored when fit without validation is used.'): 0.01,
 Param(parent='GBTRegressor_8e7274f1a275', name='lossType', doc='Loss function which GBT tries to minimize (case-insensitive). Supported options: squared, absolute'): 'squared',
 Param(parent='GBTRegressor_8e7274f1a275', name='labelCol', doc='label column name.'): 'risk_score',
 Param(parent='GBTRegressor_8e7274f1a275', name='seed', doc='random seed.'): -67477362756987781,
 Param(parent='GBTRegressor_8e7274f1a275', name='checkpointInterval', doc='set checkpoint interval (>= 1) or disable checkpoint (-1). E.g. 10 means that the cache will get checkpointed every 10 iterations. Note: this setting will be ignored if the checkpoint directory is not set in the 

### Save the model to HDFS

In [42]:
# model_best_gbt.write().overwrite().save("project/models/model1")

# # Run it from root directory of the repository
# run("hdfs dfs -get project/models/model1\
#     ~/project/bigdata_project_team24/models/model1")

### Prediction for the best model

In [43]:
predictions_best_gbt = model_best_gbt.transform(test_data)
predictions_best_gbt.show()

+--------------------+----------+----------+
|            features|risk_score|prediction|
+--------------------+----------+----------+
|(26,[1,6,10,11,12...|   42.1875|   42.1875|
|(26,[1,7,10,11,12...|   35.4375|   35.4375|
|(26,[1,7,10,11,12...|   42.1875|   42.1875|
|(26,[1,7,10,11,12...|     33.75|     33.75|
|(26,[1,10,11,12,1...|      25.0|      25.0|
|(26,[1,10,11,12,1...|      25.0|      25.0|
|(26,[1,10,11,12,1...|     33.75|     33.75|
|(26,[1,10,11,12,1...|      25.0|      25.0|
|(26,[1,10,11,12,1...|   35.4375|   35.4375|
|(26,[1,10,11,12,1...|     26.25|     26.25|
|(26,[1,10,11,12,1...|     26.25|     26.25|
|(26,[1,10,11,12,1...|     31.25|     31.25|
|(26,[1,10,11,12,1...|     33.75|     33.75|
|(26,[1,10,11,12,1...|     26.25|     26.25|
|(26,[1,10,11,12,1...|     26.25|     26.25|
|(26,[1,10,11,12,1...|     26.25|     26.25|
|(26,[1,10,11,12,1...|     26.25|     26.25|
|(26,[1,10,11,12,1...|     26.25|     26.25|
|(26,[1,10,11,12,1...|     31.25|     31.25|
|(26,[1,10

### Save prediction of the best model to HDFS

In [44]:
# predictions_best_gbt.select("risk_score", "prediction")\
#     .coalesce(1)\
#     .write\
#     .mode("overwrite")\
#     .format("csv")\
#     .option("sep", ",")\
#     .option("header", "true")\
#     .save("project/output/model1_predictions.csv")

# # Run it from root directory of the repository
# run("hdfs dfs -cat project/output/model1_predictions.csv/*.csv >\
#     ~/project/bigdata_project_team24/output/model1_predictions.csv")

### Evaluation for the best model 

In [45]:
rmse_best_gbt = evaluator_rmse.evaluate(predictions_best_gbt)
r2_best_gbt = evaluator_r2.evaluate(predictions_best_gbt)

print("RMSE for the best GBTRegressor:", rmse_best_gbt)
print("R2 for the best GBTRegressor:", r2_best_gbt)

RMSE for the best GBTRegressor: 0.0009687880561017027
R2 for the best GBTRegressor: 0.9999999980183946


## Model 2 (RandomForestRegressor)

### Model training

In [46]:
rf = RandomForestRegressor(featuresCol="features", labelCol="risk_score")

In [47]:
model_rf = rf.fit(train_data)

### Prediction

In [48]:
predictions_rf = model_rf.transform(test_data)

In [49]:
predictions_rf.show(10)

+--------------------+----------+------------------+
|            features|risk_score|        prediction|
+--------------------+----------+------------------+
|(26,[1,6,10,11,12...|   42.1875| 37.80712326073164|
|(26,[1,7,10,11,12...|   35.4375| 34.58019175022808|
|(26,[1,7,10,11,12...|   42.1875| 37.86082857042029|
|(26,[1,7,10,11,12...|     33.75|34.300774733960836|
|(26,[1,10,11,12,1...|      25.0| 26.57898599406077|
|(26,[1,10,11,12,1...|      25.0| 26.57898599406077|
|(26,[1,10,11,12,1...|     33.75|34.300774733960836|
|(26,[1,10,11,12,1...|      25.0| 26.57898599406077|
|(26,[1,10,11,12,1...|   35.4375| 34.56257266818231|
|(26,[1,10,11,12,1...|     26.25|26.954319699919314|
+--------------------+----------+------------------+
only showing top 10 rows



### Evaluation

In [50]:
rmse_rf = evaluator_rmse.evaluate(predictions_rf)
r2_rf = evaluator_r2.evaluate(predictions_rf)

In [51]:
print("RMSE for RandomForestRegressor:", rmse_rf)
print("R2 for RandomForestRegressor:", r2_rf)

RMSE for RandomForestRegressor: 1.6453683885199653
R2 for RandomForestRegressor: 0.9942840828107031


### Hyperparameter optimization

In [52]:
param_grid_rf = grid.addGrid(model_rf.maxDepth, [2, 4, 6]) \
    .addGrid(model_rf.maxBins, [16, 32, 64]) \
    .build()

In [53]:
cv_rf = CrossValidator(estimator=rf,
                       estimatorParamMaps=param_grid_rf,
                       evaluator=evaluator_rmse,
                       parallelism=5,
                       numFolds=3)

In [54]:
cvModel_rf = cv_rf.fit(train_data)

### Select the best model

In [55]:
model_best_rf = cvModel_rf.bestModel
model_best_rf

RandomForestRegressionModel: uid=RandomForestRegressor_0907e22566f2, numTrees=20, numFeatures=26

In [56]:
pprint(model_best_rf.extractParamMap())

{Param(parent='RandomForestRegressor_0907e22566f2', name='numTrees', doc='Number of trees to train (>= 1).'): 20,
 Param(parent='RandomForestRegressor_0907e22566f2', name='predictionCol', doc='prediction column name.'): 'prediction',
 Param(parent='RandomForestRegressor_0907e22566f2', name='maxMemoryInMB', doc='Maximum memory in MB allocated to histogram aggregation. If too small, then 1 node will be split per iteration, and its aggregates may exceed this size.'): 256,
 Param(parent='RandomForestRegressor_0907e22566f2', name='minInfoGain', doc='Minimum information gain for a split to be considered at a tree node.'): 0.0,
 Param(parent='RandomForestRegressor_0907e22566f2', name='minInstancesPerNode', doc='Minimum number of instances each child must have after split. If a split causes the left or right child to have fewer than minInstancesPerNode, the split will be discarded as invalid. Should be >= 1.'): 1,
 Param(parent='RandomForestRegressor_0907e22566f2', name='minWeightFractionPerNo

### Save the model to HDFS

In [57]:
# model_best_rf.write().overwrite().save("project/models/model2")

# # Run it from root directory of the repository
# run("hdfs dfs -get project/models/model2 models/model2")

### Prediction for the best model

In [58]:
predictions_best_rf = model_best_rf.transform(test_data)
predictions_best_rf.show()

+--------------------+----------+------------------+
|            features|risk_score|        prediction|
+--------------------+----------+------------------+
|(26,[1,6,10,11,12...|   42.1875| 41.80924893126178|
|(26,[1,7,10,11,12...|   35.4375|  34.5702190284223|
|(26,[1,7,10,11,12...|   42.1875|  41.4304731246282|
|(26,[1,7,10,11,12...|     33.75|34.318865019411376|
|(26,[1,10,11,12,1...|      25.0| 25.75739834806202|
|(26,[1,10,11,12,1...|      25.0|25.756682081417104|
|(26,[1,10,11,12,1...|     33.75| 34.41574781686634|
|(26,[1,10,11,12,1...|      25.0|25.756682081417104|
|(26,[1,10,11,12,1...|   35.4375| 34.59032502488314|
|(26,[1,10,11,12,1...|     26.25|26.294364491549032|
|(26,[1,10,11,12,1...|     26.25| 26.29508075819395|
|(26,[1,10,11,12,1...|     31.25|31.058805305814793|
|(26,[1,10,11,12,1...|     33.75| 34.39690777695381|
|(26,[1,10,11,12,1...|     26.25|26.299416215326584|
|(26,[1,10,11,12,1...|     26.25|  26.2915184380855|
|(26,[1,10,11,12,1...|     26.25| 26.313737991

### Save prediction of the best model to HDFS

In [59]:
# predictions_best_rf.select("risk_score", "prediction")\
#     .coalesce(1)\
#     .write\
#     .mode("overwrite")\
#     .format("csv")\
#     .option("sep", ",")\
#     .option("header","true")\
#     .save("project/output/model2_predictions.csv")

# # Run it from root directory of the repository
# run("hdfs dfs -cat project/output/model2_predictions.csv/*.csv >\
#     output/model2_predictions.csv")

### Evaluation for the best model 

In [60]:
rmse_best_rf = evaluator_rmse.evaluate(predictions_best_rf)
r2_best_rf = evaluator_r2.evaluate(predictions_best_rf)

print("RMSE for the best RandomForestRegressor:", rmse_best_rf)
print("R2 for the best RandomForestRegressor:", r2_best_rf)

RMSE for the best RandomForestRegressor: 1.1010680138340212
R2 for the best RandomForestRegressor: 0.9974403067522074


## Compare the best models

### Create data frame to compare performance of the models

In [61]:
models = [[str(model_best_gbt), rmse_best_gbt, r2_best_gbt],
          [str(model_best_rf), rmse_best_rf, r2_best_rf]]

In [62]:
comp_models = spark.createDataFrame(models, ["model", "RMSE", "R2"])
comp_models.show(truncate=False)

+------------------------------------------------------------------------------------------------+--------------------+------------------+
|model                                                                                           |RMSE                |R2                |
+------------------------------------------------------------------------------------------------+--------------------+------------------+
|GBTRegressionModel: uid=GBTRegressor_8e7274f1a275, numTrees=20, numFeatures=26                  |9.687880561017027E-4|0.9999999980183946|
|RandomForestRegressionModel: uid=RandomForestRegressor_0907e22566f2, numTrees=20, numFeatures=26|1.1010680138340212  |0.9974403067522074|
+------------------------------------------------------------------------------------------------+--------------------+------------------+



### Save comparation models to HDFS

In [63]:
# comp_models.coalesce(1)\
#     .write\
#     .mode("overwrite")\
#     .format("csv")\
#     .option("sep", ",")\
#     .option("header", "true")\
#     .save("project/output/evaluation.csv")


# run("hdfs dfs -cat project/output/evaluation.csv/*.csv >\
#     output/evaluation.csv")