In [None]:
pip install snowflake-connector-python pandas python-dotenv

In [1]:
import snowflake.connector
import pandas as pd
import os
from dotenv import load_dotenv

In [None]:
#store your snowflake credentials here first
'''
with open('.env', 'w') as f:
    f.write('SNOWFLAKE_USER=your_username\n') #replace with your user
    f.write('SNOWFLAKE_PASSWORD=your_password\n')
    f.write('SNOWFLAKE_ACCOUNT=your_account\n')
'''

In [2]:
load_dotenv(override=True)

#Snowflake connection parameters
SNOWFLAKE_ACCOUNT = os.environ.get('SNOWFLAKE_ACCOUNT')
SNOWFLAKE_USER = os.environ.get('SNOWFLAKE_USER')
SNOWFLAKE_PASSWORD = os.environ.get('SNOWFLAKE_PASSWORD')
SNOWFLAKE_DATABASE = "TEST"
SNOWFLAKE_SCHEMA = "CTI_2025_EXTRACTS"
SNOWFLAKE_WAREHOUSE = "COMPUTE_WH"
SNOWFLAKE_STAGE = "CTI_EXTRACT_STAGE"
TABLE_NAME = "INVITED_SPEAKERS_CHAIRS"
STAGING_TABLE_NAME = f"{TABLE_NAME}_staging"
CSV_FILE_PATH = r"C:\Users\maxwell.bicking\OneDrive - American Association for Cancer Res\cti_report.csv" #change this as needed

snowflake_file_path = CSV_FILE_PATH.replace("\\", "/")

csv_filename = os.path.basename(snowflake_file_path)

In [3]:
#Connect to Snowflake, will trigger 2FA
conn = snowflake.connector.connect(
    user=SNOWFLAKE_USER,
    password=SNOWFLAKE_PASSWORD,
    account=SNOWFLAKE_ACCOUNT,
    warehouse=SNOWFLAKE_WAREHOUSE,
    database=SNOWFLAKE_DATABASE,
    schema=SNOWFLAKE_SCHEMA
)
cur = conn.cursor()

try:
    #Step 0: Create/replace staging table 
    print("Creating or replacing staging table...")
    cur.execute(f"CREATE OR REPLACE TABLE {STAGING_TABLE_NAME} LIKE {TABLE_NAME};")

    #Step 1: Upload CSV file to Snowflake stage
    print(f"Uploading file '{csv_filename}' to Snowflake stage...")
    cur.execute(f"PUT 'file://{snowflake_file_path}' @{SNOWFLAKE_STAGE} OVERWRITE = TRUE;")
    
    #Check if the file is uploaded
    cur.execute(f"LIST @{SNOWFLAKE_STAGE};")
    uploaded_files = cur.fetchall()
    print("Uploaded files:", uploaded_files)
    
    if not uploaded_files:
        raise Exception(f"No files found in stage @{SNOWFLAKE_STAGE}")

    #Step 2: Truncate the staging table and load new data
    print("Truncating staging table and loading new data...")
    cur.execute(f"TRUNCATE TABLE {STAGING_TABLE_NAME};")

    #Create a file format explicitly for CSV and GZIP compression
    file_format = """
    CREATE OR REPLACE FILE FORMAT my_csv_format
    TYPE = 'CSV'
    FIELD_OPTIONALLY_ENCLOSED_BY = '"'
    SKIP_HEADER = 1
    COMPRESSION = 'GZIP'
    FIELD_DELIMITER = ','
    NULL_IF = ('NULL', 'null', '');
    """

    #Ensure that file format exists
    cur.execute(file_format)
    
    #Now copy the data using the correct file format
    copy_command = f"""
        COPY INTO {STAGING_TABLE_NAME}
        FROM @{SNOWFLAKE_STAGE}/{csv_filename}
        FILE_FORMAT = my_csv_format
        ON_ERROR = 'CONTINUE';
    """
    print(f"Executing COPY INTO command: {copy_command}")
    cur.execute(copy_command)

    #Step 3: Verify if data is in staging table
    print(f"Verifying data in staging table {STAGING_TABLE_NAME}...")
    cur.execute(f"SELECT COUNT(*) FROM {STAGING_TABLE_NAME};")
    rows_in_staging = cur.fetchone()[0]
    print(f"Rows in staging table: {rows_in_staging}")
    
    if rows_in_staging == 0:
        raise Exception(f"No data loaded into staging table {STAGING_TABLE_NAME}")

    #Step 4: Swap staging table with the main table (instant update)
    print("Swapping staging table with main table...")
    cur.execute(f"ALTER TABLE {TABLE_NAME} SWAP WITH {STAGING_TABLE_NAME};")

    print("Table successfully overwritten with new CSV data!")

except Exception as e:
    print(f"Error: {e}")

finally:
    cur.close()
    conn.close()

  warn(f"Bad owner or permissions on {str(filep)}{chmod_message}")


Creating or replacing staging table...
Uploading file 'cti_report.csv' to Snowflake stage...
Uploaded files: [('cti_extract_stage/cti_report.csv.gz', 113040, '4a066a6f6039435a3cf81fe92b954528', 'Tue, 25 Mar 2025 13:56:48 GMT')]
Truncating staging table and loading new data...
Executing COPY INTO command: 
        COPY INTO INVITED_SPEAKERS_CHAIRS_staging
        FROM @CTI_EXTRACT_STAGE/cti_report.csv
        FILE_FORMAT = my_csv_format
        ON_ERROR = 'CONTINUE';
    
Verifying data in staging table INVITED_SPEAKERS_CHAIRS_staging...
Rows in staging table: 1779
Swapping staging table with main table...
Table successfully overwritten with new CSV data!
