In [1]:
import os 
os.chdir('/app')

In [2]:
from dotenv import load_dotenv
load_dotenv(dotenv_path='config/.env')
database_path = os.getenv('DATABASE_PATH')
print(database_path)

database/db/ohlcv_data.db


In [30]:
import duckdb
import os
import pandas as pd
from concurrent.futures import ProcessPoolExecutor

def read_parquet_for_symbol(symbol, market_name='indian_equity', timeframe='1d', base_directory='database/finstore'):
    """
    Reads the Parquet file for a given symbol and returns it as a DataFrame.

    Parameters:
        symbol (str): The symbol to read data for.
        market_name (str): The market name (default: 'indian_equity').
        timeframe (str): The timeframe (default: '1d').
        base_directory (str): The base directory where data is stored (default: 'database/finstore').

    Returns:
        tuple: A tuple containing the symbol and its corresponding DataFrame.
    """
    # Define the directory path and file path based on the input parameters
    file_path = os.path.join(base_directory, f"market_name={market_name}", f"timeframe={timeframe}", symbol, 'ohlcv_data.parquet')

    # Check if the file exists
    if not os.path.isfile(file_path):
        raise FileNotFoundError(f"Parquet file not found for symbol '{symbol}' at '{file_path}'")

    # Create a DuckDB connection (in-memory for this operation)
    conn = duckdb.connect()
    conn.execute("PRAGMA threads=4")  # Use multiple threads for parallel reading

    # Read the entire Parquet file into a DataFrame
    df = conn.execute(f"SELECT * FROM read_parquet('{file_path}')").fetchdf()

    # Close the DuckDB connection
    conn.close()

    return symbol, df

def read_all_symbols(symbols, market_name='indian_equity', timeframe='1d', base_directory='database/finstore'):
    """
    Reads the Parquet files for all given symbols in parallel and returns a dictionary with the results.

    Parameters:
        symbols (list): List of symbols to read data for.
        market_name (str): The market name (default: 'indian_equity').
        timeframe (str): The timeframe (default: '1d').
        base_directory (str): The base directory where data is stored (default: 'database/finstore').

    Returns:
        dict: A dictionary with symbols as keys and their corresponding DataFrames as values.
    """
    results = {}
    with ProcessPoolExecutor() as executor:
        # Use ProcessPoolExecutor to read each symbol's data in parallel
        futures = {executor.submit(read_parquet_for_symbol, symbol, market_name, timeframe, base_directory): symbol for symbol in symbols}
        for future in futures:
            symbol = futures[future]
            try:
                symbol, df = future.result()
                results[symbol] = df
            except Exception as e:
                print(f"Error reading data for symbol {symbol}: {e}")
    
    return results

# Example of usage
# symbols_list = ['AAPL', 'GOOGL', 'MSFT']  # List of symbols to read
# data_dict = read_all_symbols(symbols_list, market_name='indian_equity', timeframe='1d')
# print(data_dict)


In [33]:
from tqdm import tqdm
import concurrent.futures
def insert_data_duckdb(market_name, symbol_name, timeframe, df, indicators_df):
    """Write technical indicators data to a parquet file."""
    base_directory = 'database/finstore'
    file_path = os.path.join(base_directory, f"market_name={market_name}", f"timeframe={timeframe}", symbol_name, 'technical_indicators.parquet')
    
    # Ensure the directory exists
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
    # Add symbol_id and timeframe columns
    indicators_df['symbol_id'] = symbol_name
    indicators_df['timeframe'] = timeframe
    
    # Reorder columns to match the desired format
    formatted_df = indicators_df[['symbol_id', 'timeframe', 'timestamp', 'indicator_name', 'indicator_value']]
    
    # Ensure indicator_value is float
    formatted_df.loc[:, 'indicator_value'] = formatted_df['indicator_value'].astype(float)

    if os.path.isfile(file_path):
        # Read the existing data
        existing_df = pd.read_parquet(file_path)
        
        # Append the current indicator DataFrame to the existing data
        formatted_df = pd.concat([existing_df, formatted_df], ignore_index=True)
    
    # Write the formatted DataFrame to a parquet file
    formatted_df.to_parquet(file_path, index=False)
    #print(f'{symbol_name} file written to {file_path}.')

def process_symbol(symbol, df, market_name, timeframe, calculation_func, calculation_kwargs):
    """
    Process data for each symbol and write technical indicators to a parquet file.
    
    Args:
    symbol (str): Symbol name.
    df (pd.DataFrame): OHLCV data DataFrame.
    market_name (str): Market name.
    timeframe (str): Timeframe (e.g., '1d', '1h').
    calculation_func (callable): Function to calculate indicators.
    calculation_kwargs (dict): Keyword arguments for the indicator calculation function.
    """
    
    # Perform indicator calculations
    try:
        indicators_df = calculation_func(df, **calculation_kwargs) 
    except Exception as e:
        print(f"Error calculating {calculation_func.__name__} for {symbol}: {e}")
        return
    
    # Write the indicators data to a parquet file
    insert_data_duckdb(market_name=market_name, symbol_name=symbol, timeframe=timeframe, df=df, indicators_df=indicators_df)

def fetch_calculate_and_insert_duckdb(ohlcv_data, market_name, timeframe, calculation_func, **calculation_kwargs):
    use_multiprocessing = True
    
    if use_multiprocessing:
        with ProcessPoolExecutor(max_workers=6) as executor:
            futures = [
                executor.submit(process_symbol, symbol, df, market_name, timeframe, calculation_func, calculation_kwargs)
                for symbol, df in ohlcv_data.items()
            ]
            for _ in tqdm(concurrent.futures.as_completed(futures), total=len(futures), desc="Processing symbols"):
                pass  # Wait for all futures to complete
    else:
        for symbol, df in tqdm(ohlcv_data.items(), desc="Processing symbols"):
            process_symbol(symbol, df, market_name, timeframe, calculation_func, calculation_kwargs)

    print(f"{calculation_func.__name__} calculation and insertion completed for market: {market_name} and timeframe: {timeframe}")

# Example usage
# from data.fetch.indian_equity import fetch_symbol_list_indian_equity
# symbol_list = fetch_symbol_list_indian_equity(index_name='nse_eq_symbols')
# fetch_calculate_and_insert_duckdb('indian_equity', '1d', symbol_list, your_calculation_function, **your_calculation_kwargs)

In [6]:
ohlcv_data = read_all_symbols(symbol_list, market_name='indian_equity', timeframe='1d')
for symbol , df in ohlcv_data.items():
    print(df)
    break

0        317.625000
1        333.500000
2        350.174988
3        367.674988
4        351.237488
           ...     
1247    1011.200012
1248     963.799988
1249     976.549988
1250    1010.000000
1251    1022.849976
Name: close, Length: 1252, dtype: float64


In [29]:
for symbol , df in ohlcv_data.items():
    print(df.columns)
    break

Index(['timestamp', 'open', 'high', 'low', 'close', 'volume', 'market_name',
       'timeframe'],
      dtype='object')


In [17]:
for symbol , df in ohlcv_data.items():
    indi_df =calculate_ema(df, length=10)
    break

indi_df = indi_df.reset_index()
indi_df = indi_df.set_index('timestamp')
# Add symbol_id and timeframe columns
indi_df['symbol_name'] = symbol
indi_df['timeframe'] = '1d'

# Reorder columns to match the desired format
formatted_df = indi_df[['symbol_name', 'timeframe', 'indicator_name', 'indicator_value']]

# Ensure indicator_value is float
formatted_df['indicator_value'] = formatted_df['indicator_value'].astype(float)
formatted_df
    

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  formatted_df['indicator_value'] = formatted_df['indicator_value'].astype(float)


Unnamed: 0_level_0,symbol_name,timeframe,indicator_name,indicator_value
timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2019-09-19 00:00:00,360ONE.NS,1d,ema_10,317.625000
2019-09-20 00:00:00,360ONE.NS,1d,ema_10,320.511364
2019-09-23 00:00:00,360ONE.NS,1d,ema_10,325.904750
2019-09-24 00:00:00,360ONE.NS,1d,ema_10,333.499339
2019-09-25 00:00:00,360ONE.NS,1d,ema_10,336.724457
...,...,...,...,...
2024-10-04 00:00:00,360ONE.NS,1d,ema_10,1034.167424
2024-10-07 00:00:00,360ONE.NS,1d,ema_10,1021.373345
2024-10-08 00:00:00,360ONE.NS,1d,ema_10,1013.223644
2024-10-09 00:00:00,360ONE.NS,1d,ema_10,1012.637527


In [3]:
from data.fetch.indian_equity import fetch_symbol_list_indian_equity
symbol_list = fetch_symbol_list_indian_equity(index_name='nse_eq_symbols')

In [34]:
from utils.calculation.indicators import calculate_ema
fetch_calculate_and_insert_duckdb(ohlcv_data, 'indian_equity', '1d', calculate_ema, length=100)
fetch_calculate_and_insert_duckdb(ohlcv_data, 'indian_equity', '1d', calculate_ema, length=200)
fetch_calculate_and_insert_duckdb(ohlcv_data, 'indian_equity', '1d', calculate_ema, length=500)


Processing symbols: 100%|██████████| 1673/1673 [00:15<00:00, 109.65it/s]


calculate_ema calculation and insertion completed for market: indian_equity and timeframe: 1d


Processing symbols: 100%|██████████| 1673/1673 [00:16<00:00, 101.51it/s]


calculate_ema calculation and insertion completed for market: indian_equity and timeframe: 1d


Processing symbols: 100%|██████████| 1673/1673 [00:17<00:00, 95.29it/s] 


calculate_ema calculation and insertion completed for market: indian_equity and timeframe: 1d


In [None]:
from utils.calculation.supertrend import faster_supertrend
from utils.calculation.slope_r2 import calculate_exponential_regression_optimized
from utils.calculation.optimized_indicators import calculate_spike_optimized, detect_large_gap_optimized, calculate_average_volume_optimized
fetch_calculate_and_insert_duckdb(ohlcv_data, 'indian_equity', '1d', faster_supertrend, period=7, multiplier=3)
fetch_calculate_and_insert_duckdb(ohlcv_data, 'indian_equity', '1d', calculate_exponential_regression_optimized, window=90)
fetch_calculate_and_insert_duckdb(ohlcv_data, 'indian_equity', '1d', calculate_exponential_regression_optimized, window=30)
fetch_calculate_and_insert_duckdb(ohlcv_data, 'indian_equity', '1d', calculate_exponential_regression_optimized, window=15)
fetch_calculate_and_insert_duckdb(ohlcv_data, 'indian_equity', '1d', calculate_spike_optimized, lookback_period=90, spike_threshold=0.15)
fetch_calculate_and_insert_duckdb(ohlcv_data, 'indian_equity', '1d', detect_large_gap_optimized, lookback_period=90, gap_threshold=0.15)
fetch_calculate_and_insert_duckdb(ohlcv_data, 'indian_equity', '1d', calculate_average_volume_optimized, lookback_period=90)

In [4]:
from finstore.finstore import Finstore
Finstore = Finstore(market_name='indian_equity', timeframe='1d')

In [5]:
ohlcv_data = Finstore.read.symbol_list(symbol_list=symbol_list)