In [1]:
import os
# Find the latest version of spark 3.x  from https://www.mongodb.com/try/download/database-tools and enter as the spark version
# For example:
# spark_version = 'spark-3.5.1'
spark_version = 'spark-3.5.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:2 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:3 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:7 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease [18.1 kB]
Get:8 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:9 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1,225 kB]
Hit:10 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:11 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:12 http://security.ubuntu.com/ubuntu jammy-security/main amd64 Packages [2,454 kB]
Get:13 https://developer.download.nvidia.com

In [2]:
 # Import packages
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.types import StructType,StructField,StringType, DateType, IntegerType

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

In [3]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("kartik2112/fraud-detection")

print("Path to dataset files:", path)


# List all files in the downloaded directory
files = os.listdir(path)
print("Files in the dataset:", files)


Downloading from https://www.kaggle.com/api/v1/datasets/download/kartik2112/fraud-detection?dataset_version_number=1...


100%|██████████| 202M/202M [00:02<00:00, 83.5MB/s]

Extracting files...





Path to dataset files: /root/.cache/kagglehub/datasets/kartik2112/fraud-detection/versions/1
Files in the dataset: ['fraudTrain.csv', 'fraudTest.csv']


In [4]:
# File location and type
file_location = path + "/fraudTrain.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

df.show()

+---+---------------------+-------------------+--------------------+-------------+------+-----------+---------+------+--------------------+--------------------+-----+-----+-------+------------------+--------+--------------------+----------+--------------------+----------+------------------+------------------+--------+
|_c0|trans_date_trans_time|             cc_num|            merchant|     category|   amt|      first|     last|gender|              street|                city|state|  zip|    lat|              long|city_pop|                 job|       dob|           trans_num| unix_time|         merch_lat|        merch_long|is_fraud|
+---+---------------------+-------------------+--------------------+-------------+------+-----------+---------+------+--------------------+--------------------+-----+-----+-------+------------------+--------+--------------------+----------+--------------------+----------+------------------+------------------+--------+
|  0|  2019-01-01 00:00:18|   2703186189

In [5]:
# Create a view or table

temp_table_name = "fraudTrain"

df.createOrReplaceTempView(temp_table_name)

In [17]:
spark.sql("""
SELECT
    cc_num,
    unix_time,
    trans_date_trans_time,
    from_unixtime(unix_time + 220924800) unix_convert,
    is_fraud,
    CASE
        WHEN from_unixtime(unix_time + 220924800) = trans_date_trans_time THEN 'Match'
        ELSE 'Mismatch'
    END as comparison_result
FROM fraudTrain
order by cc_num,trans_date_trans_time, is_fraud
""").show()

+-----------+----------+---------------------+-------------------+--------+-----------------+
|     cc_num| unix_time|trans_date_trans_time|       unix_convert|is_fraud|comparison_result|
+-----------+----------+---------------------+-------------------+--------+-----------------+
|60416207185|1325422035|  2019-01-01 12:47:15|2019-01-01 12:47:15|       0|            Match|
|60416207185|1325493897|  2019-01-02 08:44:57|2019-01-02 08:44:57|       0|            Match|
|60416207185|1325494056|  2019-01-02 08:47:36|2019-01-02 08:47:36|       0|            Match|
|60416207185|1325507894|  2019-01-02 12:38:14|2019-01-02 12:38:14|       0|            Match|
|60416207185|1325509846|  2019-01-02 13:10:46|2019-01-02 13:10:46|       0|            Match|
|60416207185|1325598995|  2019-01-03 13:56:35|2019-01-03 13:56:35|       0|            Match|
|60416207185|1325610310|  2019-01-03 17:05:10|2019-01-03 17:05:10|       0|            Match|
|60416207185|1325685595|  2019-01-04 13:59:55|2019-01-04 13:

In [6]:
spark.sql("""select *
            from fraudTrain""").show(truncate=False)

+---+---------------------+-------------------+----------------------------------------+-------------+------+-----------+---------+------+------------------------------+------------------------+-----+-----+-------+------------------+--------+---------------------------------------------+----------+--------------------------------+----------+------------------+------------------+--------+
|_c0|trans_date_trans_time|cc_num             |merchant                                |category     |amt   |first      |last     |gender|street                        |city                    |state|zip  |lat    |long              |city_pop|job                                          |dob       |trans_num                       |unix_time |merch_lat         |merch_long        |is_fraud|
+---+---------------------+-------------------+----------------------------------------+-------------+------+-----------+---------+------+------------------------------+------------------------+-----+-----+-------+----

In [7]:
# Write DataFrame to Parquet with partitioning by a column (e.g., 'Class')
df.write.mode("overwrite").partitionBy("is_fraud").parquet("fraud_train")

In [8]:
# Read in our new parquet formatted data
p_df=spark.read.parquet('fraud_train')
p_df.createOrReplaceTempView('p_fraudTrain')

In [9]:
import time

start_time = time.time()
spark.sql("""select is_fraud,
                    round(avg(amt),2),
                    round(min(amt),2),
                    round(max(amt),2),
                    round(count(amt),2)
            from p_fraudTrain
            group by is_fraud""").show(truncate=False)
print("--- %s seconds ---" % (time.time() - start_time))

+--------+------------------+------------------+------------------+--------------------+
|is_fraud|round(avg(amt), 2)|round(min(amt), 2)|round(max(amt), 2)|round(count(amt), 2)|
+--------+------------------+------------------+------------------+--------------------+
|0       |67.67             |1.0               |28948.9           |1289169             |
|1       |531.32            |1.06              |1376.04           |7506                |
+--------+------------------+------------------+------------------+--------------------+

--- 4.030420780181885 seconds ---


In [10]:
 # Import our dependencies
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
import pandas as pd
import tensorflow as tf

In [19]:
query = "select cc_num, amt, zip, lat, long, city_pop, unix_time, merch_lat, merch_long, is_fraud from p_fraudTrain"
spark_df = spark.sql(query)

# Convert Spark DataFrame to Pandas DataFrame
pandas_df = spark_df.toPandas()

# Display the Pandas DataFrame
pandas_df.head()

Unnamed: 0,cc_num,amt,zip,lat,long,city_pop,unix_time,merch_lat,merch_long,is_fraud
0,2703186189652095,4.97,28654,36.0788,-81.1781,3495,1325376018,36.011293,-82.048315,0
1,630423337322,107.23,99160,48.8878,-118.2105,149,1325376044,49.159047,-118.186462,0
2,38859492057661,220.11,83252,42.1808,-112.262,4154,1325376051,43.150704,-112.154481,0
3,3534093764340240,45.0,59632,46.2306,-112.1138,1939,1325376076,47.034331,-112.561071,0
4,375534208663984,41.96,24433,38.4207,-79.4629,99,1325376186,38.674999,-78.632459,0


In [25]:
import folium
from geopy.distance import geodesic  # To calculate distance
import ipywidgets as widgets
from IPython.display import display


In [32]:
query = "select cc_num, lat, long, merchant, merch_lat, merch_long from p_fraudTrain where is_fraud == 1"
spark_df = spark.sql(query)

# Convert Spark DataFrame to Pandas DataFrame
fraudulent_transactions_df = spark_df.toPandas()

# Display the Pandas DataFrame
fraudulent_transactions_df.head()

Unnamed: 0,cc_num,lat,long,merchant,merch_lat,merch_long
0,4613314721966,35.9946,-81.7266,fraud_Rutherford-Mertz,36.430124,-81.179483
1,340187018810220,29.44,-98.459,"fraud_Jenkins, Hauck and Friesen",29.819364,-99.142791
2,340187018810220,29.44,-98.459,fraud_Goodwin-Nitzsche,29.273085,-98.83636
3,4613314721966,35.9946,-81.7266,fraud_Erdman-Kertzmann,35.909292,-82.09101
4,340187018810220,29.44,-98.459,fraud_Koepp-Parker,29.786426,-98.68341


In [33]:
def calculate_distance(row):
    transaction_coords = (row["lat"], row["long"])
    merchant_coords = (row["merch_lat"], row["merch_long"])
    return geodesic(transaction_coords, merchant_coords).km

# Add a distance column
fraudulent_transactions_df["distance_km"] = fraudulent_transactions_df.apply(calculate_distance, axis=1)

fraudulent_transactions_df.head()


Unnamed: 0,cc_num,lat,long,merchant,merch_lat,merch_long,distance_km
0,4613314721966,35.9946,-81.7266,fraud_Rutherford-Mertz,36.430124,-81.179483,68.962726
1,340187018810220,29.44,-98.459,"fraud_Jenkins, Hauck and Friesen",29.819364,-99.142791,78.443229
2,340187018810220,29.44,-98.459,fraud_Goodwin-Nitzsche,29.273085,-98.83636,41.048425
3,4613314721966,35.9946,-81.7266,fraud_Erdman-Kertzmann,35.909292,-82.09101,34.211963
4,340187018810220,29.44,-98.459,fraud_Koepp-Parker,29.786426,-98.68341,44.124854


In [35]:
# Unique credit card numbers for the dropdown
cc_nums = fraudulent_transactions_df["cc_num"].unique()
dropdown = widgets.Dropdown(
    options=cc_nums,
    description="Credit Card:",
    value=cc_nums[0],
)

# Function to create the map for a selected credit card number
def create_map(cc_num):
    filtered_df = fraudulent_transactions_df[fraudulent_transactions_df["cc_num"] == cc_num]

    # Initialize the map
    if not filtered_df.empty:
        map_center = [filtered_df.iloc[0]["lat"], filtered_df.iloc[0]["long"]]
        fraud_map = folium.Map(location=map_center, zoom_start=10)

        # Add markers and lines
        for _, row in filtered_df.iterrows():
            # Customer home location
            folium.Marker(
                location=[row["lat"], row["long"]],
                popup=f"Customer",
                icon=folium.Icon(color="blue"),
            ).add_to(fraud_map)

            # Merchant location
            folium.Marker(
                location=[row["merch_lat"], row["merch_long"]],
                popup=f"Merchant: {row['merchant']}<br>Distance: {row['distance_km']:.2f} km",
                icon=folium.Icon(color="green"),
            ).add_to(fraud_map)

            # Draw a line between customer home and merchant
            folium.PolyLine(
                locations=[(row["lat"], row["long"]), (row["merch_lat"], row["merch_long"])],
                color="red",
                weight=2,
            ).add_to(fraud_map)

        # Display the map
        return fraud_map
    else:
        return folium.Map(location=[0, 0], zoom_start=2)

# Function to update the map when the dropdown changes
def update_map(change):
    selected_cc_num = change["new"]
    map_display.clear_output()
    with map_display:
        fraud_map = create_map(selected_cc_num)
        fraud_map.save("fraud_map.html")
        display(fraud_map)

# Display the map with the initial credit card number
map_display = widgets.Output()
with map_display:
    display(create_map(cc_nums[0]))

# Update the map when a new credit card number is selected
dropdown.observe(update_map, names="value")

# Display the dropdown and map
display(dropdown, map_display)


Dropdown(description='Credit Card:', options=(4613314721966, 340187018810220, 4922710831011201, 34154619900653…

Output()

In [None]:
# Extract the 'is_fraud' column as a list
y = p_df.select("is_fraud").rdd.flatMap(lambda x: x).collect()

# Drop the "is_fraud" column and convert the features to a Pandas DataFrame or NumPy array
X = pandas_df.drop("is_fraud", axis=1)

# Use train_test_split from Scikit-learn
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42, stratify=y)

In [None]:
# Preprocess numerical data for neural network

# Create a StandardScaler instances
scaler = StandardScaler()

# Fit the StandardScaler
X_scaler = scaler.fit(X_train)

# Scale the data
X_train_scaled = X_scaler.transform(X_train)
X_test_scaled = X_scaler.transform(X_test)

In [None]:
import numpy as np

y_train = np.array(y_train)
y_test = np.array(y_test)

print(type(X_train_scaled))  # Should be <class 'numpy.ndarray'>
print(type(y_train))         # Should be <class 'numpy.ndarray'>


In [None]:
 # Define the deep learning model
n_features = X_train_scaled.shape[1]

nn_model = tf.keras.models.Sequential()
nn_model.add(tf.keras.layers.Input(shape=(n_features,)))
nn_model.add(tf.keras.layers.Dense(units=18, activation="relu"))
nn_model.add(tf.keras.layers.Dense(units=9, activation="relu"))
nn_model.add(tf.keras.layers.Dense(units=1, activation="sigmoid"))

# Compile the Sequential model together and customize metrics
nn_model.compile(loss="binary_crossentropy", optimizer="adam", metrics=["accuracy"])

# Train the model
fit_model = nn_model.fit(X_train_scaled, y_train, epochs=5)

In [None]:
# Evaluate the model using the test data
model_loss, model_accuracy = nn_model.evaluate(X_test_scaled,y_test,verbose=2)
print(f"Loss: {model_loss}, Accuracy: {model_accuracy}")