<a href="https://colab.research.google.com/github/kathirvel-G/pyspark/blob/main/fraud_detection.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Fraud Detection in Orders

This project focuses on detecting fraudulent orders based on unusual patterns in the data by using anomaly detection, decision trees, and random forests to classify fraud(fake).

## Goal:

To prevent financial losses due to fraudulent transactions.

## Libraries:

We'll use PySpark for distributed processing and MLlib for machine learning.

## Environment Setup

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.3.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.3-py2.py3-none-any.whl size=317840625 sha256=8a98ebff93a4a53614d8e9f5e3e14555ed5150fc12568e89d3f7944d30d409f7
  Stored in directory: /root/.cache/pip/wheels/1b/3a/92/28b93e2fbfdbb07509ca4d6f50c5e407f48dce4ddbda69a4ab
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.3


# Importing necessary libraries

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, isnan, count, avg, stddev, abs, dayofweek, hour, countDistinct, sum as spark_sum, count as spark_count
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.feature import StringIndexer

# Spark session

In [None]:
spark = SparkSession.builder \
    .appName("Fraud Detection in Orders") \
    .getOrCreate()

#Data Loading and Exploration

In [None]:

data_path = "/Copy of Sales - JAN 2015 TO JUNE 2015- ALL CATEGORIES -EB^JEO_ER (version 1)_CAP.csv"

df = spark.read.csv(data_path, header=True, inferSchema=True)

df.show(5)

+----+----------+-----------+-------+------------+----------+--------------------+-----------+----------+---------+--------------+---------+-------------+----------+----+------+----------+-----------+------+------+--------------+-------------+-------+---+--------+----+--------+-----------------+-----------------+--------------------+-----------+--------+-------+-------+
|SORG|    REGION|      STATE|CHANNEL|BILLING TYPE|STORE CODE|                NAME|INVOICE NOS|      DATE|      DAY|         BRAND|SUB BRAND|MATERIAL TYPE|STYLE CODE|SIZE|SLEEVE|BASIC/CORE|     COLOUR|SEASON|GENDER|SUPPLIER STYLE|SUPPLIER SIZE|BARCODE|QTY|UNIT MRP| MRP|DISCOUNT|MANUALDISC REASON|MANUALDISC REMARK|       SCHEME REASON|TAXABLE AMT|TAX RATE|TAX AMT|NET AMT|
+----+----------+-----------+-------+------------+----------+--------------------+-----------+----------+---------+--------------+---------+-------------+----------+----+------+----------+-----------+------+------+--------------+-------------+-------+---

In [None]:

df.printSchema()

root
 |-- SORG: string (nullable = true)
 |-- REGION: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- CHANNEL: string (nullable = true)
 |-- BILLING TYPE: string (nullable = true)
 |-- STORE CODE: integer (nullable = true)
 |-- NAME: string (nullable = true)
 |-- INVOICE NOS: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- BRAND: string (nullable = true)
 |-- SUB BRAND: string (nullable = true)
 |-- MATERIAL TYPE: string (nullable = true)
 |-- STYLE CODE: string (nullable = true)
 |-- SIZE: string (nullable = true)
 |-- SLEEVE: string (nullable = true)
 |-- BASIC/CORE: string (nullable = true)
 |-- COLOUR: string (nullable = true)
 |-- SEASON: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- SUPPLIER STYLE: string (nullable = true)
 |-- SUPPLIER SIZE: string (nullable = true)
 |-- BARCODE: double (nullable = true)
 |-- QTY: integer (nullable = true)
 |-- UNIT MRP: integer (nullable = true)
 |-- M

In [None]:

df.describe().show()

+-------+------+----------+--------------+-------+------------+------------------+--------------------+-----------+------------------+---------+------------------+------------+-------------+----------+------------------+------+----------+------------------+------+------+--------------+-----------------+--------------------+------------------+-----------------+------------------+------------------+--------------------+-------------------+--------------------+------------------+------------------+-----------------+------------------+
|summary|  SORG|    REGION|         STATE|CHANNEL|BILLING TYPE|        STORE CODE|                NAME|INVOICE NOS|              DATE|      DAY|             BRAND|   SUB BRAND|MATERIAL TYPE|STYLE CODE|              SIZE|SLEEVE|BASIC/CORE|            COLOUR|SEASON|GENDER|SUPPLIER STYLE|    SUPPLIER SIZE|             BARCODE|               QTY|         UNIT MRP|               MRP|          DISCOUNT|   MANUALDISC REASON|  MANUALDISC REMARK|       SCHEME REASON|

In [None]:
# Counting missing values in each column
missing_values = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns])
missing_values.show()

+----+------+-----+-------+------------+----------+----+-----------+----+---+-----+---------+-------------+----------+----+------+----------+------+------+------+--------------+-------------+-------+---+--------+---+--------+-----------------+-----------------+-------------+-----------+--------+-------+-------+
|SORG|REGION|STATE|CHANNEL|BILLING TYPE|STORE CODE|NAME|INVOICE NOS|DATE|DAY|BRAND|SUB BRAND|MATERIAL TYPE|STYLE CODE|SIZE|SLEEVE|BASIC/CORE|COLOUR|SEASON|GENDER|SUPPLIER STYLE|SUPPLIER SIZE|BARCODE|QTY|UNIT MRP|MRP|DISCOUNT|MANUALDISC REASON|MANUALDISC REMARK|SCHEME REASON|TAXABLE AMT|TAX RATE|TAX AMT|NET AMT|
+----+------+-----+-------+------------+----------+----+-----------+----+---+-----+---------+-------------+----------+----+------+----------+------+------+------+--------------+-------------+-------+---+--------+---+--------+-----------------+-----------------+-------------+-----------+--------+-------+-------+
|   2| 96151|92060|      1|           1|         1|   1|     

In [None]:
df.head(1)

[Row(SORG='ABL', REGION='INDIA EAST', STATE='WEST BENGAL', CHANNEL='BF1', BILLING TYPE='SALES', STORE CODE=107450, NAME='US - SHAILIJA SHOPPERS KOLKATA', INVOICE NOS='U32IN406750', DATE='01/01/2015', DAY='Thursday ', BRAND='US Polo', SUB BRAND=None, MATERIAL TYPE='Shirts', STYLE CODE='USSH3311', SIZE='L', SLEEVE='F', BASIC/CORE='Fashion', COLOUR='NAVY', SEASON='A14', GENDER='Mens', SUPPLIER STYLE='USSH3311', SUPPLIER SIZE='L  FS', BARCODE=8910000000000.0, QTY=1, UNIT MRP=2399, MRP=2399, DISCOUNT=0.0, MANUALDISC REASON='NA', MANUALDISC REMARK=None, SCHEME REASON='NA', TAXABLE AMT=2284.76, TAX RATE=5.0, TAX AMT=114.24, NET AMT=2399.0)]

##Data Preprocessing

1. Handling Missing Values

In [None]:
# Drop rows with missing values in critical columns or fill them as appropriate
# For example, let's fill missing 'DISCOUNT' with 0
df = df.na.fill({'DISCOUNT': 0, 'MANUALDISC REASON': 'No Reason', 'MANUALDISC REMARK': 'No Remark'})

# Alternatively, drop rows with any remaining missing values
#df = df.dropna()

#Feature Engineering

Key Features:
QTY: Unusually high quantities could indicate fraud.
DISCOUNT: Large discounts, especially manual ones, could raise flags.
NET AMT: Abnormally low or high net amounts might signal fraudulent activity.

1. Order Amount Features
Calculate the total order amount and related metrics.

In [None]:
# Assuming 'QTY' is quantity and 'UNIT MRP' is unit price
df = df.withColumn("TOTAL_AMOUNT", col("QTY") * col("UNIT MRP"))

# High-value order flag (e.g., orders above a certain threshold)
df = df.withColumn("HIGH_VALUE_ORDER", when(col("TOTAL_AMOUNT") > 1000, 1).otherwise(0))

In [None]:

df = df.withColumn("high_quantity_order", when(col("QTY") > 100, 1).otherwise(0))

df = df.withColumn("high_discount", when(col("DISCOUNT") > 50, 1).otherwise(0))

df = df.withColumn("high_net_amount", when(col("NET AMT") > 10000, 1).otherwise(0))

df = df.withColumn("qty_zscore",
                   (col("QTY") - df.select(avg(col("QTY"))).first()[0]) / df.select(stddev(col("QTY"))).first()[0])

df = df.withColumn("net_amt_zscore",
                   (col("NET AMT") - df.select(avg(col("NET AMT"))).first()[0]) / df.select(stddev(col("NET AMT"))).first()[0])



Formula for Z-Score
The Z-score formula is:

𝑍
=
𝑋
−
𝜇
/
𝜎


Where:


X: The value of the data point

μ: The mean of the column

σ: The standard deviation of the column

A Z-score tells you how many standard deviations a value is away from the mean.

In [None]:
df.printSchema()

root
 |-- SORG: string (nullable = true)
 |-- REGION: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- CHANNEL: string (nullable = true)
 |-- BILLING TYPE: string (nullable = true)
 |-- STORE CODE: integer (nullable = true)
 |-- NAME: string (nullable = true)
 |-- INVOICE NOS: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- BRAND: string (nullable = true)
 |-- SUB BRAND: string (nullable = true)
 |-- MATERIAL TYPE: string (nullable = true)
 |-- STYLE CODE: string (nullable = true)
 |-- SIZE: string (nullable = true)
 |-- SLEEVE: string (nullable = true)
 |-- BASIC/CORE: string (nullable = true)
 |-- COLOUR: string (nullable = true)
 |-- SEASON: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- SUPPLIER STYLE: string (nullable = true)
 |-- SUPPLIER SIZE: string (nullable = true)
 |-- BARCODE: double (nullable = true)
 |-- QTY: integer (nullable = true)
 |-- UNIT MRP: integer (nullable = true)
 |-- M

In [None]:
from pyspark.sql.functions import abs, when, col, avg, stddev


# Flagging anomalous orders
df = df.withColumn("anomalous_order", when((abs(col("qty_zscore")) > 3) | (abs(col("net_amt_zscore")) > 3), 1).otherwise(0))

df.select("high_quantity_order", "high_discount", "high_net_amount", "anomalous_order").show(5)

+-------------------+-------------+---------------+---------------+
|high_quantity_order|high_discount|high_net_amount|anomalous_order|
+-------------------+-------------+---------------+---------------+
|                  0|            0|              0|              0|
|                  0|            0|              0|              0|
|                  0|            0|              0|              0|
|                  0|            1|              0|              0|
|                  0|            0|              0|              0|
+-------------------+-------------+---------------+---------------+
only showing top 5 rows



In [None]:
df.head()

Row(SORG='ABL', REGION='INDIA EAST', STATE='WEST BENGAL', CHANNEL='BF1', BILLING TYPE='SALES', STORE CODE=107450, NAME='US - SHAILIJA SHOPPERS KOLKATA', INVOICE NOS='U32IN406750', DATE='01/01/2015', DAY='Thursday ', BRAND='US Polo', SUB BRAND=None, MATERIAL TYPE='Shirts', STYLE CODE='USSH3311', SIZE='L', SLEEVE='F', BASIC/CORE='Fashion', COLOUR='NAVY', SEASON='A14', GENDER='Mens', SUPPLIER STYLE='USSH3311', SUPPLIER SIZE='L  FS', BARCODE=8910000000000.0, QTY=1, UNIT MRP=2399, MRP=2399, DISCOUNT=0.0, MANUALDISC REASON='NA', MANUALDISC REMARK='No Remark', SCHEME REASON='NA', TAXABLE AMT=2284.76, TAX RATE=5.0, TAX AMT=114.24, NET AMT=2399.0, TOTAL_AMOUNT=2399, HIGH_VALUE_ORDER=1, high_quantity_order=0, high_discount=0, high_net_amount=0, qty_zscore=0.02577477277361803, net_amt_zscore=1.0407056218706243, anomalous_order=0)

2. Discount Features
Analyze the impact of discounts on orders.

In [None]:
# Percentage Discount
df = df.withColumn("PERCENT_DISCOUNT", (col("DISCOUNT") / col("MRP")) * 100)

# High Discount Flag
df = df.withColumn("HIGH_DISCOUNT", when(col("PERCENT_DISCOUNT") > 70, 1).otherwise(0))

In [None]:

normal_orders = df.filter(col("HIGH_DISCOUNT") == 0)

normal_orders.show(truncate=False)


In [None]:
df.head(2)

[Row(SORG='ABL', REGION='INDIA EAST', STATE='WEST BENGAL', CHANNEL='BF1', BILLING TYPE='SALES', STORE CODE=107450, NAME='US - SHAILIJA SHOPPERS KOLKATA', INVOICE NOS='U32IN406750', DATE='01/01/2015', DAY='Thursday ', BRAND='US Polo', SUB BRAND=None, MATERIAL TYPE='Shirts', STYLE CODE='USSH3311', SIZE='L', SLEEVE='F', BASIC/CORE='Fashion', COLOUR='NAVY', SEASON='A14', GENDER='Mens', SUPPLIER STYLE='USSH3311', SUPPLIER SIZE='L  FS', BARCODE=8910000000000.0, QTY=1, UNIT MRP=2399, MRP=2399, DISCOUNT=0.0, MANUALDISC REASON='NA', MANUALDISC REMARK=None, SCHEME REASON='NA', TAXABLE AMT=2284.76, TAX RATE=5.0, TAX AMT=114.24, NET AMT=2399.0, TOTAL_AMOUNT=2399, HIGH_VALUE_ORDER=1, PERCENT_DISCOUNT=0.0, HIGH_DISCOUNT=0),
 Row(SORG='ABL', REGION='INDIA EAST', STATE='WEST BENGAL', CHANNEL='BF1', BILLING TYPE='SALES', STORE CODE=107450, NAME='US - SHAILIJA SHOPPERS KOLKATA', INVOICE NOS='U32IN406750', DATE='01/01/2015', DAY='Thursday ', BRAND='US Polo', SUB BRAND=None, MATERIAL TYPE='Shirts', STYLE 

3. Temporal Features\
Extracting features based on the order date.

In [None]:
from pyspark.sql.functions import dayofweek, hour

# Day of the week (1 = Monday, 7 = Sunday)
df = df.withColumn("DAY_OF_WEEK", dayofweek(col("DATE")))

In [None]:
df.describe().show()

+-------+------+----------+--------------+-------+------------+------------------+--------------------+-----------+------------------+---------+------------------+------------+-------------+----------+------------------+------+----------+------------------+------+------+--------------+-----------------+--------------------+------------------+-----------------+------------------+-----------------+--------------------+-------------------+--------------------+------------------+------------------+-----------------+------------------+------------------+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+------------------+------------------+
|summary|  SORG|    REGION|         STATE|CHANNEL|BILLING TYPE|        STORE CODE|                NAME|INVOICE NOS|              DATE|      DAY|             BRAND|   SUB BRAND|MATERIAL TYPE|STYLE CODE|              SIZE|SLEEVE|BASIC/CORE|            COLOUR|SEAS

4. Address and Channel Features
Identify mismatches and channel-specific patterns

In [None]:
# Addressing Mismatch Flag (if applicable)
df = df.withColumn("ADDRESS_MISMATCH", when(col("BILLING TYPE") != col("CHANNEL"), 1).otherwise(0))

# Sales Channel Indexing
indexer = StringIndexer(inputCol="CHANNEL", outputCol="CHANNEL_INDEX")
df = indexer.fit(df).transform(df)

5. Tax Features
Analyze tax-related irregularities.

In [None]:
# Tax Percentage Verification
df = df.withColumn("CALCULATED_TAX", col("NET AMT") - (col("TAXABLE AMT") + col("DISCOUNT")))
df = df.withColumn("TAX_INCONSISTENCY", when(col("CALCULATED_TAX") != col("TAX AMT"), 1).otherwise(0))

In [None]:
df.select("QTY", "DISCOUNT", "NET AMT", "CALCULATED_TAX", "HIGH_VALUE_ORDER", "ADDRESS_MISMATCH").show(2)

+---+--------+-------+------------------+----------------+----------------+
|QTY|DISCOUNT|NET AMT|    CALCULATED_TAX|HIGH_VALUE_ORDER|ADDRESS_MISMATCH|
+---+--------+-------+------------------+----------------+----------------+
|  1|     0.0| 2399.0|114.23999999999978|               1|               1|
|  1|     0.0| 2199.0|104.71000000000004|               1|               1|
+---+--------+-------+------------------+----------------+----------------+
only showing top 2 rows



Anomaly Detection
Detecting anomalies helps in identifying outliers that may indicate fraudulent activities.

Using Z-Score for Anomaly Detection

In [None]:
# Calculating mean and standard deviation for 'TOTAL_AMOUNT'
stats = df.select(avg(col("TOTAL_AMOUNT")).alias("mean"), stddev(col("TOTAL_AMOUNT")).alias("stddev")).collect()
mean_total = stats[0]['mean']
stddev_total = stats[0]['stddev']

# Calculating Z-Score
df = df.withColumn("TOTAL_AMOUNT_ZSCORE", (col("TOTAL_AMOUNT") - mean_total) / stddev_total)

# Flag anomalies
df = df.withColumn("ANOMALY_FLAG", when((col("TOTAL_AMOUNT_ZSCORE") > 3) | (col("TOTAL_AMOUNT_ZSCORE") < -3), 1).otherwise(0))


In [None]:
df.printSchema()

root
 |-- SORG: string (nullable = true)
 |-- REGION: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- CHANNEL: string (nullable = true)
 |-- BILLING TYPE: string (nullable = true)
 |-- STORE CODE: integer (nullable = true)
 |-- NAME: string (nullable = true)
 |-- INVOICE NOS: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- BRAND: string (nullable = true)
 |-- SUB BRAND: string (nullable = true)
 |-- MATERIAL TYPE: string (nullable = true)
 |-- STYLE CODE: string (nullable = true)
 |-- SIZE: string (nullable = true)
 |-- SLEEVE: string (nullable = true)
 |-- BASIC/CORE: string (nullable = true)
 |-- COLOUR: string (nullable = true)
 |-- SEASON: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- SUPPLIER STYLE: string (nullable = true)
 |-- SUPPLIER SIZE: string (nullable = true)
 |-- BARCODE: double (nullable = true)
 |-- QTY: integer (nullable = true)
 |-- UNIT MRP: integer (nullable = true)
 |-- M

Prepare Data for Modeling
We need to convert features into a format that PySpark's MLlib can use for classification. We'll use VectorAssembler to combine relevant features.

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col, when, lit

# List of features to be used in modeling
feature_cols = ["QTY", "DISCOUNT", "NET AMT", "high_quantity_order",
                "HIGH_DISCOUNT", "high_net_amount", "anomalous_order"]

# Impute nulls with 0 or mean for numerical columns
for column in feature_cols:
    df = df.withColumn(column, when(col(column).isNull(), lit(0)).otherwise(col(column)))

    # mean_val = df.select(mean(column)).collect()[0][0]
    # df = df.withColumn(column, when(col(column).isNull(), lit(mean_val)).otherwise(col(column)))

# Use VectorAssembler to combine feature columns into a single feature vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="_newfeatures")

# Apply the assembler to create 'new_features'
df = assembler.transform(df)

df.select("_newfeatures").show(5, truncate=False)



+-------------------------------+
|_newfeatures                   |
+-------------------------------+
|(7,[0,2],[1.0,2399.0])         |
|(7,[0,2],[1.0,2199.0])         |
|(7,[0,2],[1.0,2599.0])         |
|(7,[0,1,2],[1.0,300.0,1888.95])|
|(7,[0,2],[1.0,1799.0])         |
+-------------------------------+
only showing top 5 rows



In [None]:
# df = df.drop(newfeatures_)
# Assign the transformed DataFrame with features to 'model_data'
model_data = df.select("_newfeatures", "anomalous_order")

# Check the first few rows to ensure 'model_data' is correctly defined
model_data.show(10, truncate=False)


+-------------------------------+---------------+
|_newfeatures                   |anomalous_order|
+-------------------------------+---------------+
|(7,[0,2],[1.0,2399.0])         |0              |
|(7,[0,2],[1.0,2199.0])         |0              |
|(7,[0,2],[1.0,2599.0])         |0              |
|(7,[0,1,2],[1.0,300.0,1888.95])|0              |
|(7,[0,2],[1.0,1799.0])         |0              |
|(7,[0,1,2],[1.0,339.8,1427.16])|0              |
|(7,[0,2],[1.0,1899.0])         |0              |
|(7,[0,2],[1.0,2799.0])         |0              |
|(7,[0,2,6],[1.0,7999.0,1.0])   |1              |
|(7,[0,1,2],[1.0,399.8,1679.16])|0              |
+-------------------------------+---------------+
only showing top 10 rows



In [None]:
# Assign the transformed DataFrame with features to 'model_data'
model_data = df.select("_newfeatures", "ANOMALY_FLAG")

model_data.show(5, truncate=False)

+-------------------------------+------------+
|_newfeatures                   |ANOMALY_FLAG|
+-------------------------------+------------+
|(7,[0,2],[1.0,2399.0])         |0           |
|(7,[0,2],[1.0,2199.0])         |0           |
|(7,[0,2],[1.0,2599.0])         |0           |
|(7,[0,1,2],[1.0,300.0,1888.95])|0           |
|(7,[0,2],[1.0,1799.0])         |0           |
+-------------------------------+------------+
only showing top 5 rows



##Train-Test Split
We'll split the dataset into training and testing sets.

In [None]:
# Split the data into training and testing sets (80% training, 20% testing)
train_data, test_data = model_data.randomSplit([0.8, 0.2], seed=12345)

train_data.show(5)


+--------------------+------------+
|        _newfeatures|ANOMALY_FLAG|
+--------------------+------------+
|           (7,[],[])|           0|
|(7,[0,1,2],[-1.0,...|           1|
|(7,[0,1,2],[-1.0,...|           1|
|(7,[0,1,2],[-1.0,...|           1|
|(7,[0,1,2],[-1.0,...|           1|
+--------------------+------------+
only showing top 5 rows



In [None]:
model_data = df.select("_newfeatures", col("ANOMALY_FLAG").alias("label"))

In [None]:


#Creating a binary label based on 'ANOMALY_FLAG'
df = df.withColumn("label", col("ANOMALY_FLAG"))

# Selecting final dataset for modeling
model_data = df.select("_newfeatures", "label")


In [None]:
# Initializing Decision Tree Classifier
dt_classifier = DecisionTreeClassifier(labelCol="label", featuresCol="_newfeatures")

# Spliting the data into training and testing sets
train_data, test_data = model_data.randomSplit([0.8, 0.2], seed=42)

# Training the model
dt_model = dt_classifier.fit(train_data)

# Making predictions on the test data
dt_predictions = dt_model.transform(test_data)

dt_predictions.select("_newfeatures", "label", "prediction").show(10)


+--------------------+-----+----------+
|        _newfeatures|label|prediction|
+--------------------+-----+----------+
|(7,[0,1,2],[-1.0,...|    1|       0.0|
|(7,[0,1,2],[-1.0,...|    1|       0.0|
|(7,[0,1,2],[-1.0,...|    1|       0.0|
|(7,[0,1,2],[-1.0,...|    1|       0.0|
|(7,[0,1,2],[-1.0,...|    1|       0.0|
|(7,[0,1,2],[-1.0,...|    1|       0.0|
|(7,[0,1,2],[-1.0,...|    1|       0.0|
|(7,[0,1,2],[-1.0,...|    1|       0.0|
|(7,[0,1,2],[-1.0,...|    1|       0.0|
|(7,[0,1,2],[-1.0,...|    1|       0.0|
+--------------------+-----+----------+
only showing top 10 rows



In [None]:

feature_cols = ["QTY", "DISCOUNT", "NET AMT", "HIGH_VALUE_ORDER", "HIGH_DISCOUNT", "NET AMT", "ANOMALY_FLAG"]

# Using VectorAssembler to combine feature columns into a single feature vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="_features__")
df = assembler.transform(df)
#(1 for fraud, 0 for non-fraud)
model_data = df.select("_features__", col("ANOMALY_FLAG").alias("label"))

In [None]:
# Feature 1: Flagging high quantity orders
df = df.withColumn("high_quantity_order", when(col("QTY") > 100, 1).otherwise(0))  # Threshold can be adjusted

# Feature 2: Flagging high discounts
df = df.withColumn("high_discount", when(col("DISCOUNT") > 50, 1).otherwise(0))  # Threshold as needed

# Feature 3: Flagging high-value transactions based on NET AMT
df = df.withColumn("high_net_amount", when(col("NET AMT") > 10000, 1).otherwise(0))  # Adjust threshold

# Fill missing values in numeric columns with 0
df = df.na.fill(0, subset=["DISCOUNT", "NET AMT", "QTY"])

# Show the updated dataset with new features
df.select("QTY", "DISCOUNT", "NET AMT", "high_quantity_order", "high_discount", "high_net_amount").show(5)


+---+--------+-------+-------------------+-------------+---------------+
|QTY|DISCOUNT|NET AMT|high_quantity_order|high_discount|high_net_amount|
+---+--------+-------+-------------------+-------------+---------------+
|  1|     0.0| 2399.0|                  0|            0|              0|
|  1|     0.0| 2199.0|                  0|            0|              0|
|  1|     0.0| 2599.0|                  0|            0|              0|
|  1|   300.0|1888.95|                  0|            1|              0|
|  1|     0.0| 1799.0|                  0|            0|              0|
+---+--------+-------+-------------------+-------------+---------------+
only showing top 5 rows



##Train-Test Split

In [None]:
# Split the data into training and testing sets (80% training, 20% testing)
train_data, test_data = model_data.randomSplit([0.8, 0.2], seed=42)

# Show some training data samples
train_data.show(5)


+--------------------+-----+
|         _features__|label|
+--------------------+-----+
|           (7,[],[])|    0|
|(7,[0,1,4],[-1.0,...|    0|
|(7,[0,1,4],[-1.0,...|    0|
|(7,[0,1,4],[-1.0,...|    0|
|(7,[0,1,4],[-1.0,...|    0|
+--------------------+-----+
only showing top 5 rows



##Decision Tree Classifier

In [None]:
# Creating a Decision Tree Classifier
dt = DecisionTreeClassifier(labelCol="label", featuresCol="_features__")

# Training the Decision Tree model
dt_model = dt.fit(train_data)

# Making predictions on the test data
dt_predictions = dt_model.transform(test_data)

dt_predictions.select("label", "prediction", "_features__").show(5)


+-----+----------+--------------------+
|label|prediction|         _features__|
+-----+----------+--------------------+
|    0|       0.0|(7,[0,1,4],[-1.0,...|
|    0|       0.0|(7,[0,1,4],[-1.0,...|
|    0|       0.0|(7,[0,1,4],[-1.0,...|
|    0|       0.0|(7,[0,1,4],[1.0,3...|
|    0|       0.0|(7,[0,1,4],[1.0,4...|
+-----+----------+--------------------+
only showing top 5 rows



##Random Forest Classifier

In [None]:
# Creating a Random Forest Classifier
rf = RandomForestClassifier(labelCol="label", featuresCol="_features__", numTrees=100)

# Training the Random Forest model
rf_model = rf.fit(train_data)

# Making predictions on the test data
rf_predictions = rf_model.transform(test_data)

rf_predictions.select("label", "prediction", "_features__").show(5)


+-----+----------+--------------------+
|label|prediction|         _features__|
+-----+----------+--------------------+
|    0|       0.0|(7,[0,1,4],[-1.0,...|
|    0|       0.0|(7,[0,1,4],[-1.0,...|
|    0|       0.0|(7,[0,1,4],[-1.0,...|
|    0|       0.0|(7,[0,1,4],[1.0,3...|
|    0|       0.0|(7,[0,1,4],[1.0,4...|
+-----+----------+--------------------+
only showing top 5 rows



In [None]:
# Evaluator for Decision Tree
evaluator = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="accuracy")
dt_accuracy = evaluator.evaluate(dt_predictions)
print(f"Decision Tree Accuracy: {dt_accuracy}")

# Evaluator for Random Forest
rf_accuracy = evaluator.evaluate(rf_predictions)
print(f"Random Forest Accuracy: {rf_accuracy}")


Decision Tree Accuracy: 1.0
Random Forest Accuracy: 1.0


In [None]:
# ParamGrid for Random Forest
paramGrid = (ParamGridBuilder()
             .addGrid(rf.numTrees, [50, 100, 200])
             .addGrid(rf.maxDepth, [5, 10, 20])
             .build())

# CrossValidator for tuning
crossval = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid,
                          evaluator=evaluator, numFolds=3)

# Training the model with cross-validation
cv_model = crossval.fit(train_data)

# Predictions with tuned model
cv_predictions = cv_model.transform(test_data)
cv_accuracy = evaluator.evaluate(cv_predictions)
print(f"Tuned Random Forest Accuracy: {cv_accuracy}")


Tuned Random Forest Accuracy: 1.0


******************************

In [None]:
df.columns

Index(['SORG', 'REGION', 'STATE', 'CHANNEL', 'BILLING TYPE', 'STORE CODE',
       'NAME', 'INVOICE NOS', 'DATE', 'DAY', 'BRAND', 'SUB BRAND',
       'MATERIAL TYPE', 'STYLE CODE', 'SIZE', 'SLEEVE', 'BASIC/CORE', 'COLOUR',
       'SEASON', 'GENDER', 'SUPPLIER STYLE', 'SUPPLIER SIZE', 'BARCODE', 'QTY',
       'UNIT MRP', 'MRP', 'DISCOUNT', 'MANUALDISC REASON', 'MANUALDISC REMARK',
       'SCHEME REASON', 'TAXABLE AMT', 'TAX RATE', 'TAX AMT', 'NET AMT'],
      dtype='object')

In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 859032 entries, 0 to 859031
Data columns (total 34 columns):
 #   Column             Non-Null Count   Dtype  
---  ------             --------------   -----  
 0   SORG               859030 non-null  object 
 1   REGION             762881 non-null  object 
 2   STATE              766972 non-null  object 
 3   CHANNEL            859031 non-null  object 
 4   BILLING TYPE       859031 non-null  object 
 5   STORE CODE         859031 non-null  float64
 6   NAME               859031 non-null  object 
 7   INVOICE NOS        859031 non-null  object 
 8   DATE               859031 non-null  object 
 9   DAY                859031 non-null  object 
 10  BRAND              859031 non-null  object 
 11  SUB BRAND          9040 non-null    object 
 12  MATERIAL TYPE      859031 non-null  object 
 13  STYLE CODE         859031 non-null  object 
 14  SIZE               859031 non-null  object 
 15  SLEEVE             549268 non-null  object 
 16  BA

In [None]:
df['DATE'].unique()

array(['01/01/2015', '01/02/2015', '01/03/2015', '01/04/2015',
       '01/05/2015', '01/06/2015', '01/07/2015', '01/08/2015',
       '01/09/2015', '01/10/2015', '01/11/2015', '01/12/2015',
       '01/13/2015', '01/14/2015', '01/15/2015', '01/16/2015',
       '01/17/2015', '01/18/2015', '01/19/2015', '01/20/2015',
       '01/21/2015', '01/22/2015', '01/23/2015', '01/24/2015',
       '01/25/2015', '01/26/2015', '01/27/2015', '01/28/2015',
       '01/29/2015', '01/30/2015', '01/31/2015', '02/01/2015',
       '02/02/2015', '02/03/2015', '02/04/2015', '02/05/2015',
       '02/06/2015', '02/07/2015', '02/08/2015', '02/09/2015',
       '02/10/2015', '02/11/2015', '02/12/2015', '02/13/2015',
       '02/14/2015', '02/15/2015', '02/16/2015', '02/17/2015',
       '02/18/2015', '02/19/2015', '02/20/2015', '02/21/2015',
       '02/22/2015', '02/23/2015', '02/24/2015', '02/25/2015',
       '02/26/2015', '02/27/2015', '02/28/2015', '03/01/2015',
       '03/02/2015', '03/03/2015', '03/04/2015', '03/05

In [None]:
df['NAME'].unique()

array(['US - SHAILIJA SHOPPERS KOLKATA',
       'US - DHUPAR AGENCIES BHUBANESHWAR',
       'US - SHAILIJA SHOPPERS KOLKATTA',
       'US - SHAILJA SHOPPERS PVT LTD KOLKATTA',
       'US - SARITA SUMEDHA PATNA', 'US - INDERA TEXTILES ROURKELA',
       'US - ALBL SALT LAKE KOLKATA', 'US - ABK COMMERCIAL KOLKATA',
       ' US - MENS ABK COMMERCIAL KOLKATA',
       'US - ALBL QUEST MALL KOLKATA', 'US - SHAILAJA SHOPPERS KOLKATA',
       'FM - CRYSTAL COURT', 'FM - LAJPATNAGAR', 'FM - GLAMOURFASHION',
       'US - MAHAGUN  METRO MALL VISHALI', 'US - SOUTH EXTENSION DELHI',
       'US - Z SQUARE KANPUR', 'CA - SAMARTH LIFE STYLE PACIFIC MALL',
       'US - H G Retail Solution NewDelhi',
       'US - HG RETAILJAWAHAR NAGAR DELHI', 'US - H.G.Retail Noida',
       'US-GULATI SECTOR-17 CHANDIGARH', 'US-HG RETAIL DWARAKA DELHI',
       'US-FRONTLINE RETAIL CROWN MALL FARIDABAD',
       'US-FRONTLINE RETAIL USPA AGRA',
       'US-FRONTLINE V3S EAST CENTRE MALL', 'US-SHIVAS SPORTS GHAZIABAD',
    

In [None]:
df.isnull().sum()

Unnamed: 0,0
SORG,2
REGION,96151
STATE,92060
CHANNEL,1
BILLING TYPE,1
STORE CODE,1
NAME,1
INVOICE NOS,1
DATE,1
DAY,1


In [None]:
df.shape

(859032, 34)

In [None]:
df['GENDER'].unique()

array(['Mens', 'Women', 'Unisex', 'Boys', 'Girls', 'Promotional', nan],
      dtype=object)

In [None]:
df['BARCODE']