In [111]:
import pandas as pd
import requests
import os
import json

def fetch_data(source, is_local=True):
    if is_local:
        # Read from a local file
        if os.path.isfile(source):
            if source.endswith('.csv'):
                return pd.read_csv(source)
            elif source.endswith('.json'):
                try:
                    with open(source, 'r') as f:
                        data = json.load(f)
                    df = pd.json_normalize(data['users'])
                    return df
                except ValueError as e:
                    raise ValueError(f"Failed to parse JSON. Error: {e}")
            else:
                raise ValueError("Unsupported file format. Only CSV and JSON are supported.")
        else:
            raise FileNotFoundError("File not found at the specified path.")
    else:
    # Download from a URL
        response = requests.get(source)
    
    if response.status_code == 200:
        content_type = response.headers.get('Content-Type', '')

        if 'zip' in content_type:
            import zipfile
            import io
            with zipfile.ZipFile(io.BytesIO(response.content)) as z:

                extracted_files = z.namelist()
                
                for filename in extracted_files:
                    if filename.endswith('.csv'):
                        with z.open(filename) as f:
                            return pd.read_csv(f)
                    elif filename.endswith('.json'):
                        with z.open(filename) as f:
                            return pd.read_json(f)
                
                raise ValueError("No CSV or JSON files found in the ZIP archive.")

        elif 'application/json' in content_type:
            data = response.json()
            return pd.json_normalize(data)

        elif source.endswith('.csv'):
            return pd.read_csv(io.StringIO(response.text))
        
        else:
            raise ValueError("Unsupported URL format. Only CSV, JSON, and ZIP are supported.")
    else:
        raise ConnectionError(f"Failed to fetch data from URL. Status code: {response.status_code}")


In [89]:
def modify_dataframe(df, action, column_name=None, data=None):   
    if action == 'add':
        if column_name is None or data is None:
            raise ValueError("To add a column, you must provide both a column name and data.")
        if len(data) != len(df):
            raise ValueError("The length of the data must match the number of rows in the DataFrame.")
        
        # Add the column
        df[column_name] = data
        print(f"Column '{column_name}' added.")
    
    elif action == 'remove':
        if column_name is None:
            raise ValueError("To remove a column, you must provide the column name.")
        if column_name not in df.columns:
            raise ValueError(f"Column '{column_name}' does not exist in the DataFrame.")
        
        # Remove the column
        df.drop(columns=[column_name], inplace=True)
        print(f"Column '{column_name}' removed.")
    
    else:
        raise ValueError("Action must be either 'add' or 'remove'.")
    
    return df

# Part 4

In [93]:
def export_dataframe(df, filename):
    if filename.endswith('.csv'):
        df.to_csv(filename, index=False)  
        print(f"DataFrame exported as CSV to {filename}")
    elif filename.endswith('.json'):
        df.to_json(filename, orient='records', lines=True) 
        print(f"DataFrame exported as JSON to {filename}")
    else:
        print("Unsupported file format. Please use '.csv' or '.json'.")

def write_dataframe_to_db(df, conn, cursor, table_name):
    quoted_columns = ', '.join([f'"{col}"' for col in df.columns]) 

    columns_with_types = ', '.join([f'"{col}" TEXT' if df[col].dtype == 'object' else f'"{col}" REAL' for col in df.columns])

    create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns_with_types});"
    cursor.execute(create_table_sql)

    insert_sql = f"INSERT INTO {table_name} ({quoted_columns}) VALUES ({', '.join(['?' for _ in df.columns])});"

    for row in df.itertuples(index=False, name=None):
        cursor.execute(insert_sql, row)

    conn.commit()
    print(f"DataFrame successfully written to table '{table_name}' in the database.")

# Controller Code

In [99]:
import json
import pandas as pd
import sqlite3
import csv
from io import StringIO

df = None

conn = sqlite3.connect('./database.db')
cursor = conn.cursor()


In [None]:
def run():
    option = get_option()
    print("You selected: ", option)
    if option == '6':
        print("Exiting program...")
        return False

    do(option)

    return True

def get_option():
    print("Validating dataframe...")
    if df is None:
        print("There is currently no dataframe")
        print("ETL Pipeline: What would you like to do?")
        valid_options = ['1','2','6']
        print("""Available Options:
            1: Fetch or Upload Data (CSV, JSON, SQL DB via upload or API call)
            2: Convert file types (CSV, JSON, SQL DB). THIS ACTION IS INDEPENDENT OF EXISTING DATAFRAME.
            6: Exit program
        """)

        while True:
            print("Waiting for input...")
            option = input("Select an option: ")

            if option not in valid_options:
                print("Invalid option! Valid options are: ", valid_options)
                continue
            
            break
    
    else:
        print("Existing data detected!")
        print("ETL Pipeline: What would you like to do?")
        while True:
            print("""Available Options:
            1: Fetch or Upload Data (Overwrite existing data) 
            2: Convert file types (CSV, JSON, SQL DB). THIS ACTION IS INDEPENDENT OF EXISTING DATAFRAME.
            3: Modify dataframe (Add or remove column)
            4: Export/Store Data to disk or database
            5: Summarize current dataframe
            6: Exit program
            """)
            valid_options = ['1','2','3','4','5','6']
            print("Waiting for input...")
            option = input("Select an option: ")
            if option not in valid_options:
                print("Invalid option! Valid options are: ", valid_options)
                continue
            if option == '1':
                print("WARNING: If existing data has not been exported, it will be lost. Do you wish to continue (Y/N)")
                print("Waiting for input...")
                y_n = input("Do you wish to continue (Y/N)")
                if y_n.upper() == 'Y': 
                    break 
                else: 
                    continue
            
            break

    return option
    

def do(option):
    global df
    option = int(option)
    if option == 1:
        df = option_1()
        print("SUCESSFULLY UPLOADED DATAFRAME. Printing head...")
        print("---------------------------------------------------------------------------------------------------------")
        print(df.head())
        print("---------------------------------------------------------------------------------------------------------")
    
    elif option == 2:
        option_2()

    elif option == 3:
        df = option_3()
        print("SUCESSFULLY MODIFIED DATAFRAME. Printing head...")
        print("---------------------------------------------------------------------------------------------------------")
        print(df.head())
        print("---------------------------------------------------------------------------------------------------------")

    elif option == 4:
        option_4()
        print("SUCESSFULLY EXPORTED DATAFRAME")

    elif option == 5:
        option_5()

    else:
        print("Invalid option")
    
    return

def option_1():
    print("Would you like to get your data locally or via API \n 1: Locally \n 2: API")
    
    while True:
        print("Waiting for input...")
        option = input("Select an option: ")

        valid_options = ['1', '2']
        if option not in valid_options:
            print("Invalid option! Valid options are: ", valid_options)
            continue

        break

    option = int(option)

    if option == 1:
        print("You selected: Locally. Please put the file into the input_data folder and provide the file name, incuding its extension (e.g. data.csv, data.json, etc)")
        print("Waiting for input...")
        file_name = input("Enter file name: ")
        source = f"input_data/{file_name}"

        return fetch_data(source)
    
    if option == 2:
        print("You selected: via API. Please provide the URL for the API to retrieve the data. Note, the URL must link diirectly to JSON data, or a CSV or JSON file or zip archive containing the CSV/JSON file")
        print("Waiting for input...")
        source = input("input URL: ")

        return fetch_data(source, False)


def option_2():
    print("What file type would you like to convert FROM? Options include: CSV, JSON")
    while True:
        print("Waiting for input...")
        file_from = input("Select an option: ").upper()

        valid_options = ['CSV', 'JSON']
        if file_from.upper() not in valid_options:
            print("Invalid option! Valid options are: ", valid_options)
            continue

        break

    print("What file type would you like to convert TO? Options include: CSV, JSON, SQL")
    while True:
        print("Waiting for input...")
        file_to = input("Select an option: ").upper()

        valid_options = ['CSV', 'JSON', 'SQL']
        if file_to.upper() not in valid_options:
            print("Invalid option! Valid options are: ", valid_options)
            continue

        break

    print(f"You are converting a file from: {file_from} to {file_to}. Please put the input file into the input_data folder and provide the file name, incuding its extension (e.g. sample_data.csv, random_data.json, etc)")
    print("Waiting for input...")
    file_name = input("Enter file name: ")
    source = f"input_data/{file_name}"

    local_df = fetch_data(source)

    print(f"The file {file_name} will be converted to {file_to}")

    if file_to.upper() in ['CSV', 'JSON']:
        print("Please provide what you want the file to be named, EXCLUDING the extension. (e.g. 'output_file', 'my_file', etc.) The file will be written to the folder output_data")        
        print("Waiting for input...")
        file_name = input("Enter name the file") + f".{file_to.lower()}"
        print("file name:", file_name)
        export_dataframe(local_df, f"output_data/{file_name}")
        print(f"SUCESSFULLY COVERTED TO {file_to}. Path to file: output_data/{file_name}")

    else:
        print("Please provide what you want the table name in the database to be.")
        print("Waiting for input...")
        table_name = input("Enter name the table")        
        print("Writing dataframe to local database, 'database.db'...")

        write_dataframe_to_db(local_df, conn, cursor, table_name)
        print("SUCESSFULLY WROTE TO database.db AS TABLE: ", table_name)

def option_3():
    global df
    print("How would you like to modify the dataframe? \n 1: Add column \n 2: Remove column")
    while True:
        print("Waiting for input...")
        option = input("Select an option: ")

        valid_options = ['1', '2']
        if option not in valid_options:
            print("Invalid option! Valid options are: ", valid_options)
            continue

        break
    
    if option == '1':
        print("You selected: Add column. Please provide the name of the column you'd like to add")
        print("Waiting for input...")
        name = input("Enter name of new column")
        print("Please provide the data you'd like to add, separated by commas")
        data = input("Enter data separated by commas").split(',')
        return modify_dataframe(df, 'add', name, data)

    
    if option == '2':
        print("You selected: Remove column. Please provide the name of the column you'd like to remove")        
        print("Waiting for input...")
        name = input("Enter name of column to remove")
        return modify_dataframe(df, 'remove', name)

def option_4():
    print("How would you like the dataframe to be stored? Options: \n 1: Write to local disk as CSV or JSON \n 2: Write to SQL database")
    print("Waiting for input...")
    while True:
        print("Waiting for input...")
        option = input("Select an option: ")

        valid_options = ['1', '2']
        if option not in valid_options:
            print("Invalid option! Valid options are: ", valid_options)
            continue

        break

    global df
    if option == '1':
        print("You selected: Write to local disk. Please provide what you want the file to be named, including the extension. (e.g. 'output_file.csv', 'my_file.json', etc.) Please note only CSV and JSON is supported. The file will be written to the folder output_data")        
        print("Waiting for input...")
        file_name = input("Enter name the file")
        export_dataframe(df, f"output_data/{file_name}")
        print(f"SUCESSFULLY EXPORTED DATAFRAME. Path to file: output_data/{file_name}")

    if option == '2':
        global cursor
        global conn

        print("You selected: Write to SQL database. Please provide what you want the table name in the database to be.")
        print("Waiting for input...")
        table_name = input("Enter name the table")        
        print("Writing dataframe to local database, 'database.db'...")

        write_dataframe_to_db(df, conn, cursor, table_name)
        print("SUCESSFULLY WROTE DATAFRAME TO database.db AS TABLE: ", table_name)

def option_5():
    global df
    print("Generating summary of dataframe...")
    print(df.info())
    print(df.describe())


In [125]:
def main():
    while True:
        res = run()
        if not res:
            return