**INSTALLING PYSPARK**

In [None]:
!python --version

In [None]:
#dowload and install java 8
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
#print working directory
#!pwd

# Downloading Apache spark binary
!wget https://apache.mirrors.nublue.co.uk/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

In [None]:
#extracting file
!tar -xvzf spark-3.1.2-bin-hadoop3.2.tgz

In [None]:
!ls /content/spark-3.1.2-bin-hadoop3.2

#installing FindSpark
!pip install findspark

In [None]:
import os 
os.environ["SPARK_HOME"] = '/content/spark-3.1.2-bin-hadoop3.2'

import findspark
findspark.init()

In [None]:
# creating a spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySpark 7082cem").getOrCreate()

In [None]:
spark

**LOADING THE DATASET**

In [None]:
Data= spark.read.csv('/content/drive/MyDrive/Colab Notebooks/7082cem/DataCoSupplyChainDataset.csv')
Data.show(5)

In [None]:
# We can set header=true as one of the options. This will read the first row as header

Data = spark.read.format('csv').options(header='true').load('/content/drive/MyDrive/Colab Notebooks/7082cem/DataCoSupplyChainDataset.csv')
Data.show(5)

In [None]:
Data.printSchema()

In [None]:
#Converting continous variable in the right format by recasting the columns

# Import all from `sql.types`
from pyspark.sql.types import *

# Write a custom function to convert the data type of DataFrame columns
def convertColumn(Data, names, newType):
    for name in names: 
        Data = Data.withColumn(name, Data[name].cast(newType))
    return Data 
# List of continuous features
Numeric_Features  = ['Days for shipping (real)', 'Days for shipment (scheduled)','Benefit per order', 'Sales per customer', 'Late_delivery_risk', 'Category Id','Customer Id', 'Customer Zipcode', 'Department Id', 'Latitude', 'Longitude', 'Order Customer Id', 'order date (DateOrders)', 
                     'Order Id', 'Order Item Cardprod Id', 'Order Item Discount', 'Order Item Discount Rate', 'Order Item Id', 'Order Item Product Price', 'Order Item Profit Ratio', 'Order Item Quantity', 'Sales', 'Order Profit Per Order', 'Order Item Total', 'Product Card Id', 'Product Category Id', 'Product Price', 'Product Status']
# Convert the type
Data = convertColumn(Data, Numeric_Features, FloatType())
# Check the dataset
Data.printSchema()


In [None]:
# Get the number of rows in the dataframe
Data.count()

In [None]:
#This gives the number of columns in the dataset
len(Data.columns)

**EXPLORING THE DATASET**

In [None]:
# Finding the details of a column
Data.describe('Type').show()
Data.describe('Product Category Id').show()

In [None]:
Data.select('Customer Id', 'Customer Lname').show(5)
Data.select('Department Id', 'Department name').distinct().show()

In [None]:
Data.select('Order status', ).distinct().show()

In [None]:
from pyspark.sql import Row
Data= Data.dropDuplicates()

In [None]:
Data.count()

In [None]:
Data.select('Customer Id', 'Customer Fname').distinct().count()

**Data Cleaning**

In [None]:
# we need to merge the first and last name columns together. 
#This will enable us identify different people  with similar last or first names.

 
from pyspark.sql import functions as sf

Data=Data.withColumn('Name', sf.concat(sf.col('Customer Lname'),sf.lit(' '),sf.col('Customer Fname')))

Data.show()


In [None]:
# Check for missing values in every columns
from pyspark.sql.functions import isnan, when, count, col
Data.select([count(when(col(c).isNull(), c)).alias(c) for c in Data.columns]).show()


In [None]:
#We fill in the customer zipcode column with 3 missing values with 0
Data.na.fill(value=0,subset=["Customer Zipcode"]).show(3)

In [None]:
Data.select([count(when(col(d).isNull(), d)).alias(d) for d in Data.columns]).show()


In [None]:
len(Data.columns)

In [None]:
#Lets create a column called Fraud. All order status suspected to be fraud will take up the value of 1 while others take 0.
#This will help us classify transaction easily.

from pyspark.sql.functions import when, lit, col
Data= Data.withColumn("Fraud", when(col('Order status') == 'SUSPECTED_FRAUD', lit('1')).otherwise(lit('0')))

In [None]:
Data.show(5)

In [None]:
Data.count()

In [None]:
#We drop unnecessary data from he columns

columns_to_drop = ['Customer Email','Product Status','Customer Password','Customer Street','Customer Fname','Customer Lname',
           'Latitude','Longitude','Product Description','Product Image','Order Zipcode','shipping date (DateOrders)', 'Shipping Mode','Name','Product Category Id','Product Card Id','Order Status','Order Item Id',
           'Order Item Cardprod Id','Order Id','order date (DateOrders)','Order Customer Id','Department Id','Customer Zipcode', 'Customer Id', 'Customer Country', 'Customer City', 'Delivery Status']

Data = Data.drop(*columns_to_drop)

In [None]:
Data.show(3)

In [None]:
len(Data.columns)

DATA PREPROCESSING

In [None]:

from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

Encoder_Data = [StringIndexer(inputCol=column, outputCol=column+"_index").fit(Data) for column in list(set(Data.columns)-set(['date'])) ]


pipeline = Pipeline(stages=Encoder_Data)
Data_r = pipeline.fit(Data).transform(Data)


In [None]:
Data_r.show(5)

In [None]:
len(Data_r.columns)

In [None]:
#Drop duplicated columns

drops = ['Type','Category Name','Customer Segment','Customer State','Department Name','Market',
           'Order Country','Order Region','Product Name','Order State','Fraud','Late_delivery_risk_index', 'Order Profit Per Order_index','Benefit per order_index','Sales per customer_index','Order Item Discount Rate_index','Days for shipment (scheduled)_index|','Sales_index',
           'Order City','Order Item Quantity_index','Order Item Product Price_index','Order Item Total_index', 'Days for shipment (scheduled)_index','Order Item Profit Ratio_index','Days for shipping (real)_index','Category Id_index', 'Category Id', 'Product Price_index', 'Order Item Discount_index']

New_data = Data_r.drop(*drops)

In [None]:
len(New_data.columns)

In [None]:
New_data.show(5)

Using a Vector Assembler, all features are put in a feature vector column

In [None]:
#Features are put inside a feature vector colum

from pyspark.ml.feature import VectorAssembler, StandardScaler

Features= ['Days for shipping (real)', 'Days for shipment (scheduled)', 'Benefit per order', 'Sales per customer', 'Late_delivery_risk','Order Item Discount',
           'Order Item Discount Rate', 'Order Item Product Price', 'Order Item Profit Ratio', 'Order Item Quantity', 'Sales', 'Order Item Total', 'Order Profit Per Order',
           'Product Price', 'Customer State_index', 'Customer Segment_index', 'Product Name_index', 'Order Region_index', 'Order City_index', 'Market_index', 'Order State_index', 'Category Name_index', 'Type_index', 'Order Country_index', 'Department Name_index']
Target= ['Fraud_index']

Assemble= VectorAssembler(inputCols=Features, outputCol= 'features')
AssembleL= VectorAssembler(inputCols=Target, outputCol= 'label')


In [None]:
Assembled= Assemble.transform(New_data)
AssembledL= AssembleL.transform(New_data)

In [None]:
Assembled.show(5, truncate=False)

Feature Scaling

In [None]:
# Initialize standardScaler function

ss = StandardScaler(inputCol="features", outputCol="features_scaled")

In [None]:
# Fit the DataFrame to the scaler
scaled_data = ss.fit(Assembled).transform(Assembled)

In [None]:
scaled_data.select("features", "features_scaled").show(5, truncate=False)


In [None]:
scaled_data = scaled_data

**MACHINE LEARNING MODEL**

First a classification model is built using Logostics Regression to classify if a transaction is a fraud on not.

In [None]:
#Splitting data into train and test set in the ratio 70:30%

train, test = scaled_data.randomSplit([.7,.3], seed=1230)

In [None]:
train.columns

In [None]:
print(train.count())
print(test.count())

In [None]:
train.show(5)

In [None]:
scaled_data.show(3)

In [None]:
label= AssembledL.select('label')

In [None]:
label.show(5)

LOGISTIC REGRESSION

In [None]:
from pyspark.ml.classification  import LogisticRegression

LR = LogisticRegression(featuresCol= 'features_scaled', labelCol= 'Fraud_index', maxIter=10, regParam=0.3, elasticNetParam=0.8) 

In [None]:
# Fit the data to the model
LRM = LR.fit(train)

In [None]:
#Fit into test set
Predict= LRM.transform(test)

In [None]:
Predict.select("Fraud_index", 'features_scaled', 'rawPrediction', 'prediction', 'probability').toPandas().head(5)

DECISION TREE MODEL

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

DTree = DecisionTreeClassifier(labelCol='Fraud_index', featuresCol= 'features_scaled')

In [None]:
DTM = DTree.fit(train)

In [None]:
Predict1= DTM.transform(test)

In [None]:
Predict1.select("Fraud_index", 'features_scaled', 'rawPrediction', 'prediction', 'probability').toPandas().head(5)

MODEL EVALUATION

In [None]:
Accuracy = Predict.filter(Predict.Fraud_index == Predict.prediction).count() / float(Predict.count())
print("Accuracy : ", Accuracy)


In [None]:
Accuracy1 = Predict1.filter(Predict1.Fraud_index == Predict1.prediction).count() / float(Predict1.count())
print("Accuracy : ", Accuracy1)


In [None]:
y_true = Predict.select(['Fraud_index']).collect()
y_pred = Predict.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))

In [None]:
y_true1 = Predict1.select(['Fraud_index']).collect()
y_pred1 = Predict1.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true1, y_pred))