In [1]:
import pandas as pd
import json
import os
import requests
import sqlite3

def fetch_data(source, source_type, file_type):
    if source_type == 'url':
        if file_type == 'csv':
            df = pd.read_csv(source)
        elif file_type == 'json':
            df = pd.read_json(source)
        else:
            raise ValueError("Unsupported file type. Please use 'csv' or 'json'.")
    elif source_type == 'local':
        if file_type == 'csv':
            df = pd.read_csv(source)
        elif file_type == 'json':
            df = pd.read_json(source)
        else:
            raise ValueError("Unsupported file type. Please use 'csv' or 'json'.")
    else:
        raise ValueError("Unsupported source type. Please use 'url' or 'local'.")
    
    return df

In [2]:
def convert_data_format(df, output_format='csv', output_file=None):
    # Convert the DataFrame to the desired format
    if output_format == 'json':
        result = df.to_json(orient='records', lines=True)
    elif output_format == 'csv':
        result = df.to_csv(index=False)
    elif output_format == 'sql':
        if output_file:
            conn = sqlite3.connect(output_file)
            df.to_sql('data_table', conn, if_exists='replace', index=False)
            conn.close()
            return
        else:
            raise ValueError("SQL format requires output_file path for the database.")
    else:
        raise ValueError("Unsupported output format")
    
    if output_file:
        with open(output_file, 'w') as f:
            f.write(result)

In [3]:
def modify_columns(df, columns_to_add=None, columns_to_remove=None):
    # Add or remove columns from the DataFrame
    if columns_to_remove:
        df = df.drop(columns=columns_to_remove)
    if columns_to_add:
        for col, val in columns_to_add.items():
            if col == 'High_low_diff':
                # Calculate difference between 'High' and 'Low' columns
                df[col] = df['High'] - df['Low']
            else:
                df[col] = val
    return df

In [4]:
def summarize_data(df, stage='Pre-Processing'):
    # Generate a summary of the DataFrame
    print(f"Summary of {stage} Data:")
    print(f"Number of records: {df.shape[0]}")
    print(f"Number of columns: {df.shape[1]}")
    print(df.head())

In [5]:
def etl_pipeline():
    try:
        # Input section for the user to specify details about the source
        source = input("Enter the data source (URL or local file path): ")
        source_type = input("Enter the source type (url/local): ").lower()
        file_type = input("Enter the file type (csv/json): ").lower()
        output_format = input("Enter the output format (csv/json/sql): ").lower()
        output_file = input("Enter the output file name with extension (e.g., output.csv or output.db): ")
        
        # Asking user whether they want to add or remove columns
        modify_choice = input("Do you want to modify columns? (yes/no): ").lower()
        
        columns_to_add = None
        columns_to_remove = None
        
        if modify_choice == 'yes':
            add_columns = input("Enter columns to add (format: {'new_column_name': 'default_value'}, or leave empty if none): ")
            remove_columns = input("Enter columns to remove (comma-separated list, or leave empty if none): ")
            
            if add_columns:
                columns_to_add = eval(add_columns)  # Convert string input to a dictionary
            if remove_columns:
                columns_to_remove = remove_columns.split(',')
        
        # Call the ETL functions with user inputs
        df = fetch_data(source, source_type, file_type)
        summarize_data(df, stage='Pre-Processing')
    
        # Modify columns based on user inputs
        df = modify_columns(df, columns_to_add, columns_to_remove)
        
        # Store the converted file
        convert_data_format(df, output_format, output_file)
        summarize_data(df, stage='Post-Processing')
        
        print("ETL Process Completed Successfully.")
    
    except Exception as e:
        print(f"Error occurred: {e}")

In [6]:
source = 'sap500.csv'  # Change to your CSV or JSON source
source_type = 'local'  # or 'local'
file_type = 'csv'  # 'csv'or 'json'
output_format = 'sql'  # 'csv' or 'json', 'sql'
output_file = 'output.db'  # 'output.csv' or 'output.db' for SQL

columns_to_add = {'High_low_diff': 'Calculated'}
columns_to_remove = ['Volume']

etl_pipeline()

Enter the data source (URL or local file path): sap500.csv
Enter the source type (url/local): local
Enter the file type (csv/json): csv
Enter the output format (csv/json/sql): sql
Enter the output file name with extension (e.g., output.csv or output.db): output.db
Do you want to modify columns? (yes/no): yes
Enter columns to add (format: {'new_column_name': 'default_value'}, or leave empty if none): {'High_low_diff': 'Calculated'}
Enter columns to remove (comma-separated list, or leave empty if none): Volume
Summary of Pre-Processing Data:
Number of records: 24315
Number of columns: 6
         Date       Open       High        Low      Close  Volume
0  1927-12-30  17.660000  17.660000  17.660000  17.660000       0
1  1928-01-03  17.760000  17.760000  17.760000  17.760000       0
2  1928-01-04  17.719999  17.719999  17.719999  17.719999       0
3  1928-01-05  17.549999  17.549999  17.549999  17.549999       0
4  1928-01-06  17.660000  17.660000  17.660000  17.660000       0
Summary of P

In [7]:
source = 'https://financialmodelingprep.com/api/v3/stock/list?apikey=2lewNDBSw4SM8weBDYlB6UEN8AcqUUKM'  # Change to your CSV or JSON source
source_type = 'url'  # or 'local'
file_type = 'json'  # 'csv'or 'json'
output_format = 'sql'  # 'csv' or 'json', 'sql'
output_file = 'output.db'  # 'output.csv' or 'output.db' for SQL
columns_to_add = None
columns_to_remove = ['exchangeShortName']

etl_pipeline()

Enter the data source (URL or local file path): https://financialmodelingprep.com/api/v3/stock/list?apikey=2lewNDBSw4SM8weBDYlB6UEN8AcqUUKM
Enter the source type (url/local): url
Enter the file type (csv/json): json
Enter the output format (csv/json/sql): sql
Enter the output file name with extension (e.g., output.csv or output.db): output.db
Do you want to modify columns? (yes/no): yes
Enter columns to add (format: {'new_column_name': 'default_value'}, or leave empty if none): 
Enter columns to remove (comma-separated list, or leave empty if none): exchangeShortName
Summary of Pre-Processing Data:
Number of records: 84178
Number of columns: 6
      symbol                                               name   price  \
0  PMGOLD.AX                                    Perth Mint Gold   17.94   
1     FEMACX  The First Trust Combined Series 447: Investmen...  943.94   
2      XSIAX                            Voya Credit Income Fund    9.70   
3      YACKX                          AMG Yacktm