# v2.0 - Random Forest with DASK

In [37]:
import os
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import networkx as nx
from sklearn import preprocessing
import geopy
from geopy.distance import geodesic
import pickle
from sklearn.ensemble import RandomForestClassifier
import dask.dataframe as dd


In [38]:
import sys
sys.path.append('../references')  # Add the references folder to the system path


In [39]:
model_specs = 'Random Forest using DASK'

In [40]:
start_time_notebook = time.time()


In [41]:
# Directory to save the figures 

input_src_dir = '/Users/sadhvichandragiri/desktop/coding/ZHAW_Project/ML_BigData_Repo_1/data/raw'
output_dir_figures_train = '/Users/sadhvichandragiri/desktop/coding/ZHAW_Project/ML_BigData_Repo_1/reports/figures/train_figures'
output_dir_figures_test = '/Users/sadhvichandragiri/desktop/coding/ZHAW_Project/ML_BigData_Repo_1/reports/figures/test_figures'
#reports_output_dir = '/Users/sadhvichandragiri/desktop/coding/ZHAW_Project/ML_BigData_Repo_1/reports'

reports_output_dir_base = '/Users/sadhvichandragiri/desktop/coding/ZHAW_Project/ML_BigData_Repo_1/reports'
# reports_output_dir for DecisionTrees
reports_output_dir = f"{reports_output_dir_base}/RandomForest"
print(reports_output_dir)

/Users/sadhvichandragiri/desktop/coding/ZHAW_Project/ML_BigData_Repo_1/reports/RandomForest


In [131]:
# Define which dataset to use
use_test_data = False  # Set to True when using fraudtest.csv

# Determine dataset type based on the variable
dataset_type = 'Test' if use_test_data else 'Train'

# Load the appropriate dataset

if use_test_data:
    output_dir_figures = output_dir_figures_test
else:
    output_dir_figures = output_dir_figures_train

In [43]:
# Generate the preprocess file name dynamically
# Get the current timestamp
timestamp = time.strftime("%Y%m%d_%H%M%S")  # Format: YYYYMMDD_HHMMSS

logfile_title = 'LogFile'
logfile_name = f"{model_specs}_{dataset_type}_{logfile_title.replace(',', '').lower().split('.')[0]}_{timestamp}.txt"

logfile_path = os.path.join(reports_output_dir, logfile_name)

# Function to log times to a file
def log_time(step_name, start_time):
    end_time = time.time()
    elapsed_time = end_time - start_time
    log_message = (f"{step_name} completed at {time.ctime(end_time)}. "
                   f"Elapsed time: {elapsed_time // 60:.0f} minutes and {elapsed_time % 60:.2f} seconds\n")
    
    # Append log to file
    with open(logfile_path, 'a') as f:
        f.write(log_message)
    
    # Print the message to the console as well
    print(log_message)


In [44]:
log_time(f"{model_specs}_{dataset_type} Notebook started at... ", start_time_notebook)
start_time = time.time()

Random Forest using DASK_Train Notebook started at...  completed at Sun Nov  3 10:52:52 2024. Elapsed time: 0 minutes and 0.03 seconds



# Use Dask already from the beginning


In [132]:
from dask.distributed import Client
from sklearn.ensemble import RandomForestClassifier
import dask.dataframe as dd

log_time("------Starting the DASK Client-----", start_time_notebook)

# Start Dask client
#client = Client()

from dask.distributed import Client
client = Client(dashboard_address=':0')  # Dask will pick an available port
#to avoid warning

------Starting the DASK Client----- completed at Sun Nov  3 11:50:04 2024. Elapsed time: 57 minutes and 11.35 seconds





In [147]:
import dask.dataframe as dd

# Load the dataset directly into Dask
if use_test_data:
    df_pre = dd.read_csv(f"{input_src_dir}/fraudTest.csv", assume_missing=True, blocksize="16MB")
else:
    df_pre = dd.read_csv(f"{input_src_dir}/fraudTrain.csv", assume_missing=True, blocksize="16MB")


In [148]:

# Rename columns
df_pre = df_pre.rename(columns={
    'amt': 'TransactionAmount', 
    'cc_num': 'CreditCardNumber', 
    'dob': 'DateOfBirth', 
    'trans_date_trans_time': 'TransactionTime'
})

# Data Cleaning and Feature Engineering
df_pre['TransactionTime'] = dd.to_datetime(df_pre['TransactionTime'])
df_pre['DateOfBirth'] = dd.to_datetime(df_pre['DateOfBirth'], errors='coerce')

# Define metadata with the new 'TransactionID' column
meta = df_pre.head(0)  # Capture existing schema
meta['TransactionID'] = int()  # Add the new column to the metadata

# Define function to assign unique TransactionIDs within each partition
def assign_transaction_id(df, start_id=1):
    df = df.copy()
    df['TransactionID'] = range(start_id, start_id + len(df))
    return df

# Apply function with updated metadata
df_pre = df_pre.map_partitions(assign_transaction_id, meta=meta)

# Persist the DataFrame after transformations
df_pre = df_pre.persist()

# Optional: Display the head of the DataFrame to verify
print(df_pre.head())


   Unnamed: 0     TransactionTime  CreditCardNumber  \
0         0.0 2019-01-01 00:00:18      2.703186e+15   
1         1.0 2019-01-01 00:00:44      6.304233e+11   
2         2.0 2019-01-01 00:00:51      3.885949e+13   
3         3.0 2019-01-01 00:01:16      3.534094e+15   
4         4.0 2019-01-01 00:03:06      3.755342e+14   

                             merchant       category  TransactionAmount  \
0          fraud_Rippin, Kub and Mann       misc_net               4.97   
1     fraud_Heller, Gutmann and Zieme    grocery_pos             107.23   
2                fraud_Lind-Buckridge  entertainment             220.11   
3  fraud_Kutch, Hermiston and Farrell  gas_transport              45.00   
4                 fraud_Keeling-Crist       misc_pos              41.96   

       first     last gender                        street  ...      long  \
0   Jennifer    Banks      F                561 Perry Cove  ...  -81.1781   
1  Stephanie     Gill      F  43039 Riley Greens Suite 393  ... 

In [149]:


# Summary statistics without .compute()
print("Number of partitions:", df_pre.npartitions)
print("Columns:", df_pre.columns)
print("Head of Data:", df_pre.head())  # Displays head without full .compute()


Number of partitions: 21
Columns: Index(['Unnamed: 0', 'TransactionTime', 'CreditCardNumber', 'merchant',
       'category', 'TransactionAmount', 'first', 'last', 'gender', 'street',
       'city', 'state', 'zip', 'lat', 'long', 'city_pop', 'job', 'DateOfBirth',
       'trans_num', 'unix_time', 'merch_lat', 'merch_long', 'is_fraud',
       'TransactionID'],
      dtype='object')
Head of Data:    Unnamed: 0     TransactionTime  CreditCardNumber  \
0         0.0 2019-01-01 00:00:18      2.703186e+15   
1         1.0 2019-01-01 00:00:44      6.304233e+11   
2         2.0 2019-01-01 00:00:51      3.885949e+13   
3         3.0 2019-01-01 00:01:16      3.534094e+15   
4         4.0 2019-01-01 00:03:06      3.755342e+14   

                             merchant       category  TransactionAmount  \
0          fraud_Rippin, Kub and Mann       misc_net               4.97   
1     fraud_Heller, Gutmann and Zieme    grocery_pos             107.23   
2                fraud_Lind-Buckridge  entertain

In [150]:

# Specific aggregations or unique counts, with selective .compute()
fraud_counts = df_pre['is_fraud'].value_counts().compute()
fraud_percentage = df_pre['is_fraud'].value_counts(normalize=True).compute() * 100
unique_credit_cards = df_pre['CreditCardNumber'].nunique().compute()

print("Fraud counts:", fraud_counts)
print("Fraud percentage:", fraud_percentage)
print("Unique Credit Cards:", unique_credit_cards)

Fraud counts: is_fraud
1.0       7506
0.0    1289169
Name: count, dtype: int64
Fraud percentage: is_fraud
1.0     0.578865
0.0    99.421135
Name: proportion, dtype: float64
Unique Credit Cards: 983


In [151]:
# Check for missing values in Dask DataFrame
missing_values = df_pre.isnull().sum().compute()
print("Missing values per column:\n", missing_values)


Missing values per column:
 Unnamed: 0           0
TransactionTime      0
CreditCardNumber     0
merchant             0
category             0
TransactionAmount    0
first                0
last                 0
gender               0
street               0
city                 0
state                0
zip                  0
lat                  0
long                 0
city_pop             0
job                  0
DateOfBirth          0
trans_num            0
unix_time            0
merch_lat            0
merch_long           0
is_fraud             0
TransactionID        0
dtype: int64


In [54]:
print(df.columns)

Index(['Unnamed: 0', 'TransactionTime', 'CreditCardNumber', 'merchant',
       'category', 'TransactionAmount', 'first', 'last', 'gender', 'street',
       'city', 'state', 'zip', 'lat', 'long', 'city_pop', 'job', 'DateOfBirth',
       'trans_num', 'unix_time', 'merch_lat', 'merch_long', 'is_fraud',
       'TransactionID'],
      dtype='object')


In [152]:
# Set 'TransactionTime' as the index permanently
df_pre = df_pre.set_index('TransactionTime')

# Verify the index
print(df_pre.index)


<dask_expr.expr.Index: expr=Index(frame=SetIndex(frame=FromGraph(2897191), _other='TransactionTime', options={}))>


In [154]:
# Get the minimum and maximum transaction times, computing the results
min_time = df_pre.index.min().compute()
max_time = df_pre.index.max().compute()

print(f"Minimum Transaction Time: {min_time}")
print(f"Maximum Transaction Time: {max_time}")


Minimum Transaction Time: 2019-01-01 00:00:18
Maximum Transaction Time: 2020-06-21 12:13:37


In [155]:
df_pre.head()

Unnamed: 0_level_0,Unnamed: 0,CreditCardNumber,merchant,category,TransactionAmount,first,last,gender,street,city,...,long,city_pop,job,DateOfBirth,trans_num,unix_time,merch_lat,merch_long,is_fraud,TransactionID
TransactionTime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2019-01-01 00:00:18,0.0,2703186000000000.0,"fraud_Rippin, Kub and Mann",misc_net,4.97,Jennifer,Banks,F,561 Perry Cove,Moravian Falls,...,-81.1781,3495.0,"Psychologist, counselling",1988-03-09,0b242abb623afc578575680df30655b9,1325376000.0,36.011293,-82.048315,0.0,1
2019-01-01 00:00:44,1.0,630423300000.0,"fraud_Heller, Gutmann and Zieme",grocery_pos,107.23,Stephanie,Gill,F,43039 Riley Greens Suite 393,Orient,...,-118.2105,149.0,Special educational needs teacher,1978-06-21,1f76529f8574734946361c461b024d99,1325376000.0,49.159047,-118.186462,0.0,2
2019-01-01 00:00:51,2.0,38859490000000.0,fraud_Lind-Buckridge,entertainment,220.11,Edward,Sanchez,M,594 White Dale Suite 530,Malad City,...,-112.262,4154.0,Nature conservation officer,1962-01-19,a1a22d70485983eac12b5b88dad1cf95,1325376000.0,43.150704,-112.154481,0.0,3
2019-01-01 00:01:16,3.0,3534094000000000.0,"fraud_Kutch, Hermiston and Farrell",gas_transport,45.0,Jeremy,White,M,9443 Cynthia Court Apt. 038,Boulder,...,-112.1138,1939.0,Patent attorney,1967-01-12,6b849c168bdad6f867558c3793159a81,1325376000.0,47.034331,-112.561071,0.0,4
2019-01-01 00:03:06,4.0,375534200000000.0,fraud_Keeling-Crist,misc_pos,41.96,Tyler,Garcia,M,408 Bradley Rest,Doe Hill,...,-79.4629,99.0,Dance movement psychotherapist,1986-03-28,a41d7549acf90789359a9aa5346dcb46,1325376000.0,38.674999,-78.632459,0.0,5


In [156]:
#finally assigning to df as all follwoing code is in df
df = df_pre

##JUST for iNFO not needed
#df = df_pre.compute()  # Converts Dask DataFrame to Pandas DataFrame


In [59]:
log_time("Initial Steps Completed File Loading, Describe, Date Conversions etc..  ", start_time)


Initial Steps Completed File Loading, Describe, Date Conversions etc..   completed at Sun Nov  3 10:53:15 2024. Elapsed time: 0 minutes and 22.84 seconds



# Feature Engineering

In [157]:
# Log pre-process time at various steps
start_time = time.time()


In [158]:
log_time("START - Feature Engineering .....  ", start_time)
start_time = time.time()

START - Feature Engineering .....   completed at Sun Nov  3 12:19:39 2024. Elapsed time: 0 minutes and 0.01 seconds



In [159]:

# Clip outliers if necessary
df['TransactionAmount'] = df['TransactionAmount'].clip(upper=df['TransactionAmount'].quantile(0.99))



In [162]:

import numpy as np

# Replace infinite values with NaN in the 'TransactionAmount' column
df['TransactionAmount'] = df['TransactionAmount'].replace([np.inf, -np.inf], np.nan)


In [163]:
#To persist the change and ensure no computations are delayed, you can persist the DataFrame:
df = df.persist()

# next type of VIZ via transaction id vs transaction count


In [165]:
# Extract hour from TransactionTime
df['Hour'] = df.index.hour  # Since TransactionTime is already set as the index


In [169]:
import dask.dataframe as dd

# Reset the index to avoid any indexing issues (optional based on your data structure)
df = df.reset_index(drop=True)

# Drop NaN values in 'Hour' or 'is_fraud' columns if they exist
df = df.dropna(subset=['Hour', 'is_fraud'])

# Step 1: Calculate fraud rate by hour and retain it as a Dask DataFrame
fraud_rate_by_hour = df.groupby('Hour').is_fraud.mean().persist()

# Step 2: Define a threshold for high-risk hours based on the mean fraud rate
threshold = fraud_rate_by_hour.mean().compute()

# Step 3: Identify high-risk hours based on the threshold
high_risk_hours = fraud_rate_by_hour[fraud_rate_by_hour > threshold].index.compute().tolist()

# Print high-risk hours for reference
print("High-Risk Hours:", high_risk_hours)

# Step 4: Create the HighRiskHour flag in the main Dask DataFrame
df['HighRiskHour'] = df['Hour'].map(lambda x: 1 if x in high_risk_hours else 0, meta=('HighRiskHour', 'int64'))


AssertionError: 

1. Time-Based Analysis:
Already explored daily and hourly trends in transaction volumes, but now dive deeper into fraud patterns based on time.



In [None]:
#Weekday vs. Weekend: Is fraud more common on weekdays or weekends?
df['DayOfWeek'] = df.index.dayofweek  # 0 = Monday, 6 = Sunday
fraud_by_day = df[df['is_fraud'] == 1]['DayOfWeek'].value_counts().sort_index()
non_fraud_by_day = df[df['is_fraud'] == 0]['DayOfWeek'].value_counts().sort_index()



In [None]:

# Define the correct day order
day_order = ['Sunday', 'Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday']


df['DayName'] = df.index.day_name()
# Convert the 'DayName' column to a categorical type with the correct order
df['DayName'] = pd.Categorical(df['DayName'], categories=day_order, ordered=True)

fraud_by_day = df[df['is_fraud'] == 1]['DayName'].value_counts().sort_index()
non_fraud_by_day = df[df['is_fraud'] == 0]['DayName'].value_counts().sort_index()



In [None]:
df['IsWeekend'] = df['DayOfWeek'].apply(lambda x: 1 if x >= 5 else 0)
weekend_fraud = df[df['is_fraud'] == 1]['IsWeekend'].mean()
weekend_non_fraud = df[df['is_fraud'] == 0]['IsWeekend'].mean()

print(f"Percentage of fraud on weekends: {weekend_fraud * 100:.2f}%")
print(f"Percentage of non-fraud on weekends: {weekend_non_fraud * 100:.2f}%")


In [None]:
log_time("Part1 - TrxAmount, Hour, DayOfWeeek etc..", start_time)
start_time = time.time()


In [None]:
import os
print(os.listdir())  # List all files in the current directory


# MULTIPROCESSING : distance

In [None]:
import pandas as pd
from geopy.distance import geodesic
import multiprocessing as mp
import numpy as np
import time
import sys
from distance_calculation import calculate_distance_chunk

start_time = time.time()


# Add the current working directory to the system path
sys.path.append(os.getcwd())

# Multiprocessing function to split the dataframe and apply the distance calculation
def parallel_distance_calculation(df, num_partitions=None):
    if num_partitions is None:
        num_partitions = mp.cpu_count()  # Use all available CPU cores
    
    # Split the dataframe into chunks
    df_split = np.array_split(df, num_partitions)
    
    # Create a multiprocessing Pool
    with mp.Pool(num_partitions) as pool:
        # Apply the calculate_distance_chunk function to each chunk in parallel
        result = pool.map(calculate_distance_chunk, df_split)
    
    # Concatenate the results back into a single dataframe
    return pd.concat(result)

# Main block to ensure multiprocessing works correctly
if __name__ == "__main__":
    start_time = time.time()

    # Assuming df has the columns ['lat', 'long', 'merch_lat', 'merch_long']
    
    # Run with limited number of cores (e.g., 4 cores)
    df = parallel_distance_calculation(df, num_partitions=4)  # Use 4 cores instead of all available cores

    # Log the time taken for distance calculation with multiprocessing
    log_time("Part2 - Distance Calculation with Multiprocessing (4 cores)", start_time)

    # Check the first few rows to verify the result
    print(df[['lat', 'long', 'merch_lat', 'merch_long', 'distance']].head())


In [None]:
import os
print(os.getcwd())  # This will print the current working directory


log_time("Part2 -  Distance Calculation", start_time)
start_time = time.time()


In [None]:
# Check unique values in the 'is_fraud' column
df['is_fraud'].unique()


In [None]:
# Fraud vs Non-Fraud by Merchant Category
fraud_by_category = df[df['is_fraud'] == 1]['category'].value_counts().head(10)
non_fraud_by_category = df[df['is_fraud'] == 0]['category'].value_counts().head(10)



In [None]:
# Top 5 categories with the highest fraud counts
top_fraud_merchant_categories = df[df['is_fraud'] == 1]['category'].value_counts().head(5).index.tolist()

# Print top fraudulent categories
print("Top Fraudulent Merchant Categories:", top_fraud_merchant_categories)

# Create HighRiskMerchantCategory flag
df['HighRiskMerchantCategory'] = df['category'].apply(lambda x: 1 if x in top_fraud_merchant_categories else 0)



In [None]:
# Print the count of 1s and 0s in HighRiskMerchantCategory
print(df['HighRiskMerchantCategory'].value_counts())


# Potential Additional Features:
Transaction Frequency:
    Feature: How often a credit card has been used within a specific time frame (e.g., last hour or day).
    Why: Fraudsters often make rapid successive transactions within short periods. You could create a rolling window to calculate transaction frequency.
    How: You could calculate the number of transactions within the past X hours/days using a rolling window on the TransactionTime feature.

#age group

In [None]:
import pandas as pd

# Ensure 'DateOfBirth' is in datetime format
df['DateOfBirth'] = pd.to_datetime(df['DateOfBirth'], errors='coerce')  # Handle errors during conversion

# Step 1: Calculate Age
# Calculate age in years
df['Age'] = (pd.Timestamp.now() - df['DateOfBirth']).dt.days // 365  # Age in years

# Step 2: Create Age Groups
# Define age bins and labels
bins = [0, 18, 25, 35, 45, 55, 65, 100]  # Define your age bins, ensuring to cover all possible ages
labels = ['0-18', '19-25', '26-35', '36-45', '46-55', '56-65', '66+']  # Corresponding labels

# Create age group feature, include NaN values handling
df['AgeGroup'] = pd.cut(df['Age'], bins=bins, labels=labels, right=False, include_lowest=True)

# Verify the new features without truncating DataFrame
#print(df[['DateOfBirth', 'Age', 'AgeGroup']].head(10))  # Display the first 10 entries


In [None]:
log_time("Part3 - Merchant Categories & Age group", start_time)
start_time = time.time()


# MULTIPROCESSING : count_transactions_within_last_hour

In [161]:
import pandas as pd
import multiprocessing as mp
import numpy as np
import time
from transaction_frequency import process_chunk  # Import from the .py file

# Multiprocessing function to parallelize the transaction counting
def parallel_count_transactions(df, num_partitions=None):
    if num_partitions is None:
        num_partitions = mp.cpu_count()  # Use all available CPU cores
    
    # Ensure the index is a datetime
    df.index = pd.to_datetime(df.index)
    
    # Split the dataframe into chunks based on the number of partitions (CPU cores)
    df_split = np.array_split(df, num_partitions)
    
    # Create a multiprocessing Pool
    with mp.Pool(num_partitions) as pool:
        # Apply the processing function to each chunk in parallel
        result = pool.map(process_chunk, df_split)
    
    # Combine the results from each chunk into a single series, reset index for consistency
    return pd.concat(result).reset_index(drop=True)

# Assuming df has 'CreditCardNumber' as a column and transaction times are indexed
if __name__ == "__main__":
    start_time = time.time()

    # Apply the parallel processing for transaction frequency counting
    df['TransactionFrequency'] = parallel_count_transactions(df, num_partitions=4)  # Adjust num_partitions as needed

    # Log the time taken for transaction frequency calculation with multiprocessing
    log_time("Part4 - TransactionFrequency Multiprocessing", start_time)

    # Check the first 10 rows
    print(df[['TransactionFrequency']].head(10))


AttributeError: 'DatetimeIndex' object has no attribute 'expr'

In [None]:
df.index = pd.to_datetime(df.index)
print(df.index)

In [None]:
# Resample the data to count transactions every hour
transaction_counts_hourly = df.resample('H').size()
transaction_counts_daily = df.resample('D').size()

# Combine with CreditCardNumber if necessary
transaction_counts = df.groupby('CreditCardNumber').resample('H').size().reset_index(name='TransactionCount')
print(transaction_counts.head(10))

In [None]:
total_transactions = df.groupby('CreditCardNumber').size().reset_index(name='TotalTransactionCount')
print(total_transactions.head(10))


In [None]:
# Calculate the time difference between consecutive transactions
time_diff = df.index.to_series().diff().dt.total_seconds()
# Flag rapid transactions (within 5 minutes)
df['RapidTransactionFlag'] = time_diff < 60  # For a 1-minute threshold

# Create a temporary DataFrame for rapid transactions
rapid_transactions = df[df['RapidTransactionFlag']]

# Group by date and count the number of rapid transactions
rapid_transaction_counts = rapid_transactions.groupby(rapid_transactions.index.date).size()
print(rapid_transaction_counts)

# Get a summary of the rapid transactions
rapid_transactions_summary = rapid_transactions.describe()
print(rapid_transactions_summary)


In [None]:
print(df.columns)


In [None]:
log_time("Part5 - RapidTransactionFlag", start_time)
start_time = time.time()


Transaction Amount Features:
Log Transaction Amount: Normalize the TransactionAmount by taking its logarithm to reduce skewness.
Transaction Amount Flags: Create binary flags for high-value transactions (e.g., if TransactionAmount exceeds a certain threshold).

In [None]:

# Sample DataFrame creation
# Assume 'df' is your DataFrame and has a 'TransactionAmount' column
# df = pd.read_csv('your_data.csv')  # Load your actual data

# Step 1: Log Transaction Amount
# Calculate the log of TransactionAmount
df['LogTransactionAmount'] = np.log1p(df['TransactionAmount'])  # Use log1p for stability with 0 values

# Step 2: Create Transaction Amount Flags
# Define a threshold for high-value transactions
threshold = 100  # Adjust the threshold based on your data context

# Create a flag for high-value transactions
df['HighValueTransactionFlag'] = df['TransactionAmount'] > threshold

# Verify the new features
print(df[['TransactionAmount', 'LogTransactionAmount', 'HighValueTransactionFlag']].head(10))  # Display the first 10 entries


Behavioral Features:
Count of Transactions in Last X Days: Count how many transactions have occurred in the last 7, 14, or 30 days.
Average Transaction Amount in Last X Days: Calculate the average transaction amount over the same periods.

In [None]:
import pandas as pd

# Assuming 'TransactionTime' is already set as the index and in datetime format

# Step 1: Count of Transactions in Last X Days
for days in [7, 14, 30]:
    # Sort data by CreditCardNumber and TransactionTime to ensure rolling works properly
    df = df.sort_values(by=['CreditCardNumber', 'TransactionTime'])
    
    # Apply rolling and count the number of transactions for each card
    df[f'TransactionCountLast{days}Days'] = (
        df.groupby('CreditCardNumber')['CreditCardNumber']
        .rolling(f'{days}D')
        .count()
        .reset_index(level=0, drop=True)
    )

# Step 2: Average Transaction Amount in Last X Days
for days in [7, 14, 30]:
    # Sort data by CreditCardNumber and TransactionTime to ensure rolling works properly
    df = df.sort_values(by=['CreditCardNumber', 'TransactionTime'])
    
    # Calculate the average transaction amount for each credit card in the last X days
    df[f'AverageTransactionAmountLast{days}Days'] = (
        df.groupby('CreditCardNumber')['TransactionAmount']
        .rolling(f'{days}D')
        .mean()
        .reset_index(level=0, drop=True)
    )

# Verify the new features
print(df[['TransactionCountLast7Days', 'TransactionCountLast14Days', 'TransactionCountLast30Days',
           'AverageTransactionAmountLast7Days', 'AverageTransactionAmountLast14Days', 'AverageTransactionAmountLast30Days']].head(10))


In [None]:
print(df.columns)  # Display all columns in the DataFrame


In [None]:
log_time("Part6 - TransactionCountLast_X_Days & AverageTrxAmountLast_X_Days", start_time)
start_time = time.time()


# Graph Construction with NetworkX:

Highlight Fraudulent Nodes: Overlay of fraudulent and non-fraudulent credit cards on this degree distribution to see if there’s a difference in their degrees.

In [None]:


# Create an empty graph
G = nx.Graph()

# Add edges between credit cards and merchants, including transaction amount as an edge attribute
for idx, row in df.iterrows():
    credit_card = str(row['CreditCardNumber'])
    merchant = str(row['merchant'])
    transaction_amount = row['TransactionAmount']  # Ensure TransactionAmount exists in your dataframe
    
    # Add an edge with the transaction amount as an attribute
    G.add_edge(credit_card, merchant, transaction_amount=transaction_amount)


# Calculate degrees for all nodes in the graph
degrees = dict(G.degree())

# Filter degrees for credit cards and merchants
credit_card_nodes = df['CreditCardNumber'].astype(str).unique()
merchant_nodes = df['merchant'].astype(str).unique()

credit_card_degrees = {node: degrees[node] for node in credit_card_nodes if node in degrees}
merchant_degrees = {node: degrees[node] for node in merchant_nodes if node in degrees}

# Debugging: Print counts to ensure correctness
print(f"Number of unique credit card nodes: {len(credit_card_nodes)}")
print(f"Number of unique merchant nodes: {len(merchant_nodes)}")
print(f"Number of credit card nodes with degrees: {len(credit_card_degrees)}")
print(f"Number of merchant nodes with degrees: {len(merchant_degrees)}")

# Create a new DataFrame for easier plotting
degree_df = pd.DataFrame({
    'CreditCardDegree': pd.Series(credit_card_degrees),
    'MerchantDegree': pd.Series(merchant_degrees)
})




In [None]:
# Add degree information back to the original DataFrame
df['degree'] = df['CreditCardNumber'].astype(str).map(credit_card_degrees)



In [None]:
# Check edges and their attributes
#for edge in G.edges(data=True):
#    print(edge)

#do NOT print this, huge list


In [None]:
df['CreditCardNumber'] = df['CreditCardNumber'].astype(str)


In [None]:
fraud_mapping = df.set_index('CreditCardNumber')['is_fraud'].to_dict()


In [None]:
log_time("Part7 - NetworkX Start Step", start_time)
start_time = time.time()


In [None]:
#print(fraud_mapping.head(5))
#only testing purposes

# MULTIPROCESSING : betweenness_centrality

In [None]:
import networkx as nx
import time
from networkx_graph_betweeness_centrality import parallel_betweenness_centrality

# Assuming G is your graph
if __name__ == "__main__":
    start_time = time.time()

    # Calculate betweenness centrality using parallel processing
    betweenness_centrality = parallel_betweenness_centrality(G, num_partitions=4)  # Adjust number of cores if needed

    # Log the time taken for betweenness centrality calculation with multiprocessing
    log_time("Part8 - Betweenness Centrality Calculation with Multiprocessing", start_time)

    # Check a few centrality values
    print(list(betweenness_centrality.items())[:10])


In [None]:
df['betweenness_centrality'] = df['CreditCardNumber'].map(betweenness_centrality)


In [None]:
print(df['betweenness_centrality'].describe())
print(df['betweenness_centrality'].isna().sum())  # Check for missing values


In [None]:
# Check betweenness centrality for specific credit card numbers
sample_nodes = ['60416207185', 'fraud_Kutch-Ferry']  # Replace with actual nodes
for node in sample_nodes:
    print(f"{node}: {betweenness_centrality.get(node)}")


1. Investigate Nodes with High Betweenness Centrality:

Now that you’ve visualized nodes with high betweenness centrality, you can:

    Examine if fraudulent nodes tend to have high betweenness centrality. This might indicate that these nodes are acting as "connectors" between different parts of the network, which could be a sign of suspicious behavior.
    Compare centrality between fraud and non-fraud nodes to see if there's a pattern.



2. Visualize Communities in the Network:

You could apply community detection to uncover fraud rings or clusters of merchants targeted by fraudsters. The Louvain algorithm is great for this.

In [None]:
import community.community_louvain as community_louvain


# Apply Louvain method for community detection
partition = community_louvain.best_partition(G)



Fraud Node Highlighting:

    Fraudulent nodes (from df['is_fraud'] == 1) are colored red to make them stand out. The rest of the nodes are still colored based on their communities.
    This should help you easily spot any fraudulent nodes in the network.

Top 10 Most Central Nodes:

    We calculate betweenness centrality and extract the top 10 most central nodes.
    These nodes are visualized with their connections, which should help declutter the graph and focus on the key players in the transaction network.

In [None]:

# Apply Louvain method for community detection
partition = community_louvain.best_partition(G)

# Create positions for nodes using a spring layout
pos = nx.spring_layout(G)

# Add the community information to the DataFrame
df['community'] = df['CreditCardNumber'].map(partition)

# Highlight fraud nodes separately
fraud_nodes = df[df['is_fraud'] == 1]['CreditCardNumber'].values




In [None]:
# Add the community information to the DataFrame
df['community'] = df['CreditCardNumber'].map(partition)

# Calculate the percentage of fraud in each community
community_fraud = df.groupby('community')['is_fraud'].mean()



In [None]:

# Print fraud rate per community
print(community_fraud)


In [None]:
community_size = df.groupby('community').size()
print(community_size)


In [None]:
# Combine fraud rates and community sizes into a single DataFrame
fraud_vs_size = pd.concat([community_fraud, df.groupby('community').size()], axis=1)
fraud_vs_size.columns = ['FraudRate', 'CommunitySize']



In [None]:
top_fraud_communities = community_fraud.sort_values(ascending=False).head(5)
print(top_fraud_communities)


In [None]:
# Get the community labels of the top fraud communities
top_community_labels = top_fraud_communities.index.tolist()

# Filter the DataFrame for only the top fraud communities
top_communities_df = df[df['community'].isin(top_community_labels)]


In [None]:
# Show Only the Top Merchants by Fraud Rate:
# Instead of displaying all merchants, you can filter the plot to show only the top 10 or 20 merchants with the highest fraud rates.

# Calculate fraud rate by merchant in the top fraud communities
merchant_fraud_rate = top_communities_df.groupby('merchant')['is_fraud'].mean()

# Sort merchants by fraud rate in descending order
top_merchants = merchant_fraud_rate.sort_values(ascending=False).head(10)

# Print top 10 merchants with highest fraud rate
print(top_merchants)


In [None]:

# Assuming 'category' is a column representing merchant categories
merchantcategory_fraud = top_communities_df.groupby('category')['is_fraud'].mean()

# Sort the fraud rate by merchant category in descending order
merchantcategory_fraud_sorted = merchantcategory_fraud.sort_values(ascending=False)


In [None]:

log_time("Part9 - Community & Top Merchants", start_time)
start_time = time.time()


In [None]:
# Check the density of the graph (a measure of sparsity)
density = nx.density(G)
print(f"Graph Density: {density}")


In [None]:
# Calculate and print the average degree
degree_sequence = [degree for node, degree in G.degree()]
average_degree = sum(degree_sequence) / len(degree_sequence)
print(f"Average Degree of Nodes: {average_degree}")


In [None]:
log_time("Part10 - Density", start_time)
start_time = time.time()


The results you’ve provided show:

    Graph Density: 0.2513
        This is a moderate density value. A density of 0 would indicate a completely disconnected graph, while a value close to 1 would indicate a very tightly connected graph (like a clique). A density of 0.25 means about 25% of the possible connections between nodes are present, which suggests the graph isn’t overly sparse, but it’s not densely connected either.

    Average Degree: 406.18
        This is relatively high, meaning that, on average, each node (credit card or merchant) is connected to about 406 other nodes. This high degree could indicate that nodes, especially credit cards, are interacting with many merchants. However, these connections are likely not forming closed loops or triangles, which is why the clustering coefficient is zero for all nodes.

What This Means:

    Even though the average degree is high, suggesting that credit cards are interacting with many merchants, the interactions are likely not forming triangles (where connected nodes are also connected to each other). This results in zero clustering coefficients across the board.

    The moderate graph density indicates that the network is connected to some extent, but not densely enough to produce high clustering coefficients.

Why This Happens:

In transaction networks, it’s common for credit cards to interact with different merchants, but merchants don’t typically transact with each other, which means closed triangles (required for a non-zero clustering coefficient) are rare. In fraud detection, this is normal, as fraudsters typically transact with many distinct merchants rather than creating highly connected communities.
Next Steps:

Given the moderate density and high degree of the nodes, the clustering coefficient might not be the most insightful metric. Instead, you could focus on the following:
1. Focus on Betweenness Centrality and Degree:

These metrics are more likely to highlight key nodes (e.g., credit cards or merchants) that are crucial in the transaction network. You’ve already calculated betweenness centrality, and the average degree indicates that some credit cards or merchants might have a significant number of connections.
2. Look for Key Nodes (High Degree or Centrality):

You could identify nodes with high degree or betweenness centrality to see if they’re involved in fraudulent transactions.

In [None]:
print(df.columns)

Given that you only identified 4 nodes with the lowest betweenness centrality, and they are all disconnected, it seems that focusing on these low-centrality nodes isn’t providing much value for your analysis. This could indicate that these nodes (credit card numbers or merchants) are peripheral and not involved in significant patterns of interaction, and therefore, might not contribute meaningful insights for detecting fraud.
Should You Continue with Low Betweenness Centrality Nodes?

    Disconnected Nodes: Since the nodes with the lowest betweenness centrality are disconnected and few in number, they don't seem to play a crucial role in the transaction network.
    Low Utility: If these nodes don't show fraud or aren't involved in key transactions, they might not be useful for your model or analysis.

What to Do Next:

    Abandon the Focus on Low Betweenness Centrality:
        Since these nodes are disconnected and don't seem to offer useful insights, it might be better to abandon the focus on low betweenness centrality.
        Instead, focus on nodes with more centrality (betweenness, degree, etc.) or explore other graph features.

    Explore Other Graph Metrics:

        You can shift your focus to more meaningful metrics such as pagerank or eigenvector centrality, which may reveal more about the influence or importance of nodes in the network.

        Here's how you can calculate pagerank and analyze it:

In [None]:
selected_features = [
    'TransactionAmount', 'LogTransactionAmount', 'HighValueTransactionFlag',
    'TransactionCountLast7Days', 'TransactionCountLast14Days', 'TransactionCountLast30Days',
    'AverageTransactionAmountLast7Days', 'AverageTransactionAmountLast14Days', 'AverageTransactionAmountLast30Days',
    'Hour', 'HighRiskHour', 'DayOfWeek', 'IsWeekend', 'TransactionFrequency', 'RapidTransactionFlag',
    'lat', 'long', 'merch_lat', 'merch_long', 'distance', 'city_pop',
    'Age', 'AgeGroup', 'gender', 'state', 'city',
    'degree', 'betweenness_centrality', 'community'
]

df_selected_features = df[selected_features]


# Page rank as new feature

In [None]:
# Calculate PageRank for each node in the graph
pagerank = nx.pagerank(G)

# Map the PageRank values to the 'CreditCardNumber' in the DataFrame
df['pagerank'] = df['CreditCardNumber'].map(pagerank)


In [None]:
# Check for NaN values in the pagerank column
print(df['pagerank'].isna().sum())


In [None]:
# Check descriptive statistics of pagerank values
print(df['pagerank'].describe())



In [None]:
# Check how many nodes have a PageRank of zero
zero_pagerank_count = (df['pagerank'] == 0).sum()
print(f"Number of nodes with zero PageRank: {zero_pagerank_count}")


In [None]:
# Compare the average PageRank for fraud and non-fraud transactions
fraud_avg_pagerank = df[df['is_fraud'] == 1]['pagerank'].mean()
non_fraud_avg_pagerank = df[df['is_fraud'] == 0]['pagerank'].mean()

print(f"Average PageRank for Fraud: {fraud_avg_pagerank}")
print(f"Average PageRank for Non-Fraud: {non_fraud_avg_pagerank}")


In [None]:
selected_features.append('pagerank')
df_selected_features = df[selected_features]


In [None]:
print(df_selected_features.columns)

In [None]:
print(df.shape)

In [None]:
log_time("Part11 - PageRank", start_time)
start_time = time.time()


In [None]:
# Decision Trees start

In [None]:
log_time("END - Feature Engineering .....  ", start_time)
start_time = time.time()

In [None]:
log_time("START - Random Forest with DASK .....  ", start_time)
start_time = time.time()

Close the dASK Client

In [None]:
from dask.distributed import Client
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, classification_report
import dask.dataframe as dd
import pandas as pd
import os

# Start Dask client
client = Client()

# Define directory paths and other configurations
reports_output_dir = "/path/to/reports"  # Define the directory to save reports
dataset_type = "train" if not use_test_data else "test"  # Set based on evaluation type

# Load and preprocess function for both train and test datasets
def load_and_preprocess(data_source):
    X = df[selected_features]
    y = df['is_fraud']
    
    # Convert to Dask, categorize, one-hot encode, then back to Pandas
    X = dd.from_pandas(X, npartitions=5).categorize()
    X = dd.get_dummies(X, drop_first=True).compute()  # Convert Dask DataFrame to Pandas
    y = dd.from_pandas(y, npartitions=5).compute()  # Convert Dask Series to Pandas
    return X, y

# Model training and evaluation
if use_test_data:
    # Load and preprocess test data
    X_test, y_test = load_and_preprocess(f"{input_src_dir}/fraudTest.csv")
    
    # Evaluate on test data
    y_test_pred = model.predict(X_test)
    test_accuracy = accuracy_score(y_test, y_test_pred)
    print("Accuracy on test data:", test_accuracy)
    
    # Generate classification report for test data
    clf_report = classification_report(y_test, y_test_pred)
    print("Classification Report: " , clf_report)
    
else:
    # Load and preprocess train data
    X_train, y_train = load_and_preprocess(df)

    # Train Random Forest model on train data
    model = RandomForestClassifier(n_estimators=100, max_depth=20, random_state=42)
    model.fit(X_train, y_train)

    # Evaluate on train data
    y_train_pred = model.predict(X_train)
    train_accuracy = accuracy_score(y_train, y_train_pred)
    print("Accuracy on train data:", train_accuracy)
    
    # Generate classification report for train data
    clf_report = classification_report(y_train, y_train_pred)
    print("Classification Report: ", clf_report)
    accuracy = train_accuracy if not use_test_data else test_accuracy



In [None]:
from sklearn.metrics import roc_auc_score, precision_recall_curve, auc

# Calculate AUC-ROC
roc_auc = roc_auc_score(y_train, y_train_pred)
print("ROC AUC Score:", roc_auc)

# Calculate Precision-Recall curve
precision, recall, thresholds = precision_recall_curve(y_train, y_train_pred)

# Calculate the area under the Precision-Recall curve
pr_auc = auc(recall, precision)
print("Precision-Recall AUC:", pr_auc)

# Display key metrics
print("Precision at various thresholds:", precision)
print("Recall at various thresholds:", recall)

# You can plot these metrics for a clearer understanding
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 6))
plt.plot(recall, precision, marker='.', label='Precision-Recall Curve')
plt.xlabel('Recall')
plt.ylabel('Precision')
# Define the title
title = 'Random Forest with DASK - ROC Curve for Test Set'
plt.title(title)
plt.legend()
plt.show()


# Use the title in the filename
filename = f"{dataset_type}_{title.replace(' ', '_').replace(',', '').lower()}.png"
plt.savefig(os.path.join(reports_output_dir, filename), dpi=300, bbox_inches='tight')


plt.show()



In [None]:

# Prepare the full report content
full_report = f"Accuracy: {accuracy:.4f}\n\n{clf_report}"
print(full_report)

# Define the report title and output filename
classification_report_title = 'Random Forest with DASK Model Classification Report'
classification_report_outputfilename = f"{model_specs}_{dataset_type}_{classification_report_title.replace(' ', '_').replace(',', '').lower()}.txt"

# Save the classification report with accuracy to a text file
with open(os.path.join(reports_output_dir, classification_report_outputfilename), 'w') as f:
    f.write(full_report)  # Writing the combined report string to the file

print(f"Classification report saved to: {os.path.join(reports_output_dir, classification_report_outputfilename)}")


In [None]:
log_time("END - Random Forest with DASK Model .....  ", start_time)
start_time = time.time()

In [None]:

# Close Dask client when done
client.close()
log_time("Closed the DASK Client", start_time)


In [None]:
import pickle
import os

# Specify the output directory
output_dir_model = '/Users/sadhvichandragiri/desktop/coding/ZHAW_Project/ML_BigData_Repo_1/models'
if not os.path.exists(output_dir_model):
    os.makedirs(output_dir_model)  # Ensure the directory exists

# Create a dynamic filename with .pkl extension
model_title = 'model'
outputfilename_model = f"{model_specs}_{model_title.replace(' ', '_').replace(',', '').lower()}.pkl"

# Save the best model obtained from GridSearchCV
with open(os.path.join(output_dir_model, outputfilename_model), 'wb') as model_file:
    pickle.dump(model, model_file)

print(f"Model saved to {os.path.join(output_dir_model, outputfilename_model)}")


In [None]:
import os
import time

# Assuming start_time is defined earlier in the notebook
end_time_notebook = time.time()
elapsed_time = end_time_notebook - start_time_notebook

# Print and format the notebook end time and total execution time
print(f"Notebook ended at: {time.ctime(end_time_notebook)}")
print(f"Total execution time: {elapsed_time // 60:.0f} minutes and {elapsed_time % 60:.2f} seconds")


log_time(f"{model_specs}_{dataset_type} Notebook Ended at... ", start_time_notebook)
