In [None]:
# ========== SETUP AND IMPORTS ==========
import pandas as pd
import os
import yfinance as yf
from sqlalchemy import create_engine
from dotenv import load_dotenv
from tqdm import tqdm
from pathlib import Path
import logging
import time
import re
from datetime import datetime

# Setup logging to record errors to a file
logging.basicConfig(filename='data_processing_errors.log', level=logging.ERROR, 
                    format='%(asctime)s %(levelname)s:%(message)s')

# Debugging mode - set to True for debugging, DEBUG_LEVEL controls verbosity
DEBUG = True
DEBUG_LEVEL = 2  # Level 1: Basic; Level 2: Detailed

# Load environment variables for server credentials
load_dotenv(r'C:\Users\Lane\Documents\Projects\trading_bot\programs\server_credentials.env')
db_user = os.getenv('DB_USER')
db_password = os.getenv('DB_PASSWORD')
db_host = os.getenv('DB_HOST')
db_port = os.getenv('DB_PORT')
db_name = os.getenv('DB_NAME')

# Database engine for PostgreSQL connection
engine = create_engine(f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}')

# Paths for Inputs
incoming_data_paths = [
    r'C:\Users\Lane\Documents\Projects\trading_bot\data\old data\Accounts_History_2021.csv',
    r'C:\Users\Lane\Documents\Projects\trading_bot\data\old data\Accounts_History_2022.csv',
    r'C:\Users\Lane\Documents\Projects\trading_bot\data\old data\Accounts_History_2023.csv',
    r'C:\Users\Lane\Documents\Projects\trading_bot\data\old data\Accounts_History_2024.csv'
]
input_master_data = r'C:\Users\Lane\Documents\Projects\trading_bot\programs\master_data14.csv'

# Paths for Outputs
cleaned_ledger_data_output_path = r'C:\Users\Lane\Documents\Projects\trading_bot\data\old data'  # Fixed output folder
    date_string = datetime.now().strftime("%Y%m%d") # Date string for output file, BELOW: Variable output file name
    consolidated_data_output_path = os.path.join(cleaned_ledger_data_output_path, f"cleaned_ledger_data_{date_string}.csv")
report_symbol_verification = r'C:\Users\Lane\Documents\Projects\trading_bot\data\old data\report-symbol_verification.csv'
output_master_data69 = r'C:\Users\Lane\Documents\Projects\trading_bot\programs\master_data69.csv'

In [None]:
# ========== DATA CLEANING FUNCTION ==========
def clean_fidelity_data(incoming_data_paths, cleaned_ledger_data_output_path):
    """
    Cleans multiple Fidelity data files, combines them into one consolidated file,
    and saves it as cleaned_ledger_data_YYYYMMDD.csv.
    """
    
    combined_data = [
        pd.read_csv(file_path, on_bad_lines='skip').apply(lambda x: x.str.strip() if x.dtype == "object" else x)
        for file_path in incoming_data_paths
    ]

    consolidated_data = pd.concat(combined_data, ignore_index=True)

    consolidated_data = consolidated_data.rename(
        columns={
            'Run Date': 'transaction_date',
            'Account': 'portfolio_name',
            'Action': 'notes',
            'Symbol': 'symbol',
            'Description': 'asset_name',
            'Quantity': 'quantity',
            'Price': 'price',
            'Amount': 'transaction_amount',
            'Commission': 'commission',
            'Fees': 'fees'
        }
    )

    consolidated_data['commission'] = consolidated_data['commission'].fillna(0)
    consolidated_data['fees'] = consolidated_data['fees'].fillna(0)

    consolidated_data.drop(columns=['Type', 'Exchange Quantity', 'Exchange Currency', 'Currency', 'Exchange Rate',
                               'Accrued Interest', 'Settlement Date'], inplace=True, errors='ignore')

    final_columns_order = ['symbol', 'asset_name', 'quantity', 'price', 'transaction_amount',
                           'commission', 'fees', 'portfolio_name', 'transaction_date', 'notes']
    consolidated_data = consolidated_data[final_columns_order]

    consolidated_data['transaction_date'] = pd.to_datetime(consolidated_data['transaction_date'], format='%m/%d/%Y').dt.strftime('%Y-%m-%d')

    consolidated_data['symbol'] = consolidated_data['symbol'].astype(str).str.replace('-', '', n=1)

    # Save the consolidated data using the pre-defined output path
    consolidated_data.to_csv(consolidated_data_output_path, index=False)
    print("Cleaned Ledger data saved. Please review the data for errors.")

    # Gate: Prompt user to review the cleaned data  
    review_data = input("Have you reviewed the cleaned data for errors? (y/n): ").strip().lower()  
    if review_data == 'y':  
        print("Program moving on to update master_data69") 
        return consolidated_data  
    else:  
        print("Data Processing Concluded")  
        exit()  # Terminate the program  

In [None]:
# ========== MASTER DATA69 UPDATE FUNCTION ==========
def update_master_data(cleaned_ledger_data, input_master_data):
    """Adds symbols from cleaned data to master data, ensuring only valid symbols are added."""
    try:
        # Load existing master data if it exists, or create an empty DataFrame
        if os.path.exists(input_master_data):
            master_data = pd.read_csv(input_master_data)
        else:
            master_data = pd.DataFrame(columns=['symbol'])

        # Set of existing symbols in the master data for quick comparison
        master_symbols_set = set(master_data['symbol'].str.upper())  # Standardized to uppercase for comparison

        # Regular expression to filter out non-standard symbols
        stock_symbol_pattern = re.compile(r'^[A-Z]{1,5}$')  # Matches 1-5 uppercase letters

        # Extract and filter symbols from the cleaned data
        cleaned_symbols = cleaned_ledger_data['symbol'].str.upper()  # Convert to uppercase for case-insensitive comparison
        valid_symbols = cleaned_symbols[cleaned_symbols.str.match(stock_symbol_pattern)]
        new_symbols_set = set(valid_symbols)

        # Identify unique new symbols that are not already in master data
        unique_new_symbols = new_symbols_set - master_symbols_set

        # Create a DataFrame for the new symbols
        new_entries = pd.DataFrame({'symbol': list(unique_new_symbols)})
        
        # Concatenate the new entries with the master data
        updated_data = pd.concat([master_data, new_entries], ignore_index=True)

        # Save the updated master data to the fixed output file for master_data69
        updated_data.to_csv(output_master_data69, index=False)  # Use the fixed output path
        print(f"Master data updated and saved as: {output_master_data69}")

        return updated_data  # Return the DataFrame for the next step

    except Exception as e:
        logging.error(f"Error during master data update: {e}")
        if DEBUG and DEBUG_LEVEL >= 2:
            print(f"[DEBUG] Error during master data update: {e}")
        return None

In [None]:
# ========== ENRICH MASTER DATA WITH YFINANCE ==========
def enrich_master_data(updated_master_data):
    """
    Enriches the master data with additional information from YFinance, 
    handling cases with missing data and filling empty columns.
    """

    for idx, symbol in enumerate(tqdm(master_data['symbol'].unique(), desc="Enriching data", unit="symbol")):
        try:
            stock = yf.Ticker(symbol)
            info = stock.info

            longname = info.get('longName', 'Unknown')
            sector = info.get('sector', 'Unknown')
            industry = info.get('industry', 'Unknown')
            history = stock.history(period="max")
            first_traded = history.index.min().strftime('%Y-%m-%d') if not history.empty else 'Unknown'

            master_data.loc[master_data['symbol'] == symbol, ['longname', 'sector', 'industry', 'first_traded']] = [
                longname, sector, industry, first_traded
            ]

            if DEBUG and DEBUG_LEVEL >= 2 and idx % 50 == 0:
                print(f"[DEBUG] Enriched symbol {symbol}: longname={longname}, sector={sector}, industry={industry}")

        except Exception as e:
            logging.error(f"Error enriching symbol {symbol}: {e}")
            if DEBUG and DEBUG_LEVEL >= 2:
                print(f"[DEBUG] Error enriching symbol {symbol}: {e}")
            master_data.loc[master_data['symbol'] == symbol, ['longname', 'sector', 'industry', 'first_traded']] = [
                'Unknown', 'Unknown', 'Unknown', 'Unknown'
            ]

    # Fill missing values in other columns
    master_data['asset_name'] = master_data['asset_name'].fillna('Unknown')
    master_data['industry'] = master_data['industry'].fillna('Unknown')
    master_data['first_traded'] = master_data['first_traded'].fillna('Unknown')

    return master_data

In [None]:
# ========== DATABASE UPLOAD WITH USER PROMPT ==========
def upload_to_database(data):
    """Prompts the user to confirm if they want to upload the enriched master data to PostgreSQL."""
    user_input = input("Do you want to upload the data to PostgreSQL? (y/n): ").strip().lower()
    if user_input == 'y':
        try:
            data.to_sql('asset_ledger', con=engine, if_exists='append', index=False)
            print("Data successfully inserted into the database.")
            print("Program run successfully, new master_data file created, uploaded to PostgreSQL.")
        except Exception as e:
            logging.error(f"Error during database upload: {e}")
            if DEBUG and DEBUG_LEVEL >= 2:
                print(f"[DEBUG] Error during database upload: {e}")
    else:
        print("Program run successfully, new master_data file created, not uploaded to PostgreSQL.")

In [None]:
# ========== MAIN SCRIPT EXECUTION ==========
if __name__ == "__main__":
    # Step 1: Clean the new data files
    cleaned_ledger_data = clean_fidelity_data(incoming_data_paths, cleaned_ledger_data_output_path)
    
    if cleaned_ledger_data is not None: 
        if DEBUG and DEBUG_LEVEL >= 1:
            print(f"[DEBUG] Total cleaned files processed: {len(cleaned_ledger_data)}")  

        # Step 2: Update master data with new symbols from the cleaned data
        updated_master_data = update_master_data(cleaned_ledger_data, input_master_data)  # Get the updated DataFrame
        
        if updated_master_data is not None:
            # Save the updated master data with a new version
            new_input_master_data = increment_filename_version(input_master_data)
            updated_master_data.to_csv(new_input_master_data, index=False)
            print(f"New master data created and saved as: {new_input_master_data}")

            # Prompt user to confirm before proceeding with enrichment and database upload
            proceed = input("Do you want to proceed with enrichment and database upload? (y/n): ").strip().lower()
            if proceed != 'y':
                print("Process terminated by user after master data generation. No enrichment or upload performed.")
                exit()  # Exit the program if user does not wish to proceed

            # Step 3: Enrich the updated master data
            enriched_master_data = enrich_master_data(updated_master_data)  # Use the updated DataFrame
            enriched_master_data.to_csv(new_input_master_data, index=False)  # Overwrite with enriched data
            print(f"Enriched master data saved to: {new_input_master_data}")
            
            # Step 4: Prompt for database upload
            upload_to_database(enriched_master_data)