This script demonstrates how to:

1. Download a CSV file (Titanic dataset) from an Amazon S3 bucket.
2. Perform basic calculations (survival rate, average fares by passenger class).
3. Store the results in a local PostgreSQL database.
4. Retrieve and verify the inserted data.


In [None]:
import boto3
import botocore
import csv
from io import StringIO
import psycopg2
import pandas as pd
from dotenv import load_dotenv
import os

load_dotenv()

# Use boto3 library to extract CSV data from AWS S3 bucket and convert it to Python dictionaries

def download_csv_from_s3(bucket_name, object_key):
    s3 = boto3.client('s3', config=botocore.config.Config(signature_version=botocore.UNSIGNED))
    response = s3.get_object(Bucket=bucket_name, Key=object_key)
    content = response['Body'].read().decode('utf-8')
    header, rows = content.split("\n", 1)
    cleaned_header = ",".join(column.strip() for column in header.split(","))
    return csv.DictReader(StringIO(cleaned_header + "\n" + rows))

titanic_csv_content = download_csv_from_s3('data-eng-makers-public-datasets-404544469985', 'etl_bites_04_titanic_dataset.csv')

# Create a Pandas dataframe to display extracted data

df = pd.DataFrame(titanic_csv_content)

# Transform data: calculate survival rate for people in each passenger class

def calculate_survival_rate(pclass):
    passengers_per_class = df[df['Pclass'] == pclass]
    surviving_passengers_per_class = passengers_per_class[passengers_per_class['Survived'] == "1"]
    survival_rate = (len(surviving_passengers_per_class) * 100) / len(passengers_per_class) 
    return survival_rate

first_class_survival_rate = calculate_survival_rate("1")
second_class_survival_rate = calculate_survival_rate("2")
third_class_survival_rate = calculate_survival_rate("3")
                        
#  Transform data: calculate average fare for travellers in each passenger class

def calculate_average_fare(pclass_filter):
        titanic_data = download_csv_from_s3('data-eng-makers-public-datasets-404544469985', 'etl_bites_04_titanic_dataset.csv')
        total_fare = 0
        passengers_count = 0
        for row in titanic_data:
            if row['Pclass'] == str(pclass_filter):
                total_fare += float(row['Fare'])
                passengers_count += 1
        return total_fare / passengers_count if passengers_count > 0 else 0


average_fare_class_1 = calculate_average_fare(1)
average_fare_class_2 = calculate_average_fare(2)
average_fare_class_3 = calculate_average_fare(3)

# Create new table in local Postgres database to store transformed data

conn_string = f"dbname={os.getenv('ANALYTICAL_DB_NAME')} user={os.getenv('ANALYTICAL_DB_USERNAME')} password={os.getenv('ANALYTICAL_DB_PASSWORD')} host={os.getenv('ANALYTICAL_DB_HOST')} port={os.getenv('ANALYTICAL_DB_PORT')}"

drop_table_query = 'DROP TABLE IF EXISTS class_average_fares;'

create_table_query = '''
CREATE TABLE IF NOT EXISTS class_average_fares (
    id SERIAL PRIMARY KEY,
    pclass INTEGER NOT NULL,
    average_fare NUMERIC(10, 2) NOT NULL,
    survival_rate NUMERIC(10, 2) NOT NULL
);
'''

# Load data into Postgres database

def execute_query_postgresql(conn_string, query):
    try:
        with psycopg2.connect(conn_string) as conn:
            with conn.cursor() as cur:
                cur.execute(query)
                conn.commit()
    except psycopg2.Error as e:
        print("Error executing Postgres query:", e)
    
execute_query_postgresql(conn_string, drop_table_query)
execute_query_postgresql(conn_string, create_table_query)


def insert_data_to_postgresql(data, connection):
    cur = connection.cursor()
    query = """
        INSERT INTO class_average_fares (pclass, average_fare, survival_rate) 
        VALUES (%s, %s, %s);
    """
    cur.execute(query, data)
    connection.commit()


conn = psycopg2.connect(conn_string)

insert_data_to_postgresql((1, average_fare_class_1, first_class_survival_rate), conn)
insert_data_to_postgresql((2, average_fare_class_2, second_class_survival_rate), conn)
insert_data_to_postgresql((3, average_fare_class_3, third_class_survival_rate), conn)

# TO CHECK IT WORKED: 

query = 'SELECT * FROM class_average_fares;'

def read_data_from_postgresql(conn_string, query):
    with psycopg2.connect(conn_string) as conn:
        with conn.cursor() as cur:     
            cur.execute(query)
            data = cur.fetchall()
            colnames = cur.description
            return colnames, data

result = read_data_from_postgresql(conn_string, query)

columns = [column[0] for column in result[0]]
data = result[1]

df = pd.DataFrame(data, columns=columns)
display(df)

# DEEP DIVE: What is Glue doing in this process?

# Glue automates schema discovery and metadata management. It scans new S3 files, extracts schema, and updates the Glue Data Catalog.
# AWS Glue's Data Catalog stores metadata (table schema, column types, file locations) so that Athena can query the data.
# If Glue wasn't there, the table's metadata and schema would not be updated automatically, so Athena could be running SQL queries 
# against the S3 using outdated schema information. This would cause Athena's queries to fail, so the data would not be extracted, 
# or loaded into its destination S3 bucket. Effectively, the ETL pipeline could easily breakdown without the automation that Glue provides.




Unnamed: 0,id,pclass,average_fare,survival_rate
0,1,1,84.15,62.96
1,2,2,20.66,47.28
2,3,3,13.68,24.24
