In [2]:
import pyspark.sql.functions as F
import os
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkContext, SparkConf

from pyspark.sql.functions import explode
from pyspark.sql.functions import countDistinct, avg
from pyspark.sql.functions import dayofmonth,dayofyear,year,month,hour,weekofyear,date_format
from pyspark.sql.functions import col as func_col
from pyspark.sql.functions import lit
from pyspark.sql.functions import *
from pyspark.ml import Pipeline

In [None]:
#Spark session is being created
#Configuration of the Spark Session
app_name="financial_transaction"
conf = SparkConf()  # create the configuration
conf.set('spark.driver.extraClassPath', "/usr/share/cmf/common_jars/mysql-connector-java-5.1.15.jar")  # set the spark.jars
conf.set('spark.executor.extraClassPath', "/usr/share/cmf/common_jars/mysql-connector-java-5.1.15.jar")

#Spark Session object
spark = SparkSession.builder.config(conf=conf).appName(app_name).getOrCreate()



In [None]:
#Reading the file from the hdfs wherein the file has the data not comma separated but the tab separated
data = spark.read.csv("/user/edureka_960126/payment_txns.csv",inferSchema=True,header=True)
data.show(20)


In [None]:
#Feature Extraction for developing the Machine Learning Model
#Although isFraud is always set when isFlaggedFraud is set, 
#since isFlaggedFraud is set just 16 times in a seemingly meaningless way, 
#we can treat this feature as insignificant and discard it in the dataset without loosing information.


#From the exploratory data analysis (EDA) , we know that fraud only occurs in 'TRANSFER's and 'CASH_OUT's

#Two new Features may be added which can serve as the good features for fraud detection

#X['errorBalanceOrig'] = X.newBalanceOrig + X.amount - X.oldBalanceOrig
#X['errorBalanceDest'] = X.oldBalanceDest + X.amount - X.newBalanceDest


#hence we need to do the Data Transformation in order to convert the Data into the format which we can use then for model building

#Gradient Boosted Algorithm can be used as ML algorithm for the model geenration


#clf = XGBClassifier(max_depth = 3, scale_pos_weight = weights, n_jobs = 4)


In [None]:
#Dropping of the isFlaggedFraud

data=data.drop("isFlaggedFraud")

#filtering out the transfers and cashout transaction in the 
data=data.filter((data["type"]=="CASH_OUT")  | (data["type"]=="TRANSFER"))

#adding new column for the feature of error of origin and destination

data=data.withColumn("error_origin",data["amount"] - data["newbalanceOrig"]+ data["oldbalanceOrg"])

data=data.withColumn("error_dest",data["amount"] - data["newbalanceDest"]+ data["oldbalanceDest"])



In [None]:
data.show(1000)

In [None]:
#Dropping of the other non useful columns

data=data.drop("nameDest","nameOrig")

data.show(100)

In [None]:
#Encoding of type data
from pyspark.ml.feature import StringIndexer, OneHotEncoder , VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier

si = StringIndexer(inputCol = "type", outputCol = 'Type_' + 'Index')

#No one hot encoding is needed because the type column has 2 labels only
inputs=['amount','oldbalanceOrg','newbalanceOrig','oldbalanceDest','newbalanceDest','error_origin','error_dest','Type_Index']

### Vector Creator with column features which will be used in the training of the model
va = VectorAssembler(inputCols=inputs , outputCol='features')

# Train a GBT model.
gbt = GBTClassifier(labelCol='isFraud', featuresCol='features' , maxIter=10)


In [None]:
#Let us set up the pipeline for the ML Model

#Used the pipelines to stremline the whole process
stages = []

#Firstly we add the string indexer
stages+=[si]
stages+=[va]
#added to the pipeline
stages+=[gbt]







In [None]:
#Splitting of Data
splits = data.randomSplit([0.7, 0.3])
train= splits[0]
test= splits[1]

In [None]:
# Chain indexer and tree in a Pipeline
pipeline = Pipeline(stages=stages)

### Pipeline fitting with train_df data 
pipelineModel = pipeline.fit(data)


# Train model.  This also runs the indexers.
model = pipeline.fit(train)

# Make predictions.
predictions = model.transform(test)


#showing the predictions
predictions.select('features','isFraud','prediction').show()


#Saving the model
model.write().overwrite().save("/user/edureka_960126/model_fraud_detection")




In [None]:
predictions.select('features','isFraud','prediction').filter(predictions['prediction']== 1).show()

In [None]:
#Pipeline Model has already been created will be used to work on the the streaming data sent to the flume
#streaming data will be retreived by the Spark Streaming to predict the result

from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml import PipelineModel

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

from pyspark.streaming.flume import FlumeUtils

import json

from pyspark.sql import Row

from pyspark.sql.functions import lit


#Model stored in the hdfs to be used for the machine learning application
#is saved as variable pipeline


#Schema is being set for the dataframe creation out of the records
cSchema = StructType([StructField("type", StringType(),nullable=True),StructField("amount", DoubleType(),nullable=True),

StructField("oldbalanceOrg", DoubleType(),nullable=True),StructField("newbalanceOrig", DoubleType(),nullable=True),
StructField("oldbalanceDest", DoubleType(),nullable=True),StructField("newbalanceDest", DoubleType(),nullable=True),                    
          StructField("error_origin", DoubleType(),nullable=True),StructField("error_dest", DoubleType(),nullable=True) ])

model = PipelineModel.load("/user/edureka_960126/model_fraud_detection")


#Process for working out the prediction
def process(rdd):
    
            # Get the singleton instance of SparkSession
            #spark = getSparkSessionInstance(rdd.context.getConf())

            #Conversion to the Data Frame to work on the dataframe for working out the predictions
            df = spark.createDataFrame(rdd,schema=cSchema)

            #As only the ones which are of type CASH out and Transfers can be fraud so no doubt about others
            df_pred = df.filter((df["type"]== "CASH_OUT" )   | (df["type"]=="TRANSFER"))

            #All other records in the Dataframe which need not to be predicted
            df_not_pred=df.filter((df["type"] != "CASH_OUT" )   & (df["type"] !="TRANSFER"))

            #Getting the output after passing the data frame through vector assembler to produce the dataframe with vectorised features
            df_pred = model.transform(df_pred)

            #df not to be predicted has to be added the column prediction with value 0
            df_not_pred=df_not_pred.withColumn("prediction",lit(0))
            df_not_pred=df_not_pred.select('type','amount','prediction')

            #showing the predictions with the union of the above two type of dataframe
            df_pred=df_pred.select('type','amount','prediction')

            #finally the Data frame with the union of above two types of Dataframes with Union even with duplicates of type and amount
            df_final = df_not_pred.unionAll(df_pred)

            #Note that type and amount are just shown to reflect the key of the transactions which actually they are not
            df_final.show()

        
#starting of the Spark streaming Context
ssc= StreamingContext(spark.sparkContext, 15)

#Flume stream is generated after the spark streaming receiver is connected to custom spark streaming sink created at the host with a given port
flumeStream = FlumeUtils.createPollingStream(ssc, [('ip-20-0-41-164.ec2.internal' , 9090)])

#Flume_microbatches count
flumeStream.count().pprint()


#RDD[Strings]

#Here we get the strings of the json format input data 
lines = flumeStream.map(lambda x: x[1])

#RDD of Dicts or JSON objects by extracting the json objects from the string 
records_dict=lines.map(lambda x: json.loads(x))

#Rows RDD rows rdd is created here
rows_rdd=records_dict.map(lambda res: Row(res['type'],res['amount'],res['oldbalanceOrg'],res['newbalanceOrig'] , res['oldbalanceDest'],res['newbalanceDest'],res["amount"] - res["newbalanceOrig"]+ res["oldbalanceOrg"],res["amount"] - res["newbalanceDest"]+ res["oldbalanceDest"]))

rows_rdd.foreachRDD(process)

ssc.start()             # Start the computation
ssc.awaitTermination()  # Wait for the computation to terminate







-------------------------------------------
Time: 2020-07-21 22:50:15
-------------------------------------------

+----+------+----------+
|type|amount|prediction|
+----+------+----------+
+----+------+----------+

-------------------------------------------
Time: 2020-07-21 22:50:30
-------------------------------------------

+----+------+----------+
|type|amount|prediction|
+----+------+----------+
+----+------+----------+

-------------------------------------------
Time: 2020-07-21 22:50:45
-------------------------------------------

+----+------+----------+
|type|amount|prediction|
+----+------+----------+
+----+------+----------+

-------------------------------------------
Time: 2020-07-21 22:51:00
-------------------------------------------
1

+--------+------+----------+
|    type|amount|prediction|
+--------+------+----------+
|TRANSFER| 181.0|       1.0|
+--------+------+----------+

-------------------------------------------
Time: 2020-07-21 22:51:15
-------------------

-------------------------------------------
Time: 2020-07-21 22:59:45
-------------------------------------------

+----+------+----------+
|type|amount|prediction|
+----+------+----------+
+----+------+----------+

-------------------------------------------
Time: 2020-07-21 23:00:00
-------------------------------------------

+----+------+----------+
|type|amount|prediction|
+----+------+----------+
+----+------+----------+

-------------------------------------------
Time: 2020-07-21 23:00:15
-------------------------------------------

+----+------+----------+
|type|amount|prediction|
+----+------+----------+
+----+------+----------+

-------------------------------------------
Time: 2020-07-21 23:00:30
-------------------------------------------

+----+------+----------+
|type|amount|prediction|
+----+------+----------+
+----+------+----------+

-------------------------------------------
Time: 2020-07-21 23:00:45
-------------------------------------------

+----+------+--------

-------------------------------------------
Time: 2020-07-21 23:09:15
-------------------------------------------

+----+------+----------+
|type|amount|prediction|
+----+------+----------+
+----+------+----------+

-------------------------------------------
Time: 2020-07-21 23:09:30
-------------------------------------------

+----+------+----------+
|type|amount|prediction|
+----+------+----------+
+----+------+----------+

-------------------------------------------
Time: 2020-07-21 23:09:45
-------------------------------------------

+----+------+----------+
|type|amount|prediction|
+----+------+----------+
+----+------+----------+

-------------------------------------------
Time: 2020-07-21 23:10:00
-------------------------------------------

+----+------+----------+
|type|amount|prediction|
+----+------+----------+
+----+------+----------+

-------------------------------------------
Time: 2020-07-21 23:10:15
-------------------------------------------

+----+------+--------

-------------------------------------------
Time: 2020-07-21 23:18:45
-------------------------------------------

+----+------+----------+
|type|amount|prediction|
+----+------+----------+
+----+------+----------+

-------------------------------------------
Time: 2020-07-21 23:19:00
-------------------------------------------

+----+------+----------+
|type|amount|prediction|
+----+------+----------+
+----+------+----------+

-------------------------------------------
Time: 2020-07-21 23:19:15
-------------------------------------------

+----+------+----------+
|type|amount|prediction|
+----+------+----------+
+----+------+----------+

-------------------------------------------
Time: 2020-07-21 23:19:30
-------------------------------------------

+----+------+----------+
|type|amount|prediction|
+----+------+----------+
+----+------+----------+

-------------------------------------------
Time: 2020-07-21 23:19:45
-------------------------------------------

+----+------+--------

In [None]:
data.printSchema()