In [None]:
# -------------------------------------------------------------------------------
# Project Overview: Automated Pipeline for API Integration
# -------------------------------------------------------------------------------
# This project automates the process of calling an API, storing data in a 
# structured SQL database, and preparing it for analysis in Power BI.
# 
# Below is an outline of the three main stages:
#
# 1. Pipeline: API Call
#    - Purpose: Retrieve data from the API, including information on users, 
#      groups, risk scores, phishing tests, and training campaigns.
#    - Description: This stage uses API endpoints to gather raw data, transforming it 
#      into a structured format for ingestion.
#
# 2. Pipeline: SQL Table and Schema Creation
#    - Purpose: Set up the SQL database structure for storing data.
#    - Description: This stage defines tables and schemas for all required entities, 
#      including users, groups, risk score histories, and phishing and training details.
#      It includes drop-and-create commands to ensure the tables are up-to-date with the 
#      most recent schema and enforce data integrity.
#
# 3. Pipeline: SQL Table Insertion
#    - Purpose: Insert the retrieved and processed data from the API into the SQL tables.
#    - Description: This stage handles the actual insertion of data into the SQL database 
#      while preserving foreign key constraints and ensuring that data types are aligned 
#      with the database schema. All data insertions are done within a transaction for 
#      atomicity, meaning that any insertion failure results in a full rollback, preserving 
#      database consistency.
#
# -------------------------------------------------------------------------------
# End of Project Overview
# -------------------------------------------------------------------------------


In [None]:
# ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
# SQL Table Setup for ETL Pipeline
# ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
# This script is responsible for the setup and teardown of SQL tables to store data fetched from various APIs in the Extract phase of our ETL pipeline.
# It is designed to manage database schemas with careful consideration of dependencies, ensuring that all operations adhere to best practices for data integrity and referential integrity.

# We utilize two dictionaries to manage SQL commands:
# 1. `create_table_sql_commands`: Stores SQL statements for creating tables, ensuring they are executed in the correct dependency order.
# 2. `drop_table_sql_commands`: Stores SQL statements for dropping tables, executed in reverse dependency order to handle foreign key constraints without errors.

# Splitting the commands into separate dictionaries allows for efficient and safe rebuilding of the database schema during development and operational phases.
create_table_sql_commands = {

    "table_1": """
        CREATE TABLE dbo.TABLE_1 (
            COLUMN_1 DECIMAL(10, 4) NOT NULL CHECK (COLUMN_1 >= 0),
            COLUMN_2 DATETIME PRIMARY KEY
        );
    """,

    "table_2": """
        CREATE TABLE dbo.TABLE_2 (
            COLUMN_1 INT UNIQUE,
            COLUMN_2 INT PRIMARY KEY NOT NULL,
            COLUMN_3 NVARCHAR(50) NULL,
            COLUMN_4 NVARCHAR(50) NULL,
            COLUMN_5 NVARCHAR(100) NULL,
            COLUMN_6 NVARCHAR(255) UNIQUE NULL,
            COLUMN_7 NVARCHAR(20) NULL,
            COLUMN_8 NVARCHAR(10) NULL,
            COLUMN_9 NVARCHAR(20) NULL,
            COLUMN_10 NVARCHAR(100) NULL,
            COLUMN_11 NVARCHAR(100) NULL,
            COLUMN_12 NVARCHAR(100) NULL,
            COLUMN_13 NVARCHAR(100) NULL,
            COLUMN_14 NVARCHAR(50) NULL,
            COLUMN_15 NVARCHAR(100) NULL,
            COLUMN_16 NVARCHAR(255) NULL,
            COLUMN_17 NVARCHAR(50) NULL,
            COLUMN_18 TEXT NULL,
            COLUMN_19 BIT NULL DEFAULT 0,
            COLUMN_20 NVARCHAR(255) NULL,
            COLUMN_21 DATETIME NULL,
            COLUMN_22 DATETIME NULL,
            COLUMN_23 DATETIME NULL,
            COLUMN_24 DATETIME NULL,
            COLUMN_25 DATETIME NULL,
            COLUMN_26 DECIMAL(10, 4) NULL CHECK (COLUMN_26 >= 0 AND COLUMN_26 <= 100),
            COLUMN_27 DECIMAL(10, 4) NULL CHECK (COLUMN_27 >= 0),
            COLUMN_28 NVARCHAR(MAX) NULL,
            COLUMN_29 NVARCHAR(MAX) NULL,
            COLUMN_30 TEXT NULL,
            COLUMN_31 TEXT NULL,
            COLUMN_32 TEXT NULL,
            COLUMN_33 TEXT NULL
        );
    """,

    "table_3": """
        CREATE TABLE dbo.TABLE_3 (
            COLUMN_1 INT NOT NULL,
            COLUMN_2 INT NOT NULL,
            COLUMN_3 DATETIME NOT NULL,
            COLUMN_4 FLOAT NOT NULL CHECK (COLUMN_4 >= 0),
            COLUMN_5 NVARCHAR(50) NULL,
            COLUMN_6 NVARCHAR(50) NULL,
            COLUMN_7 NVARCHAR(100) NULL,
            COLUMN_8 NVARCHAR(100) NULL,
            COLUMN_9 NVARCHAR(100) NULL,
            CONSTRAINT PK_TABLE_3 PRIMARY KEY (COLUMN_2, COLUMN_3),
            CONSTRAINT CHK_COLUMN_4_POSITIVE CHECK (COLUMN_4 >= 0)
        );
    """,

    "table_4": """
        CREATE TABLE dbo.TABLE_4 (
            COLUMN_1 INT PRIMARY KEY NOT NULL,
            COLUMN_2 NVARCHAR(255) NULL,
            COLUMN_3 NVARCHAR(100) NULL,
            COLUMN_4 NVARCHAR(50) NULL,
            COLUMN_5 NVARCHAR(50) NULL,
            COLUMN_6 INT NOT NULL DEFAULT 0 CHECK (COLUMN_6 >= 0),
            COLUMN_7 DECIMAL(10, 1) NULL CHECK (COLUMN_7 >= 0)
        );
    """,

    "table_5": """
        CREATE TABLE dbo.TABLE_5 (
            COLUMN_1 INT NOT NULL,
            COLUMN_2 INT NULL,
            COLUMN_3 INT NOT NULL,
            COLUMN_4 NVARCHAR(50) NULL,
            COLUMN_5 NVARCHAR(50) NULL,
            COLUMN_6 NVARCHAR(100) NULL,
            COLUMN_7 NVARCHAR(255) NULL,
            COLUMN_8 NVARCHAR(20) NULL,
            COLUMN_9 NVARCHAR(10) NULL,
            COLUMN_10 NVARCHAR(20) NULL,
            COLUMN_11 NVARCHAR(100) NULL,
            COLUMN_12 NVARCHAR(100) NULL,
            COLUMN_13 NVARCHAR(100) NULL,
            COLUMN_14 NVARCHAR(100) NULL,
            COLUMN_15 NVARCHAR(50) NULL,
            COLUMN_16 NVARCHAR(100) NULL,
            COLUMN_17 NVARCHAR(255) NULL,
            COLUMN_18 NVARCHAR(50) NULL,
            COLUMN_19 TEXT NULL,
            COLUMN_20 FLOAT NULL CHECK (COLUMN_20 >= 0 AND COLUMN_20 <= 100),
            COLUMN_21 FLOAT NULL CHECK (COLUMN_21 >= 0),
            COLUMN_22 BIT NULL DEFAULT 0,
            COLUMN_23 NVARCHAR(255) NULL,
            COLUMN_24 TEXT NULL,
            COLUMN_25 TEXT NULL,
            COLUMN_26 DATETIME NULL,
            COLUMN_27 DATETIME NULL,
            COLUMN_28 DATETIME NULL,
            COLUMN_29 DATETIME NULL,
            COLUMN_30 TEXT NULL,
            COLUMN_31 TEXT NULL,
            COLUMN_32 TEXT NULL,
            COLUMN_33 TEXT NULL,
            CONSTRAINT PK_TABLE_5 PRIMARY KEY (COLUMN_1, COLUMN_3),
            CONSTRAINT CHK_COLUMN_20 CHECK (COLUMN_20 >= 0 AND COLUMN_20 <= 100),
            CONSTRAINT CHK_COLUMN_21 CHECK (COLUMN_21 >= 0)
        );
    """,

    "table_6": """
        CREATE TABLE dbo.TABLE_6 (
            COLUMN_1 INT NOT NULL,
            COLUMN_2 DATETIME NOT NULL,
            COLUMN_3 FLOAT NOT NULL CHECK (COLUMN_3 >= 0),
            COLUMN_4 NVARCHAR(255) NULL,
            COLUMN_5 INT NOT NULL,
            CONSTRAINT PK_TABLE_6 PRIMARY KEY (COLUMN_1, COLUMN_2)
        );
    """,

    "table_7": """
        CREATE TABLE dbo.TABLE_7 (
            COLUMN_1 BIGINT NOT NULL,
            COLUMN_2 BIGINT NOT NULL,
            COLUMN_3 NVARCHAR(50) NULL,
            COLUMN_4 NVARCHAR(255) NULL,
            COLUMN_5 INT CHECK (COLUMN_5 >= 0) NULL,
            COLUMN_6 FLOAT CHECK (COLUMN_6 >= 0 AND COLUMN_6 <= 100) NULL,
            COLUMN_7 INT CHECK (COLUMN_7 >= 0) NULL,
            COLUMN_8 INT CHECK (COLUMN_8 >= 0) NULL,
            COLUMN_9 INT CHECK (COLUMN_9 >= 0) NULL,
            COLUMN_10 INT CHECK (COLUMN_10 >= 0) NULL,
            COLUMN_11 INT CHECK (COLUMN_11 >= 0) NULL,
            COLUMN_12 INT CHECK (COLUMN_12 >= 0) NULL,
            COLUMN_13 DATETIME NULL,
            COLUMN_14 TEXT NULL,
            CONSTRAINT PK_TABLE_7 PRIMARY KEY (COLUMN_1, COLUMN_2),
            CONSTRAINT CHK_COLUMN_6 CHECK (COLUMN_6 >= 0 AND COLUMN_6 <= 100),
            CONSTRAINT CHK_COLUMN_5 CHECK (COLUMN_5 >= 0)
        );
    """
}

drop_table_sql_commands = {
    "table_1": "DROP TABLE IF EXISTS dbo.TABLE_1;",
    "table_2": "DROP TABLE IF EXISTS dbo.TABLE_2;",
    "table_3": "DROP TABLE IF EXISTS dbo.TABLE_3;",
    "table_4": "DROP TABLE IF EXISTS dbo.TABLE_4;",
    "table_5": "DROP TABLE IF EXISTS dbo.TABLE_5;",
    "table_6": "DROP TABLE IF EXISTS dbo.TABLE_6;",
    "table_7": "DROP TABLE IF EXISTS dbo.TABLE_7;"
}

# Return the dictionary for review
logging.info(f"Returned back are {len(create_table_sql_commands)} SQL commands for table creation.")
logging.info(f"Returned back are {len(drop_table_sql_commands)} SQL commands for table deletion.")


In [None]:
# -------------------------------------------------------------------------------
# Database Schema Management Script
# -------------------------------------------------------------------------------
# This script is responsible for managing the schema for the ETL pipeline, ensuring 
# all tables are created or dropped in a consistent and safe manner. The approach 
# ensures:
# 1. All-or-Nothing Execution: If any operation fails, all changes are rolled back.
# 2. Referential Integrity: Tables are dropped and recreated in the correct order 
#    based on their dependencies.
# 3. Detailed Logging: Each operation is logged to provide transparency and aid debugging.
# 4. Verification: Post-creation checks ensure all tables exist as expected.

# Key Steps:
# 1. Load environment variables and establish a database connection.
# 2. Drop existing tables in reverse dependency order to handle foreign key constraints.
# 3. Create new tables in the correct dependency order using SQL commands stored in dictionaries.
# 4. Verify the existence of all tables after the transaction commits.
# -------------------------------------------------------------------------------

# -------------------------------------------------------------------------------------------------------------------------------------------------------------------------

import os
import logging
from datetime import datetime
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("api_script.log"),
    ]
)

# Load connection string from environment
connection_string = os.getenv("CONNECT_STR")
if not connection_string:
    logging.error("Database connection string not found in environment variables.")
    raise ValueError("Database connection string is required.")

# Start the logging process with a timestamp
start_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
logging.info(f"Starting table creation process at {start_time}.")

try:
    # Create the database engine
    engine = create_engine(connection_string)
    logging.info("Database engine created successfully.")

    # Test the connection and retrieve the database version
    with engine.connect() as conn:
        result = conn.execute(text("SELECT @@VERSION;"))
        db_version = result.fetchone()[0]
        logging.info(f"Connected to database, version: {db_version}")

    # Begin a transaction to drop and recreate tables
    with engine.begin() as conn:
        try:
            # Drop tables in reverse dependency order
            logging.info("Starting table drop process.")
            for table_name, drop_table_sql in reversed(drop_table_sql_commands.items()):
                # Check if the table exists before attempting to drop it
                result = conn.execute(text(
                    f"SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '{table_name.upper()}';"
                ))
                table_exists = result.fetchone()
                
                if table_exists:
                    conn.execute(text(drop_table_sql))
                    logging.info(f"Table '{table_name}' dropped successfully.")
                else:
                    logging.info(f"Table '{table_name}' does not exist, skipping drop.")

            # Create tables in the correct dependency order
            logging.info("Starting table creation process.")
            for table_name, create_table_sql in create_table_sql_commands.items():
                conn.execute(text(create_table_sql))
                logging.info(f"Table '{table_name}' created successfully.")

            # Commit the transaction only after all tables are dropped and recreated
            logging.info("All tables created successfully in a single transaction.")

        except SQLAlchemyError as e:
            # If any command fails, rollback the entire transaction
            logging.error(f"Operation failed, rolling back transaction: {e}")
            raise

except SQLAlchemyError as e:
    logging.error(f"An error occurred in the table creation process: {e}")


# Confirm the existence of each table if the transaction committed successfully
try:
    with engine.connect() as conn:
        for table_name in create_table_sql_commands.keys():
            try:
                result = conn.execute(text(
                    f"SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_NAME = '{table_name.upper()}';"
                ))
                table_exists = result.fetchone()
                if table_exists:
                    logging.info(f"Table '{table_name}' exists in the database.")
                else:
                    logging.warning(f"Table '{table_name}' not found in the database.")
            except SQLAlchemyError as e:
                logging.error(f"Error while checking existence of table '{table_name}': {e}")

except SQLAlchemyError as e:
    logging.error(f"Error in final table existence verification process: {e}")

# Final log entry to mark end of the script
end_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
logging.info(f"Table creation and verification process completed at {end_time}.")


In [None]:
# -------------------------------------------------------------------------------
# Insertion Order Configuration
# -------------------------------------------------------------------------------
# This script manages data insertion for tables with complex foreign key dependencies.
# To ensure smooth data insertion without violating foreign key constraints, we define
# an `insertion_order` list.

# `insertion_order`:
#    - Specifies the sequence for inserting data into tables based on dependencies.
#    - By inserting data into tables that do not depend on others first, followed by
#      tables that depend on data in previous tables, we prevent foreign key constraint
#      violations.
#    - This ordered approach preserves referential integrity across the database and 
#      ensures that each table’s dependencies are satisfied.

# Using only `insertion_order`, we set up a reliable workflow for data insertion
# that upholds data integrity without requiring truncation, as tables are dropped and 
# recreated prior to this step.

# -------------------------------------------------------------------------------

# Insertion order - correct dependency order for inserting data
insertion_order = [
    "TABLE_1",   
    "TABLE_2",                
    "TABLE_3",     
    "TABLE_4",                       
    "TABLE_5",              
    "TABLE_6",    
    "TABLE_7"   
]

In [None]:
import pandas as pd
import json

# Configure logging with DEBUG level for more detailed output during development
logging.basicConfig(
    level=logging.DEBUG,  # Change to INFO or WARNING in production
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.StreamHandler(),
        logging.FileHandler("api_script.log"),
    ]
)

def truncate_large_int(value, length=8):
    """
    Truncates an integer to the specified number of rightmost digits.
    
    Parameters:
        value (int): The integer to be truncated.
        length (int): The number of rightmost digits to keep (default is 8).
    
    Returns:
        int: The truncated integer.
    """
    # Ensure value is an integer and truncate if it's large
    if isinstance(value, int) and value > 10**length:
        return int(str(value)[-length:])
    return value

def prepare_for_insertion(df):
    """
    Pre-processes a DataFrame for SQL insertion by setting the 'groups', 'aliases', and 'content' columns to None if they exist.
    These columns often contain JSON-like structures that can be safely nulled because they are represented in bridge tables in our schema.
    Additionally, this function formats datetime columns, rounds float values, and replaces NaNs in float columns.
    """
    # Set 'groups' column to None if it exists
    if 'groups' in df.columns:
        df['groups'] = None
        # found in dataframes: scia_api_users_listing, scia_api_users_in_groups, scia_api_training_campaigns
    
    if 'aliases' in df.columns:
        df['aliases'] = None
        # found in dataframes: scia_api_users_listing, scia_api_users_in_groups

    if 'content' in df.columns:
        df['content'] = None
        # found in dataframes: scia_api_training_campaigns, scia_api_training_content
    
    # Format datetime columns to SQL-compatible strings
    datetime_columns = df.select_dtypes(include=['datetime64[ns]']).columns
    df[datetime_columns] = df[datetime_columns].map(lambda x: x.strftime('%Y-%m-%d %H:%M:%S') if pd.notnull(x) else None)
    
    # Round float columns and replace NaNs with a default value
    float_columns = df.select_dtypes(include=['float']).columns
    df[float_columns] = df[float_columns].fillna(0.0).round(4)
    
    # Truncate large integers in specific columns
    if 'recipient_id' in df.columns:
        df['recipient_id'] = df['recipient_id'].apply(lambda x: truncate_large_int(x, length=8))
    
    return df


def remove_test_user(df, test_id=0000000):
    """
    Removes rows where '_id' matches the given test_id.
    """
    return df[df['_id'] != test_id].copy()

def generate_insert_sql(table_name, df):
    """
    Generates a parameterized SQL INSERT statement for the given table and DataFrame.

    Parameters:
        table_name (str): The name of the target SQL table.
        df (pd.DataFrame): DataFrame containing the columns for the table.

    Returns:
        str: A SQL INSERT statement with parameter placeholders.
    """
    columns = df.columns.tolist()
    column_names = ", ".join(columns)
    placeholders = ", ".join([f":{col}" for col in columns])
    
    sql = f"""
    INSERT INTO {table_name} ({column_names})
    VALUES ({placeholders})
    """
    return sql



# Dictionary of tables with dynamically generated SQL statements
insertion_data = {
    "scia_api_account_risk_score_history": {
        "dataframe": prepare_for_insertion(api_df_account_risk_score_history),
        "insert_sql": generate_insert_sql("dbo.scia_api_account_risk_score_history", api_df_account_risk_score_history)
    },
    "scia_api_users_listing": {
        "dataframe": remove_test_user(prepare_for_insertion(api_df_users_listing)),
        "insert_sql": generate_insert_sql("dbo.scia_api_users_listing", api_df_users_listing)
    },
    "scia_api_users_risk_score_history": {
        "dataframe": remove_test_user(prepare_for_insertion(api_df_combined_risk_score_history)),
        "insert_sql": generate_insert_sql("dbo.scia_api_users_risk_score_history", api_df_combined_risk_score_history)
    },
    "scia_api_groups": {
        "dataframe": prepare_for_insertion(api_df_groups),
        "insert_sql": generate_insert_sql("dbo.scia_api_groups", api_df_groups)
    },
    "scia_api_users_in_groups": {
        "dataframe": remove_test_user(prepare_for_insertion(api_df_all_users_all_groups)),
        "insert_sql": generate_insert_sql("dbo.scia_api_users_in_groups", api_df_all_users_all_groups)
    },
    "scia_api_group_risk_score_history" : {
        "dataframe" : prepare_for_insertion(api_df_combined_group_risk_score_history),
        "insert_sql": generate_insert_sql("dbo.scia_api_group_risk_score_history", api_df_combined_group_risk_score_history)
    },
    "scia_api_phishing_tests": {
        "dataframe": prepare_for_insertion(api_df_phishing_tests),
        "insert_sql": generate_insert_sql("dbo.scia_api_phishing_tests", api_df_phishing_tests)
    },
    "scia_api_phishing_campaigns": {
        "dataframe" : prepare_for_insertion(api_df_combined_phish_campaigns),
        "insert_sql" : generate_insert_sql("dbo.scia_api_phishing_campaigns", api_df_combined_phish_campaigns)
    }, 
    "scia_api_pst_results": {
        "dataframe": prepare_for_insertion(api_df_pst_results_merged),
        "insert_sql": generate_insert_sql("dbo.scia_api_pst_results", api_df_pst_results_merged)
    },
    "scia_api_training_campaigns": {
        "dataframe": prepare_for_insertion(api_df_campaigns),
        "insert_sql": generate_insert_sql("dbo.scia_api_training_campaigns", api_df_campaigns)
    },
    "scia_api_training_enrollments": {
        "dataframe": prepare_for_insertion(api_df_enrollments),
        "insert_sql": generate_insert_sql("dbo.scia_api_training_enrollments", api_df_enrollments)
    }, 
    "scia_api_training_details": {
        "dataframe": prepare_for_insertion(api_df_training_details),
        "insert_sql": generate_insert_sql("dbo.scia_api_training_details", api_df_training_details)
    },
    "scia_api_training_content": {
        "dataframe": prepare_for_insertion(api_df_content),
        "insert_sql": generate_insert_sql("dbo.scia_api_training_content", api_df_content)
    },
    "scia_api_groups_bridge_campaigns":{
        "dataframe": prepare_for_insertion(api_df_combined_groups_bridge_campaigns),
        "insert_sql": generate_insert_sql("dbo.scia_api_groups_bridge_campaigns", api_df_combined_groups_bridge_campaigns)
    }

}

# Logging output to verify generated SQL commands for each table
for table_name, data in insertion_data.items():
    logging.info(f"INSERT SQL for table '{table_name}':\n{data['insert_sql']}\n")

logging.info(f"{len(insertion_data)} SQL Insert statements generated")