# Credit Card Transaction Analysis

### 1. Importing Libraries

The first step is to import the necessary libraries. We will be using PySpark for this project. PySpark is a Python library that allows us to use Spark for big data processing.

In [1]:
# Import Libraries

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, udf, from_unixtime
from pyspark.sql.types import StringType, DoubleType, IntegerType, TimestampType
import pyspark.sql.functions as F
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# Configure Matplotlib
%matplotlib inline

### 2. Create new spark session


#### Configuration Details

- **Application Name:** Sets the name of the Spark application to "CreditCardTransactionAnalysis". This name is used for identification in the Spark UI and logs.

- **Driver Memory:** Allocates 4 GB of memory for the Spark driver process. This is crucial for managing the application's execution and data.

- **Executor Memory:** Allocates 4 GB of memory for each executor process. This memory is used for executing tasks and storing data.

- **Python Worker Memory:** Allocates 1 GB of memory for each Python worker process. This is important for executing Python code efficiently.

- **Python Worker Timeout:** Sets the timeout for Python worker processes to 120 seconds. This helps manage unresponsive workers by restarting them if they exceed this limit.

- **Network Timeout:** Sets the network timeout to 120 seconds. This ensures that network operations between the driver and executors do not hang indefinitely.

- **Executor Heartbeat Interval:** Sets the heartbeat interval for executors to 60 seconds. This helps the driver monitor the health of executors.

- **Shuffle Partitions:** Sets the number of partitions for shuffling data during operations like joins and aggregations to 4. This helps optimize performance by controlling parallelism.

- **Master URL:** Specifies that Spark should run locally with as many worker threads as there are logical cores on the machine. This is useful for development and testing.

- **Get or Create Session:** Creates a new Spark session or retrieves an existing one. This ensures efficient resource management by reusing sessions.

In [2]:
# Stop existing session if any. I used this to retry configure spark session if any error occurs without restarting kernel
try:
    spark.stop()
except:
    pass

# Create new session with updated configuration
spark = SparkSession.builder \
    .appName("CreditCardTransactionAnalysis") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.python.worker.memory", "1g") \
    .config("spark.python.worker.timeout", "120") \
    .config("spark.network.timeout", "120") \
    .config("spark.executor.heartbeatInterval", "60s") \
    .config("spark.sql.shuffle.partitions", "4") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to reduce verbosity
spark.sparkContext.setLogLevel("ERROR")

### 3. Load the data

In [None]:
# Read the JSON file
df = spark.read.json("cc_sample_transaction.json")

# Display schema and sample data
print("Schema:")
df.printSchema()
print("\nSample Data:")
df.show(5)

### 4. Data Cleaning

- Masking Credit Card Numbers. The first step in cleaning the data is to mask the credit card numbers. This is done by replacing the digits with a mask character. This function masks credit card numbers to protect sensitive information. It replaces all but the last four digits with "XXXX".

- Registering the UDF. UDFs enable custom transformations on DataFrame columns, making it possible to apply the masking logic to entire columns of credit card numbers efficiently.

- Converting Timestamps Columns. This function converts timestamp columns to a specified timezone (UTC+8) and creates a new column with the converted timestamps.

- Extracting name. This function extracts the first and last names from the full name column from the `person_name` key.

In [4]:
# Mask credit card numbers
def mask_cc_num(cc_num):
    if cc_num is None or len(cc_num) < 4:
        return "XXXX-XXXX-XXXX-XXXX"
    else:
        return "XXXX-XXXX-XXXX-" + str(cc_num)[-4:]

# Register UDF
mask_udf = udf(mask_cc_num, StringType())

# Convert timestamp columns to UTC+8
def convert_timestamp(df, col_name):
    return df.withColumn(
        f"{col_name}_converted",
        (F.to_timestamp(F.col(col_name), 'yyyy-MM-dd HH:mm:ss') + F.expr('INTERVAL 8 HOURS')).cast(TimestampType())
    )

# Extract first and last names from 'person_name'
def extract_names(df):
    name_split = split(F.col('person_name'), ',\s*')
    df = df.withColumn('first', name_split.getItem(0)) \
           .withColumn('last', name_split.getItem(1))
    return df

### 5. Applying Data Transformations

- Start with flattening the nested JSON structure by selecting relevant fields and extracting values from the `personal_detail` JSON object.

- Extract the first and last names from the `personal_detail` JSON object.

- Masking personally indentifiable information (PII) by replacing sensitive data with asterisks.

- Dropping sensitive columns.

In [None]:
try:
    # First, let's check the structure of personal_detail
    print("Sample of personal_detail structure:")
    df.select('personal_detail').show(2, truncate=False)
    
    # Flatten the JSON data with proper parsing
    df_flat = df.select(
        'Unnamed: 0',
        'trans_date_trans_time',
        'cc_num',
        'merchant',
        'category',
        'amt',
        F.get_json_object('personal_detail', '$.person_name').alias('person_name'),
        F.get_json_object('personal_detail', '$.gender').alias('gender'),
        F.get_json_object('personal_detail', '$.street').alias('street'),
        F.get_json_object('personal_detail', '$.city').alias('city'),
        F.get_json_object('personal_detail', '$.state').alias('state'),
        F.get_json_object('personal_detail', '$.zip').alias('zip'),
        F.get_json_object('personal_detail', '$.lat').alias('lat'),
        F.get_json_object('personal_detail', '$.long').alias('long'),
        F.get_json_object('personal_detail', '$.city_pop').alias('city_pop'),
        F.get_json_object('personal_detail', '$.job').alias('job'),
        F.get_json_object('personal_detail', '$.dob').alias('dob'),
        'trans_num',
        'merch_lat',
        'merch_long',
        'is_fraud',
        'merch_zipcode',
        'merch_last_update_time',
        'merch_eff_time',
        'cc_bic'
    )

    # Extract first and last names from person_name
    df_flat = df_flat.withColumn(
        'first',
        F.when(
            F.col('person_name').contains(','),
            F.trim(F.split('person_name', ',').getItem(0))
        ).otherwise(F.lit('UNKNOWN'))
    ).withColumn(
        'last',
        F.when(
            F.col('person_name').contains(','),
            F.trim(F.split('person_name', ',').getItem(1))
        ).otherwise(F.lit('UNKNOWN'))
    )

    # Convert timestamp columns to UTC+8
    timestamp_cols = ['trans_date_trans_time', 'merch_last_update_time', 'merch_eff_time']
    for col_name in timestamp_cols:
        df_flat = df_flat.withColumn(
            f"{col_name}_converted",
            F.to_timestamp(F.col(col_name), 'yyyy-MM-dd HH:mm:ss')
        ).withColumn(
            f"{col_name}_converted",
            F.from_utc_timestamp(F.col(f"{col_name}_converted"), "UTC+8")
        )

    # Mask PII (credit card numbers)
    df_flat = df_flat.withColumn(
        'cc_num_masked',
        F.when(F.length('cc_num') >= 4,
              F.concat(F.lit('XXXX-XXXX-XXXX-'), F.substring('cc_num', -4, 4)))
        .otherwise(F.lit('XXXX-XXXX-XXXX-XXXX'))
    )

    # Cast numeric columns
    df_flat = df_flat \
        .withColumn('amt', F.col('amt').cast('double')) \
        .withColumn('lat', F.col('lat').cast('double')) \
        .withColumn('long', F.col('long').cast('double')) \
        .withColumn('city_pop', F.col('city_pop').cast('integer')) \
        .withColumn('merch_lat', F.col('merch_lat').cast('double')) \
        .withColumn('merch_long', F.col('merch_long').cast('double')) \
        .withColumn('is_fraud', F.col('is_fraud').cast('integer'))

    # Drop original columns that have been transformed
    df_flat = df_flat.drop('person_name', 'cc_num')

    # Rename masked credit card column
    df_flat = df_flat.withColumnRenamed('cc_num_masked', 'cc_num')

    # Show the resulting schema
    print("\nResulting Schema:")
    df_flat.printSchema()

    # Show sample of transformed data
    print("\nSample of transformed data:")
    df_flat.show(5, truncate=False)

except Exception as e:
    print(f"Error in transformation: {str(e)}")
    
    # Additional debugging information
    print("\nDebugging Information:")
    print("Original Schema:")
    df.printSchema()

### 6. Data Inspection and Cleaning

This cell focuses on inspecting and cleaning the dataset to ensure data quality before analysis. It identifies and handles non-numeric values, invalid entries, and casts columns to appropriate data types.

- Identifying Non-Numeric 'amt' Values. Identifies entries in the `amt` column that are non-numeric or null. Ensures data quality by identifying invalid entries before analysis.

- Displaying Sample Non-Numeric 'amt' Entries. Displays a sample of non-numeric entries for inspection. Helps understand the nature of invalid entries.

- Removing Rows with Non-Numeric 'amt' Values. Filters out rows with non-numeric or null values in the `amt` column. Ensures subsequent analyses are based on valid numeric data.

- Identifying Invalid 'is_fraud' Values. Identifies entries in the `is_fraud` column that are not '0' or '1' or are null. Ensures valid binary indicators for accurate fraud analysis.

- Displaying Sample Invalid 'is_fraud' Entries. Displays a sample of invalid entries for inspection. Helps understand the nature of invalid entries.

- Removing Rows with Invalid 'is_fraud' Values. Filters out rows with invalid values in the `is_fraud` column. Ensures subsequent analyses are based on valid binary indicators.

- Casting Columns to Appropriate Data Types. Casts various columns to their appropriate data types. Ensures accurate calculations and analyses.

- Dropping Duplicates and Handling Nulls. Removes duplicate rows and handles null values. Ensures data quality and avoids potential errors in subsequent analyses.

In [None]:
# Identify and handle non-numeric 'amt' values
non_numeric_amt = df_flat.filter(~F.col('amt').rlike('^\d+\.?\d*$') | F.col('amt').isNull())
non_numeric_amt_count = non_numeric_amt.count()
print(f"Number of non-numeric 'amt' entries: {non_numeric_amt_count}")

if non_numeric_amt_count > 0:
    print("Sample non-numeric 'amt' entries:")
    non_numeric_amt.select('amt').show(5)

# Remove rows with non-numeric 'amt'
df_flat = df_flat.filter(F.col('amt').rlike('^\d+\.?\d*$') & F.col('amt').isNotNull())

# Identify and handle invalid 'is_fraud' values
non_numeric_is_fraud = df_flat.filter(~F.col('is_fraud').isin('0', '1') | F.col('is_fraud').isNull())
non_numeric_is_fraud_count = non_numeric_is_fraud.count()
print(f"Number of invalid 'is_fraud' entries: {non_numeric_is_fraud_count}")

if non_numeric_is_fraud_count > 0:
    print("Sample invalid 'is_fraud' entries:")
    non_numeric_is_fraud.select('is_fraud').show(5)

# Remove rows with invalid 'is_fraud' values
df_flat = df_flat.filter(F.col('is_fraud').isin('0', '1') & F.col('is_fraud').isNotNull())

# Cast columns to appropriate data types
df_cleaned = df_flat \
    .withColumn('amt', F.col('amt').cast(DoubleType())) \
    .withColumn('is_fraud', F.col('is_fraud').cast(IntegerType())) \
    .withColumn('trans_date_trans_time_converted', F.col('trans_date_trans_time_converted').cast(TimestampType())) \
    .withColumn('merch_last_update_time_converted', F.col('merch_last_update_time_converted').cast(TimestampType())) \
    .withColumn('merch_eff_time_converted', F.col('merch_eff_time_converted').cast(TimestampType())) \
    .withColumn('lat', F.col('lat').cast(DoubleType())) \
    .withColumn('long', F.col('long').cast(DoubleType())) \
    .withColumn('city_pop', F.col('city_pop').cast(IntegerType())) \
    .withColumn('merch_lat', F.col('merch_lat').cast(DoubleType())) \
    .withColumn('merch_long', F.col('merch_long').cast(DoubleType()))

# Drop duplicates and handle nulls
df_cleaned = df_cleaned.dropDuplicates()
df_cleaned = df_cleaned.dropna(subset=['amt', 'is_fraud', 'trans_date_trans_time_converted'])

### 7. Basic Analysis
- Caching the DataFrame. Caching is useful when the DataFrame will be reused multiple times, as it avoids recomputation and speeds up access.

- Basic Dataframe information. Providing an overview of the DataFrame structure helps users understand the data they are working with.

- Total number of transactions. Knowing the total number of records is essential for understanding the dataset's size and for further analysis.

- Amount statistics. Calculating the minimum, maximum, and average amount of transactions helps understand the distribution of spending. These statistics provide insights into the distribution of transaction amounts, which is crucial for understanding spending patterns.

- Fraud Statistics. Understanding the distribution of fraud cases is essential for evaluating the effectiveness of fraud detection measures and for further analysis.

In [None]:
# Cache the DataFrame
df_cleaned = df_cleaned.cache()

# Basic DataFrame info
print("Basic DataFrame Information:")
print("Number of columns:", len(df_cleaned.columns))
print("Column names:", df_cleaned.columns)

# Total number of transactions
total_transactions = df_cleaned.count()
print(f"\nTotal number of transactions: {total_transactions}")

# Amount statistics
amount_stats = df_cleaned.agg(
    F.count('amt').alias('count'),
    F.round(F.min('amt'), 2).alias('min_amount'),
    F.round(F.max('amt'), 2).alias('max_amount'),
    F.round(F.avg('amt'), 2).alias('avg_amount'),
    F.round(F.stddev('amt'), 2).alias('stddev_amount')
)

print("\nAmount Statistics:")
amount_stats.show()

# Fraud statistics
fraud_stats = df_cleaned.groupBy('is_fraud').count()
print("\nFraud Statistics:")
fraud_stats.show()

### 8. Visualization: Transaction Amount Distribution

- Sampling Data for Plotting. Sampling is useful for large datasets to improve performance and reduce memory usage during visualization.

- Visualizing the distribution of transaction amounts using a histogram. The histogram helps us understand the distribution of transaction amounts and identify any outliers or unusual patterns.


In [None]:
# Take a sample for plotting
sample_fraction = 0.05  # Adjust the fraction as needed
pandas_df = df_cleaned.select('amt').sample(False, sample_fraction, seed=1).toPandas()

plt.figure(figsize=(10,6))
sns.histplot(data=pandas_df, x='amt', bins=50, kde=True)
plt.title('Distribution of Transaction Amounts')
plt.xlabel('Amount')
plt.ylabel('Frequency')
plt.show()

### 9. Visualization: Fraud Distribution

- Grouping by Fraud Indicator. Understanding the distribution of fraud cases is essential for evaluating the effectiveness of fraud detection measures.

- Creating a pie chart to visualize the distribution of fraud cases. A pie chart provides a clear visual representation of the proportion of fraudulent vs. legitimate transactions, making it easy to understand the overall fraud landscape.

In [None]:
# Fraud distribution
fraud_df = df_cleaned.groupBy('is_fraud').count().toPandas()

plt.figure(figsize=(6,6))
labels = ['Legitimate', 'Fraudulent']
sizes = fraud_df['count']
plt.pie(sizes, labels=labels, autopct='%1.1f%%', colors=['#66b3ff','#ff6666'], startangle=140)
plt.title('Distribution of Fraudulent vs Legitimate Transactions')
plt.axis('equal')
plt.show()

### 10. Visualization: Geographic Distribution
- Sampling Data for Geographic Analysis. This line takes a random sample of 1% of the merch_lat (merchant latitude) and merch_long (merchant longitude) columns from the cleaned DataFrame for visualization. Sampling is useful for large datasets to improve performance and reduce memory usage during visualization.

- Creating Scatter Plot. Visualizing the geographic distribution helps identify patterns in transaction locations and can reveal areas with high transaction density.

In [None]:
# Sampling data for geographic analysis
geo_sample_fraction = 0.01  # Adjust the fraction as needed
geo_df = df_cleaned.select('merch_lat', 'merch_long').sample(False, geo_sample_fraction, seed=1).toPandas()

plt.figure(figsize=(10,6))
plt.scatter(geo_df['merch_long'], geo_df['merch_lat'], alpha=0.5, s=10)
plt.title('Geographic Distribution of Transactions')
plt.xlabel('Merchant Longitude')
plt.ylabel('Merchant Latitude')
plt.show()

### 11. Save Large Dataset

- **Purpose:** Saves the cleaned dataset to a file for further analysis.

- **Significance:** This step allows you to save the cleaned dataset for future reference or analysis.

In [None]:
try:
    print("Starting data export process...")
    
    # Get total count
    total_records = df_cleaned.count()
    print(f"Total records to process: {total_records}")
    
    # Calculate number of chunks needed
    CHUNK_SIZE = 50000  # Increased chunk size
    num_chunks = (total_records // CHUNK_SIZE) + 1
    print(f"Will process data in {num_chunks} chunks")
    
    # Prepare output file
    output_path = "./cleaned_transactions_final.csv"
    
    # Process first chunk with headers
    print("\nProcessing chunk 1...")
    first_chunk = df_cleaned.limit(CHUNK_SIZE).toPandas()
    first_chunk.to_csv(output_path, index=False, mode='w')
    records_processed = len(first_chunk)
    print(f"Processed {records_processed:,} records")
    
    # Process remaining chunks
    for i in range(1, num_chunks):
        print(f"\nProcessing chunk {i+1} of {num_chunks}...")
        
        next_chunk = df_cleaned.offset(i * CHUNK_SIZE).limit(CHUNK_SIZE).toPandas()
        next_chunk.to_csv(output_path, index=False, mode='a', header=False)
        
        records_processed += len(next_chunk)
        print(f"Total processed: {records_processed:,} of {total_records:,} records")
        
        # Optional: Add progress percentage
        progress = (records_processed / total_records) * 100
        print(f"Progress: {progress:.1f}%")
    
    print("\nExport completed successfully!")
    print(f"Output file: {output_path}")
    print(f"Total records saved: {records_processed:,}")

except Exception as e:
    print(f"\nError during export: {str(e)}")
    
    try:
        print("\nAttempting to save a sample of the data...")
        sample_size = 10000
        print(f"Saving {sample_size:,} records as sample...")
        
        sample_df = df_cleaned.limit(sample_size).toPandas()
        sample_path = "./cleaned_transactions_sample.csv"
        sample_df.to_csv(sample_path, index=False)
        
        print(f"Sample saved successfully to: {sample_path}")
        print(f"Sample size: {len(sample_df):,} records")
        
    except Exception as e:
        print(f"Error saving sample: {str(e)}")

finally:
    # Stop Spark session
    print("\nStopping Spark session...")
    spark.stop()
    print("Spark session stopped")