In [103]:
import boto3
import pandas as pd
from faker import Faker
import random
from io import StringIO
from datetime import datetime
import snowflake.connector
from dotenv import load_dotenv
import os

In [104]:
# Initialize Faker and AWS S3 client
fake = Faker()
s3 = boto3.client('s3')

# Load .env
load_dotenv()

True

In [105]:
s3 = boto3.client(
    "s3",
    endpoint_url="http://localhost:9100",
    aws_access_key_id="minioadmin",
    aws_secret_access_key="minioadmin",
    region_name="us-east-1"
)

In [106]:
def generate_cdc_order_data(num_rows):
    data = []
    for _ in range(num_rows):
        order = {
            'order_id': fake.uuid4(),
            'customer_id': fake.uuid4(),
            'order_date': fake.date_this_year(),
            'status': random.choice(['CREATED', 'SHIPPED', 'DELIVERED', 'CANCELLED']),
            'product_id': fake.uuid4(),
            'quantity': random.randint(1, 5),
            'price': round(random.uniform(10.0, 500.0), 2),
            'total_amount': 0.0,  # We'll calculate this next
            'cdc_timestamp': datetime.now()   # Simulate CDC timestamp
        }
        order['total_amount'] = round(order['quantity'] * order['price'], 2)
        data.append(order)

    # Convert to DataFrame
    df = pd.DataFrame(data)
    return df

In [107]:
# Define S3 bucket and file path
bucket_name = 'raw'
file_name = 'fake_cdc_order_data_12.csv' #can be parquet, csv etc.
num_rows = 150

# Function to upload data to S3
def upload_to_s3(bucket_name, file_name, df):
    csv_buffer = StringIO()
    df.to_csv(csv_buffer, index=False)
    s3.put_object(Bucket=bucket_name, Key=file_name, Body=csv_buffer.getvalue())
    print(f"Data uploaded to s3://{bucket_name}/{file_name}")

# Generate n rows of fake CDC order data
df_cdc_order_data = generate_cdc_order_data(num_rows)

# Upload the generated data to S3
upload_to_s3(bucket_name, file_name, df_cdc_order_data)

Data uploaded to s3://raw/fake_cdc_order_data_12.csv


In [108]:
s3.download_file("raw", file_name, f"/tmp/{file_name}")

In [109]:
sf = {
    "user": os.getenv("SNOWFLAKE_USER"),
    "password": os.getenv("SNOWFLAKE_PASSWORD"),
    "account": os.getenv("SNOWFLAKE_ACCOUNT"),
    "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
    "database": os.getenv("SNOWFLAKE_DATABASE"),
    "schema": os.getenv("SNOWFLAKE_SCHEMA"),
}

missing = [k for k, v in sf.items() if not v]
if missing:
    raise ValueError(f"Missing env vars: {missing}")

conn = snowflake.connector.connect(**sf)
cursor = conn.cursor()

sql = f"""
PUT file:///tmp/{file_name}
@DBT_ELT.STAGING.MY_INTERNAL_STAGE
AUTO_COMPRESS=TRUE
"""

print("Executing:", sql)
cursor.execute(sql)

cursor.close()
conn.close()

Executing: 
PUT file:///tmp/fake_cdc_order_data_12.csv
@DBT_ELT.STAGING.MY_INTERNAL_STAGE
AUTO_COMPRESS=TRUE

