In [None]:
import requests
import pandas as pd
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
from datetime import datetime, timedelta
from dotenv import load_dotenv
import os

load_dotenv()
SNF_USER = os.getenv('snf_user')
SNF_PASSWORD = os.getenv('snf_password')
SNF_ACCOUNT = os.getenv('snf_account_name')
SNF_WAREHOUSE = os.getenv('snf_warehouse')
SNF_DATABASE = os.getenv('snf_db')
API_KEY = os.getenv('API_KEY')

In [None]:
def fetch_technical_indicator(symbol, indicator_type, period, start_date, end_date):

    base_url = f"https://financialmodelingprep.com/api/v3/technical_indicator/1day/{symbol}"
    params = {
        'type': indicator_type,
        'period': period,
        'from': start_date,
        'to': end_date,
        'apikey': API_KEY
    }
    response = requests.get(base_url, params=params)
    if response.status_code == 200:
        data = response.json()
        # Handle both possible response formats
        historical_data = data if isinstance(data, list) else data.get('historical', [])
        
        if historical_data:
                   data = response.json()
        # Handle both possible response formats
        historical_data = data if isinstance(data, list) else data.get('historical', [])
        
        if historical_data:
            df = pd.DataFrame(historical_data)
            df['symbol'] = symbol  # Add symbol column
            
            # The API returns the indicator name as the column regardless of the period,
            # so we manually rename it to include the period for clarity.
            period_column_name = f"{indicator_type}_{period}d"
            if indicator_type in df.columns:
                df.rename(columns={indicator_type: period_column_name}, inplace=True)
            else:
                print(f"The expected indicator '{indicator_type}' is not present in the API response.")
                return pd.DataFrame()

            # Filter the DataFrame to include only the date, symbol, and the renamed indicator column
            df = df[['date', 'symbol', period_column_name]]
            df.columns = [column.upper() for column in df.columns]  # Adjust as necessary to match Snowflake's schema
            df['DATE'] = pd.to_datetime(df['DATE'],errors='coerce').dt.date
            return df
    else:
            print(f"Failed to fetch data for {symbol} with indicator {indicator_type}. Status code: {response.status_code}")
    return pd.DataFrame()

In [None]:
fetch_technical_indicator('AAPL', 'sma', 5, '2024-02-10', '2024-02-27')

In [None]:
tech_indicators = ['sma', 'ema', 'wma', 'dema', 'tema', 'williams', 'rsi', 'adx', 'standardDeviation']
time_period = [5, 10, 15, 20]

In [None]:
def fetch_and_combine_all_indicators_for_symbol(symbol, tech_indicators, time_periods, start_date, end_date):
    # Initialize an empty DataFrame
    combined_df = pd.DataFrame()

    for indicator in tech_indicators:
        for period in time_periods:
            df = fetch_technical_indicator(symbol, indicator, period, start_date, end_date)
            
            # Check if the DataFrame is not empty and contains the expected data
            period_column_name = f"{indicator.upper()}_{period}D"  # Ensure column name matches expected format
            if not df.empty and period_column_name in df.columns:
                # If the combined_df is empty, initialize it with the current df
                if combined_df.empty:
                    combined_df = df
                else:
                    # Since 'DATE' and 'SYMBOL' are already in the combined_df, ensure they are not duplicated
                    df.drop(columns=['SYMBOL'], inplace=True, errors='ignore')
                    
                    # Merge the new df with the combined_df on 'DATE'
                    combined_df = pd.merge(combined_df, df, on=['DATE'], how='outer')

    # Ensure the 'DATE' column is correctly formatted as dates
    combined_df['DATE'] = pd.to_datetime(combined_df['DATE'], errors='coerce').dt.date

    # Sort the DataFrame by 'DATE' to have the data in chronological order
    combined_df.sort_values(by='DATE',ascending=False, inplace=True)
    combined_df.reset_index(drop = True)
    return combined_df


In [None]:
## get master stock list from the financial_ratios table instead of calling the api again 
import snowflake.connector

def fetch_master_stock_list():
    # Snowflake connection details
    ctx = snowflake.connector.connect(
        user=SNF_USER,
        password=SNF_PASSWORD,
        account=SNF_ACCOUNT,
        warehouse=SNF_WAREHOUSE,
        database=SNF_DATABASE,
        schema='PUBLIC'
    )
    
    # Create cursor object
    cs = ctx.cursor()
    
    try:
        # Execute the query to fetch distinct stock symbols
        cs.execute("SELECT DISTINCT symbol FROM stock_analytics.public.financial_ratios")
        
        # Fetch all the results
        result = cs.fetchall()
        
        # Extract symbols from the result set
        master_stock_list = [row[0] for row in result]
        return master_stock_list
        
    except Exception as e:
        print(f"Failed to fetch master stock list: {e}")
        return []
    finally:
        # Close cursor and connection
        cs.close()
        ctx.close()

# Use the fetched master_stock_list for further processing
master_stock_list = fetch_master_stock_list()
print(f"Fetched {len(master_stock_list)} symbols from Snowflake.")


### finally upload to snowflake 

In [None]:
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
import pandas as pd
from datetime import datetime, timedelta

# Assuming fetch_and_combine_all_indicators_for_symbol and fetch_master_stock_list are already defined
from datetime import date, datetime, timedelta
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas

def upload_technical_indicators_to_snowflake(master_stock_list):
    # Snowflake connection details
    ctx = snowflake.connector.connect(
        user=SNF_USER,
        password=SNF_PASSWORD,
        account=SNF_ACCOUNT,
        warehouse=SNF_WAREHOUSE,
        database=SNF_DATABASE,
        schema='PUBLIC'
    )

    tech_indicators = ['sma', 'ema', 'wma', 'dema', 'tema', 'williams', 'rsi', 'adx', 'standardDeviation']
    time_periods = [5, 10, 15, 20]
    end_date = date.today()
    start_date = end_date - timedelta(days=6*365)

    # Loop through each stock in the master_stock_list
    for symbol in master_stock_list:
        # Check if the ticker's data is already up-to-date
        yesterday = (date.today() - timedelta(days=1)).strftime('%Y-%m-%d')
        today = date.today().strftime('%Y-%m-%d')
        query = f"SELECT MAX(DATE) FROM STOCK_ANALYTICS.PUBLIC.TECHNICAL_INDICATORS WHERE SYMBOL = '{symbol}'"
        cursor = ctx.cursor()
        cursor.execute(query)
        result = cursor.fetchone()[0]
        max_date = result.strftime('%Y-%m-%d') if result else None

        if max_date in [yesterday, today]:
            print(f"Data for {symbol} already up-to-date. Skipping.")
            continue

        print(f"Processing {symbol}")
        try:
            combined_indicators_df = fetch_and_combine_all_indicators_for_symbol(symbol, tech_indicators, time_periods, start_date.strftime('%Y-%m-%d'), end_date.strftime('%Y-%m-%d'))

            if not combined_indicators_df.empty:
                write_pandas(ctx, combined_indicators_df, 'TECHNICAL_INDICATORS')
                print(f"Uploaded indicators for {symbol}")
            else:
                print(f"No data to upload for {symbol}")
        except KeyError as e:
            print(f"KeyError encountered for {symbol}: {e}. Skipping this ticker.")
        except Exception as e:
            print(f"Unexpected error encountered for {symbol}: {e}. Skipping this ticker.")

    ctx.close()





In [None]:
# # Fetch the master stock list from Snowflake
# # master_stock_list = ['AAPL', 'MSFT', 'NVDA']
# master_stock_list = fetch_master_stock_list()

# # Upload technical indicators for all stocks in the master list

# upload_technical_indicators_to_snowflake(master_stock_list)