## Connecting to the the Postgres DataBase

In [46]:
import psycopg2
from sqlalchemy import create_engine
import os
from dotenv import load_dotenv

load_dotenv()

def conn_to_db():
    username = os.getenv('DB_USERNAME')
    password = os.getenv('DB_PASSWORD')
    host = os.getenv('HOST')
    port = os.getenv('PORT')
    database = os.getenv('DATABASE')

    try:
        engine = create_engine(f'postgresql://{username}:{password}@{host}:{port}/{database}')
    except Exception as e:
        print(f'Error: Could not connect to the database.\n Reason:{e}')

    return engine
conn_to_db()

Engine(postgresql://user:***@localhost:5432/trade_stat)

## Insert Data into the stock_data table create in the DataBase

In [47]:
import pandas as pd

def insert_data():
    # Load the data from CSV
    file_path = 'HINDALCO.csv'
    df = pd.read_csv(file_path)

    # Insert data into the database
    df.to_sql('stock_data', conn_to_db(), if_exists='replace', index=False)

# insert_data()

## EXPLORATION OF DATA
#### Reading data form the DataBase
#### Checking the quality of data via check_data_quality function
#### Calculation of the moving average crossover strategy via moving_average_crossover_strategy function

##### additions
#### Calculation of drawdown 
#### Calculation of the performance using sharpe ratio

In [48]:
import numpy as np
import matplotlib.pyplot as plt

table_name = os.getenv('TABLE_NAME')
query = f"SELECT * FROM {table_name}"

# Load data into a Pandas DataFrame
df = pd.read_sql(query, conn_to_db())

def check_data_quality(df):
    if df.isnull().values.any():
        df = df.fillna(method='ffill')  

    assert df['open'].dtype == np.float64, "Open column is not float64"
    assert df['high'].dtype == np.float64, "High column is not float64"
    assert df['close'].dtype == np.float64, "Close column should be a float64"
    assert df['volume'].dtype == np.int64, "Volume column should be a int64"
    assert df['instrument'].dtype == object, "Instrument column should be an object type"
    assert df['datetime'].dtype == object, "Datatime column should be an object type"

check_data_quality(df)

# Sample of the data
print(df.head())

def moving_average_crossover_strategy(df):
    short_window = 40
    long_window = 100

    # Calculate moving averages
    df['short_mavg'] = df['close'].rolling(window=short_window, min_periods=1).mean()
    df['long_mavg'] = df['close'].rolling(window=long_window, min_periods=1).mean()

    # Generating signals
    df['signal'] = 0
    df.loc[short_window:, 'signal'] = np.where(df['short_mavg'][short_window:] > df['long_mavg'][short_window:], 1, 0)
    df['positions'] = df['signal'].diff()

    return df

result_df = moving_average_crossover_strategy(df)

# calculating strategy return 
def calculate_returns(df):
    df['returns'] = df['close'].pct_change()
    df['strategy_returns'] = df['returns'] * df['positions'].shift(1)
    return df

result_df = calculate_returns(result_df)

# Calculate cumulative returns
result_df['cumulative_returns'] = (1 + result_df['strategy_returns']).cumprod()

def calculate_drawdown(df):
    # Calculate the cumulative maximum value to track the peak
    df['cumulative_max'] = df['cumulative_returns'].cummax()
    # Calculate drawdown
    df['drawdown'] = df['cumulative_max'] - df['cumulative_returns']
    return df

result_df = calculate_drawdown(result_df)


def calculate_performance_metrics(df):
    #Assuming a risk-free rate of 0 for simplicity
    sharpe_ratio = df['strategy_returns'].mean() / df['strategy_returns'].std() * np.sqrt(252)

    # Max Drawdown
    max_drawdown = df['drawdown'].max()

    return sharpe_ratio, max_drawdown

sharpe_ratio, max_drawdown = calculate_performance_metrics(result_df)

# Output results
print(f"Sharpe Ratio: {sharpe_ratio}")
print(f"Maximum Drawdown: {max_drawdown}")

AssertionError: Open column is not float64

## Visualization of Data

In [None]:
# Visualization of Cumulative Returns
plt.figure(figsize=(10, 6))
plt.plot(result_df.index, result_df['cumulative_returns'], label='Cumulative Returns')
plt.title('Cumulative Returns Over Time')
plt.xlabel('Date')
plt.ylabel('Cumulative Returns')
plt.legend()
plt.show()

# Visualization of Drawdown
plt.figure(figsize=(10, 6))
plt.fill_between(result_df.index, result_df['drawdown'], color='red', label='Drawdown')
plt.title('Drawdown Over Time')
plt.xlabel('Date')
plt.ylabel('Drawdown')
plt.legend()
plt.show()


## Run a Test using unittest

In [None]:
import unittest
import datetime

class TestStockData(unittest.TestCase):
    def test_data_types(self):
        data = [
            ("2014-01-24 00:00:00", 114.00, 115.35, 113.00, 113.15, 5737135, "HINDALCO"),
            ("2014-01-27 00:00:00", 111.10, 112.70, 109.30, 112.00, 8724577, "HINDALCO"),
            ("2014-01-28 00:00:00", 113.80, 115.00, 109.75, 110.00, 4513345, "HINDALCO"),
            ("2014-01-29 00:00:00", 111.75, 114.75, 111.15, 114.50, 4713458, "HINDALCO"),
            ("2014-01-30 00:00:00", 108.10, 110.70, 107.60, 110.20, 5077231, "HINDALCO"),
            ]

        for record in data:
            date_object = datetime.datetime.strptime(record[0], '%Y-%m-%d %H:%M:%S')

            self.assertIsInstance(date_object, datetime.datetime)
            self.assertIsInstance(record[1], float)
            self.assertIsInstance(record[2], float)
            self.assertIsInstance(record[3], float)
            self.assertIsInstance(record[4], float)
            self.assertIsInstance(record[5], int)
            self.assertIsInstance(record[6], str)


if __name__ == '__main__':
    unittest.main(argv=[''], exit=False)
