### Fetching Data from Excel File (For testing)

In [None]:
import pandas as pd

def read_csv_to_dataframe(file_path):
    """
    Reads a CSV file and returns a pandas DataFrame.

    Parameters:
    - file_path (str): The path to the CSV file.

    Returns:
    - DataFrame: The data from the CSV file.
    """
    try:
        df = pd.read_csv(file_path)
        print("CSV file successfully read.")
        return df
    except FileNotFoundError:
        print("Error: File not found at <file_path>.")
    except pd.errors.EmptyDataError:
        print("Error: The CSV file is empty.")
    except pd.errors.ParserError:
        print("Error: The CSV file is malformed.")
    except Exception as e:
        print("An unexpected error occurred:", e)


### Fetching Data from API Function 

In [None]:
def throughAPI(api_url):
    """
    Connect to the API and retrieve data through it.

    Args:
        string: Link of the API

    Returns:
        JSON: Data fetched from the API
    """

    # ***API Credentials WARNING
    username = "<USERNAME>"
    password = "<PASSWORD>"

    try:
        logger.info("Connecting to API.... and pulling Data")
        response = requests.get(api_url, auth=HTTPBasicAuth(username, password), timeout=600)
        response.raise_for_status()
        data = response.json()

        if not data:
            logger.warning("No Data, API returned no data.")
        else:
            logger.info("API connection successful and Data is pulled")
            return data

    except requests.exceptions.HTTPError as http_err:
        logger.error(f"HTTP error occurred: {http_err}")
        sys.exit(1)
    except requests.exceptions.ConnectionError:
        logger.error("Connection Error Could not connect to the API.")
        sys.exit(1)
    except requests.exceptions.Timeout:
        logger.error("Timeout The API request timed out.")
        sys.exit(1)
    except requests.exceptions.RequestException as e:
        logger.error(f"API Error, An error occurred: {e}")
        sys.exit(1)
    except ValueError:
        logger.error("Decode Error, Failed to decode JSON from the API.")
        sys.exit(1)


### Fetching Data from SQL Server

In [None]:
def connect_to_sql_server(server, database, user, password, port=1433):
    """
    Connect to the SQL Server.

    Args:
        string: server name
        string: database name
        string: User name
        string: Password
        Int: Port number

    Returns:
        Object: Connection object
    """
    try:
        conn_str = (
            f"DRIVER={{ODBC Driver 17 for SQL Server}};"
            f"SERVER=<SERVER>,{port};"
            f"DATABASE=<DATABASE>;"
            f"UID=<USER>;"
            f"PWD=<PASSWORD>"
        )
        connection = pyodbc.connect(conn_str)
        print("Connected to SQL Server")
        return connection
    except pyodbc.Error as e:
        print("Error connecting to SQL Server.")
        return None

def fetch_sqlserver_table_as_df(connection, table_name):
    """
    Fetches a SQL Server table as a DataFrame.

    Args:
        object: connection object
        string: Table name

    Returns:
        DataFrame: Table in the form of DataFrame
    """
    try:
        query = f"SELECT * FROM <Schema>.<table_name>"
        df = pd.read_sql(query, con=connection)
        return df
    except Exception as e:
        print("Error fetching SQL Server table.", e)
        return None


### Bulk Load and Incremental Load

In [None]:
def bulk_load(engine, table_name, df):
    """
    Empties the table and bulk loads new data, checking for schema compatibility.

    Args:
        engine: SQLAlchemy engine connected to the database.
        table_name (str): The name of the table to load data into.
        df (DataFrame): The new data to be loaded.

    Raises:
        ValueError: If the DataFrame schema does not match the existing table schema.

    Returns:
        None
    """
    try:
        inspector = inspect(engine)

        # Add Query_status and Load_Timestamp columns
        df['Query_status'] = 'Bulk load'
        df['Load_Timestamp'] = datetime.now()

        if inspector.has_table(table_name):
            # Get existing table schema
            columns = inspector.get_columns(table_name)
            existing_columns = {col['name'] for col in columns}
            df_columns = set(df.columns)

            # Check if schemas match
            if existing_columns != df_columns:
                logger.error(f"Schema mismatch for table '{table_name}'.")
                raise ValueError(
                    f"Schema mismatch for table '{table_name}'. "
                )

            # Truncate the table
            with engine.connect() as connection:
                connection.execute(f"TRUNCATE TABLE <Schema>.{table_name}")
                connection.commit()
                logger.info("Table '<table_name>' truncated.")

        # Load data
        df.to_sql(table_name, con=engine, if_exists='replace', index=False)
        logger.info(f"Table '{table_name}' loaded with {len(df)} records.")

        # Move data (assuming this function is defined elsewhere)
        move_stage1_to_stage2(engine, table_name, table_name, incremental=False, bulk_load=True)

    except Exception as e:
        logger.error(f"Error in bulk_load for table '{table_name}': {str(e)}")
        sys.exit(1)
    


def IncrementalLoadWithTracking(engine, table_name, df):
    """
    Loads new records, updates status changes, and tags each row with Query_status and Load_Timestamp.

    Args:
        engine: SQLAlchemy engine connected to the database.
        table_name (str): The name of the table to load data into.
        df (DataFrame): The new data to be loaded.

    Returns:
        None
    """
    unique_key = ''
    lowerTableName = table_name.lower()

    # Identify unique key for each table
    if '<placeholder>' in lowerTableName:
        unique_key = '<TABLE_ID1>'
        status = '<TABLE_STATUS1>'
    elif '<placeholder>' in lowerTableName:
        unique_key = '<TABLE_ID2>'
        status = '<TABLE_STATUS2>'
    elif '<placeholder>' in lowerTableName:
        unique_key = '<TABLE_ID3>'
        status = '< TABLE_STATUS3>'
    else:
        logger.error("Incremental Load failed. No unique key found.")
        sys.exit()

    is_incremental = False

    try:
        inspector = inspect(engine)

        # Deduplicate incoming data
        # df = df.drop_duplicates(subset=unique_key, keep='last') -- Need to be revised

        # Add Query_status and Load_Timestamp columns
        df['Query_status'] = ''
        df['Load_Timestamp'] = datetime.now()

        if not inspector.has_table(table_name):
            df['Query_status'] = 'Bulk load'
            df.to_sql(table_name, con=engine, if_exists='replace', index=False)
            logger.info(f"Table '{table_name}' created and all records inserted.")
            move_stage1_to_stage2(engine, table_name, table_name, incremental=False, bulk_load=True)
            return

        with engine.connect() as conn:
            existing_df = pd.read_sql(f"SELECT * FROM <Schema>.{table_name}", conn)
            filtered_df = pd.read_sql(f"SELECT {unique_key}, {status}, Query_status, Load_Timestamp FROM <Schema>.{table_name}", conn)

        df['Load_Timestamp'] = df['Load_Timestamp'].astype('datetime64[ns]')
        new_schema = list(zip(df.columns, df.dtypes))
        
        existing_schema = list(zip(existing_df.columns, existing_df.dtypes))
            
        if existing_df.empty:
            if existing_schema != new_schema:
                logger.error(f"Schema mismatch for table '{table_name}'.")
                logger.error(f"Existing schema: {existing_schema}")
                logger.error(f"New schema: {new_schema}")
                return  # Or use sys.exit(1) if this is at the script level
            else:
                logger.info("Schema match confirmed. Proceeding with incremental load.")
                
            df['Query_status'] = 'Bulk load'
            df.to_sql(table_name, con=engine, if_exists='replace', index=False)
            logger.info(f"Initial Load: Loaded {len(df)} records into '{table_name}'.")
            move_stage1_to_stage2(engine, table_name, table_name, incremental=False, bulk_load=True)
        else:
            if existing_schema != new_schema:
                logger.error(f"Schema mismatch for table '{table_name}'.")
                logger.error(f"Existing schema: {existing_schema}")
                logger.error(f"New schema: {new_schema}")
                return  # Or use sys.exit(1) if this is at the script level
            else:
                logger.info("Schema match confirmed. Proceeding with incremental load.")
            merged_df = df.merge(filtered_df, on=unique_key, suffixes=('_new', '_old'), how='left')

            # Separate into categories
            new_rows = merged_df[merged_df[status+'_old'].isna()]
            updated_rows = merged_df[(~merged_df[status+'_old'].isna()) & (merged_df[status+'_new'] != merged_df[status+'_old'])]
            same_rows = merged_df[(~merged_df[status+'_old'].isna()) & (merged_df[status+'_new'] == merged_df[status+'_old'])]

            timestamp = datetime.now()

            # Label Query_status and timestamp
            df.loc[df[unique_key].isin(new_rows[unique_key]), 'Query_status'] = 'newly inserted'
            df.loc[df[unique_key].isin(updated_rows[unique_key]), 'Query_status'] = 'updated'
            df.loc[df[unique_key].isin(same_rows[unique_key]), 'Query_status'] = 'same'
            df['Load_Timestamp'] = timestamp

            with engine.begin() as conn:
                # Insert new records
                if not new_rows.empty:
                    is_incremental = True
                    insert_data = df[df[unique_key].isin(new_rows[unique_key])]
                    insert_data.to_sql(table_name, con=conn, if_exists='append', index=False)
                    logger.info(f"Inserted {len(insert_data)} new records into '{table_name}'.")

                # Update changed records
                if not updated_rows.empty:
                    is_incremental = True
                    for _, row in df[df[unique_key].isin(updated_rows[unique_key])].iterrows():
                        update_query = f"""
                            UPDATE <Schema>.{table_name}
                            SET {status} = {int(row[status])},
                                Query_status = 'updated',
                                Load_Timestamp = '{timestamp.strftime('%Y-%m-%d %H:%M:%S')}'
                            WHERE {unique_key} = '{row[unique_key]}'
                        """
                        conn.execute(text(update_query))
                    logger.info(f"Updated {len(updated_rows)} records in '{table_name}' due to status changes.")

                if new_rows.empty and updated_rows.empty:
                    logger.info(f"No new or updated records for '{table_name}'.")

            move_stage1_to_stage2(engine, table_name, table_name, incremental=is_incremental)

    except Exception as e:
        logger.error("Insert/Update Failed:\n" + str(e))
        sys.exit(1)


### Moving Data from Stage 1 to Stage 2

In [None]:
def move_stage1_to_stage2(engine, stage1_table, stage2_table, incremental=False, bulk_load=False):
    """
    Move data from Stage1 to Stage2. If incremental, only move updated/new rows.

    Args:
        engine: SQL Server connector
        stage1_table (str): Source table (Stage1)
        stage2_table (str): Destination table (Stage2)
        incremental (bool): True = move only updated/new rows, False = move entire table
        bulk_load (bool): True = move all stage 1 table into stage 2, False = only copy the schema

    Returns:
        None
    """
    try:
        with engine.begin() as conn:
            if incremental:
                # For incremental: move only updated or newly inserted rows
                conn.execute(text(f"""
                    DROP TABLE IF EXISTS <Schema>.{stage2_table};
                    WITH MaxTimestampCTE AS (
                        SELECT MAX(load_Timestamp) AS MaxTimestamp
                        FROM <Schema>.{stage1_table}
                        WHERE Query_status IN ('updated', 'newly inserted')
                    )
                    SELECT *
                    INTO <Schema>.{stage2_table}
                    FROM <Schema>.{stage1_table}
                    WHERE Query_status IN ('updated', 'newly inserted')
                      AND load_Timestamp = (SELECT MaxTimestamp FROM MaxTimestampCTE);
                """))
                logger.info("Incremental Stage 2 Load: Moved updated/new data from {stage1_table} to {stage2_table}.")
            elif bulk_load:
                # For bulk load: move entire table
                conn.execute(text(f"""
                    DROP TABLE IF EXISTS <Schema>.{stage2_table};
                    SELECT *
                    INTO <Schema>.{stage2_table}
                    FROM <Schema>.{stage1_table};
                """))
                logger.info("Bulk Stage 2 Load: Moved full data from {stage1_table} to {stage2_table}.")
            else:
                # Only copy the schema of stage 1 to stage 2
                conn.execute(text(f"""
                    DROP TABLE IF EXISTS <Schema>.{stage2_table};
                    SELECT TOP 0 *
                    INTO <Schema>.{stage2_table}
                    FROM <Schema>.{stage1_table};
                """))
                logger.info("No new row added to stage 2")
    except Exception as e:
        logger.error("Stage 2 Load Failed: Error moving data from {stage1_table} to {stage2_table}:\n" + str(e))


### Mapping Validator Function

In [None]:
def check_unexpected_values(engine, validations):
    """
    validations: a list of dictionaries
    Each dictionary should have:
        - name: a label for what you are checking
        - query: the SQL query to run
    Args:
        engine: SQL Server Connector
        Dictionary: Name of the query and its query
    Return:
        none
    """

    with engine.connect() as connection:
        for validation in validations:
            name = validation['name']
            query = text(validation['query'])

            result = connection.execute(query)
            unexpected_values = [row[0] for row in result]

            if unexpected_values:
                logger.warning(f"Unexpected values found for <Validation_name>: {unexpected_values}")

                # Optional: Save into a log file
                with open(f'<Validation_name>_unexpected.log', 'a') as f:
                    for value in unexpected_values:
                        f.write(f"{value}\n")
            else:
                logger.info(f"No unexpected values found for <Validation_name>.")


### Transformation Scripts

In [None]:
def transform(engine, aggregated_table):
    """
    This function will transform the stage 2 data according to Sapphire API

    Args:
        engine: SQL Server Connector
        aggregated_table: Bool Value to run aggregated tables

    Returns:
        none
    """
    # Validations of the Mapping that needs to be done
    validations = [
        {
            "name": "Column1 For Table1",
            "query": """
                SELECT DISTINCT Column1
                FROM Table1
                WHERE Column1 NOT IN ('value1','value2','value3','value4','value5');
            """
        },
        {
            "name": "Column2 For Table1",
            "query": """
                SELECT DISTINCT Column2
                FROM Table1
                WHERE Column2 NOT IN ('value1','value2','value3','value4','value5','value6','value7','value8','value9','value10','value11','value12');
            """
        },
        {
            "name": "Column3 For Table1",
            "query": """
                SELECT DISTINCT Column3
                FROM Table1
                WHERE Column3 NOT IN ('value1','value2','value3','value4','value5','value6','value7','value8','value9','value10','value11','value12','value13','value14');
            """
        },
        {
            "name": "Column4 For Table1",
            "query": """
            SELECT DISTINCT Column4
            FROM Table1
            WHERE Column4 NOT IN ('value1','value2','value3','value4','value5','value6','value7','value8','value9','value10','value11','value12','value13','value14');
            """
        },
        {
            "name": "Column1 For Table2",
            "query": """
                SELECT DISTINCT Column1
                FROM Table2
                WHERE Column1 NOT IN ('value1','value2','value3','value4','value5','value6','value7','value8','value9','value10','value11','value12');
            """
        },
        {
            "name": "Column2 For Table2",
            "query": """
                SELECT DISTINCT Column2
                FROM Table2
                WHERE Column2 NOT IN ('value1','value2','value3','value4','value5');
            """
        },
        {
            "name": "Column1 For Table3",
            "query": """
                SELECT DISTINCT Column1
                FROM Table3
                WHERE Column1 NOT IN ('value1','value2','value3','value4','value5','value6','value7','value8','value9','value10','value11','value12');
            """
        },
        {
            "name": "Column2 For Table3",
            "query": """
                SELECT DISTINCT Column2
                FROM Table3
                WHERE Column2 NOT IN ('value1','value2','value3','value4','value5','value6','value7','value8');
            """
        },
        {
            "name": "Column3 For Table3",
            "query": """
            SELECT DISTINCT Column3
            FROM Table3
            WHERE Column3 NOT IN ('value1','value2','value3','value4','value5','value6','value7','value8','value9','value10','value11','value12');
            """
        },
        {
            "name": "Column4 For Table3",
            "query": """
            SELECT DISTINCT Column4
            FROM Table3
            WHERE Column4 NOT IN ('value1','value2','value3','value4','value5','value6','value7','value8');
            """
        },
        {
            "name": "Column5 For Table3",
            "query": """
            SELECT DISTINCT Column5
            FROM Table3
            WHERE Column5 NOT IN ('value1','value2','value3','value4','value5','value6','value7','value8','value9','value10','value11','value12','value13','value14');
            """
        },
        {
            "name": "Column6 For Table3",
            "query": """
            SELECT DISTINCT Column6
            FROM Table3
            WHERE Column6 NOT IN ('value1','value2','value3','value4','value5','value6','value7','value8','value9','value10','value11','value12','value13','value14');
            """
        },
        {
            "name": "Column7 For Table3",
            "query": """
            SELECT DISTINCT Column7
            FROM Table3
            WHERE Column7 NOT IN ('value1','value2','value3','value4','value5','value6','value7','value8','value9','value10','value11','value12','value13','value14','value15','value16','value17','value18','value19','value20');
            """
        }
    ]

    
    mapping_query = text("""
        ----Making lookups
        ----=================================================================================
        DROP TABLE IF EXISTS Table_01;
        CREATE TABLE Table_01 (
            Col_01 INT,
            Col_02 INT,
            Col_03 INT
        );
        INSERT INTO Table_01 (Col_01, Col_02, Col_03) VALUES
            (1, 1, 1), (1, 2, 2), (1, 3, 3), (1, 4, 4), (1, 5, NULL);

        DROP TABLE IF EXISTS Table_02;
        CREATE TABLE Table_02 (
            Col_01 INT,
            Col_02 INT,
            Col_03 INT,
            Col_04 INT
        );
        INSERT INTO Table_02 (Col_01, Col_02, Col_03, Col_04) VALUES
            (1, 1, 1, 1), (1, 1, 2, 2), (1, 2, 3, 3), (1, 2, 4, 4), (1, 2, 5, NULL);

        DROP TABLE IF EXISTS Table_03;
        CREATE TABLE Table_03 (
            Col_01 INT,
            Col_02 INT
        );
        INSERT INTO Table_03 (Col_01, Col_02) VALUES
            (1, 1), (2, 2), (3, 3), (4, 4), (5, NULL);

        DROP TABLE IF EXISTS Table_04;
        CREATE TABLE Table_04 (
            Col_01 INT,
            Col_02 INT
        );
        INSERT INTO Table_04 (Col_01, Col_02) VALUES
            (1, NULL), (2, NULL), (3, 1), (3, 2), (4, 3);

        DROP TABLE IF EXISTS Table_05;
        CREATE TABLE Table_05 (
            Col_01 INT,
            Col_02 INT
        );
        INSERT INTO Table_05 (Col_01, Col_02) VALUES
            (0, NULL), (1, 1), (2, 2), (3, 3), (3, 4);

        DROP TABLE IF EXISTS Table_06;
        CREATE TABLE Table_06 (
            Col_01 INT,
            Col_02 INT
        );
        INSERT INTO Table_06 (Col_01, Col_02) VALUES
            (0, NULL), (1, 1), (2, 2), (3, NULL), (4, 3);

        DROP TABLE IF EXISTS Table_07;
        CREATE TABLE Table_07 (
            Col_01 INT,
            Col_02 INT
        );
        INSERT INTO Table_07 (Col_01, Col_02) VALUES
            (0, 1), (0, 2), (0, 3), (1, 4), (2, 5);

        DROP TABLE IF EXISTS Table_08;
        CREATE TABLE Table_08 (
            Col_01 INT,
            Col_02 INT
        );
        INSERT INTO Table_08 (Col_01, Col_02) VALUES
            (1, NULL), (2, NULL), (3, NULL), (4, NULL);

        DROP TABLE IF EXISTS Table_09;
        CREATE TABLE Table_09 (
            Col_01 INT,
            Col_02 INT
        );
        INSERT INTO Table_09 (Col_01, Col_02) VALUES
            (0, 0), (1, 1), (2, 2), (3, 3);

        DROP TABLE IF EXISTS Table_10;
        CREATE TABLE Table_10 (
            Col_01 INT,
            Col_02 INT
        );
        INSERT INTO Table_10 (Col_01, Col_02) VALUES
            (0, 0), (1, 1), (2, 2), (3, 3), (4, NULL);

        DROP TABLE IF EXISTS Table_11;
        CREATE TABLE Table_11 (
            Col_01 INT,
            Col_02 INT
        );
        INSERT INTO Table_11 (Col_01, Col_02) VALUES
            (0, 0), (1, 1), (2, 2), (3, 3), (4, NULL);

        DROP TABLE IF EXISTS Table_12;
        CREATE TABLE Table_12 (
            Col_01 INT,
            Col_02 INT
        );
        INSERT INTO Table_12 (Col_01, Col_02) VALUES
            (3, 1), (4, 2), (5, 3), (6, 4), (7, 5);

        DROP TABLE IF EXISTS Table_13;
        CREATE TABLE Table_13 (
            Col_01 INT,
            Col_02 INT
        );
        INSERT INTO Table_13 (Col_01, Col_02) VALUES
            (101, NULL), (102, NULL), (103, NULL), (104, 1), (105, NULL);

        DROP TABLE IF EXISTS Table_14;
        CREATE TABLE Table_14 (
            Col_01 INT,
            Col_02 INT
        );
        INSERT INTO Table_14 (Col_01, Col_02) VALUES
            (0, 1), (1, 2), (2, 3);

    """) 
    
    try:
        with engine.begin() as conn:
            # 1. Mapping Tables
            if not aggregated_table:
                # Run the Mapping Validator Function
                check_unexpected_values(engine, validations)
                try:
                    conn.execute(mapping_query)
                    logger.info("Mapping Tables Created")
                except SQLAlchemyError as e:
                    logger.error("Failed to execute mapping_query.")
                    logger.error(str(e))

            # 2. Table01-related tables
            try:
                result = conn.execute(text("SELECT COUNT(*) FROM Schema01.Table01"))
                if result.scalar() > 0:
                    table01_queries = []
                    if not aggregated_table:
                        table01_queries.append((query_Table01, "Table01"))
                    else:
                        table01_queries.extend([
                            (query_Table02, "Table02"),
                            (query_Table03, "Table03"),
                            (query_Table04, "Table04"),
                            (query_Table05, "Table05"),
                            (query_Table06, "Table06"),
                            (query_Table07, "Table07"),
                            (query_Table08, "Table08"),
                            (query_Table09, "Table09"),
                        ])
                    for query, name in table01_queries:
                        try:
                            conn.execute(query)
                            logger.info(f"{name} Table Created")
                        except SQLAlchemyError as e:
                            logger.error(f"Failed to create {name} table: {e}")
            except SQLAlchemyError as e:
                logger.error("Error checking/processing Table01 data.")
                logger.error(str(e))

            # 3. Table02-related tables
            try:
                result = conn.execute(text("SELECT COUNT(*) FROM Schema01.Table02"))
                if result.scalar() > 0:
                    table02_queries = []
                    if not aggregated_table:
                        table02_queries.append((query_Table10, "Table10"))
                    else:
                        table02_queries.extend([
                            (query_Table11, "Table11"),
                            (query_Table12, "Table12"),
                            (query_Table13, "Table13"),
                            (query_Table14, "Table14"),
                            (query_Table15, "Table15"),
                            (query_Table16, "Table16"),
                        ])
                    for query, name in table02_queries:
                        try:
                            conn.execute(query)
                            logger.info(f"{name} Table Created")
                        except SQLAlchemyError as e:
                            logger.error(f"Failed to create {name} table: {e}")
            except SQLAlchemyError as e:
                logger.error("Error checking/processing Table02 data.")
                logger.error(str(e))

            # 4. Table03-related tables
            try:
                result = conn.execute(text("SELECT COUNT(*) FROM Schema01.Table03"))
                if result.scalar() > 0:
                    table03_queries = []
                    if not aggregated_table:
                        table03_queries.append((query_Table17, "Table17"))
                    else:
                        table03_queries.extend([
                            (query_Table18, "Table18"),
                            (query_Table19, "Table19"),
                            (query_Table20, "Table20"),
                            (query_Table21, "Table21"),
                            (query_Table22, "Table22"),
                            (query_Table23, "Table23"),
                        ])
                    for query, name in table03_queries:
                        try:
                            conn.execute(query)
                            logger.info(f"{name} Table Created")
                        except SQLAlchemyError as e:
                            logger.error(f"Failed to create {name} table: {e}")
            except SQLAlchemyError as e:
                logger.error("Error checking/processing Table03 data.")
                logger.error(str(e))

        logger.info("All possible queries executed (errors logged if any).")
        return

    except SQLAlchemyError as e:
        logger.critical("Critical DB error — transaction not even started.")
        logger.critical(str(e))
        return



### Moving to Stage3

In [None]:
def move_stage2_to_stage3(engine, table2_name, table3_name, aggregated_table = False):
    """
    Copies data from Stage2 table to Stage3 table. If Stage3 table does not exist, it is created with the same schema.

    Args:
        engine: SQLAlchemy engine connected to SQL Server
        table2_name (str): Source table name (Stage2 schema)
        table3_name (str): Target table name (Stage3 schema)

    Returns:
        None
    """
    try:
        with engine.begin() as conn:
            if not aggregated_table:
                # Step 1: Check if the table exists in Stage3 database
                table_exists = conn.execute(text(f"""
                    SELECT 1 
                    FROM Stage3.INFORMATION_SCHEMA.TABLES 
                    WHERE TABLE_NAME = :table_name AND TABLE_SCHEMA = 'dbo'
                """), {'table_name': table3_name}).scalar()
    
                if not table_exists:
                    # Step 2: Create the table structure in Stage3
                    conn.execute(text(f"""
                        SELECT * INTO Stage3.dbo.{table3_name}
                        FROM Stage2.dbo.{table2_name}
                        WHERE 1 = 0
                    """))
    
                # Step 3: Insert data from Stage2 to Stage3
                conn.execute(text(f"""
                    INSERT INTO Stage3.dbo.{table3_name}
                    SELECT * FROM Stage2.dbo.{table2_name}
                """))
            else:
                # Step 1: Check if the table exists in Stage3 database
                table_exists = conn.execute(text(f"""
                    SELECT 1 
                    FROM Stage3.INFORMATION_SCHEMA.TABLES 
                    WHERE TABLE_NAME = :table_name AND TABLE_SCHEMA = 'dbo'
                """), {'table_name': table3_name}).scalar()
    
                if not table_exists:
                    # Step 2: Create the table structure in Stage3
                    conn.execute(text(f"""
                        SELECT * INTO Stage3.dbo.{table3_name}
                        FROM Stage2.dbo.{table2_name}
                        WHERE 1 = 0
                    """))
    
                # Step 3: Insert data from Stage2 to Stage3
                conn.execute(text(f"""
                    TRUNCATE TABLE Stage3.dbo.{table3_name}
                    INSERT INTO Stage3.dbo.{table3_name}
                    SELECT * FROM Stage2.dbo.{table2_name}
                """))

        logger.info(f"Stage 3 Load: Data from {table2_name} inserted into {table3_name} successfully.")
        return
    except Exception as e:
        logger.error(f"Stage 3 Load Failed: Error processing tables:\n{e}")
        return

#### Data Summary 

In [None]:
def data_summary(engine, table_name):
    """
    Generates the data summary of the main tables.

    Args:
        engine: SQLAlchemy engine connected to SQL Server
        table_name (str): table name for which data summary will be generated
        output_path (str): path where the file which is saved

    Returns:
        None
    """
    output_path = table_name + "_Data_Summary.txt"
    logger.info("Generating the Data Summary")
    
    if table_name.lower() == "school":
        queries =  {
            "query_01": "SELECT COUNT(*) AS Col_01 FROM Stage1.dbo.Tbl_01",
            "query_02": "SELECT COUNT(DISTINCT Col_02) AS Col_03 FROM Stage1.dbo.Tbl_01",
            "query_03": """
                SELECT COALESCE(SUM(Col_04), 0) AS Col_05 FROM (
                    SELECT COUNT(*) as Col_04
                    FROM Stage1.dbo.Tbl_01
                    GROUP BY Col_02
                    HAVING COUNT(*) > 1
                ) AS TblX
            """,
            "query_04": """
                SELECT Col_06, COUNT(Col_02) as Col_07
                FROM Stage1.dbo.Tbl_01
                GROUP BY Col_06
            """,
            "query_05": """
                SELECT Col_08, COUNT(Col_02) as Col_09
                FROM Stage1.dbo.Tbl_01
                GROUP BY Col_08
            """,
            "query_06": """
                SELECT Col_10, COUNT(Col_02) as Col_11
                FROM Stage1.dbo.Tbl_01 ns
                GROUP BY Col_10
            """,
            "query_07": """
                SELECT Col_08, COUNT(Col_12) as Col_13
                FROM Stage1.dbo.Tbl_01 ns
                GROUP BY Col_12
            """,
            "query_08": """
                SELECT Col_08, COUNT(Col_14) as Col_15
                FROM Stage1.dbo.Tbl_01 ns
                GROUP BY Col_14
            """,
            "query_09": """
                SELECT Col_08, COUNT(Col_16) as Col_17
                FROM Stage1.dbo.Tbl_01 ns
                GROUP BY Col_16
            """
        }

    if table_name.lower() == "teachers":
        output_path = table_name + "_Data_Summary.txt"
        queries =  {
            "query_01": """
                SELECT COUNT(*) AS Col_01
                FROM Stage1.dbo.Tbl_02;
            """,
            "query_02": """
                SELECT COUNT(DISTINCT Col_02) AS Col_03
                FROM Stage1.dbo.Tbl_02;
            """,
            "query_03": """
                SELECT COALESCE(SUM(Col_04), 0) AS Col_05
                FROM (
                    SELECT COUNT(*) AS Col_04
                    FROM Stage1.dbo.Tbl_02
                    GROUP BY Col_02
                    HAVING COUNT(*) > 1
                ) AS TblY;
            """,
            "query_04": """
                SELECT Col_06, COUNT(Col_02) AS Col_07
                FROM Stage1.dbo.Tbl_02
                GROUP BY Col_06;
            """,
            "query_05": """
                SELECT Col_08, COUNT(Col_02) AS Col_09
                FROM Stage1.dbo.Tbl_02
                GROUP BY Col_08;
            """,
            "query_06": """
                SELECT Col_10, COUNT(Col_02) AS Col_11
                FROM Stage1.dbo.Tbl_02
                GROUP BY Col_10;
            """,
            "query_07": """
                SELECT Col_12, COUNT(Col_02) AS Col_13
                FROM Stage1.dbo.Tbl_02
                GROUP BY Col_12;
            """,
            "query_08": """
                SELECT Col_14, COUNT(Col_02) AS Col_15
                FROM Stage1.dbo.Tbl_02
                GROUP BY Col_14;
            """,
            "query_09": """
                SELECT Col_16, COUNT(Col_02) AS Col_17
                FROM Stage1.dbo.Tbl_02
                GROUP BY Col_16;
            """,
            "query_10": """
                SELECT Col_06, ROUND(AVG(CAST(Col_18 AS FLOAT)), 2) AS Col_19
                FROM Stage1.dbo.Tbl_02
                GROUP BY Col_06;
            """
        }

    if table_name.lower() == "learners":
        output_path = table_name + "_Data_Summary.txt"
        queries =  {
            "query_01": "SELECT COUNT(*) AS Col_01 FROM Stage1.dbo.Tbl_03;",
            "query_02": "SELECT COUNT(DISTINCT Col_02) AS Col_03 FROM Stage1.dbo.Tbl_03;",
            "query_03": "SELECT COALESCE(SUM(Col_04), 0) AS Col_05 FROM (SELECT COUNT(*) AS Col_04 FROM Stage1.dbo.Tbl_03 GROUP BY Col_02 HAVING COUNT(*) > 1) AS TblZ;",
            "query_04": "SELECT Col_06, COUNT(Col_02) AS Col_07 FROM Stage1.dbo.Tbl_03 GROUP BY Col_06;",
            "query_05": "SELECT Col_08, COUNT(Col_02) AS Col_09 FROM Stage1.dbo.Tbl_03 GROUP BY Col_08;",
            "query_06": "SELECT Col_10, COUNT(Col_02) AS Col_11 FROM Stage1.dbo.Tbl_03 GROUP BY Col_10;",
            "query_07": "SELECT Col_12, COUNT(Col_02) AS Col_13 FROM Stage1.dbo.Tbl_03 GROUP BY Col_12;",
            "query_08": "SELECT Col_14, COUNT(Col_02) AS Col_15 FROM Stage1.dbo.Tbl_03 GROUP BY Col_14;",
            "query_09": "SELECT Col_16, COUNT(Col_02) AS Col_17 FROM Stage1.dbo.Tbl_03 GROUP BY Col_16;",
            "query_10": "SELECT Col_18, COUNT(Col_02) AS Col_19 FROM Stage1.dbo.Tbl_03 GROUP BY Col_18;",
            "query_11": "SELECT Col_20, COUNT(Col_02) AS Col_21 FROM Stage1.dbo.Tbl_03 GROUP BY Col_20;",
            "query_12": "SELECT Col_22, COUNT(Col_02) AS Col_23 FROM Stage1.dbo.Tbl_03 GROUP BY Col_22;",
            "query_13": "SELECT Col_24, COUNT(Col_02) AS Col_25 FROM Stage1.dbo.Tbl_03 GROUP BY Col_24;",
            "query_14": "SELECT Col_26, COUNT(Col_02) AS Col_27 FROM Stage1.dbo.Tbl_03 GROUP BY Col_26;",
            "query_15": "SELECT Col_28, COUNT(Col_02) AS Col_29 FROM Stage1.dbo.Tbl_03 GROUP BY Col_28;",
            "query_16": "SELECT Col_30, COUNT(Col_02) AS Col_31 FROM Stage1.dbo.Tbl_03 GROUP BY Col_30;",
            "query_17": "SELECT Col_32, COUNT(Col_02) AS Col_33 FROM Stage1.dbo.Tbl_03 GROUP BY Col_32;",
            "query_18": "SELECT YEAR(Col_34) AS Col_35, COUNT(Col_02) AS Col_36 FROM Stage1.dbo.Tbl_03 WHERE Col_34 IS NOT NULL GROUP BY YEAR(Col_34) ORDER BY Col_35;",
            "query_19": "SELECT Col_37, COUNT(Col_02) AS Col_38 FROM Stage1.dbo.Tbl_03 GROUP BY Col_37;"
        }

    with engine.connect() as conn, open(output_path, "a", encoding="utf-8") as f:
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        f.write(f"\n========= Summary for {table_name} at {timestamp} =========\n")

        for title, query in queries.items():
            f.write(f"\n-- {title} --\n")
            try:
                result = conn.execute(text(query))
                rows = result.fetchall()
                columns = result.keys()

                if rows:
                    table = tabulate(rows, headers=columns, tablefmt="grid")
                    f.write(f"{table}\n")
                else:
                    f.write("No data returned.\n")
            except Exception as e:
                f.write(f"Query failed: {e}\n")

        f.write(f"\n========== End of {table_name} Summary ==========\\n\\n")

### Test Cases

In [None]:
def test_cases(engine, table_name):
    """
    Run the test cases for the ETL pipeline (anonymized)
    
    Args:
        engine: SQLAlchemy engine connected to SQL Server
        table_name (str): table name for which data summary will be generated
    Returns:
        None
    """
    # List for storing test case results
    results = []
    results.append(("-------------- Running Tests at " + datetime.now().strftime("%Y-%m-%d %H:%M:%S") + " --------------", "For Table " + table_name))
    
    if table_name.lower() == 'table_01':
        # Test no 1: Check whether the stage 3 table is null or not?
        query = f"SELECT COUNT(*) AS row_count FROM Stage3.dbo.{table_name};"
        result = pd.read_sql(query, engine)
        row_count = result.iloc[0]['row_count']
        if row_count == 0:
            results.append((f"Test no 1 : Stage 3 {table_name} is not empty", "FAIL"))
        else:
            results.append((f"Test no 1 : Stage 3 {table_name} is not empty", "PASS"))
        
        # Test no 2: Count of distinct primary key in Stage 1
        query = f"""SELECT COUNT(DISTINCT col_01) AS count FROM Stage1.dbo.{table_name} where col_04 != 'val_01' AND col_04 != 'val_02' AND col_04 != 'val_03' AND col_07 = 1"""
        stage1_count = pd.read_sql(query, engine).iloc[0]['count']
        stage3_count = pd.read_sql(f"SELECT COUNT(DISTINCT col_05) AS count FROM Stage3.dbo.{table_name}", engine).iloc[0]['count']
        if stage1_count == stage3_count:
            results.append((f"Test no 2 : Stage1 vs Stage 3 primary key Count for {table_name}", "PASS"))
        else:
            results.append((f"Test no 2 : Stage1 vs Stage 3 primary key Count for {table_name}", "FAIL"))
        
        # Test no 3: Check the count of the Province in the final table
        query = f"SELECT COUNT(DISTINCT col_02) AS province_count FROM Stage3.dbo.{table_name}"
        province_count = pd.read_sql(query, engine)
        output = province_count.iloc[0]['province_count']
        if output == 1:
            results.append((f"Test no 3: Stage 3 Province Count is 1 in {table_name}", "PASS"))
        else:
            results.append((f"Test no 3: Stage 3 Province Count is 1 in {table_name}", "FAIL"))
        
        # Test no 4: Check the given Data types and column names from the Output Table
        expected_schema = {
            "col_01": "NVARCHAR",
            "col_03": "SMALLINT",
            "col_02": "TINYINT",
            "col_05": "NVARCHAR",
            "col_06": "NVARCHAR",
            "col_08": "NVARCHAR",
            "col_09": "NVARCHAR",
            "col_10": "INT",
            "col_11": "NVARCHAR",
            "col_12": "NVARCHAR",
            "col_13": "INT",
            "col_14": "INT",
            "col_15": "DATE",
            "col_16": "TINYINT",
            "col_17": "SMALLINT",
            "col_18": "TINYINT",
            "col_19": "NVARCHAR",
            "col_20": "TINYINT",
            "col_21": "TINYINT",
            "col_22": "TINYINT",
            "col_23": "DATE",
            "col_24": "TINYINT",
            "col_25": "SMALLINT",
            "col_26": "TINYINT",
            "col_27": "TINYINT",
            "col_28": "TINYINT",
            "col_29": "TINYINT",
            "col_30": "NVARCHAR",
            "col_31": "NVARCHAR",
            "col_32": "NVARCHAR",
            "col_33": "NVARCHAR"
        }
        query = f"""
        SELECT * FROM (
            SELECT c.name AS column_name, ty.name AS data_type
            FROM Stage3.sys.columns c
            JOIN Stage3.sys.tables t ON c.object_id = t.object_id
            JOIN Stage3.sys.schemas s ON t.schema_id = s.schema_id
            JOIN Stage3.sys.types ty ON c.user_type_id = ty.user_type_id
            JOIN sys.databases db ON db.name = 'Stage3'
            WHERE t.name = '{table_name}'
        ) inner_query
        """
        df = pd.read_sql(query, engine)
        actual_schema = dict(zip(df['column_name'], df['data_type']))
        expected_keys_lower = [k.lower() for k in expected_schema.keys()]
        actual_keys_lower = [k.lower() for k in actual_schema.keys()]
        if expected_keys_lower != actual_keys_lower:
            results.append((f"Test no 4 PART 1: Stage 3 Column Name matched (case-insensitive) for {table_name}", "FAIL"))
        else:
            results.append((f"Test no 4 PART 1: Stage 3 Column Name matched (case-insensitive) for {table_name}", "PASS"))
        mismatches = []
        for col, expected_type in expected_schema.items():
            actual_type = actual_schema.get(col)
            if actual_type.lower() != expected_type.lower():
                mismatches.append((col, expected_type, actual_type))
        if not mismatches:
            results.append((f"Test no 4 PART 2: Stage 3 Column Data Type matched (case-insensitive) for {table_name}", "PASS"))
        else:
            results.append((f"Test no 4 PART 2: Stage 3 Column Data Type matched (case-insensitive) for {table_name}", "FAIL"))
        
        # Test no 5: District Unique Count
        query = f"SELECT COUNT(DISTINCT col_34) AS count FROM Stage1.dbo.{table_name} where col_04 != 'val_01' AND col_04 != 'val_02' AND col_04 != 'val_03' AND col_07 = 1"""
        stage1_count = pd.read_sql(query, engine).iloc[0]['count']
        stage3_count = pd.read_sql(f"SELECT COUNT(DISTINCT col_13) AS count FROM Stage3.dbo.{table_name}", engine).iloc[0]['count']
        if stage1_count == stage3_count:
            results.append((f"Test no 5 Part 1 : Stage1 vs Stage 3 District Unique Count for {table_name}", "PASS"))
        else:
            results.append((f"Test no 5 Part 1 : Stage1 vs Stage 3 District Unique Count for {table_name}", "FAIL"))
        
        query = f"SELECT COUNT(DISTINCT col_35) AS count FROM Stage1.dbo.{table_name} where col_04 != 'val_01' AND col_04 != 'val_02' AND col_04 != 'val_03' AND col_07 = 1"""
        stage1_count = pd.read_sql(query, engine).iloc[0]['count']
        stage3_count = pd.read_sql(f"SELECT COUNT(DISTINCT col_14) AS count FROM Stage3.dbo.{table_name}", engine).iloc[0]['count']
        if stage1_count == stage3_count:
            results.append((f"Test no 5 Part 2 : Stage1 vs Stage 3 Tehsil Unique Count for {table_name}", "PASS"))
        else:
            results.append((f"Test no 5 Part 2 : Stage1 vs Stage 3 Tehsil Unique Count for {table_name}", "FAIL"))
        
        # Test no 6: Mapping Validity Checks
        # Province Code Mapping
        query = f"""
            SELECT DISTINCT CASE WHEN Inner_query.Final_col_02 = l1.col_02 THEN 'TRUE' ELSE 'FALSE' END AS [check]
            FROM Stage2.dbo.Lookup_01 l1
            RIGHT JOIN (
                SELECT DISTINCT t1.col_02 as Final_col_02, t2.col_36 as Initial_col_36
                FROM Stage3.dbo.Table_01 t1
                LEFT JOIN Stage1.dbo.Table_01 t2 ON t1.col_01 = t2.col_01
            ) Inner_query ON Inner_query.Initial_col_36 = l1.col_37
        """
        df = pd.read_sql(query, engine)
        if df['check'].astype(str).str.lower().eq('true').any():
            results.append((f"Test no 6 Part 1 : Province Mapping Validity Check for {table_name}", "PASS"))
        else:
            results.append((f"Test no 6 Part 1 : Province Mapping Validity Check for {table_name}", "FAIL"))
        # School Level Mapping
        query = f"""
            SELECT DISTINCT CASE WHEN Inner_query.Final_col_16 = l2.col_16 THEN 'TRUE' ELSE 'FALSE' END AS [check]
            FROM Stage2.dbo.Lookup_02 l2
            RIGHT JOIN (
                SELECT DISTINCT t1.col_16 as Final_col_16, t2.col_38 as Initial_col_38
                FROM Stage3.dbo.Table_01 t1
                LEFT JOIN Stage1.dbo.Table_01 t2 ON t1.col_01 = t2.col_01
            ) Inner_query ON Inner_query.Initial_col_38 = l2.col_39
        """
        df = pd.read_sql(query, engine)
        if df['check'].astype(str).str.lower().eq('true').any():
            results.append((f"Test no 6 Part 2 : School Level Mapping Validity Check for {table_name}", "PASS"))
        else:
            results.append((f"Test no 6 Part 2 : School Level Mapping Validity Check for {table_name}", "FAIL"))
        # Gender Mapping
        query = f"""
            SELECT DISTINCT CASE WHEN Inner_query.Final_col_27 = l3.col_27 THEN 'TRUE' ELSE 'FALSE' END AS [check]
            FROM Stage2.dbo.Lookup_03 l3
            RIGHT JOIN (
                SELECT DISTINCT t1.col_27 as Final_col_27, t2.col_40 as Initial_col_40
                FROM Stage3.dbo.Table_01 t1
                LEFT JOIN Stage1.dbo.Table_01 t2 ON t1.col_01 = t2.col_01
            ) Inner_query ON Inner_query.Initial_col_40 = l3.col_41
        """
        df = pd.read_sql(query, engine)
        if df['check'].astype(str).str.lower().eq('true').any():
            results.append((f"Test no 6 Part 3 : Gender Mapping Validity Check for {table_name}", "PASS"))
        else:
            results.append((f"Test no 6 Part 3 : Gender Mapping Validity Check for {table_name}", "FAIL"))
        # Medium Mapping
        query = f"""
            SELECT DISTINCT CASE WHEN Inner_query.Final_col_29 = l4.col_29 THEN 'TRUE' ELSE 'FALSE' END AS [check]
            FROM Stage2.dbo.Lookup_04 l4
            RIGHT JOIN (
                SELECT DISTINCT t1.col_29 as Final_col_29, t2.col_42 as Initial_col_42
                FROM Stage3.dbo.Table_01 t1
                LEFT JOIN Stage1.dbo.Table_01 t2 ON t1.col_01 = t2.col_01
            ) Inner_query ON Inner_query.Initial_col_42 = l4.col_43
        """
        df = pd.read_sql(query, engine)
        if df['check'].astype(str).str.lower().eq('true').any():
            results.append((f"Test no 6 Part 4 : Medium Mapping Validity Check for {table_name}", "PASS"))
        else:
            results.append((f"Test no 6 Part 4 : Medium Mapping Validity Check for {table_name}", "FAIL"))
        # Test no 7: Date format check
        column_name = 'col_15'
        query = f"SELECT {column_name} FROM Stage3.dbo.Table_01 WHERE {column_name} IS NOT NULL"
        df = pd.read_sql(query, engine)
        def is_yyyy_mm_dd_format(date_val):
            try:
                date_str = str(date_val)
                return date_str == datetime.strftime(pd.to_datetime(date_val), '%Y-%m-%d')
            except Exception:
                return False
        df['is_valid_format'] = df[column_name].apply(is_yyyy_mm_dd_format)
        not_formated_row = df[~df['is_valid_format']]
        if not not_formated_row.empty:
            results.append((f"Test no 7 Part 1 : Date format for column {column_name}, Check for {table_name}", "FAIL"))
        else:
            results.append((f"Test no 7 Part 1 : Date format for column {column_name}, Check for {table_name}", "PASS"))
    
    # ...repeat similar anonymization for Table_02 and Table_03 blocks...
    # ...existing code for teachers and learners, anonymized in the same way...
    
    # General Test: Validity of join between Table_02 and Table_01
    query = f"""SELECT COUNT (DISTINCT Table_02_PID) as row_count FROM (
            SELECT t1.col_01 as Table_02_PID, t2.col_01 as Table_01_PID
            FROM Stage3.dbo.Table_02 t1
            LEFT JOIN Stage3.dbo.Table_01 t2 ON t1.col_01 = t2.col_01
) X where Table_01_PID IS NULL"""
    query_result = pd.read_sql(query, engine)
    output = query_result.iloc[0]['row_count']
    if output == 0:
        results.append((f"General Test: Table_02-Table_01 join validity in Stage 3", "PASS"))
    else:
        results.append((f"General Test: Table_02-Table_01 join validity in Stage 3", "FAIL"))
    
    # General Test: Validity of join between Table_03 and Table_01
    query = f"""SELECT COUNT (DISTINCT Table_03_PID) as row_count FROM (
            SELECT t1.col_01 as Table_03_PID, t2.col_01 as Table_01_PID
            FROM Stage3.dbo.Table_03 t1
            LEFT JOIN Stage3.dbo.Table_01 t2 ON t1.col_01 = t2.col_01
) X where Table_01_PID IS NULL"""
    query_result = pd.read_sql(query, engine)
    output = query_result.iloc[0]['row_count']
    if output == 0:
        results.append((f"General Test: Table_03-Table_01 join validity in Stage 3", "PASS"))
    else:
        results.append((f"General Test: Table_03-Table_01 join validity in Stage 3", "FAIL"))
    
    output_path = "test_cases_results.txt"
    with open(output_path, "a", encoding="utf-8") as f:
        for description, status in results:
            f.write(f"{description} : {status}\n")

# Main Function 

In [None]:
# Importing Libraries
import requests
import pandas as pd
from sqlalchemy import create_engine
from requests.auth import HTTPBasicAuth
from tkinter import Tk, messagebox
from sqlalchemy import inspect
import logging
import sys
from sqlalchemy import text
from sqlalchemy.exc import SQLAlchemyError
from datetime import datetime
import mysql.connector
from mysql.connector import Error
import pyodbc
from tabulate import tabulate
from sqlalchemy.exc import SQLAlchemyError

# === Logging Setup === #
logger = logging.getLogger("data_pipeline_logger")
logger.setLevel(logging.DEBUG)  # Capture everything, including debug

# File handler (logs everything)
file_handler = logging.FileHandler("pipeline_logs.log")
file_handler.setLevel(logging.DEBUG)

# Console handler (logs info and above)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(logging.INFO)

# Log format
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)

# Add handlers to logger
logger.addHandler(file_handler)
logger.addHandler(console_handler)

# ***Data Server Credentials WARNING 
server = 'SERVER_NAME'
database = 'DB_NAME'
driver = 'ODBC Driver 17 for SQL Server'

logger.info("======================================================--Starting Main Script--======================================================")
logger.info("Starting DB Connection...")
try:
    conn_str = f"mssql+pyodbc://@{server}/{database}?driver={driver}&trusted_connection=yes"
    engine = create_engine(conn_str)
    with engine.connect():
        logger.info("DB Connection SQL Server connection successful.")
except Exception as e:
    logger.error(f"DB Connection Failed, Could not connect to SQL Server:\n{e}")
    sys.exit(1)

# Your API links
apis = [
    "http://API_BASE_URL/Entity01/GetData?Param01={Param01}&Param02={Param02}&Param03=4&Param04={Param04}",
    "http://API_BASE_URL/Entity02/GetData?Param01={Param01}&Param02={Param02}&Param03=4&Param04={Param04}",
    "http://API_BASE_URL/Entity03/GetData?Param01={Param01}&Param02={Param02}&Param03=4&Param04={Param04}",
    # ,'http://API_BASE_URL/Entity03/GetAttendance?Param01={Param01}&Param02={Param02}&Param03={Param03}&Param04={Param04}&Param05={Param05}'
]

# Corresponding tables names 
table_names = [
    'Table_01',
    'Table_02',
    'Table_03'
]

logger.info("Starting data pipeline...")

# Loop through the links and table names
for link, table_name in zip(apis, table_names):
    # ---==================    Getting Data From Source   ==================---
    data = throughAPI(link)  # Call your function with the current link

    # data = read_csv_to_dataframe("Table_01_data.csv")
    if not data:
        continue
    
    df = pd.DataFrame(data)

    # ---==================    Saving Data From Source   ==================---
    bulk_load = False
    savingData(engine, table_name, df, bulk_load)

    # ---==================    Generating the data summary   ==================---
    try:
        data_summary(engine, table_name)
    except Exception as e:
        logger.info(f"Data summaries Failed for {table_name} :", e)
        continue

transform(engine, aggregated_table=False)

for link, table_name in zip(apis, table_names):
    # ---==================    Moving Data   ==================---
    try:
        move_stage2_to_stage3(engine, table_name + '_Transform_step_2', table_name)
    except Exception as e:
        logger.info(f"Stage 2 to Stage 3 Failed for {table_name} :", e)
        continue

transform(engine, aggregated_table=True)

# For the Aggregated columns 
aggregated_tables = [
    'AggTable_01',
    'AggTable_02',
    'AggTable_03',
    'AggTable_04',
    'AggTable_05',
    'AggTable_06',
    'AggTable_07',
    'AggTable_08',
    'AggTable_09',
    'AggTable_10',
    'AggTable_11',
    'AggTable_12',
    'AggTable_13',
    'AggTable_14',
    'AggTable_15',
    'AggTable_16',
    'AggTable_17',
    'AggTable_18',
    'AggTable_19',
    'AggTable_20',
    'AggTable_21',
    'AggTable_22'
]
for table_name in aggregated_tables:
    # ---==================    Moving Data   ==================---
    move_stage2_to_stage3(engine, table_name + '_Transform_step_2', table_name, aggregated_table=True)

for link, table_name in zip(apis, table_names):
    # Run the test Cases
    logger.info(f"Running Test Cases for {table_name}.")

    try:
        test_cases(engine, table_name)
    except Exception as e:
        logger.info(f"One of the test cases Failed for {table_name} :", e)
        continue

logger.info("Pipeline Completed...")
sys.exit(0)