In [None]:
from boxsdk import JWTAuth, Client
import io
import os
import polars as pl
import psycopg2

# Authentication and Client Setup
def authenticate_box():
    config = JWTAuth.from_settings_file("Token/BOX_TOKEN.json")
    client = Client(config)
    user = client.user().get()
    print(f"Authenticated as {user.name}")
    return client

# Search for all CSV files in the folder
def find_csv_files(client, folder_id='287803137437'):
    csv_files = []
    items = client.folder(folder_id).get_items(limit=100)
    for item in items:
        if item.type == 'file' and item.name.endswith('.csv'):
            print(f'CSV File Found: {item.name}')
            csv_files.append(item)
    if not csv_files:
        print("No CSV files found.")
    return csv_files

# Download and load file into a Polars DataFrame
def download_csv_file(file_item):
    if file_item:
        file_stream = io.BytesIO()
        file_item.download_to(file_stream)
        file_stream.seek(0)
        df = pl.read_csv(file_stream)
        return df
    return None

# Check if table exists in PostgreSQL
def check_table_exists(cursor, schema, table):
    cursor.execute("""
        SELECT EXISTS (
            SELECT 1 
            FROM information_schema.tables 
            WHERE table_schema = %s AND table_name = %s
        );
    """, (schema, table))
    return cursor.fetchone()[0]

# Append DataFrame to PostgreSQL table without using execute_values
def append_data_to_db(df, schema, table, db_params):
    try:
        conn = psycopg2.connect(**db_params)
        cursor = conn.cursor()

        if not check_table_exists(cursor, schema, table):
            print(f"Table {schema}.{table} does not exist.")
            return False  # Exit early if the table doesn't exist

        # Create a temporary table
        temp_table = f"{table}_temp"
        create_temp_table_query = f"CREATE TEMP TABLE {temp_table} (LIKE {schema}.{table} INCLUDING ALL);"
        cursor.execute(create_temp_table_query)

        # Prepare columns for insertion
        columns = [f'"{col}"' for col in df.columns]
        columns_str = ', '.join(columns)
        values_placeholder = ', '.join(['%s'] * len(df.columns))
        insert_temp_query = f"INSERT INTO {temp_table} ({columns_str}) VALUES ({values_placeholder})"

        # Convert DataFrame to list of tuples
        data_tuples = [tuple(row) for row in df.to_numpy()]

        # Use executemany to insert data into the temporary table
        cursor.executemany(insert_temp_query, data_tuples)

        # Insert new records into the main table using NOT EXISTS
        join_conditions = ' AND '.join([f'main."{col}" = temp."{col}"' for col in df.columns])
        insert_main_query = f"""
            INSERT INTO {schema}.{table} ({columns_str})
            SELECT {columns_str} FROM {temp_table} temp
            WHERE NOT EXISTS (
                SELECT 1 FROM {schema}.{table} main
                WHERE {join_conditions}
            );
        """
        cursor.execute(insert_main_query)

        # Drop the temporary table
        cursor.execute(f"DROP TABLE {temp_table}")

        # Commit the transaction
        conn.commit()
        print(f"Inserted new records into {schema}.{table}.")
        return True  # Indicate success

    except Exception as e:
        print(f"An error occurred while appending data to {schema}.{table}: {e}")
        return False  # Indicate failure

    finally:
        cursor.close()
        conn.close()

# Move file to 'imported' subfolder on Box
def move_file_to_imported(client, file_item):
    imported_folder_id = '287805162509'  # ID of the 'imported' subfolder
    try:
        # Move the file to the imported folder
        imported_folder = client.folder(imported_folder_id)
        file_item.move(imported_folder)
        print(f"File '{file_item.name}' moved to 'imported' folder.")
    except Exception as e:
        print(f"Failed to move file '{file_item.name}': {e}")

# Main execution
if __name__ == "__main__":
    client = authenticate_box()
    csv_files = find_csv_files(client)

    if not csv_files:
        print("No CSV files to process.")
    else:
        db_params = {
            'host': 'localhost',
            'port': 5432,
            'dbname': 'mbta_dw',
            'user': 'opmi_etl',
            'password': 'postgres' 
        }

        for file_item in csv_files:  # Iterate through each found CSV file
            df = download_csv_file(file_item)  # Download each file

            if df is not None:
                # Extract schema and table name from the file name
                file_name = os.path.basename(file_item.name)  # Get the base file name
                schema_table = file_name[:-4].split('.')  # Remove '.csv' and split
                if len(schema_table) != 2:
                    print(f"File name '{file_item.name}' does not match expected format 'schema.table.csv'. Skipping.")
                    continue
                schema, table = schema_table

                # Append data to the database
                if append_data_to_db(df, schema, table, db_params):
                    move_file_to_imported(client, file_item)  # Move the file if successful
                else:
                    print(f"Failed to append data for file '{file_item.name}'; file will not be moved.")


## SQL table creation (not used, table creation already in docker)

In [None]:
from psycopg2 import sql

# Database connection parameters
db_params = {
    'host': 'localhost',
    'port': 5432,
    'dbname': 'mbta_dw',
    'user': 'opmi_etl',
    'password': 'postgres'  # Replace with the actual password
}

try:
    # Connect to the PostgreSQL database
    conn = psycopg2.connect(**db_params)
    cursor = conn.cursor()

    # Create 'csat' table in 'surveys' schema
    create_table_query = sql.SQL("""
        CREATE TABLE IF NOT EXISTS surveys.csat (
                survey_date DATE,
                survey_name VARCHAR(255),
                question_description TEXT,
                response_total INTEGER,
                response_1_text VARCHAR(255),
                response_1_percent FLOAT,
                response_2_text VARCHAR(255),
                response_2_percent FLOAT,
                response_3_text VARCHAR(255),
                response_3_percent FLOAT,
                response_4_text VARCHAR(255),
                response_4_percent FLOAT,
                response_5_text VARCHAR(255),
                response_5_percent FLOAT,
                response_6_text VARCHAR(255),
                response_6_percent FLOAT,
                response_7_text VARCHAR(255),
                response_7_percent FLOAT
        )
    """)

    # Execute the create table query
    cursor.execute(create_table_query)

    # Commit the changes
    conn.commit()

    print("Table 'csat' created successfully in 'surveys' schema.")

except Exception as e:
    print("Error while connecting to PostgreSQL or creating table:", e)

finally:
    # Close the database connection
    if cursor:
        cursor.close()
    if conn:
        conn.close()