In [1]:


config.py
-----
# import needed library
import os
from dotenv import load_dotenv

# Load the .env file
load_dotenv()

# dictionary of urls to read data
urls = {
    "movies": "https://drive.google.com/file/d/188tIKLJKek62rGmzj1Ylc03fe4Pgb5co/view?usp=drive_link",
    "ratings": "https://drive.google.com/file/d/1-3S-XOgZyo9D3sVoXtjPvmFdsihjfQhN/view?usp=drive_link",
    "users": "https://drive.google.com/file/d/1_wAww5beF2K7dpx-SU_gUUddNWeaeZqv/view?usp=drive_link"
}

# dictionary of file columns
expected_columns = {
    "movies": ['item_id', 'movie_title', 'release_date', 'IMDb_URL', 'primary_genre'],
    "ratings": ['user_id', 'item_id', 'rating', 'timestamp'],
    "users": ['user_id', 'age', 'gender', 'occupation', 'zip_code']
}

# date columns to process
date_columns = {
    "movies": "release_date",
    "ratings": "timestamp"
}

# AWS S3 configuration keys
AWS_ACCESS_KEY_ID = os.getenv("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = os.getenv("AWS_SECRET_ACCESS_KEY")
AWS_REGION = os.getenv("AWS_REGION", "us-east-1")

S3_BUCKET_BRONZE = os.getenv("S3_BUCKET_BRONZE")
S3_BUCKET_SILVER = os.getenv("S3_BUCKET_SILVER")
S3_BUCKET_GOLD = os.getenv("S3_BUCKET_GOLD")
WATERMARKS_PATH = "watermarks/watermarks.parquet"  # JSON preferred over parquet for metadata

# Postgres configuration keys
POSTGRES_CONFIG = {
    "host": os.getenv("POSTGRES_HOST", "localhost"),
    "port": os.getenv("POSTGRES_PORT", "5432"),
    "database": os.getenv("POSTGRES_DB", "movie_rating_database"),
    "user": os.getenv("POSTGRES_USER"),
    "password": os.getenv("POSTGRES_PASSWORD")
}

# orchestration
pipeline_schedule = {
    "frequency": "weekly",  # can be "daily", "weekly", etc.
    "day_of_week": "sunday"  # if weekly
}


utils/utils.py
-----
import logging
import os
from datetime import datetime

def get_logger(name: str, log_level=logging.INFO) -> logging.Logger:
    """
    Creates and returns a logger instance with console and file handlers.
    
    Args:
        name (str): The name of the logger, __name__.
        log_level (int): Logging level, default is INFO.

    Returns:
        logging.Logger: Configured logger instance.
    """

    # create a logs folder if it doesn't exist
    os.makedirs("logs", exist_ok=True)

    # define the log filename with timestamp
    log_filename = f"logs/{datetime.now().strftime('%Y-%m-%d')}.log"

    # create a logger
    logger = logging.getLogger(name)
    logger.setLevel(log_level)

    # prevent duplicating of logging
    if not logger.handlers:
        
        # Create formatter
        formatter = logging.Formatter(
            "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
        )

        # Console Handler
        console_handler = logging.StreamHandler()       
        console_handler.setFormatter(formatter)
        logger.addHandler(console_handler)

        # File Handler
        file_handler = logging.FileHandler(log_filename)
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)

    return logger


s3_client.py
----------
import boto3
import pandas as pd
from io import BytesIO
from botocore.exceptions import ClientError
from config import AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION
from utils.utils import get_logger

# Initialize Logger
logger = get_logger(__name__)


def initialize_s3_client():
    """
    Initialize and return a boto3 S3 client instance.
    """
    try:
        client = boto3.client(
            "s3",
            aws_access_key_id=AWS_ACCESS_KEY_ID,
            aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
            region_name=AWS_REGION,
        )
        logger.info("Successfully initialized S3 client.")
        return client

    except Exception as e:
        logger.error(f"Failed to initialize S3 client: {str(e)}")
        raise


def create_bucket(client, bucket_name):
    """
    Create an S3 bucket if it doesn't exist already.
    """
    try:
        # Check if bucket exists
        existing_buckets = client.list_buckets()
        if not any(b["Name"] == bucket_name for b in existing_buckets["Buckets"]):
            client.create_bucket(
                Bucket=bucket_name,
                CreateBucketConfiguration={"LocationConstraint": AWS_REGION},
            )
            logger.info(f"Bucket '{bucket_name}' created successfully.")
        else:
            logger.info(f"Bucket '{bucket_name}' already exists.")

    except ClientError as e:
        logger.error(f"Bucket creation error: {str(e)}")
        raise


def upload_file(client, bucket_name, object_name, file_path):
    """
    Upload a file to a specified S3 bucket.
    """
    try:
        client.upload_file(file_path, bucket_name, object_name)
        logger.info(
            f"File '{file_path}' uploaded to bucket '{bucket_name}' as '{object_name}'."
        )

    except ClientError as e:
        logger.error(f"Upload error: {str(e)}")
        raise


def download_file(client, bucket_name, object_name, file_path):
    """
    Download a file from S3 bucket to local path.
    """
    try:
        client.download_file(bucket_name, object_name, file_path)
        logger.info(
            f"File '{object_name}' downloaded from bucket '{bucket_name}' to '{file_path}'."
        )

    except ClientError as e:
        logger.error(f"Download error: {str(e)}")
        raise


def read_file(client, bucket_name, object_name):
    """
    Read a file (CSV/Parquet) from S3 bucket into a DataFrame.
    """
    try:
        response = client.get_object(Bucket=bucket_name, Key=object_name)
        logger.info(f"Reading file '{object_name}' from bucket '{bucket_name}'.")

        # Guess file type from extension
        if object_name.endswith(".parquet"):
            df = pd.read_parquet(BytesIO(response["Body"].read()))
        else:
            df = pd.read_csv(BytesIO(response["Body"].read()))

        logger.info(f"File '{object_name}' successfully read into DataFrame.")
        return df

    except ClientError as e:
        logger.error(f"Read error: {str(e)}")
        raise


bronze/ingest.py
-----------
# importing libraries/modules
import pandas as pd
from utils.utils import get_logger

# initialize logger
logger = get_logger(__name__)

# function to read files from source (drive)
def read_files(urls: dict) -> dict: 
    """
    Reads a list of the 3 file paths into dataframes

    Parameters:
        urls: dicyionary of file names and their urls

    Returns:
        dict: dictionary of file names and created dataframes
    """
    dataframes = {}

    for file, url in urls.items():
        try:
            file_id = url.split('/')[-2]
            direct_url = f"https://drive.usercontent.google.com/download?id={file_id}&export=download&authuser=0&confirm=t"
            
            logger.info(f"Reading {file}...")
            
            df = pd.read_csv(direct_url)
            dataframes[file] = df
            
            logger.info(f"Successfully read {file}.")

        except Exception as e:
            logger.info(f"Failed to read {file}. Error: {e}")

    return dataframes

if __name__ == "__main__":
    pass


bronze/validation.py
-----------
# importing libraries/modules
from utils.utils import get_logger

# initialize logger
logger = get_logger(__name__)

# function to check columns
def validate_columns(dataframes: dict, expected_cols: dict):
    """
    Validates that each dataframe contains the expected columns.
    Args:
        dataframes (dict): {name: dataframe}
        expected_columns (dict): {name: list of expected columns}
    """
    for file, df in dataframes.items():
        logger.info(f"Validating columns for {file}..")
        expected_columns = set(expected_cols[file])
        actual_columns = set(df.columns)
        missing = expected_columns - actual_columns
        extra = actual_columns - expected_columns

        # if missing set is not empty
        if missing: 
            logger.error(f"Missing columns in {file}: {missing}")
            raise ValueError(f"Missing columns in {file}: {missing}")
        
        # if extra columns are present
        if extra:
            logger.warning(f"Extra columns found in {file}: {extra}")

        logger.info(f"successfully validated {file}'s columns.")

# function to check null values
def validate_nulls(dataframes: dict):
    """
    Logs if there are any null values in the dataframes.
    Args:
        dataframes (dict): {name: dataframe}
    """
    for name, df in dataframes.items():
        null_counts = df.isnull().sum()
        total_nulls = null_counts.sum()
        if total_nulls > 0:
            logger.warning(f"{name} has {total_nulls} missing values:\n{null_counts[null_counts > 0]}")
        else:
            logger.info(f"No missing values found in {name}.")

if __name__ == "__main__":
    pass


bronze/upload.py
-----------
# importing libraries/modules
import io
import boto3
from utils.utils import get_logger
from pipeline.s3_client import initialize_s3_client
from config import S3_BUCKET_BRONZE

# initialize logger
logger = get_logger(__name__)

def upload_to_bronze(dataframes: dict):
    """
    Uploads dataframes to a specified S3 Bronze bucket.

    Parameters:
        dataframes: dictionary of file names and their dataframes.
    """
    s3_client = initialize_s3_client()

    for file, df in dataframes.items():
        try:
            logger.info(f"Starting {file} upload to bronze bucket")

            # Convert DataFrame to in-memory buffer
            csv_buffer = io.StringIO()
            df.to_csv(csv_buffer, index=False)

            object_name = f"{file}.csv"  # prefix ensures Bronze folder in bucket
            logger.info(f"Uploading {object_name} to S3 bucket {S3_BUCKET_BRONZE}")

            # Upload the file to S3
            s3_client.put_object(
                Bucket=S3_BUCKET_BRONZE,
                Key=object_name,
                Body=csv_buffer.getvalue()
            )

            logger.info(f"Successfully uploaded {object_name} to S3 bucket {S3_BUCKET_BRONZE}.")

        except Exception as e:
            logger.error(f"Failed to upload {file}. Error: {e}")


bronze/orchestration.py
-------------
# Importing required modules
from pipeline.a_bronze.ingest import read_files
from pipeline.a_bronze.validation import validate_columns, validate_nulls
from pipeline.a_bronze.upload import upload_to_bronze
from utils.utils import get_logger
from config import urls, expected_columns, S3_BUCKET_BRONZE
from pipeline.s3_client import initialize_s3_client, create_bucket

# Initialize logger
logger = get_logger(__name__)

# Initialize AWS S3 client
client = initialize_s3_client()

def source_to_bronze():
    """Main function to orchestrate the data pipeline with error handling."""
    
    logger.info("Starting Bronze layer data pipeline...")

    try:
        # Step 1: Read raw files
        dataframes = read_files(urls)
        logger.info("Successfully ingested raw data.")

        # Step 2: Validate schema & nulls
        validate_columns(dataframes, expected_columns)
        validate_nulls(dataframes)
        logger.info("Data validation completed successfully.")

        # Step 3: Ensure Bronze bucket exists
        create_bucket(client, S3_BUCKET_BRONZE)

        # Step 4: Upload to Bronze
        upload_to_bronze(dataframes)
        logger.info("Data successfully uploaded to Bronze S3 bucket.")

        logger.info("Bronze pipeline execution completed successfully!")
    
    except Exception as e:
        logger.error(f"Pipeline execution failed. Error: {str(e)}", exc_info=True)
        raise

if __name__ == "__main__":
    source_to_bronze()



b_silver/read_write_buckets.py
---------------
import pandas as pd
from io import BytesIO
from pipeline.s3_client import initialize_s3_client
from utils.utils import get_logger
from config import S3_BUCKET_BRONZE, S3_BUCKET_SILVER

# Initialize Logger
logger = get_logger(__name__)

# Initialize AWS S3 Client
client = initialize_s3_client()


def read_bronze_file(object_name: str) -> pd.DataFrame:
    """
    Reads a CSV file from the Bronze bucket into a DataFrame.
    """
    logger.info(f"Reading {object_name} from {S3_BUCKET_BRONZE} bucket.")
    try:
        response = client.get_object(Bucket=S3_BUCKET_BRONZE, Key=object_name)
        data = response["Body"].read()
        # bytesIO makes it so i dont have to download the file. 
        # get to read the content from memory
        # masks the binary content and tricks ps.r_csv to think it is a real file
        df = pd.read_csv(BytesIO(data)) 
        logger.info(f"Successfully read {object_name} into DataFrame. Shape: {df.shape}")
        return df

    except Exception as e:
        logger.error(f"Error reading {object_name} from S3 Bronze bucket: {str(e)}", exc_info=True)
        raise


def write_to_silver(df: pd.DataFrame, object_name: str):
    """
    Uploads a DataFrame as a CSV file into the Silver bucket.
    """
    logger.info(f"Writing {object_name} to {S3_BUCKET_SILVER} bucket.")
    try:
        csv_buffer = BytesIO()
        df.to_csv(csv_buffer, index=False) # puts content inside buffer
        csv_buffer.seek(0) # resets buffer pointer to line 1 (beginning)

        client.put_object(
            Bucket=S3_BUCKET_SILVER,
            Key=object_name,
            Body=csv_buffer.getvalue(),
            ContentType="application/csv"
        )
        logger.info(f"Successfully wrote {object_name} to Silver bucket.")

    except Exception as e:
        logger.error(f"Error writing {object_name} to Silver bucket: {str(e)}", exc_info=True)
        raise


if __name__ == "__main__":
    pass


b_silver/watermarks.py
------------
import pandas as pd
import io
from pipeline.s3_client import initialize_s3_client
from config import S3_BUCKET_BRONZE, WATERMARKS_PATH
from utils.utils import get_logger
from datetime import datetime
from botocore.exceptions import ClientError

# initialize logger
logger = get_logger(__name__)

# initialize S3 client
client = initialize_s3_client()

# function to read watermarks
def read_watermarks():
    """
    Reads the existing watermarks from S3.
    Returns an empty DataFrame if no watermark exists.
    """
    try:
        logger.info("Reading watermarks from S3 Bronze bucket...")
        response = client.get_object(Bucket=S3_BUCKET_BRONZE, Key=WATERMARKS_PATH)

        # Read parquet file directly from S3 response
        watermarks_df = pd.read_csv(io.BytesIO(response["Body"].read()))
        logger.info("Successfully read existing watermarks.")
        return watermarks_df

    except ClientError as e:
        # If the file doesn't exist, return empty DataFrame
        logger.warning(f"Watermarks not found in S3. Initializing new one. Details: {e}")
        columns = ["dataset_name", "max_value", "records_loaded", "processing_time"]
        return pd.DataFrame(columns=columns)

# function to initialize watermarks
def initialize_watermarks(initial_watermarks: pd.DataFrame):
    """
    Initializes the watermarks table during the first load.
    """
    logger.info("Initializing watermarks table in S3...")
    try:
        buffer = io.BytesIO()
        initial_watermarks.to_csv(buffer, index=False)
        buffer.seek(0)

        client.put_object(
            Bucket=S3_BUCKET_BRONZE,
            Key=WATERMARKS_PATH,
            Body=buffer.getvalue(),
            ContentType="application/csv"
        )
        logger.info("Successfully initialized watermark in S3.")

    except Exception as e:
        logger.error(f"Failed to initialize watermarks: {e}")
        raise

# function to update watermarks
def update_watermarks(new_watermark: dict):
    """
    Updates the watermarks table after an incremental load.
    """
    logger.info("Updating watermarks in S3...")
    try:
        # read old watermarks if available
        try:
            old_watermarks = read_watermarks()
        except Exception:
            logger.warning("No existing watermark file. Creating a new one.")
            old_watermarks = pd.DataFrame(columns=["dataset_name", "max_value", "records_loaded", "processing_time"])

        # add the new row
        new_row = pd.DataFrame([new_watermark])
        updated_watermark = pd.concat([old_watermarks, new_row], ignore_index=True)

        # write back to S3
        buffer = io.BytesIO()
        updated_watermark.to_csv(buffer, index=False)
        buffer.seek(0)

        client.put_object(
            Bucket=S3_BUCKET_BRONZE,
            Key=WATERMARKS_PATH,
            Body=buffer.getvalue(),
            ContentType="application/csv"
        )
        logger.info("Watermarks updated successfully in S3.")

    except Exception as e:
        logger.error(f"Failed to update watermarks: {e}")
        raise

if __name__ == "__main__":
    pass


b_silver/transform.py
-------------
import pandas as pd
from io import BytesIO
from pipeline.s3_client import initialize_s3_client
from utils.utils import get_logger
from pipeline.b_silver.watermarks import read_watermarks, update_watermarks
from pipeline.b_silver.read_write_buckets import read_bronze_file, write_to_silver

# initialize Logger
logger = get_logger(__name__)

# initialize s3 Client
client = initialize_s3_client()


# function to tranform movie df
def prepare_movie_df():
    """
    Clean the movie data
    """
    # read from bronze 
    logger.info("Starting transformations for movie data..")
    df = read_bronze_file('movies.csv')

    # standardize column names
    df.columns = df.columns.str.strip().str.lower()

    # drop duplicates
    df = df.drop_duplicates()

    # ensure release_date is datetime
    df['release_date'] = pd.to_datetime(df['release_date'], errors='coerce')

    # drop rows with missing item_id or movie_title
    df = df.dropna(subset=['item_id', 'movie_title'])

    # trim and clean string columns
    df['movie_title'] = df['movie_title'].str.strip()
    df['primary_genre'] = df['primary_genre'].str.strip().str.title()
    df['imdb_url'] = df['imdb_url'].str.strip()

    # final shape logging
    logger.info(f"Movies data cleaned with {df.shape[0]} records")

    # incremental filter based on watermark
    watermarks = read_watermarks()

    if not watermarks.empty:
        logger.info("Applying watermark filter for movies..")
        
        # Get latest max_value for the 'ratings' dataset
        movie_watermark = watermarks[watermarks["dataset_name"] == "movies"]
                               
        if not movie_watermark.empty:
            latest_watermark = pd.to_datetime(movie_watermark['max_value'].max())
            df = df[df['release_date'] > latest_watermark]
            logger.info(f"Filtered movie_df to {df.shape[0]} new records after watermark {latest_watermark}.")
        else:
            logger.warning("No existing watermark for movies. Proceeding without filter.")
    else:
        logger.warning("No watermark file found. Proceeding without filter.")

    # warning if no new data
    if df.empty:
        logger.warning("No new movie data to process after watermark filtering.")
    else:
        logger.info(f"Movie data cleaned and filtered with {df.shape[0]} records.")
    
    # upload to silver
    write_to_silver(df, 'movies.csv')

    # update the watermark with the latest processing record
    if not df.empty:
        latest_max_value = df['release_date'].max()
        data = {'dataset_name': 'movies',
                'max_value': latest_max_value,
                'records_loaded': df.shape[0],
                'processing_time': pd.Timestamp.now()
                                }
        
        update_watermarks(data)


# function to transform ratings df
def prepare_ratings_df():
    """
    Clean the ratings data
    """
    # read from bronze 
    df = read_bronze_file('ratings.csv')

    logger.info("Starting transformations for ratings..")

    # standardize column names
    df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_')

    # Drop exact duplicates
    df = df.drop_duplicates()

    # Drop rows with missing user_id, item_id or rating
    df = df.dropna(subset=['user_id', 'item_id', 'rating'])

    # Convert timestamp into proper datetime
    df['timestamp'] = pd.to_datetime(df['timestamp'], unit='s')

    logger.info(f"Ratings data cleaned with {df.shape[0]} records")

    # incremental filter based on watermark
    watermarks = read_watermarks()

    if not watermarks.empty:
        logger.info("Applying watermark filter for ratings..")
        
        # Get latest max_value for the 'ratings' dataset
        ratings_watermark = watermarks[watermarks["dataset_name"] == "ratings"]
                               
        if not ratings_watermark.empty:
            latest_watermark = pd.to_datetime(ratings_watermark['max_value'].max(), unit='s')
            df = df[df['timestamp'] > latest_watermark]
            logger.info(f"Filtered ratings to {df.shape[0]} new records after watermark {latest_watermark}.")
        else:
            logger.warning("No existing watermark for ratings. Proceeding without filter.")
    else:
        logger.warning("No watermark file found. Proceeding without filter.")

    if df.empty:
        logger.warning("No new ratings data to process after watermark filtering.")
    else:
        logger.info(f"Ratings data cleaned and filtered with {df.shape[0]} records.")

    # upload to silver
    write_to_silver(df, 'ratings.csv')

    # update the watermark with the new details
    if not df.empty:
        latest_max_value = df['timestamp'].max()
        data = {'dataset_name': 'ratings',
                'max_value': latest_max_value,
                'records_loaded': df.shape[0],
                'processing_time': pd.Timestamp.now()
                                }
        
        update_watermarks(data)


# function to transform user df
def prepare_users_df():
    """
    Clean the user data
    """
    
    # read from bronze 
    df = read_bronze_file('users.csv')

    logger.info("Starting transformations for users..")

    # standardize column names
    df.columns = df.columns.str.strip().str.lower().str.replace(' ', '_')
    
    # drop users without user_id
    df = df.dropna(subset=['user_id'])

    # drop duplicates
    df = df.drop_duplicates()

    # clean string fields: whitespaces and case
    df['gender'] = df['gender'].str.strip().str.upper()
    df['occupation'] = df['occupation'].str.strip().str.title()
    df['zip_code'] = df['zip_code'].astype(str).str.strip()

    logger.info(f"Users data cleaned with {df.shape[0]} records")

    # incremental filter based on watermark
    watermarks = read_watermarks()

    if not watermarks.empty:
        logger.info("Applying watermark filter for users..")

        # Get latest max_value for the 'users' dataset
        user_watermark = watermarks[watermarks["dataset_name"] == "users"]

        if not user_watermark.empty:
            latest_watermark = int(user_watermark["max_value"].max())
            df = df[df["user_id"].astype(int) > latest_watermark]
            logger.info(f"Filtered user data to {df.shape[0]} new records after user_id {latest_watermark}.")
        
        else:
            logger.warning("No existing watermark for ratings. Proceeding without filter.")
    else:
        logger.warning("No watermark file found. Proceeding without filter.")

    if df.empty:
        logger.warning("No new user data to process after watermark filtering.")
    else:
        logger.info(f"User data cleaned and filtered with {df.shape[0]} records.")
    
    # upload to silver
    write_to_silver(df, 'users.csv')

    # update the watermark with the new details
    if not df.empty:
        latest_max_value = df['user_id'].max()
        data = {'dataset_name': 'users',
                'max_value': latest_max_value,
                'records_loaded': df.shape[0],
                'processing_time': pd.Timestamp.now()
                                }
        
        update_watermarks(data)


if __name__ == "__main__":
    pass


b_siver/orchestrator_B-S.py
---------------
from utils.utils import get_logger
from pipeline.b_silver.transform import prepare_movie_df, prepare_ratings_df, prepare_users_df

# need

# initializing logger
logger = get_logger(__name__)

def orchestrate():
    logger.info("Starting bronze-silver orchestration...")

    try:
        prepare_movie_df()
    except Exception as e:
        logger.error(f"Movie pipeline failed: {e}")

    try:
        prepare_ratings_df()
    except Exception as e:
        logger.error(f"Ratings pipeline failed: {e}")

    try:
        prepare_users_df()
    except Exception as e:
        logger.error(f"Users pipeline failed: {e}")

    logger.info("MovieLens ETL Orchestration Complete.")

if __name__ == "__main__":
    orchestrate()


s_gold/read_bucket.py
----------------
import pandas as pd
from io import BytesIO
from pipeline.s3_client import initialize_s3_client
from utils.utils import get_logger
from config import S3_BUCKET_SILVER


# initialize Logger
logger = get_logger(__name__)

# initialize s3 Client
client = initialize_s3_client()


# function to read from silver bucket
def read_silver_file(object_name: str) -> pd.DataFrame:
    """
    Reads a CSV file from silver bucket into a DataFrame.
    """
    logger.info(f"Reading {object_name} from {S3_BUCKET_SILVER} bucket.")
    try:
        response = client.get_object(Bucket= S3_BUCKET_SILVER, key= object_name)
        data = response["body"].read()
        df = pd.read_csv(BytesIO(data))
        logger.info(f"Successfully read {object_name} into DataFrame. Shape: {df.shape}")
        return df
    
    except Exception as e:
        logger.error(f"Error reading {object_name} from {S3_BUCKET_SILVER}: {str(e)}")
        raise


if __name__ == "__main__":
    pass



s_gold/connection.py
--------------
import logging
import pandas as pd
from config import POSTGRES_CONFIG
from sqlalchemy import create_engine

# initializing logger
logger = logging.getLogger(__name__)

#function to connect to database
def get_db_connection():
    try:
        engine = create_engine(
            f"postgresql://{POSTGRES_CONFIG['user']}:{POSTGRES_CONFIG['password']}@"
            f"{POSTGRES_CONFIG['host']}:{POSTGRES_CONFIG['port']}/{POSTGRES_CONFIG['database']}"
        )
        logger.info("Successfully connected to Postgres.")
        return engine
    
    except Exception as e:
        logger.error(f"PostgreSQL connection error: {str(e)}")
        raise


# function to create table
def create_table_if_not_exists(sql: str, table: str):
    try:
        engine = get_db_connection()
        with engine.connect() as conn:
            conn.execute(sql)
        logger.info(f"Table '{table}' in Postgres.")

    except Exception as e:
        logger.error(f"Error creating table '{table}': {str(e)}")
        raise


# function to upload to postgres
def write_to_postgres(df: pd.DataFrame, table_name: str):
    """
    Uploads DataFrame to Postgres.
    """
    logger.info(f"Writing {table_name} to Postgres..")

    try:
        engine = get_db_connection()
        df.to_sql(name= table_name, con=engine, if_exists="append", index=False, method='multi')

        logger.info(f"Successfully inserted {len(df)} records into {table_name}.")

    except Exception as e:
        logger.error(f"Error writing {table_name} df to PostgreSQL: {str(e)}")
        raise

if __name__ == "__main__":
    pass



s_gold/schema.py
-------------

# python
movies_table_sql = """
CREATE TABLE IF NOT EXISTS movies (
    item_id INT PRIMARY KEY,
    movie_title TEXT NOT NULL,
    release_date TIMESTAMP,
    primary_genre TEXT,
    imdb_url TEXT
);
"""

users_table_sql = """
CREATE TABLE IF NOT EXISTS users (
    user_id INT PRIMARY KEY,
    gender CHAR(1),
    age INT,
    occupation TEXT,
    zip_code TEXT
);
"""

ratings_table_sql = """
CREATE TABLE IF NOT EXISTS ratings (
    user_id INT REFERENCES users(user_id),
    item_id INT REFERENCES movies(item_id),
    rating DOUBLE PRECISION,
    timestamp TIMESTAMP,
    PRIMARY KEY (user_id, item_id, timestamp)
);
"""








SyntaxError: invalid syntax (37823289.py, line 2)