## High-Level ETL (Extract - Transform - Load) Flow
**Goal**: By the end of this tutorial, you will be able to
- Extract: Download a file from AWS S3 using Python’s boto3.
- Transform: Clean, filter, or manipulate data in Python (often using libraries like pandas).
- Load: Insert the transformed data into a relational database via SQL statements.

## Lab Assignment

1. Implement the following functions
   - `extract_from_csv(file_to_process: str) -> pd.DataFrame`: read the .csv file and return dataframe
   - `extract_from_json(file_to_process: str) -> pd.DataFrame`: read the .json file and return dataframe
   - `extract() -> pd.DataFrame`: extract data of heterogeneous format and combine them into a single dataframe.
   - `transform(df) -> pd.DataFrame`: function for data cleaning and manipulation.
2. Clean the data
   - Round float-type columns to two decimal places.
   - remove duplicate samples
   - Save the cleaned data into parquet file
3. Insert the data into SQL
   - Create postgresql database
   - Insert the data into the database
  
Submission requirement:
    1. Jupyter Notebook
    2. Parquet File
    3. SQL file (optional)

In [None]:
# Required Package:
# psycopg2 2.9.10 (A PostgreSQL database adapter)
# pandas 2.0.3 (For data manipulation and analysis)
# sqlalchemy 2.0.37 (A SQL toolkit and Object Relational Mapper)
# pyarrow 14.0.1 (Provides support for efficient in-memory columnar data structures, part from Apache Arrow Objective)
import pandas as pd

#required for reading .xml files
import xml.etree.ElementTree as ET

#required for navigating machine's directory
import glob
import os.path

#required for communicating with SQL database
#!pip install sqlalchemy
from sqlalchemy import create_engine


[notice] A new release of pip is available: 23.1.2 -> 25.1.1
[notice] To update, run: C:\Users\nkim0\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


Collecting sqlalchemy
  Downloading sqlalchemy-2.0.40-cp310-cp310-win_amd64.whl (2.1 MB)
                                              0.0/2.1 MB ? eta -:--:--
     ---------                                0.5/2.1 MB 10.0 MB/s eta 0:00:01
     -------------------------                1.3/2.1 MB 14.1 MB/s eta 0:00:01
     ---------------------------------------  2.1/2.1 MB 16.7 MB/s eta 0:00:01
     ---------------------------------------- 2.1/2.1 MB 11.2 MB/s eta 0:00:00
Collecting greenlet>=1 (from sqlalchemy)
  Downloading greenlet-3.2.1-cp310-cp310-win_amd64.whl (294 kB)
                                              0.0/295.0 kB ? eta -:--:--
     -------------------------------------- 295.0/295.0 kB 9.2 MB/s eta 0:00:00
Installing collected packages: greenlet, sqlalchemy
Successfully installed greenlet-3.2.1 sqlalchemy-2.0.40


# E: Extracting data from multiple sources

In [None]:
import boto3
import os

my_aws_access_key_id='ASIAYAAO5HRMF7ZLNJU2'
my_aws_secret_access_key='PX4jkqlULDo0x1/eg15CJfLmcUxi8kdTxuEjKdw6'
my_aws_session_token='IQoJb3JpZ2luX2VjEKb//////////wEaCXVzLWVhc3QtMiJGMEQCIB37bp25MiXoxb7o1TqYKZG3iVt71zK2AeqLLLg3tCNMAiAuDtH+U9lJ0Wnd3vEWnBF1DjnQPaUltKr5Fp3y731zxCrrAghPEAAaDDU0OTc4NzA5MDAwOCIMCy3pji3/W8DEm4QXKsgCSWhkD5crVv0h7oNU3alP0v6zvESCf60I0y87qmLflt1SfOa7UPgHCcATsecYUsGBXT/n1COptJI7bigBOSG5fM4Lb2CXKQfkgdThPUZJ3dNIvQKxfwXO0GfQPHg1wP0sCtIYoF1ZqtPWyYKpKUmt6t2orhGS6v9HQJADCuRp3SvWI/sj/oLEX3qaC490WADVBnSiwg9VJc3Tjc2SsFoBHRCPH7l7IneS5zh0yQ5lfBwLpiHlBs3BaKchZ4f5Nllm8yC9VVRfm+Nw7Sga0EA18V9eq8cXPboi8fqTg568lVWeG+vRfg+4CHPpPmiYoGXMi6fc88cbY15OYyY6FwkOrmtcJAdXLa8jSGpHFoyaHUxVjjw/lq4ToPcseJi7Rrdwz6D/V90JkK5+h+2JhPr5j2oxWZ7PHBVKRjHNmeBhG+olc8hyBw3M/TCKjOrABjqoAdGc7bOxLVIlQ5L3Zx/ASuk8HiR0OFtRayq4nGcmIK+KvRGTIufBPeXnxNzK8iILZqpc3JAwlGfRDi0k5kklEWqYBKx+GdaB6fNxc6vQEQUyELay5aV2QpCYWgCAOluXD6pHgnPtUrOwzNUu4IlMK25sxj5LLabPz7L9kCEflVAQ1+SXk3PU2YGNR25in8HjoW/C459FPo1PpnhdKUEg6fEd5Y1ZXTrAWg=='

BUCKET_NAME = 'de300spring2025'   # Replace with your bucket name
S3_FOLDER = 'dinglin_xia/lab4_data/'             # The folder path in S3
LOCAL_DIR = './local-data/'      # Local directory to save files

In [12]:
def download_s3_folder(bucket_name, s3_folder, local_dir):
    """Download a folder from S3."""
    if not os.path.exists(local_dir):
        os.makedirs(local_dir)

    # List objects within the specified folder
    s3_resource = boto3.resource('s3',
                                aws_access_key_id=my_aws_access_key_id,
                                aws_secret_access_key=my_aws_secret_access_key,
                                aws_session_token=my_aws_session_token)
    bucket = s3_resource.Bucket(bucket_name)
    
    for obj in bucket.objects.filter(Prefix=s3_folder):
        # Define local file path
        local_file_path = os.path.join(local_dir, obj.key[len(s3_folder):])  
        
        if obj.key.endswith('/'):  # Skip folders
            continue
        
        # Create local directory if needed
        local_file_dir = os.path.dirname(local_file_path)
        if not os.path.exists(local_file_dir):
            os.makedirs(local_file_dir)
        
        # Download the file
        bucket.download_file(obj.key, local_file_path)
        print(f"Downloaded {obj.key} to {local_file_path}")

In [15]:
download_s3_folder(BUCKET_NAME, S3_FOLDER, LOCAL_DIR)

Downloaded nayeon_kim/reading_data.html to ./local-data/reading_data.html


## Extract data from ./data/ folder

In [16]:
all_files = glob.glob('./local-data/*')

# Output the list of files
for file in all_files:
    print(file)

./local-data\reading_data.html
./local-data\used_car_prices1.csv
./local-data\used_car_prices1.json
./local-data\used_car_prices1.xml
./local-data\used_car_prices2.csv
./local-data\used_car_prices2.json
./local-data\used_car_prices2.xml
./local-data\used_car_prices3.csv
./local-data\used_car_prices3.json
./local-data\used_car_prices3.xml


### Function to extract data from one .csv file

In [17]:
def extract_from_csv(file_to_process: str) -> pd.DataFrame:

    # add you line here to read the .csv file and return dataframe
    df = pd.read_csv(file_to_process)
    return df

### Function to extract data from one .json file

In [32]:
def extract_from_json(file_to_process: str) -> pd.DataFrame:
    
    # add you line here to read the .json file and return dataframe
    df = pd.read_json(file_to_process, lines=True)
    return df

### Function to extract data from one  .xml file

In [19]:
def extract_from_xml(file_to_process: str) -> pd.DataFrame:
    dataframe = pd.DataFrame(columns = columns)
    tree = ET.parse(file_to_process)
    root = tree.getroot()
    for person in root:
        car_model = person.find("car_model").text
        year_of_manufacture = int(person.find("year_of_manufacture").text)
        price = float(person.find("price").text)
        fuel = person.find("fuel").text
        sample = pd.DataFrame({"car_model":car_model, "year_of_manufacture":year_of_manufacture, "price":price, "fuel":fuel}, index = [0])
        dataframe = pd.concat([dataframe, sample], ignore_index=True)
    return dataframe

### Function to extract data from the ./data/ folder

In [33]:
def extract() -> pd.DataFrame:
    extracted_data = pd.DataFrame(columns = columns)
    #for csv files
    for csv_file in glob.glob(os.path.join(folder, "*.csv")):
        extracted_data = pd.concat([extracted_data, extract_from_csv(csv_file)], ignore_index=True)
    
    #add lines for json files
    for json_file in glob.glob(os.path.join(folder, "*.json")):
        extracted_data = pd.concat([extracted_data, extract_from_json(json_file)], ignore_index=True)

    #add lines for xml files

    # for xml files
    for xml_file in glob.glob(os.path.join(folder, "*.xml")):
        extracted_data = pd.concat([extracted_data, extract_from_xml(xml_file)], ignore_index=True)
    
    return extracted_data

### Extract the data

In [34]:
columns = ['car_model','year_of_manufacture','price', 'fuel']
folder = "local-data"
#table_name = "car_data"

# run
def main():
    data = extract()
    #insert_to_table(data, "car_data")
    
    return data

data = main()

In [35]:
data.head()

Unnamed: 0,car_model,year_of_manufacture,price,fuel
0,ritz,2014,5000.0,Petrol
1,sx4,2013,7089.552239,Diesel
2,ciaz,2017,10820.895522,Petrol
3,wagon r,2011,4253.731343,Petrol
4,swift,2014,6865.671642,Diesel


# T: Transformation data and save organized data to .parquet file 

In [36]:
staging_file = "cars.parquet"
staging_data_dir = "staging_data"

In [60]:
def transform(df):
    print(f"Shape of data {df.shape}")

    # truncate price with 2 decimal place (add your code below)
    df['price'] = df['price'].round(2)

    # remove samples with same car_model (add your code below)
    df = df.drop_duplicates(subset='car_model')

    print(f"Shape of data {df.shape}")

    # Ensure the staging directory exists before writing the Parquet file
    if not os.path.exists(staging_data_dir):
        os.makedirs(staging_data_dir)
        print(f"Directory '{staging_data_dir}' created.")

    # write to parquet
    df.to_parquet(os.path.join(staging_data_dir, staging_file))
    return df

In [61]:
# print the head of your data
#!pip install pyarrow
df = transform(data)
df.head()

Shape of data (90, 4)
Shape of data (25, 4)


Unnamed: 0,car_model,year_of_manufacture,price,fuel
0,ritz,2014,5000.0,Petrol
1,sx4,2013,7089.55,Diesel
2,ciaz,2017,10820.9,Petrol
3,wagon r,2011,4253.73,Petrol
4,swift,2014,6865.67,Diesel


# L: Loading data for further modeling

### Set Up PostgreSQL Locally
#### Step 1: Install PostgreSQL
- Windows: Download from MySQL Official Site {https://www.postgresql.org/download/}
- Mac:
  ```{bash}
  brew install postgresql
  brew services start postgresql
  ```
Then access PostgreSQL CLI
```{bash}
psql -U postgres
```
Note: if you don't have default "postgres" user, then create it manually by
```{bash}
default "postgres" user
```
or
```{bash}
sudo -u $(whoami) createuser postgres -s
```

Then create a database
```{sql}
CREATE DATABASE my_local_db;
\l  -- List all databases
```

#### Step 2: Create a User and Grant Privileges
In PostgreSQL CLI:
```{sql}
CREATE USER myuser WITH ENCRYPTED PASSWORD 'mypassword';
GRANT ALL PRIVILEGES ON DATABASE my_local_db TO myuser;
```

#### Step 3: Install Required Python Libraries
```{bash}
pip install pandas sqlalchemy pymysql psycopg2 mysql-connector-python
```

### Utility function for writing data into the SQL database

In [92]:
# Database credentials
db_host = "localhost"
db_user = "nkim"
db_password = "MyStrongPass123"
db_name = "my_local_db"

conn_string = f"postgresql+psycopg2://{db_user}:{db_password}@{db_host}/{db_name}"

engine = create_engine(conn_string)

In [95]:
from sqlalchemy import text
with engine.connect() as connection:
    df = pd.read_sql(text("SELECT * FROM pg_catalog.pg_tables;"), con=connection)
    print(df)


            schemaname                tablename tableowner tablespace  \
0           pg_catalog             pg_statistic   postgres       None   
1           pg_catalog                  pg_type   postgres       None   
2           pg_catalog         pg_foreign_table   postgres       None   
3           pg_catalog                pg_authid   postgres  pg_global   
4           pg_catalog    pg_statistic_ext_data   postgres       None   
..                 ...                      ...        ...        ...   
63          pg_catalog           pg_largeobject   postgres       None   
64  information_schema                sql_parts   postgres       None   
65  information_schema             sql_features   postgres       None   
66  information_schema  sql_implementation_info   postgres       None   
67  information_schema               sql_sizing   postgres       None   

    hasindexes  hasrules  hastriggers  rowsecurity  
0         True     False        False        False  
1         True   

In [98]:
# Test connection
#!pip install "SQLAlchemy<2.0"

#df = pd.read_sql("SELECT * FROM pg_catalog.pg_tables;", con=engine.connect())
#print(df)

In [None]:
print('SQLAlchemy:', sqlalchemy.__version__)
print('pandas:', pd.__version__)
!pip install --upgrade pandas


SQLAlchemy: 2.0.40
pandas: 1.5.3
Collecting pandas
  Downloading pandas-2.2.3-cp310-cp310-win_amd64.whl (11.6 MB)
                                              0.0/11.6 MB ? eta -:--:--
                                              0.3/11.6 MB 16.4 MB/s eta 0:00:01
                                              0.3/11.6 MB 16.4 MB/s eta 0:00:01
     --                                       0.8/11.6 MB 6.2 MB/s eta 0:00:02
     ------                                   1.8/11.6 MB 10.2 MB/s eta 0:00:01
     ---------                                2.7/11.6 MB 12.4 MB/s eta 0:00:01
     ------------                             3.7/11.6 MB 13.9 MB/s eta 0:00:01
     ---------------                          4.6/11.6 MB 14.1 MB/s eta 0:00:01
     -----------------                        5.2/11.6 MB 14.4 MB/s eta 0:00:01
     --------------------                     6.0/11.6 MB 15.4 MB/s eta 0:00:01
     ----------------------                   6.5/11.6 MB 14.9 MB/s eta 0:00:01
     ----------

ERROR: Could not install packages due to an OSError: [WinError 5] Access is denied: 'C:\\Users\\nkim0\\AppData\\Local\\Packages\\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\\LocalCache\\local-packages\\Python310\\site-packages\\~-ndas\\_libs\\algos.cp310-win_amd64.pyd'
Check the permissions.


[notice] A new release of pip is available: 23.1.2 -> 25.1.1
[notice] To update, run: C:\Users\nkim0\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


In [91]:
import pandas as pd

In [96]:
def insert_to_table(data: pd.DataFrame, conn_string:str, table_name:str):
    db = create_engine(conn_string) # creates a connection to the database using SQLAlchemy
    conn = db.connect() # Establishes a database connection
    data.to_sql(table_name, conn, if_exists="replace", index=False)
    conn.close()

In [97]:
# read from the .parquet file

def load() -> pd.DataFrame:
    data = pd.DataFrame()
    for parquet_file in glob.glob(os.path.join(staging_data_dir, "*.parquet")):
        data = pd.concat([pd.read_parquet(parquet_file),data])

    #insert_to_table(data, table_name)
    insert_to_table(data = data, conn_string = conn_string, table_name = 'ml_car_data')

    return data

data = load()
print(data.shape)

ProgrammingError: (psycopg2.errors.InsufficientPrivilege) permission denied for schema public
LINE 2: CREATE TABLE ml_car_data (
                     ^

[SQL: 
CREATE TABLE ml_car_data (
	car_model TEXT, 
	year_of_manufacture BIGINT, 
	price FLOAT(53), 
	fuel TEXT
)

]
(Background on this error at: https://sqlalche.me/e/20/f405)