In [6]:
import gzip
from datetime import datetime, timedelta
import csv
import math
import os
import logging
import collections
import sys
from typing import Dict, List, Optional
from dataclasses import dataclass
import numpy as np
import pandas as pd
from sortedcontainers import SortedDict

In [2]:
class NseDataLoader:
    """
    Handles loading and initial vectorized cleaning of NSE fixed-width .gz files.
    """

    # Define fixed-width specifications based on NSE documentation
    ORDER_SCHEMA = {
        'cols': [
            'record_type', 'segment', 'order_number', 'order_time', 'side', 'activity_type',
            'symbol', 'series', 'volume_disclosed', 'volume_original', 'limit_price',
            'trigger_price', 'is_market_order', 'is_stop_loss', 'is_ioc', 'algo_indicator', 'client_type'
        ],
        'widths': [
            2, 4, 16, 14, 1, 1, 10, 2, 8, 8, 8, 8, 1, 1, 1, 1, 1
        ],
        'dtypes': {
            'record_type': 'str', 'segment': 'str', 'order_number': 'int64', 'order_time': 'int64',
            'side': 'str', 'activity_type': 'int64', 'symbol': 'str', 'series': 'str',
            'volume_disclosed': 'int64', 'volume_original': 'int64', 'limit_price': 'int64',
            'trigger_price': 'int64', 'is_market_order': 'str', 'is_stop_loss': 'str',
            'is_ioc': 'str', 'algo_indicator': 'int64',
            # --- ERROR FIX: Read ambiguous single-char fields as 'str' to avoid conversion errors
            'client_type': 'str'
        }
    }

    TRADE_SCHEMA = {
        'cols': [
            'record_type', 'segment', 'trade_number', 'trade_time', 'symbol', 'series',
            'trade_price', 'trade_quantity', 'buy_order_number', 'buy_algo', 'buy_client',
            'sell_order_number', 'sell_algo', 'sell_client'
        ],
        'widths': [
            2, 4, 16, 14, 10, 2, 8, 8, 16, 1, 1, 16, 1, 1
        ],
        'dtypes': {
            'record_type': 'str', 'segment': 'str', 'trade_number': 'int64', 'trade_time': 'int64',
            'symbol': 'str', 'series': 'str', 'trade_price': 'int64', 'trade_quantity': 'int64',
            'buy_order_number': 'int64', 'buy_algo': 'int64',
            # --- ERROR FIX: Read ambiguous single-char fields as 'str' ---
            'buy_client': 'str',
            'sell_order_number': 'int64', 'sell_algo': 'int64',
            'sell_client': 'str'
        }
    }

    EPOCH = datetime(1980, 1, 1)

    def _read_file(self, filepath, schema):
        """Reads a .gz fixed-width file into a DataFrame."""
        try:
            # Use gzip to open the file in text mode
            with gzip.open(filepath, 'rt', encoding='utf-8') as f:
                # Use pandas read_fwf, which is optimized for this task
                df = pd.read_fwf(
                    f,
                    widths=schema['widths'],
                    names=schema['cols'],
                    header=None,
                    dtype=schema['dtypes']  # Enforce types during read
                )
            return df
        except Exception as e:
            logging.error(f"Error reading file {filepath}: {e}")
            return pd.DataFrame(columns=schema['cols'])

    def _jiffies_to_datetime(self, jiffies_series):
        """Vectorized conversion of NSE 'jiffies' to datetime objects."""
        # 65536 jiffies = 1 second
        return pd.to_timedelta(jiffies_series / 65536, unit='s') + self.EPOCH

    def _clean_data(self, df, file_type):
        """Applies all vectorized cleaning and type conversion steps."""
        if df.empty:
            return df

        df = df.copy()

        # 1. Filter for Regular Market ('RM') and 'EQ' series
        # This is the primary cleaning step per your request.
        df = df[(df['record_type'] == 'RM') & (df['series'] == 'EQ')].reset_index(drop=True)

        if df.empty:
            logging.warning(f"No 'RM' or 'EQ' data found in {file_type} file.")
            return df
        
        # 2. Convert timestamps
        time_col = 'order_time' if file_type == 'order' else 'trade_time'
        df['timestamp'] = self._jiffies_to_datetime(df[time_col])

        # 3. Convert prices from paise to Rupees
        if file_type == 'order':
            df['limit_price'] = df['limit_price'] / 100.0
            df['trigger_price'] = df['trigger_price'] / 100.0
            
            # 4. Map categorical codes
            df['side'] = df['side'].map({'B': 'BUY', 'S': 'SELL'})
            df['is_buy'] = df['side'] == 'BUY'
            df['activity_type'] = df['activity_type'].map({1: 'ENTRY', 3: 'CANCEL', 4: 'MODIFY'})
            df['is_market_order'] = df['is_market_order'] == 'Y'
            df['is_ioc'] = df['is_ioc'] == 'Y'
            
            # Safely convert client_type to numeric after reading as str
            df['client_type'] = pd.to_numeric(df['client_type'], errors='coerce').fillna(0).astype(int)
            
            # Rename for easier merging later
            df = df.rename(columns={'volume_original': 'volume'})
            df = df.drop(columns=['order_time']) # Drop original jiffies col

        elif file_type == 'trade':
            df['trade_price'] = df['trade_price'] / 100.0
            
            # Safely convert client types to numeric
            df['buy_client'] = pd.to_numeric(df['buy_client'], errors='coerce').fillna(0).astype(int)
            df['sell_client'] = pd.to_numeric(df['sell_client'], errors='coerce').fillna(0).astype(int)

            # Rename for easier merging
            df = df.rename(columns={'trade_quantity': 'volume'})
            df = df.drop(columns=['trade_time']) # Drop original jiffies col
        
        return df

    def load_data(self, order_filepath, trade_filepath):
        """
        Public method to load and clean both order and trade files.
        
        Args:
            order_filepath (str): Path to the CASH_Orders_...DAT.gz file.
            trade_filepath (str): Path to the CASH_Trades_...DAT.gz file.
            
        Returns:
            (pd.DataFrame, pd.DataFrame): A tuple of (cleaned_orders_df, cleaned_trades_df)
        """
        logging.info("Loading and cleaning order data...")
        orders_df = self._read_file(order_filepath, self.ORDER_SCHEMA)
        orders_df = self._clean_data(orders_df, 'order')
        
        logging.info("Loading and cleaning trade data...")
        trades_df = self._read_file(trade_filepath, self.TRADE_SCHEMA)
        trades_df = self._clean_data(trades_df, 'trade')
        
        return orders_df, trades_df

In [3]:
trade_infy_19_path = r"C:\Users\acer\OneDrive - Indian Institute of Management Ahmedabad\Term-5\Project-Agent Based Model\PropAlgo-Behaviour\CASH_Trades_19082019_INFY.DAT.gz"
order_infy_19_path = r"C:\Users\acer\OneDrive - Indian Institute of Management Ahmedabad\Term-5\Project-Agent Based Model\PropAlgo-Behaviour\CASH_Orders_19082019_INFY.DAT.gz"
trade_infy_20_path = r"C:\Users\acer\OneDrive - Indian Institute of Management Ahmedabad\Term-5\Project-Agent Based Model\PropAlgo-Behaviour\CASH_Trades_20082019_INFY.DAT.gz"
order_infy_20_path = r"C:\Users\acer\OneDrive - Indian Institute of Management Ahmedabad\Term-5\Project-Agent Based Model\PropAlgo-Behaviour\CASH_Orders_20082019_INFY.DAT.gz"

In [7]:
nse_data_loader = NseDataLoader()
order_infy_19_df, trade_infy_19_df = nse_data_loader.load_data(order_infy_19_path, trade_infy_19_path)

In [8]:
order_infy_19_df.head()

Unnamed: 0,record_type,segment,order_number,side,activity_type,symbol,series,volume_disclosed,volume,limit_price,trigger_price,is_market_order,is_stop_loss,is_ioc,algo_indicator,client_type,timestamp,is_buy
0,RM,CASH,1100000000071762,SELL,ENTRY,bbbbbbINFY,EQ,0,40,805.5,0.0,False,N,False,0,0,2019-08-19 09:15:00.021652222,False
1,RM,CASH,1100000000071791,BUY,ENTRY,bbbbbbINFY,EQ,0,2,776.0,0.0,False,N,False,0,0,2019-08-19 09:15:00.021896362,True
2,RM,CASH,1100000000071813,SELL,ENTRY,bbbbbbINFY,EQ,0,14,800.0,0.0,False,N,False,0,0,2019-08-19 09:15:00.022125244,False
3,RM,CASH,1100000000071814,SELL,ENTRY,bbbbbbINFY,EQ,0,14,798.0,0.0,False,N,False,0,0,2019-08-19 09:15:00.022125244,False
4,RM,CASH,1100000000071841,SELL,ENTRY,bbbbbbINFY,EQ,0,50,814.95,0.0,False,N,False,0,0,2019-08-19 09:15:00.022384644,False


In [9]:
trade_infy_19_df.head()

Unnamed: 0,record_type,segment,trade_number,symbol,series,trade_price,volume,buy_order_number,buy_algo,buy_client,sell_order_number,sell_algo,sell_client,timestamp
0,RM,CASH,2019081925004843,bbbbbbINFY,EQ,776.0,2,1100000000071791,0,1,1000000000515581,0,0,2019-08-19 09:15:00.022354126
1,RM,CASH,2019081925005310,bbbbbbINFY,EQ,775.95,50,1100000000074000,0,1,1000000000738180,0,0,2019-08-19 09:15:00.341812134
2,RM,CASH,2019081925005408,bbbbbbINFY,EQ,775.95,2,1100000000074387,0,1,1000000000738180,0,0,2019-08-19 09:15:00.513641357
3,RM,CASH,2019081925005520,bbbbbbINFY,EQ,775.7,1,1100000000071870,0,1,1000000000747501,0,0,2019-08-19 09:15:00.657546997
4,RM,CASH,2019081925005523,bbbbbbINFY,EQ,775.95,2,1100000000074766,0,1,1000000000738180,0,0,2019-08-19 09:15:00.662216187


In [30]:
len(set(trade_infy_19_df['sell_order_number']))

42376

In [33]:
len(set(order_infy_19_df['order_number']))

227065

In [34]:
len(set(order_infy_19_df['order_number'])) - len(set(trade_infy_19_df['sell_order_number']))

184689

In [35]:
227065- 184689

42376

In [None]:
orders_entry = order_infy_19_df[order_infy_19_df['activity_type'] == 'ENTRY'].copy()

In [37]:
len(orders_entry)

227449

In [38]:
len(order_infy_19_df), len(trade_infy_19_df)

(1023206, 107303)

In [10]:
def determine_aggressor_side(trades_df, orders_df):
    """
    Determines the aggressor side for each trade based on order entry timestamps.
    
    Args:
        trades_df (pd.DataFrame): DataFrame containing trade data with 'buy_order_number' and 'sell_order_number'.
        orders_df (pd.DataFrame): DataFrame containing order data with 'order_number', 'timestamp', and 'activity_type'.
        
    Returns:
        pd.DataFrame: The trades_df with added columns 'buy_entry_ts', 'sell_entry_ts', and 'aggressor_side'.
                      aggressor_side: +1 if Buyer Initiated, -1 if Seller Initiated, 0 if unknown/same time.
    """
    # Filter for ENTRY orders to get original arrival time
    # We assume 'activity_type' column exists and 'ENTRY' denotes the initial order
    if 'activity_type' in orders_df.columns:
        orders_entry = orders_df[orders_df['activity_type'] == 'ENTRY'].copy()
    else:
        # Fallback if activity_type is missing, though it should be there based on NseDataLoader
        orders_entry = orders_df.copy()
    
    # Create lookup for order timestamps
    # taking the first entry if duplicates exist (though order_number should be unique for ENTRY)
    orders_ts = orders_entry.drop_duplicates('order_number').set_index('order_number')['timestamp']
    
    # Map timestamps to trades
    trades_df = trades_df.copy()
    trades_df['buy_entry_ts'] = trades_df['buy_order_number'].map(orders_ts)
    trades_df['sell_entry_ts'] = trades_df['sell_order_number'].map(orders_ts)
    
    def get_aggressor(row):
        buy_ts = row['buy_entry_ts']
        sell_ts = row['sell_entry_ts']
        
        buy_present = pd.notna(buy_ts)
        sell_present = pd.notna(sell_ts)
        
        if buy_present and sell_present:
            if buy_ts > sell_ts:
                return 1 # Buyer initiated (arrived later)
            elif sell_ts > buy_ts:
                return -1 # Seller initiated (arrived later)
            else:
                return 0 # Same time
        elif buy_present and not sell_present:
            # Buy present -> Buy arrived first -> Seller arrived last -> Seller initiated
            return -1
        elif not buy_present and sell_present:
            # Sell present -> Sell arrived first -> Buyer arrived last -> Buyer initiated
            return 1
        else:
            return 0 # Both missing
            
    trades_df['aggressor_side'] = trades_df.apply(get_aggressor, axis=1)
    return trades_df


In [11]:
trade_infy_19_df.isna().sum()

record_type          0
segment              0
trade_number         0
symbol               0
series               0
trade_price          0
volume               0
buy_order_number     0
buy_algo             0
buy_client           0
sell_order_number    0
sell_algo            0
sell_client          0
timestamp            0
dtype: int64

In [None]:
orders_ts = order_infy_19_df.drop_duplicates('order_number').set_index('order_number')['timestamp']

In [26]:
trade_infy_19_df['buy_order_number'].map(orders_ts).isna().sum()

95

In [27]:
trade_infy_19_df['sell_order_number'].map(orders_ts).isna().sum()

107303

In [22]:
orders_ts

order_number
1100000000071762   2019-08-19 09:15:00.021652222
1100000000071791   2019-08-19 09:15:00.021896362
1100000000071813   2019-08-19 09:15:00.022125244
1100000000071814   2019-08-19 09:15:00.022125244
1100000000071841   2019-08-19 09:15:00.022384644
                                ...             
1100000006113180   2019-08-19 15:48:30.047622681
1100000006113264   2019-08-19 15:49:48.677291870
1100000006113497   2019-08-19 15:54:38.705245972
1100000006113538   2019-08-19 15:55:58.337615967
1100000006113579   2019-08-19 15:57:06.510070801
Name: timestamp, Length: 227065, dtype: datetime64[ns]

In [17]:
orders_ts

0         2019-08-19 09:15:00.021652222
1         2019-08-19 09:15:00.021896362
2         2019-08-19 09:15:00.022125244
3         2019-08-19 09:15:00.022125244
4         2019-08-19 09:15:00.022384644
                       ...             
1023201   2019-08-19 15:54:50.779495239
1023202   2019-08-19 15:55:58.337615967
1023203   2019-08-19 15:56:08.284042358
1023204   2019-08-19 15:57:06.510070801
1023205   2019-08-19 15:59:27.785385132
Name: timestamp, Length: 1023206, dtype: datetime64[ns]

In [12]:
trade_df = determine_aggressor_side(trade_infy_19_df, order_infy_19_df)

In [13]:
trade_df.isna().sum()

record_type               0
segment                   0
trade_number              0
symbol                    0
series                    0
trade_price               0
volume                    0
buy_order_number          0
buy_algo                  0
buy_client                0
sell_order_number         0
sell_algo                 0
sell_client               0
timestamp                 0
buy_entry_ts            172
sell_entry_ts        107303
aggressor_side            0
dtype: int64

In [28]:
len(trade_infy_19_df)

107303

In [15]:
trade_df['aggressor_side'].value_counts()

aggressor_side
-1    107131
 0       172
Name: count, dtype: int64