In [3]:
import psycopg2
import pandas as pd
import boto3
from io import BytesIO

def connect_to_postgres():
    try:
        connection = psycopg2.connect(
            host="",
            database="tokyo_olympics_db",
            user="postgres",
            password="test123"
        )
        return connection
    except Exception as e:
        print(f"Error connecting to PostgreSQL: {e}")
        return None

# Fetch data from a PostgreSQL table and convert it to a DataFrame
def fetch_data_from_postgres(query, connection):
    try:
        df = pd.read_sql(query, connection)
        return df
    except Exception as e:
        print(f"Error fetching data from PostgreSQL: {e}")
        return None

# Convert DataFrame to Parquet format and store in memory
def convert_to_parquet(df):
    try:
        buffer = BytesIO()
        df.to_parquet(buffer, index=False, engine='fastparquet')
        buffer.seek(0)
        return buffer
    except Exception as e:
        print(f"Error converting DataFrame to Parquet: {e}")
        return None

# Upload Parquet data to S3
def upload_to_s3(buffer, s3_bucket_name, s3_key):
    try:
        s3_client = boto3.client('s3')
        
        s3_client.put_object(Bucket=s3_bucket_name, Key=s3_key, Body=buffer)
        print(f"File uploaded successfully to s3://{s3_bucket_name}/{s3_key}")
    except Exception as e:
        print(f"Error uploading file to S3: {e}")

# Main function
def process_and_upload():
    connection = connect_to_postgres()
    if connection is None:
        return

    # Fetch data from PostgreSQL for each of the tables
    tables = ["athletes", "coaches", "entries_gender", "medals", "teams"]
    for table in tables:
        print(f"Fetching data from {table}...")
        query = f"SELECT * FROM {table}"
        df = fetch_data_from_postgres(query, connection)
        if df is not None:
            print(f"Data fetched for {table}, converting to Parquet...")
            buffer = convert_to_parquet(df)
            if buffer is not None:
                s3_key = f"{table}_data.parquet"  
                s3_bucket_name = "de-project-postgres"
                upload_to_s3(buffer, s3_bucket_name, s3_key)
    
    connection.close()

process_and_upload()


Fetching data from athletes...




Data fetched for athletes, converting to Parquet...
File uploaded successfully to s3://de-project-postgres/athletes_data.parquet
Fetching data from coaches...
Data fetched for coaches, converting to Parquet...




File uploaded successfully to s3://de-project-postgres/coaches_data.parquet
Fetching data from entries_gender...
Data fetched for entries_gender, converting to Parquet...




File uploaded successfully to s3://de-project-postgres/entries_gender_data.parquet
Fetching data from medals...
Data fetched for medals, converting to Parquet...




File uploaded successfully to s3://de-project-postgres/medals_data.parquet
Fetching data from teams...
Data fetched for teams, converting to Parquet...




File uploaded successfully to s3://de-project-postgres/teams_data.parquet
