In [3]:
import boto3 #AWS SDK for Python, used to interact with Amazon S3
import psycopg2 #PostgreSQL database adapter for Python
import pandas as pd #Used to manipulate and load the CSV file into a DataFrame
from io import StringIO #Allows treating strings as file-like objects, useful for handling CSV data in memory

- Defines PostgreSQL database connection details including hostname, port, database name, username, and password 
- Specifies the table name where the data will be stored in PostgreSQL.

In [None]:
PG_HOST = "<host_name>"
PG_PORT = "<port_number>"
PG_DATABASE = "<db_name>"
PG_USER = "<user_name>"
PG_PASSWORD = "<password>"
TABLE_NAME = "<table_name>"

- Defines AWS S3 bucket name and the file path to the CSV file

In [None]:
# AWS S3 Configuration
BUCKET_NAME = "<bucket_name>"
FILE_NAME = "path/to_file.csv>"

- Creates an S3 client using boto3.client("s3").
- Retrieves the CSV file from S3 using s3.get_object().
- Reads and decodes the file content into a string.
- Handles errors gracefully—if an error occurs (e.g., wrong bucket name, missing file), it prints an error message and exits the script.
- Converts the CSV data into a file-like object using StringIO(csv_data).


In [6]:
# Step 1: Connect to S3 (Using Default AWS Credentials)
try:
    s3 = boto3.client("s3")  # Credentials are picked up from environment or AWS CLI config
    csv_obj = s3.get_object(Bucket=BUCKET_NAME, Key=FILE_NAME)
    csv_data = csv_obj["Body"].read().decode("utf-8")
    print("✅ Successfully retrieved file from S3.")
except Exception as e:
    print(f"Error retrieving file from S3: {e}")
    exit(1)

✅ Successfully retrieved file from S3.


- Loads the data into a Pandas DataFrame using pd.read_csv().
- Handles errors—if the CSV format is incorrect or empty, it prints an error and exits.

In [None]:
# Step 2: Load Data into Pandas DataFrame
try:
    df = pd.read_csv(StringIO(csv_data))
    print("✅ Data successfully loaded into Pandas DataFrame.")
except Exception as e:
    print(f"Error loading data into Pandas: {e}")
    exit(1)

- Attempts to connect to PostgreSQL using the provided credentials.
- Creates a cursor (cur) to execute SQL commands.
- Handles connection errors—if authentication or connectivity fails, it prints an error and exits.

In [8]:
# Step 3: Connect to PostgreSQL with Try-Except
try:
    conn = psycopg2.connect(
        host=PG_HOST, database=PG_DATABASE, user=PG_USER, password=PG_PASSWORD, port=PG_PORT
    )
    cur = conn.cursor()
    print("✅ Successfully connected to PostgreSQL.")
except psycopg2.Error as e:
    print(f"Error connecting to PostgreSQL: {e}")
    exit(1)

✅ Successfully connected to PostgreSQL.


- Dynamically generates a CREATE TABLE SQL statement based on the DataFrame's column names and data types.
- Maps Pandas data types to SQL types (INTEGER, FLOAT, BOOLEAN, TEXT).
- Executes the SQL command to create the table if it doesn’t already exist.
- Handles errors—if table creation fails, the transaction is rolled back.

In [12]:
# Step 4: Create Table if Not Exists
def create_table_from_dataframe(cursor, table_name, dataframe):
    """
    Generates a CREATE TABLE statement dynamically from a Pandas DataFrame.
    """
    column_types = []
    for col, dtype in dataframe.dtypes.items():
        if "int" in str(dtype):
            sql_type = "INTEGER"
        elif "float" in str(dtype):
            sql_type = "FLOAT"
        elif "bool" in str(dtype):
            sql_type = "BOOLEAN"
        else:
            sql_type = "TEXT"  # Default to TEXT for strings and other types
        column_types.append(f'"{col}" {sql_type}')

    create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {table_name} (
        {', '.join(column_types)}
    );
    """
    try:
        cursor.execute(create_table_sql)
        print(f"✅ Table '{table_name}' created or already exists.")
    except psycopg2.Error as e:
        print(f"Error creating table: {e}")
        conn.rollback()
        exit(1)

- Calls the function to create the table before inserting data

In [None]:
create_table_from_dataframe(cur, TABLE_NAME, df)

conn.commit()

- Converts the DataFrame into a CSV format using to_csv(), without headers
- Uses StringIO to hold the CSV in memory instead of saving it to disk
- Executes the COPY command to insert data efficiently into PostgreSQL
- Handles errors—if insertion fails, the transaction is rolled back

In [None]:


# Step 5: Insert Data into PostgreSQL using COPY
try:
    buffer = StringIO()
    df.to_csv(buffer, index=False, header=False)
    buffer.seek(0)

    copy_sql = f"""
        COPY {TABLE_NAME} FROM STDIN WITH CSV DELIMITER ',' NULL '' 
    """
    cur.copy_expert(copy_sql, buffer)
    print(f"✅ Data successfully loaded into table '{TABLE_NAME}' in database '{PG_DATABASE}'.")
except psycopg2.Error as e:
    print(f"Error inserting data into PostgreSQL database '{PG_DATABASE}': {e}")
    conn.rollback()
    exit(1)

# Commit and Close Connection
conn.commit()

- Runs a SELECT * FROM {TABLE_NAME} LIMIT 10; query to check if records were inserted
- Fetches the first 10 rows and prints them
- Handles errors—if querying fails, it prints an error message

In [None]:
# Step 6: Verify Records with SELECT Query
try:
    cur.execute(f"SELECT * FROM {TABLE_NAME} LIMIT 10;")
    rows = cur.fetchall()
    print(f"✅ {len(rows)} records retrieved from '{TABLE_NAME}':")
    
    # Print the retrieved rows
    for row in rows:
        print(row)
except psycopg2.Error as e:
    print(f" Error executing SELECT query: {e}")

# Close Connection
cur.close()
conn.close()
print("✅ Connection closed successfully.")