# Pipeline Notebook
- This notebook was used to create the script pipeline.py to run in the main notebook
- Calling modular functions from other files in this repository
- Logging is added. 

In [1]:
import logging
from load_data import load_csv
from transformation import split_and_clean
from currency_converter import convert_historical_prices
from to_postgre import pipe_to_sql
from con_checker import test_postgres_connection

In [2]:
# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[logging.StreamHandler()]
)

### Checking for Server Connection
Using script that has connection information, and rules. 

In [3]:
test_postgres_connection()

✅ PostgreSQL connection successful!


True

## Pipeline Steps 
- Step 0: Checking Connection to SQL Server
- Step 1: This will be used to convert raw CSV file to a DataFrame. 
- Step 2: To Clean and to Denormalize all element for easier query. 
- Step 3: Using ExhchangeRateAPI to convert to USD (making another column) from original currency at the time of purchased for later analysis. 
- Step 4: Lastly transporting it to my PostgreSQL Server on my network

In [None]:
#Cleaning & Transporting
def main():
    source = '2024-2019_raw.csv'
    logging.info("Starting Hermes data pipeline...")
    
    # Step 0 - Testing Connection
    if not test_postgres_connection():
        logging.error('FAILED: PostgreSQL connection failed. Pipeline stopped.')
        return
    
    # Step 1 - Loading Data
    try:
        data = load_csv(source)
        logging.info(f"SUCCESS: Loaded data from '{source}' — {len(data)} rows.")
    except Exception as e:
        logging.exception(f"FAILED: Failed to Load Data from '{source}' ")

    #Step 2 - Cleaning Data
    try: 
        cleaned = split_and_clean(data)
        logging.info('SUCCESS: Data Cleaning and Transformation Completed')
    except Exception as e:
        logging.exception('FAILED: Failed to be Converted & Transformed')
        
    # Step 3 - Rates conversion to USD using ExchangerateAPI.io 
    try:
        ready = convert_historical_prices(cleaned)
        logging.info('SUCCESS: Currency Conversion Completed')
    except Exception as e:
        logging.exception('FAILED: Currency Conversion not Complete. Please Check API Settings/Subscriptions')
        
    
    # Step 4 - Send to Database
    try:
        transported = pipe_to_sql(ready, table_name='handbag', schema='hermes')
        logging.info('SUCCESS: Data loaded into PostgreSQL.')
    except Exception as e:
        logging.exception('FAILED: Data was not able to be loaded into PostgreSQL. Please Check Connection.')
        
    logging.info("SUCCESS: Pipeline complete! Check your Database now!")
    
    return ready


In [6]:
main()

✅ Successfully saved DataFrame to 'handbag' table.


Unnamed: 0,Bag_Type,Date_Purchased,Material_Style,Price,Currency,Bag_Size,Leather,Exotic_Skin,Stitching,Limited_Edition,Price_USD
0,Kelly,1/2/24,Togo Retourne,8600.0,EUR,25,Togo,Regular Leather,Retourne,Regular,9409.60
1,Kelly,1/2/24,Epsom Sellier,9500.0,EUR,25,Epsom,Regular Leather,Sellier,Regular,10394.33
2,Kelly,1/2/24,Swift,9000.0,EUR,25,Swift,Regular Leather,Sellier,Regular,9847.26
3,Kelly,1/2/24,Mysore Chèvre Sellier,9600.0,EUR,25,Mysore Chèvre,Regular Leather,Sellier,Regular,10503.74
4,Kelly,1/2/24,Shiny Crocodile Sellier,42400.0,EUR,25,Exotic,Croc,Sellier,Regular,46391.54
...,...,...,...,...,...,...,...,...,...,...,...
1554,Evelyne,1/1/19,Sellier Epsom,3330.0,GBP,33,Epsom,Regular Leather,Sellier,Regular,4246.91
1555,Evelyne,1/22/19,Sellier Epsom,5450.0,CAD,33,Epsom,Regular Leather,Sellier,Regular,4079.80
1556,Evelyne,1/22/19,Sellier Hunter Cowhide,5350.0,USD,33,Hunter,Regular Leather,Sellier,Regular,5350.00
1557,Evelyne,2/4/19,Sellier Hunter,6000.0,USD,33,Hunter,Regular Leather,Sellier,Regular,6000.00


### I also made this into a .py file to run on main notebook. 