### Setup

In [137]:
# import all necessary libraries
import os
import json
import pprint
import requests
import requests.exceptions
import pandas as pd
from sqlalchemy import create_engine
import numpy as np

# sql database connection set up
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "password"

### Data Retrieval

In [138]:
# retrieve remote data by URL, or ingest local file USER INPUT

def get_data(url=None, local_file=None, sql_db=None):
    if url != None:
        return get_api_response(url)
    elif local_file != None:
        return get_local_file(local_file)
    elif sql_db != None:
        try:
            return get_dataframe(user_id, pwd, host_name, sql_db)
        except Exception as e:
            return (f"Error fetching data from SQL table: {e}")
    else:
        return "Error: No valid data source was provided. Must either be url, local file, or sql database."
        
def get_api_response(url):
    try:
        response = requests.get(url)
        response.raise_for_status()

    except requests.exceptions.HTTPError as errh:
        return "An Http Error occurred: " + repr(errh)
    except requests.exceptions.ConnectionError as errc:
        return "An Error Connecting to the API occurred: " + repr(errc)
    except requests.exceptions.Timeout as errt:
        return "A Timeout Error occurred: " + repr(errt)
    except requests.exceptions.RequestException as err:
        return "An Unknown Error occurred: " + repr(err)
    result = pd.json_normalize(response.json())
    return result
        
def get_local_file(local_file):
    try:
        with open(local_file,'r') as file:
            if local_file.endswith('.json'):
                return pd.read_json(file)
            elif local_file.endswith('.csv'):
                return pd.read_csv(file)
            else:
                return "Error: Unsupported file type. Must be either json or csv."
    except Exception as e:
        return (f"Error reading local file: {e}")
    
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe

### Data Conversion & Storage

In [147]:
# csv, json, sql database table conversions USER CHOOSES

def convert_data(data, output_file=None, output_format=None, sql_db=None):
    try:
        if sql_db != None:
            set_dataframe(user_id, pwd, host_name, sql_db, data, input("Enter table name: "), input("Enter primary key: "))
        elif output_file != None:
            if output_format != None:
                if output_format == 'csv':
                    data.to_csv(output_file, index=False,)
                elif output_format == 'json':
                    data.to_json(output_file, orient='records', lines=True)
                else:
                    return "Error: Invalid output format."
            else:
                return "Error: output format not specified."
        return "Data converted and saved."
    except Exception as e:
        return f"Error: Unable to convert or save data {e}"
    
def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    df.to_sql(table_name, con=connection, index=False, if_exists='replace')
    connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
    
    connection.close()

### Data Modification

In [148]:
# transformations: column selection, column creation (any useful information you want eg. differences)

def modify_data(data, operation=None):
    if operation != None:
        if operation == "reduce":
            selection = input("List columns you want to keep separated by spaces:").split()
            data = data[selection]
        elif operation == "add":
            data = new_columns(data, str(input("Enter name of new column: ")), input(f"Enter conditional variable name from {data.columns}: "))
        elif operation == "none":
            data = data
        else:
            return "Error: invalid operation, choose either 'reduce', 'add', or 'none'."
    else:
        return "Error: No modification specified."
    return data

def new_columns(data, name, basis):
    cutoff = int(input("Enter cut-off value: "))
    conditions = [
        (data[basis]>=cutoff),(data[basis]<cutoff)
    ]
    results = [input("Value if higher than or equal to cutoff: "),input("Value if lower than cutoff: ")]
    data[name] = np.select(conditions,results)
    return data
    
    

### Data Summary

In [149]:
def data_summary(data):
    return (f"Number of records: {data.shape[0]} \n Number of columns: {data.shape[1]}")

### Execution

In [159]:
def ETLProcessor(ingest):
    dtype = input("State your input type, 'url', 'sqldatabase', or 'localfile': ")
    if dtype == "url":
        data=get_data(url=ingest)
    elif dtype == "localfile":
        data=get_data(local_file=ingest)
    elif dtype == "sqldatabase":
        data=get_data(sql_db=ingest)
    else:
        print("Error: Invalid input type.")
    print(data)
    print("PRE PROCESSING: " + data_summary(data))
    data=modify_data(data, input("Operation desired, 'add' columns,'reduce' columns, or 'none': "))
    print(data)
    print("POST PROCESSING: " + data_summary(data))
    otype = input("State desired output format, 'localfile' or 'sqldatabase': ")
    if otype == "localfile":
        data=convert_data(data, output_file=input("Output file name: "), output_format=input("Output format: "))
    elif otype == "sqldatabase":
        data=convert_data(data, sql_db=input("Database Name: "))
    else:
        print("Error: Invalid output type.")
    return data

### Example with CSV and API JSON Data Sources

In [160]:
examples=["http://universities.hipolabs.com/search?name=middle","survey.csv"]

In [161]:
cont = 1
while cont == 1:
    ETLProcessor(input("Enter your source: "))
    cont = int(input("Do you want to use the ETLProcessor? '1' for yes, '0' for no: "))

Enter your source: survey.csv
State your input type, 'url', 'sqldatabase', or 'localfile': localfile
           Timestamp         Year  Gender  Job  GuessAmountSpentThisMonth  \
0   10/18/2023 17:14  Fourth Year    Male  Yes                     2500.0   
1   10/18/2023 17:18   Third Year  Female   No                     2000.0   
2   10/18/2023 17:21  Fourth Year  Female  Yes                     1500.0   
3   10/18/2023 17:24  Fourth Year    Male   No                     3000.0   
4   10/18/2023 17:25  Fourth Year  Female   No                      150.0   
..               ...          ...     ...  ...                        ...   
70   10/25/2023 0:11  Second Year    Male   No                      300.0   
71   10/25/2023 1:28   First Year    Male   No                      400.0   
72   10/25/2023 9:09   Third Year  Female  Yes                     2000.0   
73  10/25/2023 11:08   Third Year  Female  Yes                      250.0   
74  10/25/2023 11:53  Second Year    Male  Yes      

State desired output format, 'localfile' or 'sqldatabase': sqldatabase
Database Name: localhost
Enter table name: uvaundergradspending
Enter primary key: TimeStamp
Do you want to use the ETLProcessor? '1' for yes, '0' for no: 0


### Note: I tested the to SQL table in second example but for some reason I can't see it in my SQL when i write to it, but I don't get any errors so not sure what's going on