In [None]:
import pandas as pd
import logging
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError

# Configuration
# Use your own Configurations
POSTGRES_CONN_STR = "postgresql+psycopg2://username:password@localhost:5432/dbname"
SNOWFLAKE_CONN_STR = (
    "snowflake://username:password@account_identifier.region.cloud_provider/database/schema"
    "?warehouse=warehouse_name&role=role_name"
)
SOURCE_TABLE = 'Games'  # The table in PostgreSQL to extract from
TARGET_TABLE = 'Games'    # The table in Snowflake to load into

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def extract_data_from_postgres():
    """Extracts data from a PostgreSQL database and stores it in a DataFrame."""
    logging.info("Starting data extraction from PostgreSQL.")
    try:
        # Connect to PostgreSQL
        postgres_engine = create_engine(POSTGRES_CONN_STR)
        query = f"SELECT * FROM {SOURCE_TABLE}"
        
        # Load data into a DataFrame
        df = pd.read_sql(query, postgres_engine)
        logging.info(f"Data extraction complete. Extracted {len(df)} rows.")
        
        return df
    except SQLAlchemyError as e:
        logging.error("Error during data extraction from PostgreSQL", exc_info=True)
        raise e
    finally:
        postgres_engine.dispose()

def validate_data(df):
    """Validates the data before loading it into Snowflake."""
    logging.info("Validating data.")
    
    # Check for null values
    if df.isnull().values.any():
        raise ValueError("Data contains null values. Validation failed.")
    
    # Data type checks
    if not pd.api.types.is_integer_dtype(df['year']):
        raise ValueError("Year column contains non-integer values.")
    if not pd.api.types.is_string_dtype(df['round']):
        raise ValueError("Round column contains non-string values.")
    if not pd.api.types.is_string_dtype(df['winner']):
        raise ValueError("Winner column contains non-string values.")
    if not pd.api.types.is_string_dtype(df['opponent']):
        raise ValueError("Opponent column contains non-string values.")
    if not pd.api.types.is_integer_dtype(df['winner_goals']):
        raise ValueError("Winner_goals column contains non-integer values.")
    if not pd.api.types.is_integer_dtype(df['opponent_goals']):
        raise ValueError("Opponent_goals column contains non-integer values.")
    
    logging.info("Data validation passed.")
    return df

def load_data_to_snowflake(df):
    """Appends validated data to an existing Snowflake table."""
    logging.info("Loading data to Snowflake.")
    try:
        # Connect to Snowflake
        snowflake_engine = create_engine(SNOWFLAKE_CONN_STR)
        
        # Append data to the target table in Snowflake
        df.to_sql(
            name=TARGET_TABLE,
            con=snowflake_engine,
            index=False,
            if_exists='append',  # Appends data to the table
            method='multi'       # Enables batch inserts
        )
        
        logging.info("Data loading to Snowflake completed successfully.")
    except SQLAlchemyError as e:
        logging.error("Error during data loading to Snowflake", exc_info=True)
        raise e
    finally:
        snowflake_engine.dispose()

def main():
    """Orchestrates the ETL pipeline."""
    try:
        # Step 1: Extract data into a DataFrame
        df = extract_data_from_postgres()
        
        # Step 2: Validate data
        validate_data(df)
        
        # Step 3: Append the DataFrame to Snowflake table
        load_data_to_snowflake(df)
    
    except Exception as e:
        logging.error("Pipeline failed.", exc_info=True)

if __name__ == "__main__":
    main()
