# FIT5202 Assignment 2A : Building Models for eCommerce Fraud Detection

## Table of Contents
*  
    * [Part 1 : Data Loading, Transformation and Exploration](#part-1)
    * [Part 2 : Feature extraction and ML training](#part-2)
    * [Part 3 : Customer Segmentation and Knowledge sharing with K-Mean](#part-3)
    * [Part 4 : Data Ethics, Privacy, and Security](#part-4)
 
Please add code/markdown cells if needed.

# Part 1: Data Loading, Transformation and Exploration <a class="anchor" name="part-1"></a>
## 1.1 Data Loading
In this section, you must load the given datasets into PySpark DataFrames and use DataFrame functions to process the data. Spark SQL usage is discouraged, and you can only use pandas to format results. For plotting, various visualisation packages can be used, but please ensure that you have included instructions to install the additional packages and that the installation will be successful in the provided docker container (in case your marker needs to clear the notebook and rerun it).

### 1.1.1 Data Loading <a class="anchor" name="1.1"></a>
1.1.1 Write the code to create a SparkSession. For creating the SparkSession, you need to use a SparkConf object to configure the Spark app with a proper application name, to ensure the maximum partition size does not exceed 16MB, and to run locally with all CPU cores on your machine (note: if you have insufficient RAM, reducing the number of cores is acceptable.)  (2%)

In [1]:
from pyspark import SparkConf
master = "local[*]"
app_name = "FIT5202 A2A"
spark_conf = SparkConf().setMaster(master).setAppName(app_name)
from pyspark import SparkContext
from pyspark.sql import SparkSession 
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

1.1.2 Write code to define the schemas for the category, customer, product, browsing behaviour and transaction datasets, following the data types suggested in the metadata file. (3%)

In [2]:
from pyspark.sql.types import *

category_schema = StructType([ 
    StructField("category_id", IntegerType(), True), 
    StructField("cat_level1", StringType(), True), 
    StructField("cat_level2", StringType(), True),
    StructField("cat_level3", StringType(), True)
])

customer_schema = StructType([ 
    StructField("customer_id", IntegerType(), True), 
    StructField("first_name", StringType(), True), 
    StructField("last_name", StringType(), True),
    StructField("username", StringType(), True),
    StructField("email", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("birthdate", DateType(), True),
    StructField("first_join_date", DateType(), True)
])

product_schema = StructType([ 
    StructField("id", IntegerType(), True), 
    StructField("gender", StringType(), True), 
    StructField("baseColour", StringType(), True),
    StructField("season", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("usage", StringType(), True),
    StructField("productDisplayName", StringType(), True),
    StructField("category_id", IntegerType(), True)
])

browsing_behaviour_schema = StructType([ 
    StructField("session_id", StringType(), True), 
    StructField("event_type", StringType(), True), 
    StructField("event_time", TimestampType(), True),
    StructField("traffic_source", StringType(), True),
    StructField("device_type", StringType(), True)
])

transaction_schema = StructType([
    StructField("created_at", TimestampType(), True),
    StructField("customer_id", IntegerType(), True),  
    StructField("transaction_id", StringType(), True),  
    StructField("session_id", StringType(), True),  
    StructField("product_metadata", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("payment_status", StringType(), True),
    StructField("promo_amount", DoubleType(), True),
    StructField("promo_code", StringType(), True),
    StructField("shipment_fee", DoubleType(), True),
    StructField("shipment_location_lat", DoubleType(), True),
    StructField("shipment_location_long", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("clear_payment", IntegerType(), True)
])

1.1.3 Using predefined schemas, write code to load the CSV files into separate data frames. Print the schemas of all data frames. (2%)

In [25]:
df_category = spark.read.format('csv')\
    .option('header', True).option('escape', '"')\
    .schema(category_schema)\
    .load('category.csv')

df_customers = spark.read.format('csv')\
    .option('header', True).option('escape', '"')\
    .schema(customer_schema)\
    .load('customer.csv')

df_transactions = spark.read.format('csv')\
    .option('header', True).option('escape', '"')\
    .schema(transaction_schema)\
    .load('transactions.csv')

df_product = spark.read.format('csv')\
    .option('header', True).option('escape', '"')\
    .schema(product_schema)\
    .load('product.csv')

df_browsing_behaviour = spark.read.format('csv')\
    .option('header', True).option('escape', '"')\
    .schema(browsing_behaviour_schema)\
    .load('browsing_behaviour.csv')

df_customer_session = spark.read.format('csv')\
    .option('header', True).option('escape', '"')\
    .load('customer_session.csv')

df_fraud_transactions = spark.read.format('csv')\
    .option('header', True).option('escape', '"')\
    .load('fraud_transaction.csv')


In [26]:
df_category.printSchema()
df_customers.printSchema()
df_transactions.printSchema()
df_product.printSchema()
df_browsing_behaviour.printSchema()
df_customer_session.printSchema()
df_fraud_transactions.printSchema()

root
 |-- category_id: integer (nullable = true)
 |-- cat_level1: string (nullable = true)
 |-- cat_level2: string (nullable = true)
 |-- cat_level3: string (nullable = true)

root
 |-- customer_id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- username: string (nullable = true)
 |-- email: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- birthdate: date (nullable = true)
 |-- first_join_date: date (nullable = true)

root
 |-- created_at: timestamp (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- transaction_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- product_metadata: string (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- payment_status: string (nullable = true)
 |-- promo_amount: double (nullable = true)
 |-- promo_code: string (nullable = true)
 |-- shipment_fee: double (nullable = true)
 |-- shipment_location_lat: double (null

### 1.2 Data Transformation to Create Features <a class="anchor" name="1.2"></a>
In the browsing behaviour dataset, there are 10 types of events:  
VC(Viewing Category), VI(Viewing Item), VP(Viewing Promotion), AP(Add Promotion), CL(Click on a product/category) , ATC(Add a product to Shopping Cart), CO(CheckOut), HP(View HomePage), SCR(Mouse Scrolling), SER(Search for a product/category)  
We categorise them into three different levels:  
L1(actions that are highly likely lead to a purchase): AP, ATC, CO  
L2(actions may lead to purchase): VC, VP, VI, SER  
L3(not very important - just browsing):  SCR, HP, CL  
Perform the following tasks based on the loaded data frames and create a new data frame.  

1.2.1 For each transaction (linked to a browsing session), count the number of actions in each level and create 3 columns(L1_count, L2_count, L3_count).

In [None]:
from pyspark.sql import functions as F

# first join transaction and browsing session with 'session_id'. 
df_transbrows_session = df_transactions.join(df_browsing_behaviour, on="session_id", how="left")
# df_transbrows_session.select("transaction_id", "session_id", "event_type").show()

L1_events = ['AP', 'ATC', 'CO']
L2_events = ['VC', 'VP', 'VI', 'SER']
L3_events = ['SCR', 'HP', 'CL']

df_transbrows_session = df_transbrows_session.withColumn(
    "event_level", 
    F.when(F.col("event_type").isin(L1_events), "L1")
     .when(F.col("event_type").isin(L2_events), "L2")
     .when(F.col("event_type").isin(L3_events), "L3")
     .otherwise("Unknown")
)
df_transbrows_session_agg = df_transbrows_session.groupBy("transaction_id").agg(
    F.sum(F.when(F.col("event_level") == "L1", 1).otherwise(0)).alias("L1_count"),
    F.sum(F.when(F.col("event_level") == "L2", 1).otherwise(0)).alias("L2_count"),
    F.sum(F.when(F.col("event_level") == "L3", 1).otherwise(0)).alias("L3_count")
)
df_transbrows_session_agg.show(truncate=False)

In [None]:
# checking if aggregation done right by using sample transaction id
df_test = df_transbrows_session.filter(F.col("transaction_id") == "6211f29a-8435-4e22-a56c-0d8e5a114e48")
df_test.select("transaction_id", "event_type", "event_level").show(truncate=False)

1.2.2 Create two columns with a percentage ratio of L1 and L2 actions. (i.e. L1 ratio = L1/(L1+L2+L3) * 100%)

L1 ratio = L1/(L1+L2+L3) * 100% \
L2 ratio = L2/(L1+L2+L3) * 100%

In [None]:
df_event_ratio = df_transbrows_session_agg.withColumn("L1_ratio", (F.col("L1_count") / (F.col("L1_count") + F.col("L2_count") + F.col("L3_count"))) * 100
).withColumn( "L2_ratio", (F.col("L2_count") / (F.col("L1_count") + F.col("L2_count") + F.col("L3_count"))) * 100
)

df_event_ratio.select("transaction_id", "L1_count", "L2_count", "L3_count", "L1_ratio", "L2_ratio").show()

1.2.3 For each unique browsing session, based on event_time, extract the time of day as 4 groups: morning(6am-11:59am), afternoon(12pm-5:59pm), evening(6pm-11:59pm), night(12am-5:59am), add a column. (note: use medium time if a browsing session spans across different groups. For example, if a session starts at 10 am and ends at 1 pm, use 11:30 => (10+13)/2).

In [None]:
df_browsing_session

def extract_time(event_time):
    


In [None]:
df_test = df_browsing_behaviour.filter(F.col("session_id") == "c9718135-8134-42b2-8e1e-2737fd6b49b1")
df_test.select("session_id", "event_time").orderBy("event_time").show(truncate=False)

In [None]:
from pyspark.sql import functions as F

# Step 1: Extract the date from the event_time
df_transbrows_session = df_transbrows_session.withColumn("event_date", F.to_date(F.col("event_time")))

# Step 2: Group by session_id and event_date, and calculate min and max event_time for each date
df_session_time = df_transbrows_session.groupBy("session_id", "event_date").agg(
    F.min("event_time").alias("min_time"),
    F.max("event_time").alias("max_time")
)

# Step 3: Filter for a specific session to check results
df_test = df_session_time.filter(F.col("session_id") == "d31b9d4b-126a-49c4-be4b-8e5d7f70804d")

# Show the result
df_test.show(truncate=False)


In [None]:
df_browsing_behaviour.show(truncate=False)

1.2.4 Join data frames to find customer information and add columns to feature_df: gender, age, geolocation, first join year. (note: For some columns, you need to perform transformations. For age, keep the integer only by rounding.)

In [18]:
# determine geolocation using shipment long lat
df_geolocation = df_transactions.select("customer_id", "shipment_location_lat", "shipment_location_long")
# df_geolocation.show(truncate=False)

In [19]:
# pip install geopy
# pip install ratelimit

In [20]:
# pip install geopy - find customer address base don their shipment latlong
import time
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter
from pyspark.sql.functions import udf

def find_address(shipment_location_lat, shipment_location_long):
    geolocator = Nominatim(user_agent="findAddress")
    location = geolocator.reverse((shipment_location_lat, shipment_location_long),timeout=10)
    return location.address if location else None  

address_udf = udf(find_address,StringType())
df_geolocation2 = df_geolocation.withColumn('geolocation', address_udf('shipment_location_lat', 'shipment_location_long'))
#df_customers2 = df_customers.join(df_geolocation2, on="customer_id", how="left").select("customer_id", "birthdate","first_join_date","geolocation","gender")

In [21]:
df_geolocation2.write.csv("output")

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/opt/conda/lib/python3.10/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/opt/conda/lib/python3.10/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [29]:
from pyspark.sql.functions import year
df_customers2 = df_customers.withColumn('first_join_year', year(df_customers['first_join_date']))

from datetime import datetime, date
from pyspark.sql.functions import udf

def age(birthdate):
    birthdate = str(birthdate)
    dob_date = datetime.strptime(birthdate, "%Y-%m-%d").date()
    today = date.today()
    return today.year - dob_date.year - ((today.month, today.day) < (dob_date.month, dob_date.day))
age_udf = udf(age,IntegerType())
df_customers_age = df_customers2.withColumn('age', age_udf('birthdate')).select('customer_id','gender','age', 'first_join_year')
df_customers_age.show()

+-----------+------+---+---------------+
|customer_id|gender|age|first_join_year|
+-----------+------+---+---------------+
|       2870|     F| 28|           2019|
|       8193|     F| 31|           2017|
|       7279|     M| 35|           2020|
|      88813|     M| 33|           2021|
|      82542|     M| 24|           2021|
|       5440|     F| 35|           2021|
|      90319|     M| 34|           2019|
|      96453|     F| 19|           2022|
|       8031|     F| 28|           2019|
|      61533|     M| 37|           2020|
|      72203|     M| 42|           2017|
|      74362|     F| 28|           2022|
|       9152|     F| 28|           2019|
|      22199|     M| 38|           2019|
|      94370|     F| 26|           2018|
|      73093|     F| 32|           2021|
|      72106|     F| 23|           2021|
|      97883|     F| 25|           2017|
|       3434|     M| 26|           2017|
|      31163|     F| 21|           2020|
+-----------+------+---+---------------+
only showing top

In [31]:
df_geolocation = df_transactions.select("customer_id", "shipment_location_lat", "shipment_location_long")
df_custom_geo = df_customers_age.join(df_geolocation, on="customer_id", how="inner")
df_custom_geo.show()

+-----------+------+---+---------------+---------------------+----------------------+
|customer_id|gender|age|first_join_year|shipment_location_lat|shipment_location_long|
+-----------+------+---+---------------+---------------------+----------------------+
|      14159|     F| 30|           2019|    -4.26351275671241|      105.489401701251|
|      22576|     F| 32|           2020|    -7.91707661186231|       110.13187555325|
|      18696|     F| 28|           2020|    -7.39661418330981|      109.511262594032|
|      90136|     F| 24|           2017|   -0.637290541399757|      109.492521253314|
|      18960|     F| 24|           2018|    -7.32004136393024|      111.225797135699|
|      60646|     F| 25|           2018|    -4.52328589944563|      105.385799510518|
|       5901|     F| 26|           2018|    -7.43210236666926|      111.096960686913|
|      69072|     F| 22|           2017|    -6.26355191799179|      106.859716713089|
|      92076|     F| 46|           2017|   -0.42055695

In [35]:
# pip install geopy - find customer address base don their shipment latlong
import time
from geopy.geocoders import Nominatim
#from geopy.extra.rate_limiter import RateLimiter
from pyspark.sql.functions import udf

def find_address(shipment_location_lat, shipment_location_long):
    geolocator = Nominatim(user_agent="abcd")
    location = geolocator.reverse((shipment_location_lat, shipment_location_long),timeout=None)
    return location.address if location else None  

address_udf = udf(find_address,StringType())
feature_df = df_custom_geo.withColumn('geolocation', address_udf('shipment_location_lat', 'shipment_location_long'))
#feature_df = feature_df.select('gender', 'age', 'geolocation', 'first_join_year')
# df_customers2 = df_customers.join(df_geolocation2, on="customer_id", how="left").select("customer_id", "birthdate","first_join_date","geolocation","gender")

In [None]:
feature_df.show()

1.2.5 Join data frames to find out the number of purchases the customer has made, add a column.

1.2.6 Attach the transaction labels for fraud/non-fraud.

### 1.3 Exploring the Data <a class="anchor" name="1.3"></a>
**1.3.1 With the feature_df, write code to show the basic statistics: a) For each numeric column, show count, mean, stddev, min, max, 25 percentile, 50 percentile, 75 percentile; b) For each non-numeric column, display the top-5 values and the corresponding counts; c) For each boolean column, display the value and count. (3%)**

In [38]:
feature_df.summary("count", "min", "25%", "75%", "max").show()

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/tmp/ipykernel_9705/4098316154.py", line 11, in find_address
  File "/opt/conda/lib/python3.10/site-packages/geopy/geocoders/nominatim.py", line 372, in reverse
    return self._call_geocoder(url, callback, timeout=timeout)
  File "/opt/conda/lib/python3.10/site-packages/geopy/geocoders/base.py", line 391, in _call_geocoder
    raise
  File "/opt/conda/lib/python3.10/site-packages/geopy/adapters.py", line 472, in get_json
    resp = self._request(url, timeout=timeout, headers=headers)
  File "/opt/conda/lib/python3.10/site-packages/geopy/adapters.py", line 494, in _request
    raise GeocoderUnavailable(message)
geopy.exc.GeocoderUnavailable: HTTPSConnectionPool(host='nominatim.openstreetmap.org', port=443): Max retries exceeded with url: /reverse?lat=-6.12349294556289&lon=106.752524532902&format=json&addressdetails=1 (Caused by NewConnectionError('<urllib3.connection.HTTPSConnection object at 0x7f1a1c5a8f40>: Failed to establish a new connection: [Errno 101] Network is unreachable'))


**1.3.2 Explore the dataframe and write code to present two plots worthy of presentation to the company, describe your plots and discuss the findings from the plots. (8%)**
One of the plots needs to be based on feature_df in regard to fraudulent behaviour; you’re free to choose the other one.  
Hint 1: You can use basic plots (e.g., histograms, line charts, scatter plots) to show the relationship between a column and the label or more advanced plots like correlation plots.  
Hint 2: If your data is too large for plotting, consider using sampling before plotting.  
150 words max for each plot’s description and discussion  
Feel free to use any plotting libraries: matplotlib, seabon, plotly, etc.  



## Part 2. Feature extraction and ML training <a class="anchor" name="part-2"></a>
In this section, you must use PySpark DataFrame functions and ML packages for data preparation, model building, and evaluation. Other ML packages, such as scikit-learn, would receive zero marks.
### 2.1 Discuss the feature selection and prepare the feature columns

2.1.1 Based on the data exploration from 1.2 and considering the use case, discuss the importance of those features (For example, which features may be useless and should be removed, which feature has a significant impact on the label column, which should be transformed), which features you are planning to use? Discuss the reasons for selecting them and how you create/transform them
300 words max for the discussion
Please only use the provided data for model building
You can create/add additional features based on the dataset
Hint - Use the insights from the data exploration/domain knowledge/statistical models to consider whether to create more feature columns, whether to remove some columns

2.1.2 Write code to create/transform the columns based on your discussion above
Hint: You can use one data frame for both use cases (classification and k-mean later in part 3) since you can select your desired columns as the input and output for each use case. 

### 2.2 Preparing Spark ML Transformers/Estimators for features, labels, and models  <a class="anchor" name="2.2"></a>

**2.2.1 Write code to create Transformers/Estimators for transforming/assembling the columns you selected above in 2.1 and create ML model Estimators for Random Forest (RF) and Gradient-boosted tree (GBT) model.
Please DO NOT fit/transform the data yet.**

**2.2.2. Write code to include the above Transformers/Estimators into two pipelines.
Please DO NOT fit/transform the data yet.**

### 2.3 Preparing the training data and testing data  
Write code to split the data for training and testing purposes.
Note: Due to the large dataset size, you can use random sampling (say 20% of the dataset) and do a train/test split or use one year of data for training and another year for testing. 

### 2.4 Training and evaluating models  
2.4.1 Write code to use the corresponding ML Pipelines to train the models on the training data from 2.3. And then use the trained models to predict the testing data from 2.3

2.4.2 For both models (RF and GBT) and testing data, write code to display the count of TP/TN/FP/FN. Compute the AUC, accuracy, recall, and precision for the above-threshold/below-threshold label from each model testing result using PySpark MLlib/ML APIs.
Draw a ROC plot.
Discuss which one is the better model (no word limit; please keep it concise)

2.4.3 Save the better model (you need it for Part B of Assignment 2).
(Note: You may need to go through a few training loops or use more data to create a better-performing model.)

### Part 3. Customer Clustering and Knowledge sharing with K-Mean <a class="anchor" name="part-3"></a>  
Please see the specification for this task and add code/markdown cells.

### Part 4: Data Ethics, Privacy, and Security <a class="anchor" name="part-4"></a>  
Please see the specification for this task and add markdown cells(word limit: 500).

## References:
Please add your references below: