In [2]:
# Libraries
!pip install bhavcopy
import requests
import json
import pandas as pd
from datetime import datetime, date
import os
import bhavcopy
import numpy as np
import math
import sklearn.metrics as metrics
from scipy.stats import norm
from keras.models import Sequential
from keras.layers import Dense
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score
from scipy.optimize import newton
import matplotlib.pyplot as plt
plt.style.use('ggplot')

Collecting bhavcopy
  Downloading bhavcopy-3.0.tar.gz (4.9 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: bhavcopy
  Building wheel for bhavcopy (setup.py) ... [?25l[?25hdone
  Created wheel for bhavcopy: filename=bhavcopy-3.0-py3-none-any.whl size=5314 sha256=8a98b9adecd7f58c709598b39b00c91f1acc793e96d2f061ce9144010752cc7b
  Stored in directory: /root/.cache/pip/wheels/98/6e/ec/d1ed7817d15c778faccd62848124dac5cb5d6acb28ef630f75
Successfully built bhavcopy
Installing collected packages: bhavcopy
Successfully installed bhavcopy-3.0


In [3]:
def BS(S, K, T, r, sigma):
    d1 = (np.log(S / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)

def calculate_iv(opt_price, S, K, T, r):
    tol = 1e-5
    low_vol = 0.001
    high_vol = 5.0
    iterations = 100
    for i in range(iterations):
        mid_vol = (low_vol + high_vol) / 2.0
        price = BS(S, K, T, r, mid_vol)
        diff = price - opt_price
        if abs(diff) < tol:
            return mid_vol
        if diff < 0:
            low_vol = mid_vol
        else:
            high_vol = mid_vol
    return None  # Return None if no convergence

In [4]:
#Getting underlying index (NIFTY 50 data)

from google.colab import drive
drive.mount('/drive')
print(os.getcwd())
# Place data need to be stored.
data_storage = '/drive/My Drive/Capstone'
# Define working directory, where files would be saved
#os.chdir(data_storage)
# Define start and end dates, and convert them into date format
start_date = date(2023, 5, 26)
end_date = date.today()
# Define wait time in seconds to avoid getting blocked
wait_time = [1, 2]
# Instantiate bhavcopy class for equities, indices, and derivatives
nse = bhavcopy.bhavcopy("indices", start_date, end_date, data_storage, wait_time)
nse.get_data()

data_nifty = pd.read_csv('/drive/My Drive/Capstone\indices.csv', parse_dates=['TIMESTAMP'])
data_nifty = data_nifty.loc[data_nifty['Index Name'] == 'Nifty 50']
data_nifty.rename(columns={"Index Name": "Index", "Closing Index Value": "Close"}, inplace=True)
data_nifty['rt'] = pd.to_numeric(data_nifty['Change(%)'])
data_nifty['rt2'] = pd.to_numeric(data_nifty['Change(%)'])**2.
data_nifty['sigma5'] = data_nifty['rt'].rolling(5).std()*(252**0.5)
data_nifty['sigma20'] = data_nifty['rt'].rolling(20).std()*(252**0.5)
data_nifty['sigma60'] = data_nifty['rt'].rolling(60).std()*(252**0.5)
data_nifty['sigma75'] = data_nifty['rt'].rolling(75).std()*(252**0.5)

Mounted at /drive
/content
Running File Check
The file does not exist. Creating File
Downloading Data for [Timestamp('2023-05-26 00:00:00', freq='D'), Timestamp('2023-05-29 00:00:00', freq='D'), Timestamp('2023-05-30 00:00:00', freq='D'), Timestamp('2023-05-31 00:00:00', freq='D'), Timestamp('2023-06-01 00:00:00', freq='D'), Timestamp('2023-06-02 00:00:00', freq='D'), Timestamp('2023-06-05 00:00:00', freq='D'), Timestamp('2023-06-06 00:00:00', freq='D'), Timestamp('2023-06-07 00:00:00', freq='D'), Timestamp('2023-06-08 00:00:00', freq='D'), Timestamp('2023-06-09 00:00:00', freq='D'), Timestamp('2023-06-12 00:00:00', freq='D'), Timestamp('2023-06-13 00:00:00', freq='D'), Timestamp('2023-06-14 00:00:00', freq='D'), Timestamp('2023-06-15 00:00:00', freq='D'), Timestamp('2023-06-16 00:00:00', freq='D'), Timestamp('2023-06-19 00:00:00', freq='D'), Timestamp('2023-06-20 00:00:00', freq='D'), Timestamp('2023-06-21 00:00:00', freq='D'), Timestamp('2023-06-22 00:00:00', freq='D'), Timestamp('20

NameError: ignored

In [None]:
#Getting option chain data on NIFTY 50 data

dt = pd.date_range(start=start_date, end=end_date, freq='B')
datafno = pd.DataFrame()
for tday in dt:
    try:
        dd = datetime.strftime(tday, '%d')
        MM = datetime.strftime(tday, '%b').upper()
        YYYY = datetime.strftime(tday, '%Y')
        fnoBhavcopyUrl = 'http://archives.nseindia.com/content/historical/DERIVATIVES/' +YYYY+ '/' +MM+ '/fo' + dd+ MM+ YYYY+'bhav.csv.zip'
        #print(fnoBhavcopyUrl)
        datafno1 = pd.read_csv(fnoBhavcopyUrl, parse_dates=['EXPIRY_DT', 'TIMESTAMP'])
        datafno = pd.concat([datafno, datafno1], join = 'outer', ignore_index=True)
    except:
        print("Error in" + dd + MM + YYYY)

datafno = datafno.drop(datafno.columns[15:], axis=1)
datafno.columns = [c.strip() for c in datafno.columns.values.tolist()]
datafno = datafno.loc[datafno['SYMBOL'] == 'NIFTY']
datafno_fut = datafno.loc[datafno['INSTRUMENT'] == 'FUTIDX']
datafno_opt = datafno.loc[(datafno['INSTRUMENT'] == 'OPTIDX')&(datafno['CONTRACTS'] > 0)]

In [None]:
#Creating input files for ANN application

data_input = pd.merge(datafno_opt, data_nifty, on='TIMESTAMP')
data_input['S'] = data_input['Close']
data_input['K'] = data_input['STRIKE_PR']
data_input['T'] = pd.to_datetime(data_input['EXPIRY_DT'])-pd.to_datetime(data_input['TIMESTAMP'])
data_input['T'] = data_input['T'].dt.days
data_input = data_input.dropna()
#data_input.to_csv('/drive/My Drive/Capstone\datainput.csv')

data_inputCE = data_input.loc[data_input['OPTION_TYP'] == 'CE']
data_inputPE = data_input.loc[data_input['OPTION_TYP'] == 'PE']

In [None]:
#Running ANN for CE

np.random.seed(42)
ncol = 10
X = data_inputCE.iloc[:,-ncol:]
X = X.apply(pd.to_numeric, errors='coerce')
y = pd.to_numeric(data_inputCE['CLOSE'])

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)

# Create the neural network model
ANN = Sequential()
ANN.add(Dense(64, input_dim=ncol, activation='relu'))  # Input layer
ANN.add(Dense(32, activation='relu'))  # Hidden layer
ANN.add(Dense(32, activation='relu'))  # Hidden layer
ANN.add(Dense(32, activation='relu'))  # Hidden layer
ANN.add(Dense(1, activation='linear'))  # Output layer

# Compile the model
ANN.compile(loss='mean_squared_error', optimizer='adam', metrics=['mae'])

# Train the model
ANN.fit(X_train, y_train, epochs=50, batch_size=32)

# Evaluate the model on the test set
loss, mae = ANN.evaluate(X_test, y_test)

# Predict option prices using the trained model
y_pred = ANN.predict(X_test)

output_ann = pd.DataFrame()
output_ann["y_test"] = y_test
output_ann["y_pred"] = y_pred

mae = metrics.mean_absolute_error(y_test, y_pred)
mse = metrics.mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
mape = metrics.mean_absolute_percentage_error(y_test, y_pred)
r2 = metrics.r2_score(y_test, y_pred)

print("ANN error metrics:")
print("MAE:", "%.2f" %mae)
print("MSE:", "%.2f" %mse)
print("RMSE:", "%.2f" %rmse)
print("MAPE:", "%.2f" %mape)
print("R-Squared:", "%.3f" %r2)

In [None]:
plt.figure(figsize=(15,10))
plt.scatter(y_test,y_pred)
plt.xlabel("Real Value")
plt.ylabel("ANN Value")
plt.annotate("r-squared = {:.3f}".format(r2_score(y_test,y_pred)), (20,1), size=15)
plt.show()

In [None]:
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.layers import LSTM, Dropout
from tensorflow.keras.optimizers import Adam

In [None]:
np.random.seed(42)
ncol = 10
data = data_inputCE.iloc[:,-ncol:]
data = data.apply(pd.to_numeric, errors='coerce')
#y = pd.dataframe[data_inputCE['CLOSE']/data_inputCE['STRIKE_PR'], data_inputCE['STRIKE_PR']]
data['opt_price'] = pd.to_numeric(data_inputCE['CLOSE'])
# Normalizing the data
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(data)

# Splitting data into features and target
X = scaled_data[:, :-1]  # Features (all columns except the last one)
y = scaled_data[:, -1]   # Target (last column - option_price)
X = X.reshape(X.shape[0], 1, X.shape[1])
# Reshaping the data for LSTM (samples, time steps, features)
#X = X.reshape(X.shape[0], 1, X.shape[1])

# Splitting the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Build LSTM model
model = Sequential()
model.add(LSTM(units=50, return_sequences=True, input_shape=(X_train.shape[1], X_train.shape[2])))
model.add(Dropout(0.2))
model.add(LSTM(units=50))
model.add(Dropout(0.2))
model.add(Dense(units=1))

# Compile the model
model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])

# Train the model
model.fit(X_train, y_train, epochs=20, batch_size=32, validation_data=(X_test, y_test))

# Predictions
predicted_values = model.predict(X_test)

# You can inverse_transform the predicted values to get the actual option prices if needed
predicted_values = scaler.inverse_transform(np.concatenate((X_test.reshape(X_test.shape[0], X_test.shape[2]), predicted_values), axis=1))
actual_prices = scaler.inverse_transform(np.concatenate((X_test.reshape(X_test.shape[0], X_test.shape[2]), y_test.reshape(len(y_test), 1)), axis=1))
actual_prices = pd.DataFrame(actual_prices)
predicted_values = pd.DataFrame(predicted_values)
y_test = actual_prices.iloc[:,-1:]
y_pred = predicted_values.iloc[:,-1:]

output_lstm = pd.DataFrame()
output_lstm["y_test"] = y_test
output_lstm["y_pred"] = y_pred

mae = metrics.mean_absolute_error(y_test, y_pred)
mse = metrics.mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
mape = metrics.mean_absolute_percentage_error(y_test, y_pred)
r2 = metrics.r2_score(y_test, y_pred)

print("LSTM error metrics:")
print("MAE:", "%.2f" %mae)
print("MSE:", "%.2f" %mse)
print("RMSE:", "%.2f" %rmse)
print("MAPE:", "%.2f" %mape)
print("R-Squared:", "%.3f" %r2)

In [None]:
plt.figure(figsize=(10,6))
plt.scatter(y_test,y_pred)
plt.xlabel("Real Value")
plt.ylabel("LSTM Value")
plt.annotate("r-squared = {:.3f}".format(r2_score(y_test,y_pred)), (20,1), size=15)
plt.show()