In [26]:
import psycopg2
import pandas as pd
import numpy as np
import plotly.graph_objects as go
import plotly.offline as pyo
import plotly.io as pio
import plotly.express as px
from plotly.subplots import make_subplots


In [27]:
conn = psycopg2.connect(
           dbname="qdap_test",
           user="amt",
           password="your_password",
           host="192.168.2.23",
           port="5432"
       )

# Create a cursor object using the cursor() method
cursor = conn.cursor()

def fetch_options(cursor, symbol, expiry):
    query = '''
        SELECT * 
        FROM ohlcv_options_per_minute oopm
        WHERE symbol = %s
        AND expiry_type = 'I'
        AND expiry = %s
        ORDER BY date_timestamp;
        '''
    cursor.execute(query,(symbol,expiry))
    rows = cursor.fetchall()
    df = pd.DataFrame(rows, columns=[desc[0] for desc in cursor.description])
    return df
# Create a cursor object
cursor = conn.cursor()

def fetch_futures(cursor, symbol, x):
    query = '''
        SELECT *
        FROM ohlcv_future_per_minute ofpm 
        WHERE ofpm.symbol = %s
        AND ofpm.expiry_type = 'I'
        AND DATE(ofpm.expiry) = %s
        ORDER BY date_timestamp ASC
    '''
    # Execute the query with parameters as a tuple
    cursor.execute(query, (symbol, x))
    rows = cursor.fetchall()
    df = pd.DataFrame(rows, columns=[desc[0] for desc in cursor.description])
    return df

# Define the fetch_expiries function
def fetch_expiries(cursor, symbol):
    query = f'''
        SELECT DISTINCT ofpem.expiry 
        FROM ohlcv_future_per_minute ofpem 
        WHERE ofpem.symbol = '{symbol}'
        AND ofpem.expiry_type = 'I'
        GROUP BY ofpem.expiry 
    '''
    cursor.execute(query)
    rows = cursor.fetchall()
    df = pd.DataFrame(rows, columns=[desc[0] for desc in cursor.description])
    return df

In [None]:
symbol = 'BANKNIFTY'  # Replace with your symbol
expiry_future = fetch_expiries(cursor, symbol)

# Convert the 'expiry' column to datetime and then to date
expiry_future['expiry'] = pd.to_datetime(expiry_future['expiry']).dt.date

# Print the expiry dates with their indices
for i, row in expiry_future.iterrows():
    print(i, row.iloc[0], sep=' = ', end=' | ')
    if (i + 1) % 5 == 0:
        print()

In [None]:
index = 12
try:
    index
except NameError:
    index = 12  # Default value if not provided

expiry_date = expiry_future.iloc[index]['expiry']
print(f"Selected expiry date: {expiry_date} of type {type(expiry_date)}")
print(f"Selected expiry date: {expiry_date} of type {type(expiry_date)}")
# Fetch the futures data for the selected expiry date
nft = fetch_futures(cursor, symbol, expiry_date)
# Convert the 'date_timestamp' column to datetime if it exists




In [31]:

short_window = 9
long_window = 26
nft['Short_EMA'] = nft['close'].ewm(span=short_window, adjust=False).mean()
nft['Long_EMA'] = nft['close'].ewm(span=long_window, adjust=False).mean()

nft['Signal'] = 0


In [None]:
for i in range(1, len(nft)):
    if nft['Short_EMA'].iloc[i] > nft['Long_EMA'].iloc[i] and nft['Short_EMA'].iloc[i-1] <= nft['Long_EMA'].iloc[i-1]:
        nft.at[nft.index[i], 'Signal'] = 1  # Buy signal
    elif nft['Short_EMA'].iloc[i] < nft['Long_EMA'].iloc[i] and nft['Short_EMA'].iloc[i-1] >= nft['Long_EMA'].iloc[i-1]:
        nft.at[nft.index[i], 'Signal'] = -1  # Sell signal

nft.head()


In [None]:

fig = make_subplots(rows=1, cols=1)

# Candlestick chart
candlestick = go.Candlestick(x=nft.index,
                             open=nft['open'],
                             high=nft['high'],
                             low=nft['low'],
                             close=nft['close'],
                             name='Candlesticks')
fig.add_trace(candlestick)

# Short EMA
short_ema = go.Scatter(x=nft.index, y=nft['Short_EMA'], mode='lines', name='Short EMA')
fig.add_trace(short_ema)

# Long EMA
long_ema = go.Scatter(x=nft.index, y=nft['Long_EMA'], mode='lines', name='Long EMA')
fig.add_trace(long_ema)

# Buy signals
buy_signals = go.Scatter(x=nft[nft['Signal'] == 1].index, 
                         y=nft['Short_EMA'][nft['Signal'] == 1], 
                         mode='markers', 
                         marker=dict(symbol='triangle-up', color='blue', size=8), 
                         name='Buy Signal')
fig.add_trace(buy_signals)

# Sell signals
sell_signals = go.Scatter(x=nft[nft['Signal'] == -1].index, 
                          y=nft['Short_EMA'][nft['Signal'] == -1], 
                          mode='markers', 
                          marker=dict(symbol='triangle-down', color='black', size=8), 
                          name='Sell Signal')
fig.add_trace(sell_signals)

fig.update_layout(
    title=f'{symbol} EMA Crossover Strategy',
    yaxis_title='Price',
    xaxis_title='Date',
    xaxis_rangeslider_visible=False,
    width=2000,
    height=1000
)

fig.show()
pio.write_html(fig,file='p.html',auto_open=True)


In [34]:

position_size = 1  
current_position = 0  # Track current position (1 for long, -1 for short, 0 for neutral)
entry_price = 0  # Track entry price
total_pnl = 0  # Total P&L
# Iterate through each row in the DataFrame
for index, row in nft.iterrows():
    if row['Signal'] == 1 and current_position == 0:  # Buy signal and no current position
        current_position = 1
        # print("entry")
        entry_price = row['close']
    elif row['Signal'] == -1 and current_position == 1:  # Sell signal and long position
        pnl = (row['close'] - entry_price) * position_size
        total_pnl += pnl
        # ###print("close")
        current_position = 0
        entry_price = 0
    elif row['Signal'] == -1 and current_position == 0:  # Sell signal but no current long position
        continue  # Can't sell if not long

# Print total P&L
print(f'Total P&L: {total_pnl}')

Total P&L: -47240


In [52]:

# Assuming fetch_options is defined to fetch data based on 'symbol' and 'expiry'
cursor1 = conn.cursor()
options = fetch_options(cursor1, symbol=symbol, expiry=str(expiry_date))

In [None]:
copt = options[options['opt_type']=='CE'].copy()
popt = options[options['opt_type']=='PE'].copy()

options.head()

In [None]:
print(nft.columns)

In [None]:
# Initialize capital
initial_capital = 1000000  # Example initial capital
remaining_capital = initial_capital
profits = 0

put = {'pos': 0, 'dts': None, 'strike': None, 'buy_price': None}
call = {'pos': 0, 'dts': None, 'strike': None, 'buy_price': None}
trades = []
trade_details = []  # List to store buy and sell prices

print(type(nft.iloc[1]['date_timestamp']))

for i in range(len(nft) - 1):
    row = nft.iloc[i]
    signal = row['Signal']
    close = nft.iloc[i + 1]['close']
    dt = nft.iloc[i + 1]['date_timestamp']

    if signal == 1:
        print('s1', end='-> ')
        atm_call = copt[(copt['date_timestamp'] == dt) & (abs(copt['strike'] - close) == abs(copt['strike'] - close).min())]

        if not atm_call.empty:
            call_price = atm_call.iloc[0]['close']
            if remaining_capital >= call_price:
                print('long on call', end=' ')
                trades.append({
                    'position_type': 'long',
                    'opt_type': 'CE',
                    'price': call_price,
                    'day': dt.date()
                })
                call['pos'] = 1
                call['strike'] = atm_call.iloc[0]['strike']
                call['buy_price'] = call_price
                remaining_capital -= call_price  # Deduct the capital

            if put['pos'] == 1:
                put_short = popt[(popt['date_timestamp'] == dt) & (popt['strike'] == put['strike'])]
                if not put_short.empty:
                    print(' => short on put', end=' ')
                    put_sell_price = put_short.iloc[0]['close']
                    trades.append({
                        'position_type': 'short',
                        'opt_type': 'PE',
                        'price': put_sell_price,
                        'day': dt.date()
                    })
                    trade_details.append({
                        'type': 'PE',
                        'buy_price': put['buy_price'],
                        'sell_price': put_sell_price,
                        'pnl': put_sell_price - put['buy_price']
                    })
                    put['pos'] = 0
                    remaining_capital += put_sell_price  # Refill the capital
                    if remaining_capital > initial_capital:
                        profits += (remaining_capital - initial_capital)
                        remaining_capital = initial_capital
                else:
                    print('no closing on put opt ', end=' ')
        print('', end='\n')

    elif signal == -1:
        print('s2', end='-> ')
        atm_put = popt[(popt['date_timestamp'] == dt) & (abs(popt['strike'] - close) == abs(popt['strike'] - close).min())]

        if not atm_put.empty:
            put_price = atm_put.iloc[0]['close']
            if remaining_capital >= put_price:
                print('long on put ', end=' ')
                trades.append({
                    'position_type': 'long',
                    'opt_type': 'PE',
                    'price': put_price,
                    'day': dt.date()
                })
                put['pos'] = 1
                put['strike'] = atm_put.iloc[0]['strike']
                put['buy_price'] = put_price
                remaining_capital -= put_price  # Deduct the capital

            if call['pos'] == 1:
                call_short = copt[(copt['date_timestamp'] == dt) & (copt['strike'] == call['strike'])]
                if not call_short.empty:
                    print('short on call opt', end=' ')
                    call_sell_price = call_short.iloc[0]['close']
                    trades.append({
                        'position_type': 'short',
                        'opt_type': 'CE',
                        'price': call_sell_price,
                        'day': dt.date()
                    })
                    trade_details.append({
                        'type': 'CE',
                        'buy_price': call['buy_price'],
                        'sell_price': call_sell_price,
                        'pnl': call_sell_price - call['buy_price']
                    })
                    call['pos'] = 0
                    remaining_capital += call_sell_price  # Refill the capital
                    if remaining_capital > initial_capital:
                        profits += (remaining_capital - initial_capital)
                        remaining_capital = initial_capital
                else:
                    print('no closing on call opt ', end=' ')
        print('', end='\n')

if call['pos'] == 1:
    call_short = copt[(copt['date_timestamp'] == nft.iloc[-1]['date_timestamp']) & (copt['strike'] == call['strike'])]
    if not call_short.empty:
        print('short on call opt', end=' ')
        call_sell_price = call_short.iloc[0]['close']
        trades.append({
            'position_type': 'short',
            'opt_type': 'CE',
            'price': call_sell_price,
            'day': dt.date()
        })
        trade_details.append({
            'type': 'CE',
            'buy_price': call['buy_price'],
            'sell_price': call_sell_price,
            'pnl': call_sell_price - call['buy_price']
        })
        call['pos'] = 0
        remaining_capital += call_sell_price  # Refill the capital
        if remaining_capital > initial_capital:
            profits += (remaining_capital - initial_capital)
            remaining_capital = initial_capital
    else:
        print('no closing on call opt ', end=' -- ')

if put['pos'] == 1:
    put_short = popt[(popt['date_timestamp'] == dt) & (popt['strike'] == put['strike'])]
    if not put_short.empty:
        print(' => short on put', end=' ')
        put_sell_price = put_short.iloc[0]['close']
        trades.append({
            'position_type': 'short',
            'opt_type': 'PE',
            'price': put_sell_price,
            'day': dt.date()
        })
        trade_details.append({
            'type': 'PE',
            'buy_price': put['buy_price'],
            'sell_price': put_sell_price,
            'pnl': put_sell_price - put['buy_price']
        })
        put['pos'] = 0
        remaining_capital += put_sell_price  # Refill the capital
        if remaining_capital > initial_capital:
            profits += (remaining_capital - initial_capital)
            remaining_capital = initial_capital
else:
    print('no closing on put opt ', end=' --\n')

# Print final capital and profits
print(f"Final remaining capital: {remaining_capital}")
print(f"Total profits: {profits}")


In [None]:

# Print trade details
print("\nTrade Details:")
for detail in trade_details:
    print(detail['pnl'])


In [None]:

# Extract PnL values
pnl_values = [detail['pnl'] for detail in trade_details]

# Create a Plotly plot
trace = go.Scatter(
    x=list(range(len(pnl_values))),
    y=pnl_values,
    mode='lines+markers',
    name='PnL'
)

layout = go.Layout(
    title='Trade PnL Over Time',
    xaxis=dict(title='Trade Number'),
    yaxis=dict(title='PnL'),
    hovermode='closest'
)

fig = go.Figure(data=[trace], layout=layout)
pyo.plot(fig, auto_open=True)

In [None]:
pnl_values = [detail['pnl'] for detail in trade_details]
# Assume risk-free rate (R_f) is 0 for simplicity, or define it as needed
risk_free_rate = 1.0

# Calculate average return (R_p)
average_return = np.mean(pnl_values)

# Calculate standard deviation of returns (σ_p)
std_dev_return = np.std(pnl_values)

# Calculate Sharpe Ratio
sharpe_ratio = (average_return - risk_free_rate) / std_dev_return

print(f"Average Return (R_p): {average_return}")
print(f"Standard Deviation of Returns (σ_p): {std_dev_return}")
print(f"Sharpe Ratio: {sharpe_ratio}")


In [42]:
daily_expenses = {}
daily_earnings = {}

for trade in trades:
    day = trade['day']
    price = trade['price']
    position_type = trade['position_type']

    if position_type == 'long':
        # Update daily expenses
        if day not in daily_expenses:
            daily_expenses[day] = 0
        daily_expenses[day] += price
    else:
        # Update daily earnings
        if day not in daily_earnings:
            daily_earnings[day] = 0
        daily_earnings[day] += price


In [43]:
net_profit = {}

# print("Daily Expenses:")
for day, expense in daily_expenses.items():
    if day not in net_profit:
        net_profit[day]=0
    net_profit[day]-=expense
    # print(f"{day}: {expense}")

# print("Daily Earnings:")
for day, earning in daily_earnings.items():
    if day not in net_profit:
        net_profit[day]=0
    net_profit[day]-=earning
    # print(f"{day}: {earning}")

for day , expences in net_profit.items():
    print(f"on {day} : {expences}")


In [None]:

def calculate_drawdown(values):
    peak = -1000000000000
    dr = 0
    i, n = 0, len(values)
    
    while i < n:
        if values[i] > peak:
            peak = values[i]
            while i < n and values[i] <= peak:
                dr = max(dr, abs(peak - values[i]))
                i += 1
        else:
            i += 1
            
    return dr

# Convert dict_values to a list
arr = list(net_profit.values())

print("Net Profit Values:", arr)
print("Maximum Drawdown:", calculate_drawdown(arr))

In [None]:

# Plotting with Plotly
trace = go.Scatter(
    x=list(net_profit.keys()),  # Dates as x-axis
    y=arr,                      # Net profit values as y-axis
    mode='lines+markers',       # Line plot with markers
    name='Net Profit'
)

layout = go.Layout(
    title='Net Profit Over Time',
    xaxis=dict(title='Date'),
    yaxis=dict(title='Net Profit'),
    hovermode='closest'
)

fig = go.Figure(data=[trace], layout=layout)
pyo.plot(fig, auto_open=True)

In [None]:
pnl = 0
day_trades = []
for it in trades:
    
    print(f"{it['position_type']} on {it['opt_type']} with strike price = {it['price']} on date {it['day']}")
    if it['position_type']=='long':
        pnl -= it['price']
    else:
        pnl += it['price']
print(pnl)



In [None]:
# Initialize PnL and a list to track PnL over time
pnl = 0
pnl_changes = []


# Iterate over trades to calculate PnL per trade
for it in trades:
    print(f"{it['position_type']} on {it['opt_type']} with strike price = {it['price']}")
    if it['position_type'] == 'long':
        pnl_change = -it['price']
    else:
        pnl_change = it['price']
    pnl_changes.append(pnl_change)  # Track PnL change per trade
    pnl += pnl_change

# Print final PnL
print(f"Final PnL: {pnl}")

# Plotting PnL changes per trade with Plotly
trace = go.Bar(
    x=list(range(1, len(pnl_changes) + 1)),
    y=pnl_changes,
    name='PnL Change Per Trade'
)

layout = go.Layout(
    title='PnL Change Per Trade',
    xaxis=dict(title='Trade Number'),
    yaxis=dict(title='PnL Change'),
    hovermode='closest'
)

fig = go.Figure(data=[trace], layout=layout)
pyo.plot(fig, auto_open=True)