In [1]:
import apache_beam as beam
from apache_beam.runners.interactive import interactive_runner
import apache_beam.runners.interactive.interactive_beam as ib
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth
import numpy as np

In [2]:
# Setting up the Apache Beam pipeline options.
options = pipeline_options.PipelineOptions()

# Sets the pipeline mode to streaming, so we can stream the data from PubSub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Sets the project to the default project in your current Google Cloud environment.
# The project will be used for creating a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

In [3]:
# The Google Cloud PubSub topic for this example.
topic = "projects/fraud-detection-data245/topics/payments_dev"

In [4]:
ib.options.recording_duration = '1m'

In [5]:
p = beam.Pipeline(interactive_runner.InteractiveRunner(), options=options)

In [6]:
payment_transactions = p | "read" >> beam.io.ReadFromPubSub(topic=topic)

In [7]:
windowed_payments = (payment_transactions 
                  | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

In [8]:
ib.show(windowed_payments, include_window_info=True)

<IPython.core.display.Javascript object>

In [None]:
pip install --upgrade faiss-cpu google-cloud-bigquery-storage google-cloud-storage


In [9]:



class FaissKNeighbors:
    import numpy as np
    import faiss
    def __init__(self, k=5):
        self.index = None
        self.y = None
        self.k = k
    #IndexFlatL2 is Euclidean distance
    def fit(self, X, y):
        self.index = faiss.IndexFlatL2(X.shape[1])
        self.index.add(X.astype(np.float32))
        self.y = y
    def predict(self, X):
        distances, indices = self.index.search(X.astype(np.float32), k=self.k)
        votes = self.y[indices]
        predictions = np.array([np.argmax(np.bincount(x)) for x in votes])
        return predictions


class ApplyDoFn(beam.DoFn):
    def __init__(self):
        #self._model = None
        import pandas as pd
        import pickle as pkl
        import numpy as np
        from google.cloud import bigquery
        from google.cloud import storage
        from apache_beam.io.gcp import gcsio
        import warnings
        warnings.filterwarnings("ignore")
        self._pkl = pkl
        self._pd = pd
        self.winningmodel = None
        self.pickled_models = []
        
        client = bigquery.Client()
        query_job = client.query(
            """
            SELECT timestamp,modelName,confusionMatrix from `fraud-detection-data245.ml_project.metric` where timestamp>'2021-11-29' and timestamp<'2021-11-30' order by modelName,foldNumber"""
        )

        #df = query_job.to_dataframe()
        results = query_job.result()  
        
        model_results = []
        for row in results:
            model_results.append([row['timestamp'],row['modelName'],row['confusionMatrix']])
            #print("{} : {} views".format(row.modelName, row.accuracy))
       
        #model_results 
        df = pd.DataFrame(model_results, columns=["timestamp","modelName","confusionMatrix"])
        
        date = df['timestamp'].max().replace(hour = 0, minute = 0, second = 0)

        df = df.loc[(df['timestamp'] >= date)]
        
        #F1 Score for all models
        df['F1'] = df.apply(lambda row:(int(row['confusionMatrix']['truePositive']) / 
                                    (int(row['confusionMatrix']['truePositive']) + 
                                     (1/2 * (int(row['confusionMatrix']['falseNegative'] + row['confusionMatrix']['falsePositive']))))) ,  axis=1)
        #print(df)
        grouped_df = df.groupby("modelName")
        #print(grouped_df)
        mean_df = grouped_df.mean()
        mean_df = mean_df.reset_index()
        print("F1 Scores for all Models Created:")
        print(mean_df)
        min_recall = mean_df['F1'].min()
        #print(min_recall)
        mean_df = mean_df.loc[mean_df['F1'] > min_recall]
        #mean_df = mean_df.loc[mean_df['F1'] > 0]
        
        print(" ")
        print("List of Model Considered after F1 Evaluation:")
        print(mean_df)
        
        models_considered = []
        for i in range(len(mean_df.modelName)):
            models_considered.append(mean_df['modelName'].iloc[i-1])
        
        print(models_considered)
    
        self.modelDirectory = str(date)[0:10].replace("-","")
        #print(self.modelDirectory)
        storage_client = storage.Client()
        gcs = gcsio.GcsIO()
        
        #Identifying the Directory for Model's Pickle Directroy
        files = storage_client.list_blobs("fraud-detection-ml-models",prefix='29112021')
        
        for picklefile in files:
            for model_name in models_considered:
                if model_name in picklefile.name:
                    print(picklefile.name)
                    try:
                        blob = gcs.open("gs://fraud-detection-ml-models/"+picklefile.name).read()
                        model_pickle = self._pkl.loads(blob)
                        self.pickled_models.append([model_pickle,model_name])
                    except:
                        print("Failed to load pkl file:"+picklefile.name)
                        
                    
                
            
        
     
    def process(self, element):
        from apache_beam.io.gcp import gcsio
        from sklearn.preprocessing import LabelEncoder
        from google.cloud import storage
        import warnings
        warnings.filterwarnings("ignore")
        
        import faiss
        import numpy as np
    
        
        print(" ")
        print("Payment Transaction: {} ".format(element))
        
        
        #Data Transformation
        new_x = self._pd.DataFrame.from_dict(element,orient = "index").transpose().fillna(0)
        valid_type = ['CASH_OUT', 'PAYMENT', 'CASH_IN', 'TRANSFER', 'DEBIT']
        encodertype = LabelEncoder()
        encodertype.fit(valid_type)

        encoderfunc = LabelEncoder()
        
        new_x['transaction_type'] = encodertype.transform(new_x['type'])
        new_x['nameorig_enc'] = encoderfunc.fit_transform(new_x['nameOrig'])
        new_x['namedest_enc'] = encoderfunc.fit_transform(new_x['nameDest'])
        
        new_x = new_x.drop(columns = ['type', 'nameOrig', 'nameDest', 'step'], axis =1)
        new_x = new_x.rename(columns = {'oldbalanceOrg': 'oldbalanceOrig'})
        
        new_x['balance_difference'] = round(new_x['oldbalanceOrig'] - new_x['newbalanceOrig'], 2).ne(new_x['amount'])
        new_x["balance_difference"] = new_x["balance_difference"].astype(int)
        

        #Predictions for Each selected Model and Fold
        
        rf_predictions = []
        svm_predictions = []
        lof_predictions = []
        knn_predictions = []
        
        for selected_model_data in self.pickled_models:
            if(selected_model_data[1]=='rf'):
                fraud_detect = selected_model_data[0].predict(new_x)[0]
                rf_predictions.append(fraud_detect)
            elif(selected_model_data[1]=='svm'):
                fraud_detect = selected_model_data[0].predict(new_x)[0]
                svm_predictions.append(fraud_detect)
            elif(selected_model_data[1]=='knn'):
                fraud_detect = selected_model_data[0].predict(np.ascontiguousarray(new_x))[0]
                knn_predictions.append(fraud_detect)
            elif(selected_model_data[1]=='lof'):
                lof_detection = selected_model_data[0].decision_function(new_x)[0]
                if(lof_detection>= 0):
                    fraud_detect = 0 # Valid transactions are labelled as 0
                else:
                    fraud_detect = 1 # Fraudulent transactions are labelled as 1.
                lof_predictions.append(fraud_detect)
            
        
        #Ensemble indivi
        rf_ensemble = max(set(rf_predictions),key=rf_predictions.count)
        print("RF K-Fold Predictions: {}  : RF Ensemble: {} ".format(rf_predictions, rf_ensemble))
        
        svm_ensemble = max(set(svm_predictions),key=svm_predictions.count)
        print("SVM K-Fold Predictions: {}  : SVM Ensemble: {} ".format(svm_predictions, svm_ensemble))
        
        knn_ensemble = max(set(knn_predictions),key=svm_predictions.count)
        print("KNN K-Fold Predictions: {}  : KNN Ensemble: {} ".format(knn_predictions, knn_ensemble))
        
        all_model_predictions = [rf_ensemble,svm_ensemble,knn_ensemble]
        final_ensemble_prediction = max(set(all_model_predictions),key=all_model_predictions.count)
        print("All Model Predictions: {}  : Final Ensemble: {} ".format(all_model_predictions, final_ensemble_prediction))
        
        
      
        element['fraud_prediction'] = final_ensemble_prediction #fraud_detect
        element['model_used'] = self.winningmodel
       
        
        return [  element ] 
        

In [10]:
import json
detection = windowed_payments | "Convert to dict" >> beam.Map(json.loads) | 'Apply Model' >> beam.ParDo(ApplyDoFn())

F1 Scores for all Models Created:
  modelName        F1
0       knn  0.412763
1       lof  0.024334
2        rf  0.997742
3       svm  0.571560
 
List of Model Considered after F1 Evaluation:
  modelName        F1
0       knn  0.412763
2        rf  0.997742
3       svm  0.571560
['svm', 'knn', 'rf']
29112021/2021-11-29_07:36_knn_fold_1_model.pkl
29112021/2021-11-29_07:36_knn_fold_2_model.pkl
29112021/2021-11-29_07:37_knn_fold_3_model.pkl
29112021/2021-11-29_07:38_knn_fold_4_model.pkl
29112021/2021-11-29_07:38_knn_fold_5_model.pkl
29112021/2021-11-29_08:22_rf_fold_1_model.pkl
29112021/2021-11-29_08:48_rf_fold_2_model.pkl
29112021/2021-11-29_09:15_rf_fold_3_model.pkl
29112021/2021-11-29_09:43_rf_fold_4_model.pkl
29112021/2021-11-29_10:12_rf_fold_5_model.pkl
29112021/2021-11-29_18:45_svm_fold_1_model.pkl
29112021/2021-11-29_18:58_svm_fold_2_model.pkl
29112021/2021-11-29_19:13_svm_fold_3_model.pkl
29112021/2021-11-29_19:25_svm_fold_4_model.pkl
29112021/2021-11-29_19:38_svm_fold_5_model.pkl

In [11]:
ib.show(detection, include_window_info=True)

 
Payment Transaction: {'step': 0, 'type': 'TRANSFER', 'amount': 51448381, 'nameOrig': 'M64871850410', 'oldbalanceOrg': 242691737, 'newbalanceOrig': 191243356, 'nameDest': 'M71407547892', 'oldbalanceDest': 319737137, 'newbalanceDest': 371185518} 
RF K-Fold Predictions: [0, 0, 0, 0, 0]  : RF Ensemble: 0 
SVM K-Fold Predictions: [0, 0, 0, 0, 0]  : SVM Ensemble: 0 
KNN K-Fold Predictions: [0, 0, 0, 0, 0]  : KNN Ensemble: 0 
All Model Predictions: [0, 0, 0]  : Final Ensemble: 0 
 
Payment Transaction: {'step': 0, 'type': 'DEBIT', 'amount': 922646899, 'nameOrig': 'M43981225250', 'oldbalanceOrg': 268465152, 'newbalanceOrig': 654181747, 'nameDest': 'C94854758464', 'oldbalanceDest': 139805544, 'newbalanceDest': 1062452443} 
RF K-Fold Predictions: [0, 0, 0, 0, 0]  : RF Ensemble: 0 
SVM K-Fold Predictions: [0, 0, 0, 0, 0]  : SVM Ensemble: 0 
KNN K-Fold Predictions: [0, 0, 0, 0, 0]  : KNN Ensemble: 0 
All Model Predictions: [0, 0, 0]  : Final Ensemble: 0 
 
Payment Transaction: {'step': 0, 'type'

<IPython.core.display.Javascript object>