In [10]:
import pandas as pd
import datetime

In [None]:
import pandas as pd
import pyodbc
import datetime

current_time = datetime.datetime.now()
load_year = current_time.year
load_month = current_time.month
load_day = current_time.day

landing_zone_path = f'../../Landing-Zone/Sales/Pizza/pizza_sales_{load_year}-{load_month}-{load_day}.csv'

try:
    df = pd.read_csv(landing_zone_path)
except FileNotFoundError:
    print(f"Error: The file {landing_zone_path} does not exist.")
    raise

df['src_file_name'] = landing_zone_path.split('/')[-1]
df['ingestion_date'] = current_time 

df['order_date'] = pd.to_datetime(df['order_date'], errors='coerce').dt.date
df['order_time'] = pd.to_datetime(df['order_time'], errors='coerce').dt.time

connection_string = r"DRIVER={ODBC Driver 17 for SQL Server};SERVER=LAPTOP-UEF1N3GJ\SQLEXPRESS;DATABASE=pizza_sales;Trusted_Connection=yes;"

try:
    conn = pyodbc.connect(connection_string)
    cursor = conn.cursor()
except pyodbc.Error as e:
    print("Error connecting to SQL Server:", e)
    raise

try:
    cursor.execute("SELECT schema_id FROM sys.schemas WHERE name = 'bronze'")
    if not cursor.fetchone():
        cursor.execute("CREATE SCHEMA bronze")
    conn.commit()
except pyodbc.Error as e:
    print("Error ensuring schema exists:", e)
    conn.rollback()

# SQL for creating the table
def generate_create_table_sql(schema, table_name, df):
    type_mapping = {
        "int64": "INT",
        "float64": "FLOAT",
        "object": "VARCHAR(255)",  
        "datetime64[ns]": "DATETIME",
        "date": "DATE",
        "time": "TIME"
    }

    columns = []
    for col, dtype in df.dtypes.items():
        if col == 'ingestion_date':
            sql_type = "DATETIME"
        else:
            sql_type = type_mapping.get(str(dtype), "VARCHAR(255)")
        columns.append(f"[{col}] {sql_type}")

    columns_sql = ",\n".join(columns)

    create_table_sql = f"""
    IF NOT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{schema}' AND TABLE_NAME = '{table_name}')
    BEGIN
        CREATE TABLE {schema}.{table_name} (
            {columns_sql}
        )
    END
    """
    return create_table_sql

# Create bronze.pizza_sales
try:
    create_table_sql = generate_create_table_sql("bronze", "pizza_sales", df)
    cursor.execute(create_table_sql)
    conn.commit()
    print("Table bronze.pizza_sales created (or already exists).")
except pyodbc.Error as e:
    print("Error creating table:", e)
    conn.rollback()

try:
    for index, row in df.iterrows():
        values = tuple(row)
        placeholders = ','.join('?' * len(row))
        sql = f"INSERT INTO bronze.pizza_sales VALUES ({placeholders})"
        cursor.execute(sql, values)

    conn.commit()
    print("Data inserted into bronze.pizza_sales.")
except pyodbc.Error as e:
    print("Error inserting data:", e)
    conn.rollback()

cursor.execute("""
SELECT COLUMN_NAME, DATA_TYPE 
FROM INFORMATION_SCHEMA.COLUMNS 
WHERE TABLE_NAME = 'pizza_sales' AND TABLE_SCHEMA = 'bronze';
""")
columns_info = cursor.fetchall()
for column in columns_info:
    print(f"Column: {column[0]}, Data Type: {column[1]}")

try:
    cursor.execute("""
    UPDATE bronze.pizza_sales
    SET order_date = '1900-01-01'
    WHERE order_date IS NULL OR TRY_CAST(order_date AS DATE) IS NULL;

    UPDATE bronze.pizza_sales
    SET order_time = '00:00:00'
    WHERE order_time IS NULL OR TRY_CAST(order_time AS TIME) IS NULL;
    """)
    conn.commit()
except pyodbc.Error as e:
    print("Error cleaning data:", e)
    conn.rollback()

# table columns from VARCHAR to DATE and TIME
try:
    cursor.execute("ALTER TABLE bronze.pizza_sales ALTER COLUMN order_date DATE")
    cursor.execute("ALTER TABLE bronze.pizza_sales ALTER COLUMN order_time TIME")
    conn.commit()
    print("Table columns altered successfully.")
except pyodbc.Error as e:
    print("Error altering table columns:", e)
    conn.rollback()

# data types after alteration
cursor.execute("""
SELECT COLUMN_NAME, DATA_TYPE 
FROM INFORMATION_SCHEMA.COLUMNS 
WHERE TABLE_NAME = 'pizza_sales' AND TABLE_SCHEMA = 'bronze';
""")
columns_info = cursor.fetchall()
for column in columns_info:
    print(f"Column: {column[0]}, Data Type: {column[1]}")

# Close the connection
cursor.close()
conn.close()


  df['order_time'] = pd.to_datetime(df['order_time'], errors='coerce').dt.time


Table bronze.pizza_sales created (or already exists).
Data inserted into bronze.pizza_sales.
Column: Unnamed: 0, Data Type: int
Column: pizza_id, Data Type: int
Column: order_id, Data Type: int
Column: pizza_name_id, Data Type: varchar
Column: quantity, Data Type: int
Column: order_date, Data Type: date
Column: order_time, Data Type: time
Column: unit_price, Data Type: float
Column: total_price, Data Type: float
Column: pizza_size, Data Type: varchar
Column: pizza_category, Data Type: varchar
Column: pizza_ingredients, Data Type: varchar
Column: pizza_name, Data Type: varchar
Column: src_file_name, Data Type: varchar
Column: ingestion_date, Data Type: datetime
Table columns altered successfully.
Column: Unnamed: 0, Data Type: int
Column: pizza_id, Data Type: int
Column: order_id, Data Type: int
Column: pizza_name_id, Data Type: varchar
Column: quantity, Data Type: int
Column: order_date, Data Type: date
Column: order_time, Data Type: time
Column: unit_price, Data Type: float
Column: to

##### FOR PREVIOUS DATE

In [12]:
# import pandas as pd
# import pyodbc
# import datetime

# # Set to yesterday's date (4 March 2025)
# yesterday = datetime.datetime(2025, 3, 4)

# # Define the path to the CSV file
# landing_zone_path = f'../../Landing-Zone/Sales/Pizza/pizza_sales_2025-3-4.csv'

# # Load the CSV file
# try:
#     df = pd.read_csv(landing_zone_path)
#     print("CSV loaded successfully.")
# except FileNotFoundError:
#     print(f"Error: The file {landing_zone_path} does not exist.")
#     raise
# except Exception as e:
#     print(f"Error reading the CSV file: {e}")
#     raise

# # Check the loaded DataFrame
# print("Data loaded from CSV:")
# print(df.head())
# print("DataFrame columns:", df.columns)

# # Add source filename and ingestion date
# df['src_file_name'] = landing_zone_path.split('/')[-1]
# df['ingestion_date'] = yesterday

# # Correct connection string
# connection_string = r"DRIVER={ODBC Driver 17 for SQL Server};SERVER=LAPTOP-UEF1N3GJ\SQLEXPRESS;DATABASE=pizza_sales;Trusted_Connection=yes;"

# # Connect to SQL Server
# try:
#     conn = pyodbc.connect(connection_string)
#     cursor = conn.cursor()
# except pyodbc.Error as e:
#     print("Error connecting to SQL Server:", e)
#     raise

# # Ensure the bronze schema exists
# try:
#     cursor.execute("IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = 'bronze') BEGIN CREATE SCHEMA bronze; END")
#     conn.commit()
# except pyodbc.Error as e:
#     print("Error ensuring schema exists:", e)
#     conn.rollback()

# # Generate SQL for creating the table
# def generate_create_table_sql(schema, table_name, df):
#     type_mapping = {
#         "int64": "INT",
#         "float64": "FLOAT",
#         "object": "VARCHAR(255)",   # Most CSV columns are strings
#         "datetime64[ns]": "DATETIME",
#         "date": "DATE"              # For date columns
#     }

#     columns = []
#     for col, dtype in df.dtypes.items():
#         # Special handling for ingestion_date to ensure it's DATETIME
#         if col == 'ingestion_date':
#             sql_type = "DATETIME"
#         else:
#             sql_type = type_mapping.get(str(dtype), "VARCHAR(255)")
#         columns.append(f"[{col}] {sql_type}")

#     columns_sql = ",\n".join(columns)

#     create_table_sql = f"""
#     IF OBJECT_ID('{schema}.{table_name}', 'U') IS NULL
#     BEGIN
#         CREATE TABLE {schema}.{table_name} (
#             {columns_sql}
#         )
#     END
#     """
#     return create_table_sql

# # Create the bronze.pizza_sales table
# try:
#     create_table_sql = generate_create_table_sql("bronze", "pizza_sales", df)
#     cursor.execute(create_table_sql)
#     conn.commit()
#     print("Table bronze.pizza_sales created (or already exists).")
# except pyodbc.Error as e:
#     print("Error creating table:", e)
#     conn.rollback()

# # Insert data into the table
# try:
#     for index, row in df.iterrows():
#         values = tuple(row)
#         placeholders = ','.join('?' * len(row))
#         sql = f"INSERT INTO bronze.pizza_sales VALUES ({placeholders})"
#         cursor.execute(sql, values)

#     conn.commit()
#     print("Data inserted into bronze.pizza_sales.")
# except pyodbc.Error as e:
#     print("Error inserting data:", e)  # Print specific error message
#     conn.rollback()

# # Step 1: Check current data types
# cursor.execute("""
# SELECT COLUMN_NAME, DATA_TYPE 
# FROM INFORMATION_SCHEMA.COLUMNS 
# WHERE TABLE_NAME = 'pizza_sales' AND TABLE_SCHEMA = 'bronze';
# """)
# columns_info = cursor.fetchall()
# for column in columns_info:
#     print(f"Column: {column[0]}, Data Type: {column[1]}")

# # Step 2: Validate and clean data
# try:
#     cursor.execute("""
#     UPDATE bronze.pizza_sales
#     SET order_date = '1900-01-01' -- Default date for invalid entries
#     WHERE TRY_CAST(order_date AS DATE) IS NULL;

#     UPDATE bronze.pizza_sales
#     SET order_time = '00:00:00' -- Default time for invalid entries
#     WHERE TRY_CAST(order_time AS TIME) IS NULL;

#     -- Commit changes
#     """)
#     conn.commit()
# except pyodbc.Error as e:
#     print("Error cleaning data:", e)
#     conn.rollback()

# # Step 3: Alter the table columns from VARCHAR to DATE and TIME
# try:
#     cursor.execute("ALTER TABLE bronze.pizza_sales ALTER COLUMN order_date DATE")
#     cursor.execute("ALTER TABLE bronze.pizza_sales ALTER COLUMN order_time TIME")
#     conn.commit()
#     print("Table columns altered successfully.")
# except pyodbc.Error as e:
#     print("Error altering table columns:", e)
#     conn.rollback()

# # Step 4: Verify the data types after alteration
# cursor.execute("""
# SELECT COLUMN_NAME, DATA_TYPE 
# FROM INFORMATION_SCHEMA.COLUMNS 
# WHERE TABLE_NAME = 'pizza_sales' AND TABLE_SCHEMA = 'bronze';
# """)
# columns_info = cursor.fetchall()
# for column in columns_info:
#     print(f"Column: {column[0]}, Data Type: {column[1]}")

# # Check for data in SQL Server
# cursor.execute("SELECT COUNT(*) FROM bronze.pizza_sales WHERE ingestion_date = '2025-03-04'")
# count = cursor.fetchone()[0]
# print(f"Number of records for 4 March 2025: {count}")

# # Close the connection
# cursor.close()
# conn.close()