# Expirements

### imports

In [26]:
import duckdb 
import pandas as pd
import pyarrow.parquet as pq
from datetime import datetime
import os
import sys
from pathlib import Path

import os
import psycopg2
import pandas as pd
os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib"
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.dates as mdates

import psycopg2


# Test Visualization

In [None]:
def get_postgres_conn():
    """
    Connect to the Postgres service using credentials from environment variables or defaults.
    """
    host = "localhost"
    port = "5434"
    user = "airflow"
    password = "airflow"
    db = "airflow"
    print(f"Connecting to {host}:{port}...")
    return psycopg2.connect(
        host=host,
        port=port,
        user=user,
        password=password,
        dbname=db   
    )


Connect, Get fraud data

In [20]:
conn = get_postgres_conn()
df = pd.read_sql_query("SELECT * FROM fraud_alerts;", conn)
conn.close()
df.head()

Connecting to localhost:5434...


  df = pd.read_sql_query("SELECT * FROM fraud_alerts;", conn)


Unnamed: 0,id,transaction_id,customer_id,transaction_timestamp,risk_score,flags,inserted_at
0,1,2419,1499,2022-08-03 09:17:27.714768,6,"GEO_MISMATCH,PAST_FRAUD,WEEKEND_LOGIN,NIGHT_LOGIN",2025-06-16 19:15:59.174734
1,2,4078,1830,2022-09-12 16:00:27.045707,5,"GEO_MISMATCH,PAST_FRAUD,NIGHT_LOGIN",2025-06-16 19:15:59.176181
2,3,1364,1288,2022-07-08 22:24:04.648546,5,"GEO_MISMATCH,PAST_FRAUD,NIGHT_LOGIN",2025-06-16 19:15:59.176402
3,4,4831,1987,2022-02-19 12:14:11.087538,6,"FAILED_LOGIN,GEO_MISMATCH,HIGH_RISK_MERCHANT,W...",2025-06-16 19:15:59.176553
4,5,4720,1967,2022-07-16 14:05:12.853759,6,"HIGH_RISK_MERCHANT,PAST_FRAUD,WEEKEND_LOGIN,NI...",2025-06-16 19:15:59.176682


Process df

In [None]:
# Convert timestamp columns into actual datetime objects
df['transaction_timestamp'] = pd.to_datetime(df['transaction_timestamp'])
df['inserted_at'] = pd.to_datetime(df['inserted_at'])

# Split the 'flags' column from a string into a list of flags (for later analysis)
df['flags_list'] = df['flags'].str.split(',')


df.head()

Unnamed: 0,id,transaction_id,customer_id,transaction_timestamp,risk_score,flags,inserted_at,flags_list
0,1,2419,1499,2022-08-03 09:17:27.714768,6,"GEO_MISMATCH,PAST_FRAUD,WEEKEND_LOGIN,NIGHT_LOGIN",2025-06-16 19:15:59.174734,"[GEO_MISMATCH, PAST_FRAUD, WEEKEND_LOGIN, NIGH..."
1,2,4078,1830,2022-09-12 16:00:27.045707,5,"GEO_MISMATCH,PAST_FRAUD,NIGHT_LOGIN",2025-06-16 19:15:59.176181,"[GEO_MISMATCH, PAST_FRAUD, NIGHT_LOGIN]"
2,3,1364,1288,2022-07-08 22:24:04.648546,5,"GEO_MISMATCH,PAST_FRAUD,NIGHT_LOGIN",2025-06-16 19:15:59.176402,"[GEO_MISMATCH, PAST_FRAUD, NIGHT_LOGIN]"
3,4,4831,1987,2022-02-19 12:14:11.087538,6,"FAILED_LOGIN,GEO_MISMATCH,HIGH_RISK_MERCHANT,W...",2025-06-16 19:15:59.176553,"[FAILED_LOGIN, GEO_MISMATCH, HIGH_RISK_MERCHAN..."
4,5,4720,1967,2022-07-16 14:05:12.853759,6,"HIGH_RISK_MERCHANT,PAST_FRAUD,WEEKEND_LOGIN,NI...",2025-06-16 19:15:59.176682,"[HIGH_RISK_MERCHANT, PAST_FRAUD, WEEKEND_LOGIN..."


top risky customers

In [None]:
top_customers = df.groupby('customer_id')['risk_score'].sum().sort_values(ascending=False).head(10)

# Plot the top 10
plt.figure(figsize=(10, 6))
sns.barplot(x=top_customers.values, y=top_customers.index)
plt.title("Top 10 Risky Customers (by Total Risk Score)")
plt.xlabel("Total Risk Score")
plt.ylabel("Customer ID")
plt.tight_layout()
os.makedirs("visualizations", exist_ok=True)
plt.savefig(f"visualizations/top_risky_customers.png")
plt.close()

Top flag fails

In [29]:
all_flags = df['flags_list'].explode()
flag_counts = all_flags.value_counts()
print("Most Common Fraud Alert Flags:", flag_counts)
plt.figure(figsize=(12, 6))
sns.barplot(x=flag_counts.values, y=flag_counts.index)
plt.title("Most Common Fraud Alert Flags")
plt.xlabel("Count")
plt.ylabel("Flag")
plt.tight_layout()
os.makedirs("visualizations", exist_ok=True)
plt.savefig(f"visualizations/alert_flag_frequencies.png")
plt.close()

Most Common Fraud Alert Flags: flags_list
GEO_MISMATCH          48305
NIGHT_LOGIN           41849
HIGH_RISK_MERCHANT    38094
WEEKEND_LOGIN         25919
ODD_HOURS             24780
PAST_FRAUD            13255
FAILED_LOGIN           5227
Name: count, dtype: int64


### Duckdb Partition


In [None]:
cwd = Path.cwd()
processed_dir = cwd / 'data' / 'processed'
raw_dir = cwd / 'data' / 'raw'

print(f'processed_dir: {processed_dir}')
print(f'raw_dir: {raw_dir}')

# Connect to duckdb in memory
con = duckdb.connect(database=':memory:')

# Dynamically get file types from raw directory
file_types = []
for file_path in raw_dir.glob('*.parquet'):
    # Extract filename without extension
    file_type = file_path.stem
    file_types.append(file_type)

print(f'Found files: {file_types}')

partition_statement = ""

for file_type in file_types:
    con.execute(f"""
    CREATE TABLE IF NOT EXISTS {file_type} AS
    SELECT
        *
    FROM read_parquet('{raw_dir}/{file_type}.parquet')
    """)
    if file_type == 'customers':
        partition_statement = "PARTITION_BY (AccountCreationMonth)"
    if file_type == 'merchants':
        partition_statement = "PARTITION_BY (ingestion_date)"
    if file_type == 'transactions':
        partition_statement = "PARTITION_BY (TimestampMonth)"
    if file_type == 'login_attempts':
        partition_statement = "PARTITION_BY (LoginTimestampMonth)"

    statement = f"""
        COPY {file_type} TO '{processed_dir}/{file_type}' 
        (FORMAT parquet, {partition_statement}, OVERWRITE_OR_IGNORE)
    """

    # print(statement)
    
    # Write partitions
    con.execute(f"""
        COPY {file_type} TO '{processed_dir}/{file_type}' 
        (FORMAT parquet, {partition_statement}, OVERWRITE_OR_IGNORE)
    """)

### TEST Minio Upload Paths

In [None]:
data_path = '/Users/tro/Desktop/fraud-pipeline-patrol/data/processed'

# Check if the data directory exists
if not os.path.exists(data_path):
    print(f"Data directory not found: {data_path}")
    raise FileNotFoundError(f"Data directory not found: {data_path}")

data_path_obj = Path(data_path)
subdirs = [d for d in data_path_obj.iterdir() if d.is_dir()]
print(f"Found {len(subdirs)} data subdirectories: {[d.name for d in subdirs]}")

# Process each table directory
for subdir in subdirs:
    table_name = subdir.name
    print(f"Processing {table_name} directory")
    
    # Use recursive glob to find all parquet files in subdirectories
    # This will find files in partition subdirectories like AccountCreationMonth=1/
    parquet_files = list(subdir.glob('**/*.parquet'))
    print(f"Found {len(parquet_files)} parquet files in {table_name}")

    for file_path in parquet_files:
        # relative_to(data_path) 
        rel_path = file_path.relative_to(data_path)
        object_name = str(rel_path)
        
        print(f"Uploading {file_path} to MinIO as {object_name}")