In [4]:
import pandas as pd
import requests
from sqlite3 import connect

print('Executing ETL Pipeline' + '\n')


source_type = input("Do you want to pull data from an API or load a local file? (Enter 'api' or 'local'): ").lower()

if source_type == 'api':

    url = input("Enter the URL of the API to pull from: ")
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        print(f"Successfully fetched data from {url}.")

        if isinstance(data, dict) and 'data' in data:
            df = pd.DataFrame(data['data'])
            print(f"API returned {len(df)} records and {len(df.columns)} columns.")
        else:
            df = pd.json_normalize(data)
            print(f"API normalized into {len(df)} records and {len(df.columns)} columns.")
    else:
        print(f"Error fetching API data: {response.status_code} - {response.text}")
        df = None
elif source_type == 'local':
    # Local file ingestion
    file_path = input("Enter the path to your local CSV or JSON file: ")
    try:
        if file_path.endswith('.csv'):
            df = pd.read_csv(file_path)
        elif file_path.endswith('.json'):
            df = pd.read_json(file_path)
        else:
            raise ValueError("Unsupported file format. Use CSV or JSON.")
        print(f"Successfully loaded local file '{file_path}' with {len(df)} records and {len(df.columns)} columns.")
    except Exception as e:
        print(f"Error loading file: {e}")
        df = None
else:
    print("Invalid input. Please enter 'api' or 'local'.")
    df = None

# (add/remove columns)
if df is not None:
    modify_choice = input("Do you want to add or remove columns? (Enter 'add', 'remove', or 'none'): ").lower()
    if modify_choice == 'add':
        new_col_name = input("Enter the name of the new column: ")
        default_value = input(f"Enter the default value for the new column '{new_col_name}': ")
        df[new_col_name] = default_value
        print(f"Added column '{new_col_name}' with default value '{default_value}'.")
    elif modify_choice == 'remove':
        col_to_remove = input("Enter the name of the column to remove: ")
        if col_to_remove in df.columns:
            df.drop(columns=[col_to_remove], inplace=True)
            print(f"Removed column '{col_to_remove}'.")
        else:
            print(f"Column '{col_to_remove}' does not exist in the dataset.")

# Save the dataset as CSV or SQL
if df is not None:
    save_choice = input("Do you want to save the dataset as CSV or SQL? (Enter 'csv' or 'sql'): ").lower()
    if save_choice == 'csv':
        output_csv = 'output_data.csv'
        df.to_csv(output_csv, index=False)
        print(f"Data saved to {output_csv} with {len(df)} records and {len(df.columns)} columns.")
    elif save_choice == 'sql':
        output_db = 'output_data.db'
        conn = connect(output_db)
        df.to_sql('data', conn, if_exists='replace', index=False)
        conn.close()
        print(f"Data saved to SQL database '{output_db}' with {len(df)} records and {len(df.columns)} columns.")
    else:
        print("Invalid input. Please enter 'csv' or 'sql'.")

print("ETL Pipeline execution completed.")


Executing ETL Pipeline

Do you want to pull data from an API or load a local file? (Enter 'api' or 'local'): local
Enter the path to your local CSV or JSON file: Global_Map_Full_Data_data.csv
Successfully loaded local file 'Global_Map_Full_Data_data.csv' with 593 records and 8 columns.
Do you want to add or remove columns? (Enter 'add', 'remove', or 'none'): none
Do you want to save the dataset as CSV or SQL? (Enter 'csv' or 'sql'): sql
Data saved to SQL database 'output_data.db' with 593 records and 8 columns.
ETL Pipeline execution completed.
