In [85]:
####CREATE TICKER SPECIFIC FILES

import os
import polars as pl
import glob
import gzip
from concurrent.futures import ThreadPoolExecutor, as_completed

# Function to process a single file
def process_file(file_path, ticker, year_month, output_dir):
    with gzip.open(file_path, 'rb') as f:
        df = pl.read_csv(f)
    df_filtered = df.filter(df['ticker'] == ticker)
    output_file = os.path.join(output_dir, f"{year_month}_{ticker}.csv")
    # Since writing to the same file from multiple threads is not safe, we return the filtered DataFrame
    return df_filtered, output_file

# Specify the directory containing the data files and other parameters
data_dir = "/Users/brandon/Documents/polygon_data/minute_aggs/"
tickers = ["NVDA","MSFT", "TSM", "AAPL","FICO"]  # List of tickers to process
tickers = ["CRWD", "NDVX", "GOOG", "META"]  # List of tickers to process

# Configure the maximum number of threads to use
max_workers = 24

# Loop through each ticker
for ticker in tickers:
    output_dir = f"/Users/brandon/Documents/stonk_bot_data/{ticker}/" 
    os.makedirs(output_dir, exist_ok=True)

    # Create a dictionary to store monthly dataframes
    monthly_dfs = {}

    # Loop through each year from 2019 to 2025
    for year in range(2024, 2025):
        for month in range(1, 13):
            month_dir = os.path.join(data_dir, str(year), "{:02d}".format(month))
            if not os.path.exists(month_dir):
                print(f"No data found for {year}-{month:02d}")
                continue

            # Collect all file paths to process
            file_paths = [os.path.join(month_dir, file) for file in os.listdir(month_dir) if file.endswith(".csv.gz")]

            # Use ThreadPoolExecutor to process files in parallel
            with ThreadPoolExecutor(max_workers=max_workers) as executor:
                # Create a future for each file
                futures = [executor.submit(process_file, file_path, ticker, f"{year}-{month:02d}", output_dir) for file_path in file_paths]

                # As futures complete, process their results
                for future in as_completed(futures):
                    df_filtered, output_file = future.result()
                    # Append or extend the DataFrame in monthly_dfs
                    year_month = os.path.basename(output_file).replace(f"_{ticker}.csv", "")
                    if year_month not in monthly_dfs:
                        monthly_dfs[year_month] = df_filtered
                    else:
                        monthly_dfs[year_month] = monthly_dfs[year_month].extend(df_filtered)
                    print(f"Processed {output_file}")

    # Output the combined monthly DataFrames to files
    for year_month, df in monthly_dfs.items():
        output_file = os.path.join(output_dir, f"{year_month}_{ticker}.csv")
        df.write_csv(output_file)
        print(f"Saved {output_file}")


In [81]:
#*******CREATE BASE DATA FRAME****************

import os
import os
import polars as pl
import datetime as dt
from datetime import timezone
import numpy as np
from polars import Config
import pytz

pl.Config.set_fmt_str_lengths(100)
pl.Config.set_tbl_rows(1000)
pl.Config(tbl_cols=-1)

tickers = ["NVDA"]
# tickers = ['NVDA', 'MSFT', 'AAPL', 'TSM']
# tickers = ['NVDA', 'MSFT', 'AAPL', 'TSM',
#            'GOOG', 'FICO']
# tickers = ["NVDA", "AAPL", "GOOG", "MSFT",
#            "TSM", "CRWD", "NVDX", "FICO"]  # List of tickers

# directory_path = f"/Users/brandon/Documents/stonk_bot_data/{ticker}/"

def combine_data_for_tickers(tickers):
    combined_df = None  # Initialize combined DataFrame

    for ticker in tickers:
        directory_path = f"/Users/brandon/Documents/stonk_bot_data/{ticker}/"

        try:
            # List all CSV files in the directory
            files = [os.path.join(directory_path, file) for file in os.listdir(directory_path)
                     if file.endswith(".csv")]
        except FileNotFoundError:
            print(f"Directory not found for ticker: {ticker}. Skipping...")
            continue  # Skip to the next ticker

        # Ensure there are files to process
        if not files:
            print(f"No CSV files found for ticker: {ticker}. Skipping...")
            continue

        # Read the first CSV file to infer the schema
        df = pl.read_csv(files[0])
        schema = {col: df[col].dtype for col in df.columns}

        # Create an empty list to store DataFrames
        dfs = []

        # Read each CSV file and append its DataFrame to the list, applying the schema
        for file in files:
            df = pl.read_csv(file, dtypes=schema)
            dfs.append(df)

        # Concatenate all DataFrames into a single DataFrame for the current ticker
        if dfs:  # Ensure dfs is not empty
            ticker_df = pl.concat(dfs)
            ticker_df = ticker_df.with_columns(pl.lit(ticker).alias('ticker'))

            # Append the DataFrame for the current ticker to the combined DataFrame
            if combined_df is None:
                combined_df = ticker_df
            else:
                combined_df = combined_df.vstack(ticker_df)

    return combined_df


df = combine_data_for_tickers(tickers)



# Drop index column if it exists
if 'index' in df.columns:
    df = df.drop('index')

# Drop duplicates
df = df.unique(subset=df.columns)


eastern = pytz.timezone('US/Eastern')
# Define a function to convert Unix timestamp in nanoseconds to EST datetime
def unix_ns_to_est(ts_ns):
    try:
        # Convert nanoseconds to seconds for timestamp
        timestamp_seconds = ts_ns / 1e9
        # Create a timezone-aware datetime object in UTC
        utc_datetime = dt.datetime.utcfromtimestamp(timestamp_seconds).replace(tzinfo=dt.timezone.utc)
        # Convert UTC to Eastern Time with daylight saving time consideration
        est_datetime = utc_datetime.astimezone(eastern)
        return est_datetime
    except Exception as e:
        print(f"Error converting timestamp {ts_ns}: {e}")
        return None

# Test the function with a sample value
test_value = df.select('window_start').head(1)[0, 0]
print(unix_ns_to_est(test_value))

# Assuming 'df' is your DataFrame and 'window_start' is a column with Unix timestamps in nanoseconds
# Add one minute to 'window_start' and create 'window_end'
df = df.with_columns((pl.col('window_start') + 60 * 1e9).cast(pl.Int64).alias('window_end'))

# Convert 'window_start' to EST datetime
df = df.with_columns(pl.col('window_start').map_elements(unix_ns_to_est, return_dtype=pl.Datetime).alias('window_start_est'))

# Convert 'window_end' to EST datetime
df = df.with_columns(pl.col('window_end').map_elements(unix_ns_to_est, return_dtype=pl.Datetime).alias('window_end_est'))

# df = agg_df

# Define trading hours
pre_market_start = dt.time(4, 0)  # Pre-market starts at 4:00 AM
trading_start = dt.time(13, 30)
trading_end = dt.time(20, 0)

# Create flags for regular trading hours, after hours, and pre-market hours
regular_trading_hours = pl.when(
    (pl.col('window_start_est').dt.time() >= trading_start) &
    (pl.col('window_start_est').dt.time() <= trading_end)
).then(1).otherwise(0)

after_hours = pl.when(
    (pl.col('window_start_est').dt.time() > trading_end) |
    (pl.col('window_start_est').dt.time() < pre_market_start)
).then(1).otherwise(0)

pre_market_hours = pl.when(
    (pl.col('window_start_est').dt.time() >= pre_market_start) &
    (pl.col('window_start_est').dt.time() < trading_start)
).then(1).otherwise(0)

# Add the new columns to the DataFrame
df = df.with_columns([
    regular_trading_hours.alias('regular_trading_hours'),
    after_hours.alias('after_hours'),
    pre_market_hours.alias('pre_market_hours'),
    df['window_start_est'].dt.date().alias('transaction_date')
])

df = df.filter(pl.col('regular_trading_hours') == 1)

2020-05-11 04:03:00-04:00


In [87]:
#########Apply Stock Splits

import requests
import logging
pl.Config.set_tbl_formatting("UTF8_FULL")
pl.Config.set_fmt_str_lengths(100)

API_KEY = 'DTc8n47jA_nApfGqD4aqKv9ntIHFOj0U'

def get_stock_splits(ticker):
    url = f"https://api.polygon.io/v3/reference/splits?ticker={ticker}&apiKey={API_KEY}"
    response = requests.get(url)
    response.raise_for_status()  # Raise an error for bad status codes
    splits = response.json().get("results", [])
    
    # print(splits)
    return splits

# List of tickers
tickers = ["NVDA","MSFT", "TSM", "AAPL","FICO",
           "CRWD", "NDVX", "GOOG", "META"] 
# Combine splits data for all tickers
def combine_splits_for_tickers(tickers):
    all_splits = []

    for ticker in tickers:
        splits = get_stock_splits(ticker)
        for split in splits:
            all_splits.append({
                "ticker": split['ticker'],
                "execution_date": split['execution_date'],
                "split_from": split['split_from'],
                "split_to": split['split_to']
            })
    
    # Convert to Polars DataFrame
    splits_df = pl.DataFrame(all_splits)
    return splits_df

# Fetch and combine splits for all tickers
splits_df = combine_splits_for_tickers(tickers)
# print(splits_df)

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def apply_stock_splits(df: pl.DataFrame, splits_df: pl.DataFrame) -> pl.DataFrame:
    # Ensure splits_df is sorted by execution_date
    splits_df = splits_df.sort('execution_date')

    # Initialize the applied_splits column as an empty list
    df = df.with_columns([pl.lit("").alias('applied_splits')])

    # Create adjusted columns for 'adjusted_close' and 'adjusted_volume'
    df = df.with_columns([
        pl.col('open').alias('adjusted_open'),
        pl.col('close').alias('adjusted_close'),
        pl.col('high').alias('adjusted_high'),
        pl.col('low').alias('adjusted_low'),
        pl.col('volume').alias('adjusted_volume')
    ])

    for split in splits_df.to_dicts():
        ticker = split['ticker']
        execution_date = dt.datetime.strptime(split['execution_date'], '%Y-%m-%d')
        split_from = split['split_from']
        split_to = split['split_to']
        ratio = split_to / split_from

        # Convert the split dictionary to a string for storage
        split_str = str(split)

        # Apply split to relevant rows
        df = df.with_columns([
            pl.when((pl.col('ticker') == ticker) & (pl.col('window_start_est') < execution_date))
            .then(pl.col('adjusted_open') / ratio)
            .otherwise(pl.col('adjusted_open')).alias('adjusted_open'),

            pl.when((pl.col('ticker') == ticker) & (pl.col('window_start_est') < execution_date))
            .then(pl.col('adjusted_close') / ratio)
            .otherwise(pl.col('adjusted_close')).alias('adjusted_close'),

            pl.when((pl.col('ticker') == ticker) & (pl.col('window_start_est') < execution_date))
            .then(pl.col('adjusted_high') / ratio)
            .otherwise(pl.col('adjusted_high')).alias('adjusted_high'),

            pl.when((pl.col('ticker') == ticker) & (pl.col('window_start_est') < execution_date))
            .then(pl.col('adjusted_low') / ratio)
            .otherwise(pl.col('adjusted_low')).alias('adjusted_low'),

            pl.when((pl.col('ticker') == ticker) & (pl.col('window_start_est') < execution_date))
            .then(pl.col('adjusted_volume') * ratio)
            .otherwise(pl.col('adjusted_volume')).alias('adjusted_volume'),

            pl.when((pl.col('ticker') == ticker) & (pl.col('window_start_est') < execution_date))
            .then(pl.concat_str([pl.col('applied_splits'), pl.lit(f",{split_str}")]))
            .otherwise(pl.col('applied_splits')).alias('applied_splits')
        ])

    return df

def apply_adjustments(df: pl.DataFrame, splits_df: pl.DataFrame, type: str, price_cols: list, volume_cols: list) -> pl.DataFrame:
    # Ensure splits_df is sorted by execution_date
    splits_df = splits_df.sort('execution_date')

    # Determine the prefix based on the type
    prefix = "adjusted_" if type == "stock" else "adjusted_option_"

    # Initialize the applied_splits column as an empty list
    df = df.with_columns([pl.lit("").alias('applied_splits')])

    # Create adjusted columns for price and volume
    for col in price_cols + volume_cols:
        df = df.with_columns([pl.col(col).alias(f'{prefix}{col}')])

    for split in splits_df.to_dicts():
        ticker = split['ticker']
        execution_date = dt.datetime.strptime(split['execution_date'], '%Y-%m-%d')
        split_from = split['split_from']
        split_to = split['split_to']
        ratio = split_to / split_from

        # Convert the split dictionary to a string for storage
        split_str = str(split)

        # Apply split to relevant rows
        adjustments = []
        for col in price_cols:
            adjustments.append(
                pl.when((pl.col('ticker') == ticker) & (pl.col('window_start_est') < execution_date))
                .then(pl.col(f'{prefix}{col}') / ratio)
                .otherwise(pl.col(f'{prefix}{col}')).alias(f'{prefix}{col}')
            )
        
        for col in volume_cols:
            adjustments.append(
                pl.when((pl.col('ticker') == ticker) & (pl.col('window_start_est') < execution_date))
                .then(pl.col(f'{prefix}{col}') * ratio)
                .otherwise(pl.col(f'{prefix}{col}')).alias(f'{prefix}{col}')
            )

        adjustments.append(
            pl.when((pl.col('ticker') == ticker) & (pl.col('window_start_est') < execution_date))
            .then(pl.concat_str([pl.col('applied_splits'), pl.lit(f",{split_str}")]))
            .otherwise(pl.col('applied_splits')).alias('applied_splits')
        )

        df = df.with_columns(adjustments)

    return df
    
# Apply the stock splits to the DataFrame
# df_split = apply_stock_splits(df, splits_df)
df_split = apply_adjustments(df, splits_df, "stock", ["open", "close", "high", "low"], ["volume"])

def format_column(column):
    return column.map_elements(lambda x: int(x))

cols = ["adjusted_volume"]

# Apply the function to all columns
df_split = df_split.with_columns([format_column(pl.col(column)).alias(column) for column in cols])

df_split = df_split.sort("window_start_est")
# df_split.tail(10000)

In [57]:
#######ADD SUPPORT AND RESISTANCE
import numpy as np
import polars as pl
import pandas as pd
import datetime as dt

def calculate_pivot_points(df: pl.DataFrame, n1: int, n2: int) -> pl.DataFrame:
    def pivotid(l, n1, n2, lows, highs):
        if l - n1 < 0 or l + n2 >= len(lows):
            return 0
        
        pividlow = 1
        pividhigh = 1
        for i in range(l - n1, l + n2 + 1):
            if lows[l] > lows[i]:
                pividlow = 0
            if highs[l] < highs[i]:
                pividhigh = 0
        
        if pividlow and pividhigh:
            return 3
        elif pividlow:
            return 1
        elif pividhigh:
            return 2
        else:
            return 0
            
    print(f"STARTING pivot calc at {dt.datetime.now()}")
    lows = df['adjusted_low'].to_list()
    highs = df['adjusted_high'].to_list()
    pivot_points = [pivotid(i, n1, n2, lows, highs) for i in range(len(lows))]
    
    df = df.with_columns(pl.Series(name='pivot', values=pivot_points))
    return df

def calculate_support_resistance(df: pl.DataFrame, n1: int, n2: int) -> pl.DataFrame:
    # Calculate pivot points
    df = calculate_pivot_points(df, n1, n2)
    
    # Initialize columns for support and resistance levels
    support_levels = np.full(len(df), np.nan)
    resistance_levels = np.full(len(df), np.nan)
    
    # Identify support and resistance levels
    low_series = df['adjusted_low'].to_list()
    high_series = df['adjusted_high'].to_list()
    pivot_series = df['pivot'].to_list()
    
    print(f"STARTING Support and Resistance calc at {dt.datetime.now()}")
    for i in range(len(pivot_series)):
        if pivot_series[i] == 1:
            support_levels[i] = low_series[i]
        elif pivot_series[i] == 2:
            resistance_levels[i] = high_series[i]
    
    # Add the support and resistance levels back to the DataFrame
    df = df.with_columns([
        pl.Series(name='support_level', values=support_levels),
        pl.Series(name='resistance_level', values=resistance_levels)
    ])
    
    return df

def calculate_weighted_support_resistance(df: pl.DataFrame, threshold: float = 0.01) -> pl.DataFrame:
    # Convert Polars DataFrame to NumPy arrays for faster calculations
    support_levels = df['support_level'].drop_nulls().unique().to_numpy()
    resistance_levels = df['resistance_level'].drop_nulls().unique().to_numpy()
    low_prices = df['adjusted_low'].to_numpy()
    high_prices = df['adjusted_high'].to_numpy()

    # Initialize dictionaries to count touches
    support_touch_counts = {}
    resistance_touch_counts = {}

    # Count touches for support levels
    for level in support_levels:
        support_touch_counts[level] = np.sum((low_prices >= (level * (1 - threshold))) &
                                             (low_prices <= (level * (1 + threshold))))

    # Count touches for resistance levels
    for level in resistance_levels:
        resistance_touch_counts[level] = np.sum((high_prices >= (level * (1 - threshold))) & 
                                                (high_prices <= (level * (1 + threshold))))

    # Group and normalize levels
    significant_support_levels = {level: count for level, count in support_touch_counts.items() if count >= 3}
    significant_resistance_levels = {level: count for level, count in resistance_touch_counts.items() if count >= 3}

    def group_levels(levels):
        grouped_levels = {}
        for level, count in levels.items():
            found = False
            for key in grouped_levels:
                if abs(level - key) <= threshold * key:
                    grouped_levels[key] += count
                    found = True
                    break
            if not found:
                grouped_levels[level] = count
        return grouped_levels

    grouped_support_levels = group_levels(significant_support_levels)
    grouped_resistance_levels = group_levels(significant_resistance_levels)

    # Add significant levels back to the DataFrame
    df = df.with_columns([
        pl.col('support_level').map_elements(lambda x: x if x in grouped_support_levels else np.nan).alias('weighted_support_level'),
        pl.col('resistance_level').map_elements(lambda x: x if x in grouped_resistance_levels else np.nan).alias('weighted_resistance_level')
    ])

    return df
    
def preprocess_with_support_resistance(df: pl.DataFrame, n1: int = 10, n2: int = 10) -> pl.DataFrame:
    try:
        df = calculate_support_resistance(df, n1, n2)
        df = calculate_weighted_support_resistance(df)
        return df
        
    except Exception as e:
        current_time = dt.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        raise RuntimeError(f"Error at {current_time}: {str(e)}")

# Example usage
try:
    print(f"STARTING SRChannel calc at {dt.datetime.now()}")
    df = preprocess_with_support_resistance(df)
    print(f"FINISHED SRChannel calc at {dt.datetime.now()}")
    df.tail(1000)
except RuntimeError as e:
    print(e)


STARTING SRChannel calc at 2024-06-29 15:04:55.952248
STARTING pivot calc at 2024-06-29 15:04:55.952322
STARTING Support and Resistance calc at 2024-06-29 15:04:56.558794
FINISHED SRChannel calc at 2024-06-29 15:04:59.900610


In [55]:
# Chart with Support/Resistance Channels

import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
from pandas import Timestamp
import plotly.io as pio
import numpy as np

# Settings
pd.set_option('display.max_rows', None)
pio.renderers.default = "notebook"

ticker = 'NVDA'

# Convert the Polars DataFrame to pandas
pandas_df = df.filter((pl.col('ticker') == ticker)).to_pandas()
pandas_df['window_start_est'] = pd.to_datetime(pandas_df['window_start_est'])

# Print the hourly counts before timezone conversion
hourly_counts = pandas_df['window_start_est'].dt.hour.value_counts().sort_index()
print("Hourly counts before timezone conversion:")
print(hourly_counts)

# Ensure the 'window_start_est' column is interpreted as UTC initially
pandas_df['window_start_est'] = pandas_df['window_start_est'].dt.tz_localize('UTC')

# Properly convert the 'window_start_est' column to US/Eastern
pandas_df['window_start_est'] = pandas_df['window_start_est'].dt.tz_convert('US/Eastern')

# Print the hourly counts after timezone conversion
hourly_counts = pandas_df['window_start_est'].dt.hour.value_counts().sort_index()
print("Hourly counts after timezone conversion:")
print(hourly_counts)

# Verify the full range of 'window_start_est' column
print("Full range of 'window_start_est':")
print("Min:", pandas_df['window_start_est'].min())
print("Max:", pandas_df['window_start_est'].max())

# Filter the DataFrame to only include data within the specified window
window_start = Timestamp('2024-06-04 09:30', tz='US/Eastern')
window_end = Timestamp('2024-06-04 16:00', tz='US/Eastern')

filtered_df = pandas_df[
    (pandas_df['window_start_est'] >= window_start) &
    (pandas_df['window_start_est'] <= window_end)
]

# Print the hourly counts for the filtered data
hourly_counts_filtered = filtered_df['window_start_est'].dt.hour.value_counts().sort_index()
print("Filtered hourly counts:")
print(hourly_counts_filtered)

# Verify the data range after filtering
print("Filtered data range:")
print(filtered_df[['window_start_est', 'adjusted_volume']].describe())
print(filtered_df['window_start_est'].min())
print(filtered_df['window_start_est'].max())

# Check if RSI has been calculated correctly
print(filtered_df[['window_start_est', 'rsi']].dropna().head(10))

# Determine crossing points
cross_above = filtered_df[filtered_df['MACD_cross_above_Signal'] == 1]['window_start_est']
cross_below = filtered_df[filtered_df['MACD_cross_below_Signal'] == 1]['window_start_est']

# Create a subplot with 4 rows
fig = make_subplots(
    rows=4, cols=1, shared_xaxes=True, vertical_spacing=0.1,  # Increased spacing
    row_heights=[0.5, 0.2, 0.15, 0.15],  # Adjust the heights of the subplots
    subplot_titles=(
        f'{ticker} Stock Price with 2-day and 3-day VWAP',
        'MACD & Signal Line',
        'adjusted_volume',
        'RSI'
    )
)

# Add the candlestick chart to the first row
fig.add_trace(
    go.Candlestick(
        x=filtered_df['window_start_est'], open=filtered_df['adjusted_open'],
        high=filtered_df['adjusted_high'], low=filtered_df['adjusted_low'],
        close=filtered_df['adjusted_close']
    ),
    row=1, col=1
)

# Adding the 2-day VWAP to the candlestick chart
fig.add_trace(
    go.Scatter(
        x=filtered_df['window_start_est'], y=filtered_df['vwap_2d'],
        mode='lines', name='2-day VWAP', line=dict(width=2)
    ),
    row=1, col=1
)

# Adding the 3-day VWAP to the candlestick chart
fig.add_trace(
    go.Scatter(
        x=filtered_df['window_start_est'], y=filtered_df['vwap_3d'],
        mode='lines', name='3-day VWAP', line=dict(width=2)
    ),
    row=1, col=1
)

# Adding the ema_200 to the candlestick chart
fig.add_trace(
    go.Scatter(
        x=filtered_df['window_start_est'], y=filtered_df['ema_200_vwap'],
        mode='lines', name='EMA 200', line=dict(width=2)
    ),
    row=1, col=1
)

# Add support and resistance lines to the candlestick chart
support_levels = filtered_df['support_level'].dropna().unique()
resistance_levels = filtered_df['resistance_level'].dropna().unique()

for level in support_levels:
    fig.add_hline(y=level, line=dict(color="green", width=1, dash="dash"), row=1, col=1)

for level in resistance_levels:
    fig.add_hline(y=level, line=dict(color="red", width=1, dash="dash"), row=1, col=1)

# Add MACD line to the second row
fig.add_trace(
    go.Scatter(
        x=filtered_df['window_start_est'], y=filtered_df['MACD_line'],
        mode='lines', name='MACD Line'
    ),
    row=2, col=1
)

# Add Signal line to the second row
fig.add_trace(
    go.Scatter(
        x=filtered_df['window_start_est'], y=filtered_df['signal_line'],
        mode='lines', name='Signal Line'
    ),
    row=2, col=1
)

# Add vertical lines for MACD crossing above Signal line
for cross_time in cross_above:
    fig.add_vline(
        x=cross_time, line=dict(color="green", width=2),
        row='all', col=1
    )

# Add vertical lines for MACD crossing below Signal line
for cross_time in cross_below:
    fig.add_vline(
        x=cross_time, line=dict(color="red", width=2),
        row='all', col=1
    )

# Add MACD histogram to the second row (below MACD and Signal Line)
fig.add_trace(
    go.Bar(
        x=filtered_df['window_start_est'],
        y=filtered_df['MACD_histogram'],
        name='MACD Histogram',
        marker_color=np.where(filtered_df['MACD_histogram'] >= 0, 'green', 'red')  # color bars conditionally
    ),
    row=2, col=1
)

# Add horizontal line at y=0 in the MACD plot
fig.add_hline(y=0, line=dict(color="black", width=2, dash="solid"), row=2, col=1)

# Add volume bars to the third row
fig.add_trace(
    go.Bar(
        x=filtered_df['window_start_est'],
        y=filtered_df['adjusted_volume'],
        marker_color='blue',
        name='adjusted_volume'
    ),
    row=3, col=1
)

# Add RSI line to the fourth row
fig.add_trace(
    go.Scatter(
        x=filtered_df['window_start_est'], y=filtered_df['rsi'],
        mode='lines', name='RSI'
    ),
    row=4, col=1
)

# Add horizontal lines at RSI levels 30 and 70
fig.add_hline(y=30, line=dict(color="red", width=1, dash="dash"), row=4, col=1)
fig.add_hline(y=70, line=dict(color="red", width=1, dash="dash"), row=4, col=1)

# Updating layout for better readability
fig.update_layout(
    title=f'{ticker} Stock Price and Technical Indicators on {window_start.strftime("%Y-%m-%d %H:%M %Z")}',
    xaxis_title='Time',
    yaxis_title='Price (USD)',
    legend_title_text='Indicator',
    xaxis_rangeslider_visible=False
)

# Adjust x-axis and y-axis titles for subplots
fig.update_xaxes(title_text="Time", row=1, col=1, tickformat="%H:%M")
fig.update_xaxes(title_text="Time", row=2, col=1, tickformat="%H:%M")
fig.update_xaxes(title_text="Time", row=3, col=1, tickformat="%H:%M")
fig.update_xaxes(title_text="Time", row=4, col=1, tickformat="%H:%M")

fig.update_yaxes(title_text="Price (USD)", row=1, col=1)
fig.update_yaxes(title_text="MACD", row=2, col=1)
fig.update_yaxes(title_text='adjusted_volume', row=3, col=1)

# Show the plot
pio.renderers.default = "notebook"
fig.show(renderer='browser')

Hourly counts before timezone conversion:
window_start_est
13    36934
14    77018
15    79079
16    79077
17    79018
18    78751
19    78691
20     1311
Name: count, dtype: int64
Hourly counts after timezone conversion:
window_start_est
8     10366
9     50366
10    79080
11    79079
12    79077
13    78783
14    78683
15    53559
16      886
Name: count, dtype: int64
Full range of 'window_start_est':
Min: 2019-03-18 09:30:00-04:00
Max: 2024-06-10 16:00:00-04:00
Filtered hourly counts:
window_start_est
9     30
10    60
11    60
12    60
13    60
14    60
15    60
16     1
Name: count, dtype: int64
Filtered data range:
              volume
count     391.000000
mean    91954.076726
std     73146.783397
min     16375.000000
25%     48670.500000
50%     73076.000000
75%    113686.000000
max    768742.000000
2024-06-04 09:30:00-04:00
2024-06-04 16:00:00-04:00
                window_start_est        rsi
507924 2024-06-04 09:30:00-04:00  86.361513
507925 2024-06-04 09:31:00-04:00  74.58893

In [79]:
#######ADD OPTIONS DATA
import os
import polars as pl

# Directory path where the CSV files are downloaded
directory_path = "/Volumes/WD18TB/us_options/minute_aggs/2024/03/"

# List all CSV files in the directory that end with "14.csv.gz"
files = [file for file in os.listdir(directory_path) if file.endswith("14.csv.gz")]
print(files)

def apply_adjustments(df: pl.DataFrame, splits_df: pl.DataFrame, type: str, price_cols: list, volume_cols: list) -> pl.DataFrame:
    # Ensure splits_df is sorted by execution_date
    splits_df = splits_df.sort('execution_date')

    # Determine the prefix and ticker column based on the type
    prefix = "adjusted_" if type == "stock" else "adjusted_option_"
    ticker_col = "ticker" if type == "stock" else "symbol"

    # Initialize the applied_splits column as an empty list
    df = df.with_columns([pl.lit("").alias('applied_splits')])

    # Create adjusted columns for price and volume
    for col in price_cols + volume_cols:
        df = df.with_columns([pl.col(col).alias(f'{prefix}{col}')])

    for split in splits_df.to_dicts():
        ticker = split['ticker']
        execution_date = dt.datetime.strptime(split['execution_date'], '%Y-%m-%d')
        split_from = split['split_from']
        split_to = split['split_to']
        ratio = split_to / split_from

        # Convert the split dictionary to a string for storage
        split_str = str(split)

        # Apply split to relevant rows
        adjustments = []
        for col in price_cols:
            adjustments.append(
                pl.when((pl.col(ticker_col) == ticker) & (pl.col('window_start_est') < execution_date))
                .then(pl.col(f'{prefix}{col}') / ratio)
                .otherwise(pl.col(f'{prefix}{col}')).alias(f'{prefix}{col}')
            )
        
        for col in volume_cols:
            adjustments.append(
                pl.when((pl.col(ticker_col) == ticker) & (pl.col('window_start_est') < execution_date))
                .then(pl.col(f'{prefix}{col}') * ratio)
                .otherwise(pl.col(f'{prefix}{col}')).alias(f'{prefix}{col}')
            )

        adjustments.append(
            pl.when((pl.col(ticker_col) == ticker) & (pl.col('window_start_est') < execution_date))
            .then(pl.concat_str([pl.col('applied_splits'), pl.lit(f",{split_str}")]))
            .otherwise(pl.col('applied_splits')).alias('applied_splits')
        )

        df = df.with_columns(adjustments)

    return df

# Create an empty list to store DataFrames
dfs = []

# Read each CSV file and append its DataFrame to the list
for file in files:
    op_df = pl.read_csv(os.path.join(directory_path, file))
    dfs.append(op_df)

# Concatenate all DataFrames into a single DataFrame
option_df = pl.concat(dfs)

# Filter the DataFrame for a specific substring in the ticker column
option_df = option_df.filter(pl.col("ticker").str.contains("NVDA"))  # Replace "NVDA" with the desired substring

# Extract symbol, year, month, day, option_type, and strike_price
option_df = option_df.with_columns([
    pl.col("ticker").str.slice(2, 4).alias("symbol"),
    (pl.col("ticker").str.slice(6, 2).cast(pl.Int32) + 2000).cast(pl.Utf8).alias("year"),
    pl.col("ticker").str.slice(8, 2).cast(pl.Utf8).alias("month"),
    pl.col("ticker").str.slice(10, 2).cast(pl.Utf8).alias("day"),
    pl.col("ticker").str.slice(12, 1).alias("option_type"),
    (pl.col("ticker").str.slice(13).cast(pl.Float64) / 1000).alias("strike_price")
])

# Construct expiry date
option_df = option_df.with_columns(
    (pl.col("year") + "-" + pl.col("month") + "-" + pl.col("day")).alias("expiry_date")
)

# Drop intermediate columns used for parsing
option_df = option_df.drop(["year", "month", "day"])

# df = df.rename({"ticker": "symbol"})
joined_df = df.join(option_df, on=["window_start", "symbol"], how="left")

joined_df = joined_df.filter(pl.col("strike_price").is_not_null())
joined_split_df = apply_adjustments(joined_df, splits_df, "option", ["open_right", "close_right", "high_right", "low_right"], ["volume_right"])

len(joined_df)

['2024-03-14.csv.gz']


96580

In [89]:
# import datetime
# # len(df.filter(pl.col("transaction_date") == datetime.date(2024, 3, 14)))
# count_per_window_start = joined_df.groupby("window_start_est").agg(pl.count().alias("count"))
# count_per_window_start.sort("window_start_est", "strike_price")
joined_split_df


In [123]:
import requests
import asyncio
import pandas as pd
import polars as pl
from datetime import date, datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed

# Define the API endpoint and API key
endpoint = "https://api.polygon.io/v3/reference/splits"
api_key = "DTc8n47jA_nApfGqD4aqKv9ntIHFOj0U"  

# Define function to fetch stock splits for a given date and type
def fetch_stock_splits(date_str, reverse_split):
    params = {
        "apiKey": api_key,
        "execution_date": date_str,
        "reverse_split": reverse_split
    }
    response = requests.get(endpoint, params=params)
    if response.status_code == 200:
        return response.json()
    else:
        return None

# Define function to fetch both regular and reverse stock splits for each day of the last 10 years
def fetch_all_stock_splits():
    start_date = date.today() - timedelta(days=365 * 10)  # 10 years ago
    end_date = date.today()
    all_splits = []
    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = []
        for current_date in (start_date + timedelta(days=n) for n in range((end_date - start_date).days + 1)):
            date_str = current_date.strftime("%Y-%m-%d")
            # Submit regular splits task
            regular_splits_future = executor.submit(fetch_stock_splits, date_str, False)
            # Submit reverse splits task
            reverse_splits_future = executor.submit(fetch_stock_splits, date_str, True)
            futures.extend([regular_splits_future, reverse_splits_future])
        for future in as_completed(futures):
            splits_data = future.result()
            if splits_data:
                all_splits.extend(splits_data["results"])
    return all_splits

# Guard clause to execute only if the script is executed directly
if __name__ == "__main__":
    # Fetch all regular and reverse stock splits for the last 10 years
    print(f'Starting: {datetime.now()}')
    all_splits_data = fetch_all_stock_splits()
    print(f'Finished: {datetime.now()}')
    # Convert data to a DataFrame
    stock_split_df = pd.DataFrame(all_splits_data)
    # Add forward and reverse split flags
    stock_split_df['forward_split'] = stock_split_df['split_from'] < stock_split_df['split_to']
    stock_split_df['reverse_split'] = stock_split_df['split_from'] > stock_split_df['split_to']
# Display the Polars DataFrame
print(len(df))


Starting: 2024-04-03 18:42:10.071414
Finished: 2024-04-03 18:45:39.213517
3166


In [None]:
# print(len(df))
# Add forward and reverse split flags
# df['forward_split'] = df['split_from'] < df['split_to']
# df['reverse_split'] = df['split_from'] > df['split_to']
# df = df.to_polars()
df_polars = pl.from_pandas(stock_split_df)

# print(df_polars)



In [None]:
# df_polars.write_csv("/Volumes/WD18TB/stock_splits/stock_splits_20240404.csv")
# stock_split_df = pl.read_csv("/Volumes/WD18TB/stock_splits/stock_splits_20240404.csv")
# stock_split_df
# Convert the 'execution_date' column to a date format
stock_split_df = stock_split_df.with_columns(pl.col("execution_date").str.to_date("%Y-%m-%d").alias("split_date"))

# stock_split_df

In [6]:


op_df = pl.read_csv("/Users/brandon/Downloads/2024-03-01.csv.gz")
filtered_op_df = op_df.filter(pl.col("ticker").str.contains("NVDA2408"))
# filtered_op_df

# 1704186000000000000 
# trades_df =  pl.read_csv("/Users/brandon/Downloads/2024-01-02.csv.gz")
# trades_df = trades_df.filter(
#     ()
# )
# trades_df.head(100)

In [289]:
import dash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import pandas as pd
import numpy as np

# df = pd.read_csv('your_data.csv')
tickers = ['AAPL', 'NVDA', 'MSFT']
dates = pd.date_range('2024-03-10', periods=10)

pd_df = df.to_pandas()

# Initialize the Dash app
app = dash.Dash(__name__)

# Get unique tickers directly with Polars
unique_tickers = df.select(pl.col('ticker').unique()).to_numpy().flatten()

# Convert datetime columns from string to datetime if they are not already
# df = df.with_columns(pl.col('window_start_est').str.strptime(pl.Datetime))

# Define min and max dates for the DatePickerRange component
min_date_py = pd.to_datetime(min_date).date()
max_date_py = pd.to_datetime(max_date).date()

app.layout = html.Div([
    dcc.Dropdown(
        id='ticker-dropdown',
        options=[{'label': ticker, 'value': ticker} for ticker in unique_tickers],
        value=unique_tickers[0]  # Default value
    ),
    dcc.Dropdown(
        id='bar-size-dropdown',
        options=[
            {'label': 'Minute', 'value': 'T'},
            {'label': 'Hour', 'value': 'H'},
            {'label': 'Day', 'value': 'D'}
        ],
        value='D'  # Default to 'Day'
    ),
    dcc.DatePickerRange(
        id='date-picker-range',
        min_date_allowed=min_date_py,
        max_date_allowed=max_date_py,
        start_date=min_date_py,
        end_date=max_date_py
    ),
    dcc.Graph(id='interactive-graph')
])


@app.callback(
    Output('interactive-graph', 'figure'),
    [Input('ticker-dropdown', 'value'),
     Input('bar-size-dropdown', 'value'),
     Input('date-picker-range', 'start_date'),
     Input('date-picker-range', 'end_date')]
)
def update_graph(selected_ticker, selected_bar_size, start_date, end_date):
    filtered_df = pd_df[
        (pd_df['ticker'] == selected_ticker) &
        (pd_df['window_start_est'] >= start_date) &
        (pd_df['window_start_est'] <= end_date)
    ]
    
    columns = {
        'adjusted_open': 'first', 
        'adjusted_high': 'max', 
        'adjusted_low': 'min', 
        'adjusted_close': 'last', 
        'adjusted_volume': 'sum', 
        'MACD_line': 'mean', 
        'signal_line': 'mean', 
        'MACD_histogram': 'mean'
    }

    # Aggregating based on the selected bar size
    freq = selected_bar_size  # Use the value directly as it corresponds to Pandas offset aliases
    filtered_df = (
        filtered_df.set_index('window_start_est')  # Ensure datetime is the index for resampling
        .resample(freq)
        .agg(columns)
        .dropna()
        .reset_index()
    )
    
    # # Aggregating based on the selected bar size
    # if selected_bar in ['Minute', 'Hour', 'Day']:
    #     freq = selected_bar[0].upper()  # Get frequency abbreviation
    #     filtered_df = (
    #         filtered_df.resample(freq, on='window_start_est')
    #         .agg(columns)
    #         .dropna()
    #         .reset_index()
    #     )
    # else:
    #     print("Invalid bar size selected.")


    # Create subplots: 3 rows for Candlestick, MACD, and Volume
    fig = make_subplots(rows=3, cols=1, shared_xaxes=True, vertical_spacing=0.02,
                        subplot_titles=("Candlestick Chart", "MACD & Signal Line", 'adjusted_volume'))

    # Candlestick
    fig.add_trace(go.Candlestick(x=filtered_df['window_start_est'],
                                 open=filtered_df['adjusted_open'],
                                 high=filtered_df['adjusted_high'],
                                 low=filtered_df['adjusted_low'],
                                 close=filtered_df['adjusted_close']),
                  row=1, col=1)

    # MACD lines
    fig.add_trace(go.Scatter(x=filtered_df['window_start_est'], y=filtered_df['MACD_line'],
                             mode='lines', name='MACD Line'),
                  row=2, col=1)
    fig.add_trace(go.Scatter(x=filtered_df['window_start_est'], y=filtered_df['signal_line'],
                             mode='lines', name='Signal Line'),
                  row=2, col=1)

    # MACD Histogram
    fig.add_trace(go.Bar(x=filtered_df['window_start_est'],
                         y=filtered_df['MACD_histogram'],
                         name='MACD Histogram',
                         marker_color=np.where(filtered_df['MACD_histogram'] >= 0, 'green', 'red')),  # Color conditionally
                  row=2, col=1)

    # Volume
    fig.add_trace(go.Bar(x=filtered_df['window_start_est'],
                         y=filtered_df['adjusted_volume'],
                         name='adjusted_volume',
                         marker_color='blue'),
                  row=3, col=1)

    # Update layout
    fig.update_layout(title=f"Stock Data for {selected_ticker}", xaxis_title="Time", yaxis_title="Price (USD)")
    fig.update_xaxes(title_text="Time", row=3, col=1)
    fig.update_yaxes(title_text='adjusted_volume', row=3, col=1)

    return fig

# Run the app
if __name__ == '__main__':
    app.run_server(debug=True)


In [7]:
# df = pl.from_pandas(df)
# df.schema

In [1]:
%history -g 2

 1/4:
engine = create_engine('postgresql://postgres:EUProdAnalyticsPostgresrdsS-yGASr9RLy1dP-zeTErO@eu-prod-analytics-postgres-rds3515897f-4rvjph38wxur.cluster-couoxzno33gl.eu-central-1.rds.amazonaws.com:5432/postgres')
df = pd.read_sql_query('select * from claims',con=engine)
 1/5:
engine = engine.create_engine('postgresql://postgres:EUProdAnalyticsPostgresrdsS-yGASr9RLy1dP-zeTErO@eu-prod-analytics-postgres-rds3515897f-4rvjph38wxur.cluster-couoxzno33gl.eu-central-1.rds.amazonaws.com:5432/postgres')
df = pd.read_sql_query('select * from claims',con=engine)
 1/6: pip install pyscopg2-binary
 1/7: pip install psycopg2-binary
 1/8:
from sqlalchemy import engine
import psycopg2
import pandas as pd
 1/9:
engine = engine.create_engine('postgresql://postgres:EUProdAnalyticsPostgresrdsS-yGASr9RLy1dP-zeTErO@eu-prod-analytics-postgres-rds3515897f-4rvjph38wxur.cluster-couoxzno33gl.eu-central-1.rds.amazonaws.com:5432/postgres')
df = pd.read_sql_query('select * from claims',con=engine)
1/10:
engine