In [2]:
# Import necessary libraries
import snowflake.connector
import glob
from tqdm import tqdm  # Import tqdm for progress bar

In [7]:
# Snowflake connection parameters
ACCOUNT = "voucimo-atb07951"
USER = "DNIERMAN"  # Update with your Snowflake username!
PASSWORD = "OnyxTheGreat2!"  # update with your Snowflake password!
WAREHOUSE = "purchase_orders_wh"
DATABASE = "purchase_orders_db"
SCHEMA = "purchase_orders_schema"
TABLE = "purchase_orders"
CSV_DIR = "./Data/Monthly_PO_Data"
STAGE = "monthly_po_stage"
FILE_FMT = "MONTHLY_PO_CSV_FMT"


In [8]:
# Snowflake connection
conn = snowflake.connector.connect(user=USER, password=PASSWORD, account=ACCOUNT)
cs = conn.cursor()

In [10]:
# Create Snowflake warehouse, database, schema and staging area
try:
    # Create a warehouse
    cs.execute(f"CREATE WAREHOUSE IF NOT EXISTS {WAREHOUSE};")
    print(f"✅ Warehouse '{WAREHOUSE}' created or replaced.")
except Exception as e:
    print(f"❌ Failed to create warehouse '{WAREHOUSE}': {e}")

try:
    # Create a database
    cs.execute(f"CREATE OR REPLACE DATABASE {DATABASE};")
    print(f"✅ Database '{DATABASE}' created or replaced.")
except Exception as e:
    print(f"❌ Failed to create database '{DATABASE}': {e}")

try:
    # Create a schema
    cs.execute(f"CREATE OR REPLACE SCHEMA {DATABASE}.{SCHEMA};")
    print(f"✅ Schema '{SCHEMA}' created or replaced in database '{DATABASE}'.")
except Exception as e:
    print(f"❌ Failed to create schema '{SCHEMA}' in database '{DATABASE}': {e}")

try:
    # Create a staging area in Snowflake
    cs.execute(f"CREATE OR REPLACE STAGE {DATABASE}.{SCHEMA}.{STAGE};")
    print(
        f"Stage '{STAGE}' created or replaced in schema '{SCHEMA}' of database '{DATABASE}'."
    )
except Exception as e:
    print(
        f"❌ Failed to create stage '{STAGE}' in schema '{SCHEMA}' of database '{DATABASE}': {e}"
    )


✅ Warehouse 'purchase_orders_wh' created or replaced.
✅ Database 'purchase_orders_db' created or replaced.
✅ Schema 'purchase_orders_schema' created or replaced in database 'purchase_orders_db'.
Stage 'monthly_po_stage' created or replaced in schema 'purchase_orders_schema' of database 'purchase_orders_db'.


In [11]:
# Upload to Snowflake stage

# Path to the CSV files
csv_files = glob.glob(f"{CSV_DIR}/*.csv")

# Check if any CSV files were found
if not csv_files:
    print("No CSV files found in the specified directory.")
else:
    failed_uploads = []  # List to store files that failed to upload

    # Use tqdm to create a progress bar
    for file_path in tqdm(csv_files, desc="Uploading files", unit="file"):
        put_command = (
            f"PUT 'file://{file_path}' @{DATABASE}.{SCHEMA}.{STAGE} OVERWRITE = TRUE"
        )
        try:
            cs.execute(put_command)
        except Exception as upload_error:
            failed_uploads.append((file_path, str(upload_error)))

    # Print summary of failed uploads
    if failed_uploads:
        print("\nThe following files failed to upload:")
        for file_path, error in failed_uploads:
            print(f"  - {file_path}: {error}")
    else:
        print("\nAll files uploaded successfully.")


Uploading files: 100%|██████████| 41/41 [00:14<00:00,  2.75file/s]


All files uploaded successfully.





In [13]:
try:
    cs.execute(f"""
        CREATE OR REPLACE TABLE {DATABASE}.{SCHEMA}.{TABLE} (
            PurchaseOrderID INTEGER,
            SupplierID INTEGER,
            OrderDate DATE,
            DeliveryMethodID INTEGER,
            ContactPersonID INTEGER,
            ExpectedDeliveryDate DATE,
            SupplierReference STRING,
            IsOrderFinalized BOOLEAN,
            Comments STRING,
            InternalComments STRING,
            LastEditedBy INTEGER,
            LastEditedWhen TIMESTAMP,
            PurchaseOrderLineID INTEGER,
            StockItemID INTEGER,
            OrderedOuters INTEGER,
            Description STRING,
            ReceivedOuters INTEGER,
            PackageTypeID INTEGER,
            ExpectedUnitPricePerOuter FLOAT,
            LastReceiptDate DATE,
            IsOrderLineFinalized BOOLEAN,
            Right_LastEditedBy INTEGER,
            Right_LastEditedWhen TIMESTAMP
        )
    """)
    print("Table created successfully!")
except Exception as e:
    print(f"Error creating table: {e}")


Table created successfully!


In [27]:
try:
    cs.execute(f"""
        CREATE OR REPLACE FILE FORMAT {DATABASE}.{SCHEMA}.{FILE_FMT}
          TYPE = 'CSV'
          SKIP_HEADER = 1
          FIELD_DELIMITER = ','
          FIELD_OPTIONALLY_ENCLOSED_BY = '"'
          NULL_IF = ('NULL','null')
          EMPTY_FIELD_AS_NULL = TRUE
          COMPRESSION = 'AUTO'
          DATE_FORMAT = 'AUTO'
          TIMESTAMP_FORMAT = 'AUTO'
    """)
    print("File format created (CSV, SKIP_HEADER=1).")
except Exception as e:
    print(f"Error creating file format: {e}")


File format created (CSV, SKIP_HEADER=1).


In [None]:
from tqdm import tqdm

# Clear table first
cs.execute(f"TRUNCATE TABLE {DATABASE}.{SCHEMA}.{TABLE}")

# Count CSV files in stage
cs.execute(f"LIST @{DATABASE}.{SCHEMA}.{STAGE}")
stage_files = [
    row[0] for row in cs.fetchall() if row[0].lower().endswith((".csv", ".csv.gz"))
]
stage_count = len(stage_files)

# COPY with explicit casts via SELECT
copy_sql = f"""
COPY INTO {DATABASE}.{SCHEMA}.{TABLE}
(
  PurchaseOrderID,
  SupplierID,
  OrderDate,
  DeliveryMethodID,
  ContactPersonID,
  ExpectedDeliveryDate,
  SupplierReference,
  IsOrderFinalized,
  Comments,
  InternalComments,
  LastEditedBy,
  LastEditedWhen,
  PurchaseOrderLineID,
  StockItemID,
  OrderedOuters,
  Description,
  ReceivedOuters,
  PackageTypeID,
  ExpectedUnitPricePerOuter,
  LastReceiptDate,
  IsOrderLineFinalized,
  Right_LastEditedBy,
  Right_LastEditedWhen
)
FROM (
  SELECT
    $1::INT  AS PurchaseOrderID,                -- Example: 1
    $2::INT  AS SupplierID,                     -- Example: 2

    -- OrderDate ($3) with invalid-date fallback for leap years
    CASE
  WHEN REPLACE(TRIM(SPLIT_PART($3, ' ', 1)), '\r','') = '2/29/2022'
    THEN TO_DATE('2/28/2022', 'MM/DD/YYYY')
  ELSE TO_DATE(REPLACE(TRIM(SPLIT_PART($3, ' ', 1)), '\r',''), 'MM/DD/YYYY')
END AS OrderDate,

    $4::INT  AS DeliveryMethodID,               -- Example: 9
    $5::INT  AS ContactPersonID,                -- Example: 2

    -- ExpectedDeliveryDate ($6) with invalid-date fallback for leap years
    CASE
  WHEN REPLACE(TRIM(SPLIT_PART($6, ' ', 1)), '\r','') = '2/29/2022'
    THEN TO_DATE('2/28/2022', 'MM/DD/YYYY')
  ELSE TO_DATE(REPLACE(TRIM(SPLIT_PART($6, ' ', 1)), '\r',''), 'MM/DD/YYYY')
END AS ExpectedDeliveryDate,

    $7       AS SupplierReference,              -- Example: B2084020
    ($8 IN ('1','TRUE','true'))::BOOLEAN AS IsOrderFinalized, -- Example: 1 → TRUE
    $9       AS Comments,                       -- Example: NULL
    $10      AS InternalComments,               -- Example: NULL
    $11::INT AS LastEditedBy,                   -- Example: 6

    -- Timestamps cleaned
    TRY_TO_TIMESTAMP_NTZ(REPLACE(TRIM($12), '\r','')) AS LastEditedWhen,   -- Example: 1/2/2019 7:00

    $13::INT AS PurchaseOrderLineID,            -- Example: 1
    $14::INT AS StockItemID,                    -- Example: 150
    $15::INT AS OrderedOuters,                  -- Example: 18
    $16      AS Description,                    -- Example: Pack of 12 action figures (variety)
    $17::INT AS ReceivedOuters,                 -- Example: 18
    $18::INT AS PackageTypeID,                  -- Example: 9
    $19::FLOAT AS ExpectedUnitPricePerOuter,    -- Example: 5.5

    -- LastReceiptDate ($20) with invalid-date fallback for leap years
    CASE
  WHEN REPLACE(TRIM(SPLIT_PART($20, ' ', 1)), '\r','') = '2/29/2022'
    THEN TO_DATE('2/28/2022', 'MM/DD/YYYY')
  ELSE TO_DATE(REPLACE(TRIM(SPLIT_PART($20, ' ', 1)), '\r',''), 'MM/DD/YYYY')
END AS LastReceiptDate,

    ($21 IN ('1','TRUE','true'))::BOOLEAN AS IsOrderLineFinalized, -- Example: 1 → TRUE
    $22::INT AS Right_LastEditedBy,             -- Example: 6
    TRY_TO_TIMESTAMP_NTZ(REPLACE(TRIM($23), '\r','')) AS Right_LastEditedWhen  -- Example: 1/2/2019 7:00
  FROM @{DATABASE}.{SCHEMA}.{STAGE} (
    FILE_FORMAT => '{DATABASE}.{SCHEMA}.{FILE_FMT}',
    PATTERN => '.*\\.csv(\\.gz)?$'
  )
)

ON_ERROR = 'ABORT_STATEMENT';
"""
try:
    cs.execute(copy_sql)

    # Fetch COPY results (one row per file)
    copy_results = []
    with tqdm(total=stage_count, desc="Loading CSVs", unit="file") as pbar:
        while True:
            row = cs.fetchone()
            if not row:
                break
            copy_results.append(row)
            pbar.update(1)

    load_count = len(copy_results)
    if load_count == stage_count:
        print(
            f"\n✅ Success: {load_count} files loaded, matches {stage_count} in stage."
        )
    else:
        print(f"\n⚠️ Mismatch: {load_count} loaded vs {stage_count} files in stage.")

except Exception as e:
    print(f"❌ Error running COPY INTO: {e}")


Loading CSVs: 100%|██████████| 41/41 [00:00<00:00, 322638.77file/s]


✅ Success: 41 files loaded, matches 41 in stage.



