ETL Extract

In [16]:
!python run_all.py --extract

Running Extract Step...
Synthetic data has been saved to ../data/iam_policies.csv
Pipeline execution completed.


In [5]:
import pandas as pd
import numpy as np
from sklearn.datasets import make_classification

# EXTRACT data

# Extract data
def extract_data():

    # Number of rows to generate
    n_samples = 10

    # Generate data based on the specification
    policy_ids = [f'P100{i+1}' for i in range(n_samples)]
    user_ids = [f'U00{i+1}' for i in range(n_samples)]
    roles_data = np.random.choice(['Admin', 'User', 'Manager'], size=n_samples)
    plan_types_data = np.random.choice(['Enterprise', 'Standard', 'Basic'], size=n_samples)
    monthly_rates = np.random.choice([50, 120, 320, 520], size=n_samples)
    premium_values_data = np.random.choice(['Yes', 'No'], size=n_samples)
    regions_data = np.random.choice(['US', 'EU', 'APAC'], size=n_samples)
    login_counts = np.random.randint(20, 800, size=n_samples)
    last_login_days = np.random.randint(1, 400, size=n_samples)

    # Dynamically generate encrypted' and 'non-encrypted' data_type
    data_types = np.random.choice(['encrypted', 'non-encrypted'], size=n_samples, p=[0.3, 0.7])  # 30% encrypted, 70% non-encrypted

    # Create DataFrame
    df = pd.DataFrame({
        'policy_id': policy_ids,
        'user_id': user_ids,
        'role': roles_data,
        'plan_type': plan_types_data,
        'monthly_rate': monthly_rates,
        'premium': premium_values_data,
        'region': regions_data,
        'login_count': login_counts,
        'last_login_days': last_login_days,
        'data_type': data_types
    })

    # Ensure the 'premium' column is boolean (True/False)
    df['premium'] = df['premium'].map({'Yes': True, 'No': False})

    # Save the new generated data to a CSV file
    output_file = '../data/iam_policies.csv'
    df.to_csv(output_file, index=False)

    # Verfiy new data file has been saved
    print(f"Synthetic data has been saved to {output_file}")

    # Return the DataFrame for further processing
    return df

ETL Transform

In [17]:
!python run_all.py --extract --transform

Running Extract Step...
Synthetic data has been saved to ../data/iam_policies.csv
Running Transform Step...
Initial DataFrame head:
  policy_id user_id     role  ... login_count  last_login_days      data_type
0     P1001    U001  Manager  ...         666              346      encrypted
1     P1002    U002    Admin  ...          40               53  non-encrypted
2     P1003    U003  Manager  ...         186              386  non-encrypted
3     P1004    U004  Manager  ...         293              340  non-encrypted
4     P1005    U005    Admin  ...         407               92  non-encrypted

[5 rows x 10 columns]
Rows to be reshaped:
  policy_id user_id     role  ... login_count  last_login_days      data_type
0     P1001    U001  Manager  ...         666              346      encrypted
1     P1002    U002    Admin  ...          40               53  non-encrypted
2     P1003    U003  Manager  ...         186              386  non-encrypted
3     P1004    U004  Manager  ...         29

In [2]:
import pandas as pd
import base64

# TRANSFORM data

# encode data
def encode_data(data):
    """Simulate encryption by encoding the data using base64."""
    if pd.isna(data):  # Skip NaN values
        print(f"Skipping encryption for NaN value: {data}")  # Debugging
        return data
    try:
        encoded_data = base64.b64encode(str(data).encode('utf-8'))
        return encoded_data.decode('utf-8')  # Return as a string
    except Exception as e:
        # Debugging
        print(f"Error encoding data: {data} ({e})")
        return data

# Clean columns
def clean_and_convert_column(df, column_name):
    """Helper function to clean and convert a column to numeric."""
    # Skip the conversion of encrypted values
    if 'encrypted' in df['data_type'].values:
        print(f"Skipping numeric conversion for '{column_name}' due to 'encrypted' data type.")
    else:
        # Convert to numeric, forcing errors if any to NaN, and display how many invalid entries are there.
        df[column_name] = pd.to_numeric(df[column_name], errors='coerce')

    # Count how many NaN values are there due to invalid values
    invalid_count = df[column_name].isna().sum()
    if invalid_count > 0:
        print(f"Warning: {invalid_count} invalid entries in column '{column_name}' have been converted to NaN.")
    
    return df

# transform data
def transform_data(df):
    """Transform the data based on sepecification"""
    
    print(f"Initial DataFrame head:\n{df.head()}")  # Debugging: Check initial data

    # **Reshape data first** (before encryption) to ensure numeric columns for reshaping
    print(f"Rows to be reshaped:\n{df.head()}")

    # Check if there are enough data to reshape
    print(f"Unique regions before reshaping: {df['region'].nunique()}")  # Number of unique regions
    print(f"Missing values in 'region' column before reshaping: {df['region'].isna().sum()}")  # Check missing regions

    # Only reshape if there is sufficient data
    # Reshaping data is to change the structure of the dataset to make it suitable for analysis, reporting, or use by other systems
    # Ensure there's enough data left for reshaping
    # Pivoting is a reshaping operation that rotates data from rows into columns
    # Aggregation is the process of calculating summary statistics for groups of records
    if df.shape[0] > 1 and df['region'].nunique() > 1:
        # Optional: Reshape data (like: pivoting or aggregating)
        try:
            # The columns are now numeric, we can safely compute the mean
            print(f"Attempting to reshape data with {df['region'].nunique()} unique regions.")
            df_reshaped = df.pivot_table(index=['region'], values=['monthly_rate', 'login_count'], aggfunc='mean')
            print(f"Reshaped DataFrame head:\n{df_reshaped.head()}")  # Check reshaped data
        except Exception as e:
            print(f"Error during reshaping: {e}")
            df_reshaped = pd.DataFrame()  # Create an empty DataFrame in case of error
    else:
        df_reshaped = pd.DataFrame()  # No data to reshape if we don't have enough rows

    # **Encrypt the data** after reshaping
    print(f"Encrypting the data...")

    # Encrypt the fields where 'data_type' is 'encrypted' (i.e., simulate encryption)
    df['monthly_rate'] = df.apply(
        lambda row: encode_data(row['monthly_rate']) if row['data_type'] == 'encrypted' else row['monthly_rate'], axis=1)
    
    df['login_count'] = df.apply(
        lambda row: encode_data(row['login_count']) if row['data_type'] == 'encrypted' else row['login_count'], axis=1)
    
    df['last_login_days'] = df.apply(
        lambda row: encode_data(row['last_login_days']) if row['data_type'] == 'encrypted' else row['last_login_days'], axis=1)

    # Debugging: Check data after encryption
    print(f"Data after encryption:\n{df.head()}")

    # **Save the data before cleaning** and converting
    df.to_csv('../data/transform_before_cleaning.csv', index=False)
    print(f"Data before cleaning and conversion saved to '../data/transform_before_cleaning.csv'")

    # **Clean and convert** the columns to numeric (but skip `encrypted` data)
    df = clean_and_convert_column(df, 'monthly_rate')
    df = clean_and_convert_column(df, 'login_count')
    df = clean_and_convert_column(df, 'last_login_days')

    print(f"Data after cleaning and conversion to numeric:\n{df.head()}")  # Check data after conversion

    # **Save the data after cleaning** but before dropping NaNs
    df.to_csv('../data/transform_after_cleaning.csv', index=False)
    print(f"Data after cleaning and conversion saved to '../data/transform_after_cleaning.csv'")

    # **Check how many rows are being dropped** when we remove NaNs
    print(f"Data before dropping NaNs: {df.shape[0]} rows.")
    
    # Remove rows where any of the numeric columns are NaN (check for missing data)
    df = df.dropna(subset=['monthly_rate', 'login_count', 'last_login_days'])

    print(f"Data after dropping NaNs:\n{df.head()}")  # Check data after removing NaNs
    print(f"Remaining data after dropping NaNs: {df.shape[0]} rows.")  # How many rows remain?

    # **Sampling the data**
    # Sampling the data is the process of selecting a subset of data 
    # from a large amount of dataset for analysis.
    df_sampled = df.sample(frac=0.5, random_state=42)

    # Debugging: Print transformed data
    print(f"Transformed DataFrame head:\n{df.head()}")  # Check the transformation
    print(f"Sampled DataFrame head:\n{df_sampled.head()}")  # Check sampled data
    print(f"Reshaped DataFrame head:\n{df_reshaped.head()}")  # Check reshaped data

    # Save the DataFrames to CSV files
    df.to_csv('../data/transformed.csv', index=False)  # Save transformed data to 'transformed.csv'
    df_sampled.to_csv('../data/sampled_iam_policies.csv', index=False)  # Save sampled data
    df_reshaped.to_csv('../data/reshaped_iam_policies.csv', index=False)  # Save reshaped data

    # Confirmation message for saved files
    print("Data saved to '../data/transformed.csv', '../data/sampled_iam_policies.csv', and '../data/reshaped_iam_policies.csv'.")

    # Return transformed data, sampled data, and reshaped data
    return df, df_sampled, df_reshaped


ETL Load

In [9]:
!python run_all.py --load

Loaded transformed data from 'transformed.csv'.
Running Load Step...
Successfully connected to Supabase PostgreSQL.
Data to be loaded:
  policy_id user_id     role  ... login_count last_login_days      data_type
0     P1001    U001  Manager  ...        NjY2            MzQ2      encrypted
1     P1002    U002    Admin  ...          40              53  non-encrypted
2     P1003    U003  Manager  ...         186             386  non-encrypted
3     P1004    U004  Manager  ...         293             340  non-encrypted
4     P1005    U005    Admin  ...         407              92  non-encrypted

[5 rows x 10 columns]
Inserting values: ('P1001', 'U001', 'Manager', 'Basic', nan, True)
Inserting values: ('P1002', 'U002', 'Admin', 'Basic', 520.0, True)
Inserting values: ('P1003', 'U003', 'Manager', 'Enterprise', 50.0, True)
Inserting values: ('P1004', 'U004', 'Manager', 'Basic', 50.0, True)
Inserting values: ('P1005', 'U005', 'Admin', 'Standard', 520.0, False)
Inserting values: ('P1006', 'U006'

In [18]:
import psycopg2
import pandas as pd
from psycopg2 import sql
import os

# Connect to Supabase PostgreSQL database
def connect_to_supabase():
    try:
        # Use Supabase database connection string
        conn = psycopg2.connect(
            host="db.prwzydmfrcbepgevmqmu.supabase.co", # Supabase host
            port=5432,  # Default PostgreSQL port
            database="postgres",  # Database name
            user="etl_user1",  # Database username
            password="cyuqD639?TT",  # Database password
            sslmode="require"  # SSL connection for security
        )
        print("Successfully connected to Supabase PostgreSQL.")
        return conn
    except Exception as e:
        print(f"Error: Unable to connect to database - {e}")
        return None

# Load data to Supabase
def load_data_to_supabase(df):
    conn = connect_to_supabase()
    
    # If the connection is missing, the function stops immediately 
    # to prevent error.
    if conn is None:
        return
    
    try:
        # Create a cursor object
        cur = conn.cursor()

        # Supabase table schema
        insert_query = sql.SQL("""
            INSERT INTO iam_policies 
            (policy_id, user_id, role, plan_type, monthly_rate, premium)
            VALUES (%s, %s, %s, %s, %s, %s)
            ON CONFLICT (policy_id) DO NOTHING;
        """)

        # Print the DataFrame structure
        print(f"Data to be loaded:\n{df.head()}")  # Debugging: Print the first few rows of the DataFrame

        # Ensure that 'monthly_rate' is numeric and handle NaN values by converting them to None (NULL in DB)
        df['monthly_rate'] = pd.to_numeric(df['monthly_rate'], errors='coerce')  # Convert to numeric, invalid values become NaN
        df['monthly_rate'] = df['monthly_rate'].where(df['monthly_rate'].notna(), None)  # Replace NaN with None

        # Loop through each row in the DataFrame and insert it into the PostgreSQL table
        for _, row in df.iterrows():
            # Ensure the values tuple is packed with the correct number of elements (6 values in total)
            values = (
                row['policy_id'],  # policy_id
                row['user_id'],    # user_id
                row['role'],       # role
                row['plan_type'],  # plan_type
                row['monthly_rate'],  # monthly_rate (now guaranteed to be numeric or None)
                row['premium'],    # premium (True/False)
            )

            # Print the values being inserted to debug
            print(f"Inserting values: {values}")  # Debugging: Print the tuple to see if it's correctly formatted
            
            # Execute the insert query with the values
            cur.execute(insert_query, values)

        # Commit the transaction
        conn.commit()
        print(f"Data loaded successfully into the Supabase database.")
    # Catch the exception
    except Exception as e:
        print(f"Error: {e}")
    finally:
        # Close the cursor and connection
        cur.close()
        conn.close()

# Main function for execution
def main():
    # Load the new transformed CSV to DataFrame
    df = pd.read_csv('../data/transformed.csv')

    # Ensure the DataFrame has the right columns that match the table schema
    # Columns must be: policy_id, user_id, role, plan_type, monthly_rate, premium, etc
    df = df[['policy_id', 'user_id', 'role', 'plan_type', 'monthly_rate', 'premium']]

    load_data_to_supabase(df)

if __name__ == "__main__":
    main()


Successfully connected to Supabase PostgreSQL.
Data to be loaded:
  policy_id user_id     role   plan_type monthly_rate  premium
0     P1001    U001  Manager       Basic         NTIw     True
1     P1002    U002    Admin       Basic          520     True
2     P1003    U003  Manager  Enterprise           50     True
3     P1004    U004  Manager       Basic           50     True
4     P1005    U005    Admin    Standard          520    False
Inserting values: ('P1001', 'U001', 'Manager', 'Basic', nan, True)
Inserting values: ('P1002', 'U002', 'Admin', 'Basic', 520.0, True)
Inserting values: ('P1003', 'U003', 'Manager', 'Enterprise', 50.0, True)
Inserting values: ('P1004', 'U004', 'Manager', 'Basic', 50.0, True)
Inserting values: ('P1005', 'U005', 'Admin', 'Standard', 520.0, False)
Inserting values: ('P1006', 'U006', 'Admin', 'Enterprise', nan, False)
Inserting values: ('P1007', 'U007', 'Manager', 'Standard', 120.0, False)
Inserting values: ('P1008', 'U008', 'User', 'Standard', 50.0, Fals