In [74]:
import glob
import pyarrow as pa
import pyarrow.parquet as pq
import pandas as pd
from sklearn.model_selection import train_test_split
import numpy as np
import os
import shutil
import seaborn as sns
import matplotlib
import numpy as np
import matplotlib.pyplot as plt
import random
from functools import reduce
import string
%matplotlib inline  

pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', -1)

# Dataset Completion

In [94]:
generate = True
number_of_people = 636262
number_of_transactions = 6362620
street_points_percentage = 0.15
square_points_percentage = 0.2
training_path = "/Users/joaoneves/Documents/demo-iot-transactions/data/transactions_data/transactions_with_gps.csv"
test_path = "/Users/joaoneves/Documents/demo-iot-transactions/data/transactions"

In [95]:
london_coordinates = (51.5073509, -0.12775829999998223)

street_points_def = [
    [(51.507306, -0.140064),(51.509734, -0.132921), "Jermyn Street"],
    [(51.513534, -0.157751),(51.516433, -0.130584), "Oxford Street"], 
    [(51.509980, -0.137049), (51.516591, -0.142516), "Regent Street"], 
    [(51.513695, -0.139419), (51.512300, -0.137901), "Carnaby Street"]
]

square_points_def = [
    [(51.507311, -0.221633), 0.0001, "Westfield London"],
    [london_coordinates, 0.1, "Other"]
]

In [96]:
def generate_street_points(p1, p2, name, n=1000):
    m = np.subtract(p2, p1)
    m = m[1] / m[0]
    b = p2[1] - m*p2[0]
    points = []
    for i in range(n):
        x = random.uniform(p1[0], p2[0])
        points.append([x, x * m + b, name])
    return points

radius_dists = (random.uniform, random.uniform)

def generate_square_points(center, name, radius=0.1, n=1000):
    result = []
    for i in range(n):
        point = np.add(center, np.array([dist(-radius,radius) for dist in radius_dists]))
        result.append(list(point) + [name])
    return result

def convert_to_color(string):
    hash_ = str(hash(string))
    if len(hash_) >= 9:
        r,g,b= int(hash_[-9:-6]) % 255, int(hash_[-6:-3]) % 255, int(hash_[-3:]) % 255
        return "rgb({}, {}, {})".format(r,b,g)
    else:
        return "rgb({}, {}, {})".format(random.randint(0,255), random.randint(0,255), random.randint(0,255))
    
def generate_random_string(l):
    return ''.join(random.choice(string.ascii_letters + string.digits) for i in range(l))

def generate_people(n,p):
    return pd.Series([str(random.randrange(p)) for i in range(n)])

def convert_to_category(df, columns):
    for column in columns:
        df[column] = df[column].astype("category")
    return df

In [97]:
random.seed(0)
square_points_list = [generate_square_points(p[0], p[2], p[1],int(number_of_transactions*square_points_percentage)) \
                      for p in square_points_def]
square_points = reduce(lambda a,b: list(a)+list(b), square_points_list)
len(square_points)

2545048

In [98]:
random.seed(0)
street_points = [generate_street_points(p[0], p[1], p[2], int(number_of_transactions*street_points_percentage)) \
                 for p in street_points_def]
street_points = reduce(lambda a,b: list(a)+list(b), street_points)
len(street_points)

3817572

In [99]:
random.seed(0)
gps_points = street_points + square_points
random.shuffle(gps_points)
gps_points = np.array(gps_points)
len(gps_points) == number_of_transactions

True

In [102]:
def complete_df(df, seed=0):
    np.random.seed(seed)
    df["gps_latitude"] = pd.Series(gps_points[:,0])
    df["gps_longitude"] = pd.Series(gps_points[:,1])
    df["location"] = pd.Series(gps_points[:,2])
    df["id"] = transactions_df.index
    df["entity_id"] = generate_people(len(df), number_of_people)
    convert_to_category(df, ["id", "isFraud", "isFlaggedFraud"])
    return df

In [103]:
if os.path.isfile(training_path):
    # load dataset
    transactions_df = pd.read_csv(training_path)
    transactions_df["id"] = transactions_df.index
    convert_to_category(transactions_df, ["id", "entity_id", "isFraud", "isFlaggedFraud"])
else:
    # generate dataset and save
    data_dir = "/Users/joaoneves/Documents/demo-iot-transactions/data/transactions_data/PS_20174392719_1491204439457_log.csv"
    transactions_df = pd.read_csv(data_dir)
    complete_df(transactions_df)
    transactions_df.to_csv(training_path, index=False)

In [104]:
transactions_df.head()

Unnamed: 0,step,type,amount,nameOrig,oldbalanceOrg,newbalanceOrig,nameDest,oldbalanceDest,newbalanceDest,isFraud,isFlaggedFraud,gps_latitude,gps_longitude,location,id,entity_id
0,1,PAYMENT,9839.64,C1231006815,170136.0,160296.36,M1979787155,0.0,0.0,0,0,51.5132,-0.138924,Carnaby Street,0,242771
1,1,PAYMENT,1864.28,C1666544295,21249.0,19384.72,M2044282225,0.0,0.0,0,0,51.5074,-0.221652,Westfield London,1,543211
2,1,TRANSFER,181.0,C1305486145,181.0,0.0,C553264065,0.0,0.0,1,0,51.5078,-0.138742,Jermyn Street,2,605382
3,1,CASH_OUT,181.0,C840083671,181.0,0.0,C38997010,21182.0,0.0,1,0,51.5123,-0.137944,Carnaby Street,3,574098
4,1,PAYMENT,11668.14,C2048537720,41554.0,29885.86,M1230701703,0.0,0.0,0,0,51.5136,-0.139334,Carnaby Street,4,454080


# Streaming Split

In [105]:
transactions_df["entity_id"].unique()

array(['242771', '543211', '605382', ..., '408170', '511895', '471177'], dtype=object)

In [106]:
test_files_path = os.path.join(test_path, "transactions*.csv")
test_files = glob.glob(test_files_path)

for file in test_files:
    os.remove(file)

for i, entity in enumerate(transactions_df["entity_id"].unique()[:100]):
    transactions_df[transactions_df["entity_id"]==entity]\
    .to_csv(os.path.join(test_path, "transactions_{}.csv".format(i)), index=False)

# Model Evaluation

In [102]:
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.ml.feature import VectorAssembler 

spark = SparkSession.builder.getOrCreate()

sc = spark.sparkContext

training_dir = "/Users/joaoneves/Documents/demo-iot-transactions/data/models"
model_path = os.path.join(training_dir, "pythonLogisticRegression")

In [103]:
data = spark.read.csv(training_path, header=True)

# data.printSchema()
columns = data.columns
float_columns = ['step', 'amount', 'oldbalanceOrg', 'newbalanceOrig', 'oldbalanceDest', 'newbalanceDest', 'gps_latitude', 'gps_longitude']
target_column = 'isFraud'
print(columns)
data.take(4)

['step', 'type', 'amount', 'nameOrig', 'oldbalanceOrg', 'newbalanceOrig', 'nameDest', 'oldbalanceDest', 'newbalanceDest', 'isFraud', 'isFlaggedFraud', 'gps_latitude', 'gps_longitude', 'id']


[Row(step='1', type='PAYMENT', amount='9839.64', nameOrig='C1231006815', oldbalanceOrg='170136.0', newbalanceOrig='160296.36', nameDest='M1979787155', oldbalanceDest='0.0', newbalanceDest='0.0', isFraud='0', isFlaggedFraud='0', gps_latitude='51.50256769719365', gps_longitude='-0.12312710200156707', id='0'),
 Row(step='1', type='PAYMENT', amount='1864.28', nameOrig='C1666544295', oldbalanceOrg='21249.0', newbalanceOrig='19384.72', nameDest='M2044282225', oldbalanceDest='0.0', newbalanceDest='0.0', isFraud='0', isFlaggedFraud='0', gps_latitude='51.457455017283436', gps_longitude='-0.10674969663788776', id='1'),
 Row(step='1', type='TRANSFER', amount='181.0', nameOrig='C1305486145', oldbalanceOrg='181.0', newbalanceOrig='0.0', nameDest='C553264065', oldbalanceDest='0.0', newbalanceDest='0.0', isFraud='1', isFlaggedFraud='0', gps_latitude='51.467931861733966', gps_longitude='-0.11230149709138582', id='2'),
 Row(step='1', type='CASH_OUT', amount='181.0', nameOrig='C840083671', oldbalanceOrg

In [104]:
def parse_row(row):
    new_row = {}
    for column in columns:
        if column in float_columns:
            new_row[column] = float(row[column])
        elif column == target_column:
            new_row["label"] = int(round(float(row[column])))
        else:
            new_row[column] = row[column]
    return Row(**new_row)

data_parsed = data.rdd.map(parse_row).toDF()

data_parsed.head()

Row(amount=9839.64, gps_latitude=51.50256769719365, gps_longitude=-0.12312710200156707, id='0', isFlaggedFraud='0', label=0, nameDest='M1979787155', nameOrig='C1231006815', newbalanceDest=0.0, newbalanceOrig=160296.36, oldbalanceDest=0.0, oldbalanceOrg=170136.0, step=1.0, type='PAYMENT')

In [105]:
vecAssembler = VectorAssembler(inputCols=['step','amount', \
                                          'oldbalanceOrg', 'newbalanceOrig', \
                                          'oldbalanceDest', 'newbalanceDest', \
                                          'gps_latitude', 'gps_longitude'], \
                               outputCol="features")

transformed_data = vecAssembler.transform(data_parsed)

transformed_data.head()

Row(amount=9839.64, gps_latitude=51.50256769719365, gps_longitude=-0.12312710200156707, id='0', isFlaggedFraud='0', label=0, nameDest='M1979787155', nameOrig='C1231006815', newbalanceDest=0.0, newbalanceOrig=160296.36, oldbalanceDest=0.0, oldbalanceOrg=170136.0, step=1.0, type='PAYMENT', features=DenseVector([1.0, 9839.64, 170136.0, 160296.36, 0.0, 0.0, 51.5026, -0.1231]))

In [106]:
lr = LogisticRegression(maxIter=1000, threshold=0.01, tol=1e-5)

model = lr.fit(transformed_data)

In [107]:
model.evaluate(transformed_data).pr.show()
print(model.evaluate(transformed_data).areaUnderROC)

+------------------+--------------------+
|            recall|           precision|
+------------------+--------------------+
|               0.0|                 1.0|
| 0.774869109947644| 0.09813415574402468|
| 0.816997443078047|0.052232938667164865|
| 0.843784244490442| 0.03607946854370145|
|0.8671618166321685|0.027854150941182453|
|0.8818945574089857|0.022683854156879693|
|0.8892000487032753|0.019072117832939425|
|0.8929745525386582|0.016424501934929052|
|0.8988189455740898|0.014470509133749298|
|0.9025934494094728|0.012920171362639737|
|0.9075855351272373|0.011694988915334494|
|0.9109947643979057|0.010673628348882993|
|0.9137952027273835|0.009967050961445243|
|0.9178132229392426| 0.00923102545450983|
| 0.920370144892244|0.008587752653637155|
|0.9237793741629124| 0.00803860905681168|
|0.9262145379276756|0.007550874350949393|
|0.9290149762571533|0.007123877032359021|
|0.9325459637160599|0.006750045608416273|
|0.9347376111043467| 0.00640672568498638|
+------------------+--------------

In [108]:
model.evaluate(transformed_data).recallByThreshold.show()

+--------------------+------------------+
|           threshold|            recall|
+--------------------+------------------+
|                 1.0| 0.774869109947644|
|0.006561491282644194| 0.816997443078047|
|0.003949357813573477| 0.843784244490442|
|0.002835492888867661|0.8671618166321685|
|0.002107414662537...|0.8818945574089857|
|0.001733748409879...|0.8892000487032753|
|0.001544655807152...|0.8929745525386582|
|0.001444811147117...|0.8988189455740898|
|0.001352713724098...|0.9025934494094728|
|0.001277444178936144|0.9075855351272373|
|0.001201056567940...|0.9109947643979057|
|0.001135738789277...|0.9117253135273348|
|0.001115631623257...|0.9163521246803847|
|0.001053026804150195|0.9192743211981006|
|9.946928734003158E-4|0.9218312431511019|
| 9.42524646181952E-4|0.9245099232923414|
|8.910491138542175E-4|0.9277973943747717|
|8.474195713039216E-4|0.9312066236454402|
|8.056726289204571E-4|0.9331547546572507|
|7.664274069040154E-4|0.9359551929867284|
+--------------------+------------

In [109]:
# Save and load model
model.write().overwrite().save(model_path)

In [110]:
sameModel = LogisticRegressionModel.load(model_path)

In [111]:
pred = sameModel.transform(transformed_data)

In [113]:
anomalies = pred.rdd.filter(lambda row: row["prediction"]==1.0)

In [116]:
len(tests[2])

64

In [165]:
transactions_ids = anomalies.map(lambda row: (get_file_with_id(int(row["id"])), row["id"])).groupByKey().mapValues(lambda l: list(l)).cache()

In [167]:
transactions_ids.take(10)

[(11112, ['146', '165']),
 (12224, ['1362']),
 (22224, ['1898', '1904']),
 (31112, ['2417']),
 (51112, ['3569']),
 (4224, ['8706']),
 (5224, ['9284']),
 (7224, ['10395', '10396']),
 (11336, ['12787']),
 (12336, ['13327', '13333', '13336'])]

# Use Cases Study

In [24]:
transactions_df.describe()

Unnamed: 0,step,amount,oldbalanceOrg,newbalanceOrig,oldbalanceDest,newbalanceDest,gps_latitude,gps_longitude
count,6362620.0,6362620.0,6362620.0,6362620.0,6362620.0,6362620.0,6362620.0,6362620.0
mean,243.3972,179861.9,833883.1,855113.7,1100702.0,1224996.0,51.50732,-0.1277554
std,142.332,603858.2,2888243.0,2924049.0,3399180.0,3674129.0,0.05772837,0.0577709
min,1.0,0.0,0.0,0.0,0.0,0.0,51.40735,-0.2277581
25%,156.0,13389.57,0.0,0.0,0.0,0.0,51.45732,-0.1777979
50%,239.0,74871.94,14208.0,0.0,132705.7,214661.4,51.50732,-0.1277567
75%,335.0,208721.5,107315.2,144258.4,943036.7,1111909.0,51.55729,-0.07772319
max,743.0,92445520.0,59585040.0,49585040.0,356015900.0,356179300.0,51.60735,-0.02775831


In [35]:
6362620 / 28921

220.0

In [27]:
transactions_orig = transactions_df["nameOrig"].value_counts()
transactions_dest = transactions_df["nameDest"].value_counts()

In [10]:
transactions_grouped_by_person = transactions_df.groupby("nameOrig").agg(sum)
len(transactions_grouped_by_person)

6353307

In [16]:
transactions_grouped_by_destination = transactions_df.groupby("nameDest").agg(sum)

In [71]:
p = np.transpose(np.array([('Jermyn Street', 11446.18),('Carnaby Street', 9839.64)])).tolist()

In [73]:
p[1]

['11446.18', '9839.64']