## Import Libralies

In [4]:
import pandas as pd
import psycopg2
from psycopg2 import extras
import ast # Required to safely parse the string list

## Read data files

In [2]:
def read_data(filename):
    """read the a datafile into dataframe."""
    df = pd.read_csv(filename)
    return df

In [3]:
# Annouce file names
cus_profile_path = 'customer_profiles_table.csv'
terminal_profile_path = 'terminal_profiles_table.csv'
transactions_path = 'transactions_df.csv'

cus_profile = read_data(cus_profile_path)
terminal_profile = read_data(terminal_profile_path)
transactions = read_data(transactions_path)

customer_profiles

In [11]:
cus_profile.sample(3)

Unnamed: 0,customer_id,mean_amount,std_amount,mean_nb_tx_per_day,network_id,bin,lat_customer,log_customer,available_terminals,nb_terminals
161,C00001161,43.688767,21.844383,2.55177,A,375570,39.8751,8.4171,"['T001040', 'T001079', 'T001059', 'T001007', '...",6
4747,C00005747,5.915223,2.957612,2.028585,D,362300,40.3959,8.9335,"['T001009', 'T001007', 'T001069', 'T001077', '...",6
2904,C00003904,10.394788,5.197394,1.525118,A,375562,40.9429,8.888,"['T001021', 'T001053', 'T001069', 'T001062', '...",8


In [35]:
at = cus_profile["available_terminals"]
at.sample(5)

4563                                          ['T001070']
1461    ['T001087', 'T001075', 'T001082', 'T001043', '...
2628                               ['T001066', 'T001008']
219     ['T001057', 'T001000', 'T001055', 'T001051', '...
4981    ['T001075', 'T001097', 'T001089', 'T001039', '...
Name: available_terminals, dtype: object

terminal_profiles

In [28]:
terminal_profile.sample(3)

Unnamed: 0,terminal_id,lat_terminal,log_terminal,mcc
6,T001006,37.8858,8.4214,5812
63,T001063,39.0981,9.0193,3504
70,T001070,41.6758,9.4532,5661


In [13]:
transactions.sample(3)

Unnamed: 0,transaction_id,post_ts,customer_id,bin,terminal_id,amt,entry_mode,fraud,fraud_scenario
1509890,QIri9_t7RjO7PwBDSjXw1w,2023-07-03 07:41:50,C00001612,512799,T001028,17.67,Contactless,0,0
356184,8J8o7jXuRMeBdPw9rsY8Fw,2023-03-08 18:11:23,C00002108,477946,T001083,95.3,Contactless,0,0
797298,RjYTQIsETliavx0NBObUAw,2023-04-22 11:30:31,C00002561,515878,T001044,83.95,Contactless,0,0


## Get overview of the dataset

In [14]:
print(cus_profile.shape)
print(terminal_profile.shape)
print(transactions.shape)

(5000, 10)
(100, 4)
(1785308, 9)


In [15]:
print(f"customer_profiles:{cus_profile.columns.tolist()}")
print(f"terminal_profiles: {terminal_profile.columns.tolist()}")
print(f"transactions: {transactions.columns.tolist()}")

customer_profiles:['customer_id', 'mean_amount', 'std_amount', 'mean_nb_tx_per_day', 'network_id', 'bin', 'lat_customer', 'log_customer', 'available_terminals', 'nb_terminals']
terminal_profiles: ['terminal_id', 'lat_terminal', 'log_terminal', 'mcc']
transactions: ['transaction_id', 'post_ts', 'customer_id', 'bin', 'terminal_id', 'amt', 'entry_mode', 'fraud', 'fraud_scenario']


In [16]:
cus_profile.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Data columns (total 10 columns):
 #   Column               Non-Null Count  Dtype  
---  ------               --------------  -----  
 0   customer_id          5000 non-null   object 
 1   mean_amount          5000 non-null   float64
 2   std_amount           5000 non-null   float64
 3   mean_nb_tx_per_day   5000 non-null   float64
 4   network_id           5000 non-null   object 
 5   bin                  5000 non-null   int64  
 6   lat_customer         5000 non-null   float64
 7   log_customer         5000 non-null   float64
 8   available_terminals  5000 non-null   object 
 9   nb_terminals         5000 non-null   int64  
dtypes: float64(5), int64(2), object(3)
memory usage: 390.8+ KB


In [17]:
terminal_profile.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 4 columns):
 #   Column        Non-Null Count  Dtype  
---  ------        --------------  -----  
 0   terminal_id   100 non-null    object 
 1   lat_terminal  100 non-null    float64
 2   log_terminal  100 non-null    float64
 3   mcc           100 non-null    int64  
dtypes: float64(2), int64(1), object(1)
memory usage: 3.3+ KB


In [18]:
transactions.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1785308 entries, 0 to 1785307
Data columns (total 9 columns):
 #   Column          Dtype  
---  ------          -----  
 0   transaction_id  object 
 1   post_ts         object 
 2   customer_id     object 
 3   bin             int64  
 4   terminal_id     object 
 5   amt             float64
 6   entry_mode      object 
 7   fraud           int64  
 8   fraud_scenario  int64  
dtypes: float64(1), int64(3), object(5)
memory usage: 122.6+ MB


In [10]:
print(cus_profile.isnull().sum())
print(terminal_profile.isnull().sum())
print(transactions.isnull().sum())

customer_id            0
mean_amount            0
std_amount             0
mean_nb_tx_per_day     0
network_id             0
bin                    0
lat_customer           0
log_customer           0
available_terminals    0
nb_terminals           0
dtype: int64
terminal_id     0
lat_terminal    0
log_terminal    0
mcc             0
dtype: int64
transaction_id    0
post_ts           0
customer_id       0
bin               0
terminal_id       0
amt               0
entry_mode        0
fraud             0
fraud_scenario    0
dtype: int64


In [19]:
print(cus_profile.duplicated().sum())
print(terminal_profile.duplicated().sum())
print(transactions.duplicated().sum())

0
0
0


## Data Transformation

In [5]:
def insert_customer_profiles(filepath, db_config):
    """
    Inserts data into both the customer_profiles and customer_terminal_map tables.
    """
    table_name_profiles = "customer_profiles"
    table_name_map = "customer_terminal_map"
    conn = None
    cur = None

    try:
        df = pd.read_csv(filepath)

        # 1. Prepare data for the customer_profiles table
        # Drop the 'available_terminals' column because it's not in this table
        df_profiles = df.drop(columns=['available_terminals'])
        
        # Ensure column order matches the SQL table schema
        profile_columns = [
            'customer_id', 'mean_amount', 'std_amount', 
            'mean_nb_tx_per_day', 'network_id', 'bin', 
            'lat_customer', 'log_customer', 'nb_terminals'
        ]
        df_profiles = df_profiles[profile_columns]

        # 2. Prepare data for the customer_terminal_map table
        # The 'available_terminals' column contains a string representation of a list
        # We use ast.literal_eval to safely convert the string to a list of strings
        df['available_terminals'] = df['available_terminals'].apply(ast.literal_eval)

        # Use explode() to create a new row for each terminal in the list
        df_map = df[['customer_id', 'available_terminals']].explode('available_terminals')
        df_map = df_map.rename(columns={'available_terminals': 'terminal_id'})

        # Establish database connection
        conn = psycopg2.connect(**db_config)
        cur = conn.cursor()

        # Insert into customer_profiles table
        print(f"Truncating table '{table_name_profiles}'...")
        cur.execute(f"TRUNCATE TABLE {table_name_profiles} RESTART IDENTITY CASCADE;")
        conn.commit()

        sql_profiles = f"INSERT INTO {table_name_profiles} ({', '.join(profile_columns)}) VALUES %s"
        data_to_insert_profiles = [tuple(row) for row in df_profiles.values]
        extras.execute_values(cur, sql_profiles, data_to_insert_profiles)
        conn.commit()
        print(f"Successfully inserted {len(data_to_insert_profiles)} rows into '{table_name_profiles}'.")
        
        # Insert into customer_terminal_map table
        print(f"Truncating table '{table_name_map}'...")
        cur.execute(f"TRUNCATE TABLE {table_name_map} RESTART IDENTITY CASCADE;")
        conn.commit()

        sql_map = f"INSERT INTO {table_name_map} (customer_id, terminal_id) VALUES %s"
        data_to_insert_map = [tuple(row) for row in df_map.values]
        extras.execute_values(cur, sql_map, data_to_insert_map)
        conn.commit()
        print(f"Successfully inserted {len(data_to_insert_map)} rows into '{table_name_map}'.")

    except psycopg2.Error as e:
        print(f"Database error: {e}")
        if conn:
            conn.rollback()
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    finally:
        if cur:
            cur.close()
        if conn:
            conn.close()
            print("Database connection closed.")

In [6]:
# Database Configuration
db_connection_config = {
    "host": "localhost",
    "database": "postgres",
    "user": "postgres",
    "password": "Saipanz11_", # REMEMBER TO CHANGE THIS
    "port": "5432"
}

# run it
if __name__ == "__main__":
    cus_profile_path = 'customer_profiles_table.csv'
    insert_customer_profiles(cus_profile_path, db_connection_config)

    # You would then add calls for the other tables here after you've implemented them
    # insert_terminal_profiles('terminal_profiles_table.csv', db_connection_config)
    # insert_transactions('transactions_df.csv', db_connection_config)

Truncating table 'customer_profiles'...
Successfully inserted 5000 rows into 'customer_profiles'.
Truncating table 'customer_terminal_map'...
Database error: insert or update on table "customer_terminal_map" violates foreign key constraint "customer_terminal_map_terminal_id_fkey"
DETAIL:  Key (terminal_id)=(T001057) is not present in table "terminal_profiles".

Database connection closed.


In [7]:
def insert_terminal_profiles(filepath, db_config):
    """
    Inserts data from the terminal profiles CSV into the terminal_profiles table.
    """
    table_name = "terminal_profiles"
    conn = None
    cur = None

    try:
        df = pd.read_csv(filepath)

        # The column names already match the SQL table schema, so no renaming is needed.
        # The data types also match (float64 -> FLOAT, int64 -> INT, object -> VARCHAR)

        # Establish database connection
        conn = psycopg2.connect(**db_config)
        cur = conn.cursor()

        print(f"Truncating table '{table_name}'...")
        cur.execute(f"TRUNCATE TABLE {table_name} RESTART IDENTITY CASCADE;")
        conn.commit()

        # Get the column names from the DataFrame
        columns = df.columns.tolist()
        sql_columns = ', '.join(columns)

        # Prepare the data as a list of tuples
        data_to_insert = [tuple(row) for row in df.values]

        print(f"Attempting to insert {len(data_to_insert)} rows into '{table_name}'...")

        # Use execute_values for efficient bulk insertion
        extras.execute_values(
            cur,
            f"INSERT INTO {table_name} ({sql_columns}) VALUES %s",
            data_to_insert
        )

        conn.commit()
        print(f"Successfully inserted {len(data_to_insert)} rows.")

    except psycopg2.Error as e:
        print(f"Database error: {e}")
        if conn:
            conn.rollback()
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    finally:
        if cur:
            cur.close()
        if conn:
            conn.close()
            print("Database connection closed.")

# run it
if __name__ == "__main__":
    db_connection_config = {
        "host": "localhost",
        "database": "postgres",
        "user": "postgres",
        "password": "Saipanz11_",
        "port": "5432"
    }

    terminal_profile_path = 'terminal_profiles_table.csv'
    insert_terminal_profiles(terminal_profile_path, db_connection_config)

Truncating table 'terminal_profiles'...
Attempting to insert 100 rows into 'terminal_profiles'...
Successfully inserted 100 rows.
Database connection closed.


In [9]:
def insert_customer_terminal_map(filepath, db_config):
    """
    Reads customer data, normalizes the available_terminals column, 
    and inserts the data into the customer_terminal_map table.
    """
    table_name = "customer_terminal_map"
    conn = None
    cur = None

    try:
        df = pd.read_csv(filepath)

        # 1. Parse the 'available_terminals' column from a string to a list
        # Use ast.literal_eval for safe conversion of the string representation of a list
        df['available_terminals'] = df['available_terminals'].apply(ast.literal_eval)

        # 2. Normalize the data using explode()
        # This creates a new row for each item in the list of available terminals
        df_map = df[['customer_id', 'available_terminals']].explode('available_terminals')
        
        # 3. Rename the column to match the SQL table schema
        df_map = df_map.rename(columns={'available_terminals': 'terminal_id'})

        # Establish database connection
        conn = psycopg2.connect(**db_config)
        cur = conn.cursor()

        print(f"Truncating table '{table_name}'...")
        cur.execute(f"TRUNCATE TABLE {table_name} RESTART IDENTITY CASCADE;")
        conn.commit()

        # Prepare the data as a list of tuples
        data_to_insert = [tuple(row) for row in df_map.values]

        print(f"Attempting to insert {len(data_to_insert)} rows into '{table_name}'...")

        # Use execute_values for efficient bulk insertion
        extras.execute_values(
            cur,
            f"INSERT INTO {table_name} (customer_id, terminal_id) VALUES %s",
            data_to_insert
        )

        conn.commit()
        print(f"Successfully inserted {len(data_to_insert)} rows.")

    except psycopg2.Error as e:
        print(f"Database error: {e}")
        if conn:
            conn.rollback()
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    finally:
        if cur:
            cur.close()
        if conn:
            conn.close()
            print("Database connection closed.")

if __name__ == "__main__":
    db_connection_config = {
        "host": "localhost",
        "database": "postgres",
        "user": "postgres",
        "password": "Saipanz11_",
        "port": "5432"
    }

    # Call the functions in the correct order
    # You would run customer profiles first, but this is a separate call for clarity
    cus_profile_path = 'customer_profiles_table.csv'
    insert_customer_terminal_map(cus_profile_path, db_connection_config)

Truncating table 'customer_terminal_map'...
Attempting to insert 30817 rows into 'customer_terminal_map'...
Successfully inserted 30817 rows.
Database connection closed.


In [10]:
def insert_transactions(filepath, db_config):
    """
    Inserts data from the transactions CSV into the transactions table.
    """
    table_name = "transactions"
    conn = None
    cur = None

    try:
        df = pd.read_csv(filepath)

        # 1. Convert the 'post_ts' column to a datetime object
        # This is a critical step to match the TIMESTAMP data type in PostgreSQL.
        df['post_ts'] = pd.to_datetime(df['post_ts'])
        
        # 2. Get the column names from the DataFrame to ensure the order is correct
        columns = df.columns.tolist()
        sql_columns = ', '.join(columns)

        # Establish database connection
        conn = psycopg2.connect(**db_config)
        cur = conn.cursor()

        print(f"Truncating table '{table_name}'...")
        cur.execute(f"TRUNCATE TABLE {table_name} RESTART IDENTITY CASCADE;")
        conn.commit()

        # Prepare the data as a list of tuples
        data_to_insert = [tuple(row) for row in df.values]

        print(f"Attempting to insert {len(data_to_insert)} rows into '{table_name}'...")

        # Use execute_values for efficient bulk insertion
        extras.execute_values(
            cur,
            f"INSERT INTO {table_name} ({sql_columns}) VALUES %s",
            data_to_insert
        )

        conn.commit()
        print(f"Successfully inserted {len(data_to_insert)} rows.")

    except psycopg2.Error as e:
        print(f"Database error: {e}")
        if conn:
            conn.rollback()
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
    finally:
        if cur:
            cur.close()
        if conn:
            conn.close()
            print("Database connection closed.")

# Database Configuration
db_connection_config = {
    "host": "localhost",
    "database": "postgres",
    "user": "postgres",
    "password": "Saipanz11_",
    "port": "5432"
}

# Define file paths
cus_profile_path = 'customer_profiles_table.csv'
terminal_profile_path = 'terminal_profiles_table.csv'
transactions_path = 'transactions_df.csv'

if __name__ == "__main__":
    # Insert data into the tables in the correct order
    # 1. Customer Profiles
    insert_customer_profiles(cus_profile_path, db_connection_config)
    
    # 2. Terminal Profiles
    insert_terminal_profiles(terminal_profile_path, db_connection_config)
    
    # 3. Customer Terminal Map (dependent on the first two tables)
    insert_customer_terminal_map(cus_profile_path, db_connection_config)
    
    # 4. Transactions (dependent on the first two tables)
    insert_transactions(transactions_path, db_connection_config)

    print("\nAll data insertion tasks completed.")

Truncating table 'customer_profiles'...
Successfully inserted 5000 rows into 'customer_profiles'.
Truncating table 'customer_terminal_map'...
Successfully inserted 30817 rows into 'customer_terminal_map'.
Database connection closed.
Truncating table 'terminal_profiles'...
Attempting to insert 100 rows into 'terminal_profiles'...
Successfully inserted 100 rows.
Database connection closed.
Truncating table 'customer_terminal_map'...
Attempting to insert 30817 rows into 'customer_terminal_map'...
Successfully inserted 30817 rows.
Database connection closed.
Truncating table 'transactions'...
Attempting to insert 1785308 rows into 'transactions'...
Successfully inserted 1785308 rows.
Database connection closed.

All data insertion tasks completed.
