In [1]:
%load_ext autoreload
%autoreload 2

In [34]:
%%writefile model.py

#
# (c) Ricky Macharm, MScFE
# https://SisengAI.com
#


import os
from glob import glob

from pathlib import Path
import pandas as pd
from arch import arch_model
from config import settings
from retrieve_data import MT5_Data, SQLiteRepo
import joblib

class GarchModel:
    """Class for training GARCH model and generating predictions.

    Atttributes
    -----------
    ticker : str
        Ticker symbol of the equity whose volatility will be predicted.
    repo : SQLRepository
        The repository where the training data will be stored.
    use_new_data : bool
        Whether to download new data from the AlphaVantage API to train
        the model or to use the existing data stored in the repository.
    model_directory : str
        Path for directory where trained models will be stored.

    Methods
    -------
    wrangle_data
        Generate equity returns from data in database.
    fit
        Fit model to training data.
    predict
        Generate volatilty forecast from trained model.
    dump
        Save trained model to file.
    load
        Load trained model from file.
    """

    def __init__(self, ticker, repo, use_new_data):
    
        self.ticker = ticker
        self.repo = repo
        self.use_new_data = use_new_data
        self.model_directory = settings.model_directory

    def calc_returns(self, n_observations):

        """Extract data from database (or get from our mt5 broker), transform it
        for training model, and attach it to `self.data`.

        Parameters
        ----------
        n_observations : int
            Number of observations to retrieve from database

        Returns
        -------
        None
        """
        # Add new data to database if required
        if self.use_new_data:
            # instantiate an API Class
            api = MT5_Data()
            # Get data
            new_data = api.ticker_data(ticker=self.ticker)
            # insert data in repo
            self.repo.insert_table(
                table_name=self.ticker, records=new_data,
                if_exists="replace"
            )

        # Pull data from SQL database & Clean data, attach to class as `data` attribute
        self.data = ((self.repo.
            read_table(table_name=self.ticker, limit=n_observations+1)
            .assign(returns = lambda x: 100*x["close"].pct_change())
            .dropna())["returns"]
             )

    def fit(self, p, q):

        """Create model, fit to `self.data`, and attach to `self.model` attribute.
        For assignment, also assigns adds metrics to `self.aic` and `self.bic`.

        Parameters
        ----------
        p : int
            Lag order of the symmetric innovation

        q : ind
            Lag order of lagged volatility

        Returns
        -------
        None
        """
        # Train Model, attach to `self.model`
        self.model = arch_model(self.data, p=p, q=q, rescale=False).fit(disp=0)
        self.aic = self.model.aic
        self.bic = self.model.bic
        

    def __clean_prediction(self, prediction):

        """Reformat model prediction to JSON.

        Parameters
        ----------
        prediction : pd.DataFrame
            Variance from a `ARCHModelForecast`

        Returns
        -------
        dict
            Forecast of volatility. Each key is date in ISO 8601 format.
            Each value is predicted volatility.
        """
         #  Calculate forecast start date
        start = prediction.index[0] + pd.DateOffset(days=1)

        # Create date range
        prediction_dates = pd.bdate_range(start=start, periods=prediction.shape[1])

        # Create prediction index labels, ISO 8601 format
        prediction_index = [d.isoformat() for d in prediction_dates]


        # Extract predictions from DataFrame, get square root
        data = prediction.values.flatten() ** .5

        # Combine `data` and `prediction_index` into Series & Return Series as dictionary
        return pd.Series(data, index=prediction_index).to_dict()

    def predict_volatility(self, horizon):

        """Predict volatility using `self.model`

        Parameters
        ----------
        horizon : int
            Horizon of forecast, by default 5.

        Returns
        -------
        dict
            Forecast of volatility. Each key is date in ISO 8601 format.
            Each value is predicted volatility.
        """
        # Generate variance forecast from `self.model`
        prediction = self.model.forecast(horizon=horizon, reindex=False).variance

        # Format prediction with `self.__clean_predction` & Return `prediction_formatted`
        
        return self.__clean_prediction(prediction)

    def dump(self):

        """Save model to `self.model_directory` with timestamp.

        Returns
        -------
        str
            filepath where model was saved.
        """
        # Create timestamp in ISO format
        timestamp = (pd.Timestamp.now()
             .isoformat()
             .replace(":", ".")
            )
        
        # Create directory if it doesn't exist
        os.makedirs(self.model_directory, exist_ok=True)
        # Create filepath, including `self.model_directory`
        filepath =  Path(f"{self.model_directory}/{self.ticker}_{timestamp}.pkl")
        
        # Save `self.model`
        joblib.dump(self.model, filepath)
    
        
        # Return filepath
        return str(filepath)

    def load(self):

        """Load most recent model in `self.model_directory` for `self.ticker`,
        attach to `self.model` attribute.

        """
        # Create pattern for glob search
        pattern = str(Path(f"{self.model_directory}/{self.ticker}*.pkl"))
        

        # Try to find path of latest model
        try:
            model_path = sorted(glob(pattern))[-1]

        # Handle possible `IndexError`
        except IndexError:
            raise Exception(f"There are no trained models for {self.ticker}")

        # Load model & attach to `self.model`
        self.model = joblib.load(model_path)
        
        


Overwriting model.py


In [2]:
import os
import sqlite3
from glob import glob

from pathlib import Path
import pandas as pd
import requests
from arch.univariate.base import ARCHModelResult
from config import settings
from retrieve_data import *
import pickle

In [3]:
connection = sqlite3.connect(settings.db_name, check_same_thread=False)
repo = SQLiteRepo(connection=connection) 

print("repo type:", type(repo))
print("repo.connection type:", type(repo.connection))

repo type: <class 'retrieve_data.SQLiteRepo'>
repo.connection type: <class 'sqlite3.Connection'>


In [4]:
from model import GarchModel

# Instantiate a `GarchModel`
df_ticker = GarchModel(ticker="GBPUSD", repo=repo, use_new_data=False)

# Does `gm_ambuja` have the correct attributes?
assert df_ticker.ticker == "GBPUSD"
assert df_ticker.repo == repo
assert not df_ticker.use_new_data
assert df_ticker.model_directory == settings.model_directory

In [7]:
print(dir(df_ticker))

['_GarchModel__clean_prediction', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', 'calc_returns', 'dump', 'fit', 'load', 'model_directory', 'predict_volatility', 'repo', 'ticker', 'use_new_data']


In [None]:
MT5_Data().ticker_list()

In [5]:
ticker = "EURGBP"

# Instantiate `GarchModel`, use new data
model_shop = GarchModel(ticker=ticker, repo=repo, use_new_data=True)

# Check that model doesn't have `data` attribute yet
assert not hasattr(model_shop, "data")

# Wrangle data
model_shop.calc_returns(n_observations=1000)

# Does model now have `data` attribute?
assert hasattr(model_shop, "data")

# Is the `data` a Series?
assert isinstance(model_shop.data, pd.Series)

# Is Series correct shape?
assert model_shop.data.shape == (1000,)

print(ticker)
model_shop.data.tail()

EURGBP


date
2023-02-20   -0.140671
2023-02-21   -0.935369
2023-02-22    0.136511
2023-02-23    0.152229
2023-02-24    0.158804
Name: returns, dtype: float64

In [6]:
ticker = "EURJPY"

# Instantiate `GarchModel`, use new data
model_shop = GarchModel(ticker=ticker, repo=repo, use_new_data=True)

# Check that model doesn't have `data` attribute yet
assert not hasattr(model_shop, "data")

# Wrangle data
model_shop.calc_returns(n_observations=1000)

# Does model now have `data` attribute?
assert hasattr(model_shop, "data")

# Is the `data` a Series?
assert isinstance(model_shop.data, pd.Series)

# Is Series correct shape?
assert model_shop.data.shape == (1000,)

print(ticker)
model_shop.data.tail()

EURJPY


date
2023-02-20   -0.149945
2023-02-21    0.404412
2023-02-22   -0.484870
2023-02-23   -0.258645
2023-02-24    0.836820
Name: returns, dtype: float64

In [7]:
ticker = "US30"

# Instantiate `GarchModel`, use new data
model_shop = GarchModel(ticker=ticker, repo=repo, use_new_data=True)

# Check that model doesn't have `data` attribute yet
assert not hasattr(model_shop, "data")

# Wrangle data
model_shop.calc_returns(n_observations=1000)

# Does model now have `data` attribute?
assert hasattr(model_shop, "data")

# Is the `data` a Series?
assert isinstance(model_shop.data, pd.Series)

# Is Series correct shape?
assert model_shop.data.shape == (1000,)

print(ticker)
model_shop.data.tail()

US30


date
2023-02-20   -0.197926
2023-02-21   -1.774207
2023-02-22   -0.210941
2023-02-23    0.129852
2023-02-24   -1.081498
Name: returns, dtype: float64

In [8]:
ticker = "US30"
# Instantiate `GarchModel`, use old data
model_shop = GarchModel(ticker=ticker, repo=repo, use_new_data=False)

# Wrangle data
model_shop.calc_returns(n_observations=1000)

# Fit GARCH(1,1) model to data
model_shop.fit(p=1, q=1)

# Does `model_shop` have a `model` attribute now?
assert hasattr(model_shop, "model")

# Is model correct data type?
assert isinstance(model_shop.model, ARCHModelResult)

# Does model have correct parameters?
assert model_shop.model.params.index.tolist() == ["mu", "omega", "alpha[1]", "beta[1]"]

# Check model parameters
model_shop.model.summary()

0,1,2,3
Dep. Variable:,returns,R-squared:,0.0
Mean Model:,Constant Mean,Adj. R-squared:,0.0
Vol Model:,GARCH,Log-Likelihood:,-1444.53
Distribution:,Normal,AIC:,2897.06
Method:,Maximum Likelihood,BIC:,2916.7
,,No. Observations:,1000.0
Date:,"Fri, Feb 24 2023",Df Residuals:,999.0
Time:,21:46:00,Df Model:,1.0

0,1,2,3,4,5
,coef,std err,t,P>|t|,95.0% Conf. Int.
mu,0.0607,2.660e-02,2.281,2.256e-02,"[8.538e-03, 0.113]"

0,1,2,3,4,5
,coef,std err,t,P>|t|,95.0% Conf. Int.
omega,0.0665,1.949e-02,3.410,6.488e-04,"[2.826e-02, 0.105]"
alpha[1],0.2026,4.020e-02,5.040,4.651e-07,"[ 0.124, 0.281]"
beta[1],0.7595,3.376e-02,22.499,4.291e-112,"[ 0.693, 0.826]"


In [9]:
# Generate prediction from `model_shop`
prediction = model_shop.predict_volatility(horizon=5)

# Is prediction a dictionary?
assert isinstance(prediction, dict)

# Are keys correct data type?
assert all(isinstance(k, str) for k in prediction.keys())

# Are values correct data type?
assert all(isinstance(v, float) for v in prediction.values())

prediction

{'2023-02-27T00:00:00': 0.9889565243747797,
 '2023-02-28T00:00:00': 1.0036816388365841,
 '2023-03-01T00:00:00': 1.017646871909975,
 '2023-03-02T00:00:00': 1.0309036012115818,
 '2023-03-03T00:00:00': 1.0434983097042294}

In [10]:
model_directory = settings.model_directory
model_directory

'models'

In [25]:
from model import GarchModel

ticker = "US30"
# Instantiate `GarchModel`, use old data
model_shop = GarchModel(ticker=ticker, repo=repo, use_new_data=False)

# Wrangle data
model_shop.calc_returns(n_observations=1000)

# Fit GARCH(1,1) model to data
model_shop.fit(p=1, q=1)

model_shop.model.summary()

# Save `model_shop` model, assign filename
filename = model_shop.dump()

# Is `filename` a string?
assert isinstance(filename, str)

# Does filename include ticker symbol?
assert model_shop.ticker in filename

# Does file exist?
assert os.path.exists(filename)

filename

'models\\US30_2023-02-24T22.07.58.277325.pkl'

In [26]:
pattern = os.path.join(settings.model_directory, f"{ticker}*.pkl")
model_path = sorted(glob(pattern))[-1]
model_path

'models\\US30_2023-02-24T22.07.58.277325.pkl'

In [21]:
pattern = str(Path(f"{settings.model_directory}/{ticker}*.pkl"))
model_path = sorted(glob(pattern))[-1]
model_path

'models\\US30_2023-02-24T21.46.30.638308.pkl'

In [24]:
from model import GarchModel

# Save `model_shop` model, assign filename
filename = model_shop.dump()

# Is `filename` a string?
# assert isinstance(filename, str)

# Does filename include ticker symbol?
assert model_shop.ticker in filename

# Does file exist?
assert os.path.exists(filename)

filename

'models\\US30_2023-02-24T22.07.14.270180.pkl'

In [30]:
model_shop = GarchModel(ticker="US30", repo=repo, use_new_data=False)

# Check that new `model_shop_test` doesn't have model attached
assert not hasattr(model_shop, "model")

# Load model
model_shop.load()

# Does `model_shop_test` have model attached?
assert hasattr(model_shop, "model")

model_shop.model.summary()

0,1,2,3
Dep. Variable:,returns,R-squared:,0.0
Mean Model:,Constant Mean,Adj. R-squared:,0.0
Vol Model:,GARCH,Log-Likelihood:,-1444.53
Distribution:,Normal,AIC:,2897.06
Method:,Maximum Likelihood,BIC:,2916.7
,,No. Observations:,1000.0
Date:,"Fri, Feb 24 2023",Df Residuals:,999.0
Time:,22:07:58,Df Model:,1.0

0,1,2,3,4,5
,coef,std err,t,P>|t|,95.0% Conf. Int.
mu,0.0607,2.660e-02,2.281,2.256e-02,"[8.538e-03, 0.113]"

0,1,2,3,4,5
,coef,std err,t,P>|t|,95.0% Conf. Int.
omega,0.0665,1.949e-02,3.410,6.488e-04,"[2.826e-02, 0.105]"
alpha[1],0.2026,4.020e-02,5.040,4.651e-07,"[ 0.124, 0.281]"
beta[1],0.7595,3.376e-02,22.499,4.291e-112,"[ 0.693, 0.826]"


In [44]:
%%writefile main_module.py

#
# (c) Ricky Macharm, MScFE
# https://SisengAI.com
#

import sqlite3

from config import settings
from retrieve_data import SQLiteRepo
from fastapi import FastAPI
from model import GarchModel
from pydantic import BaseModel


# `FitIn` class
class FitIn(BaseModel):
    ticker: str
    use_new_data: bool
    n_observations: int
    p: int
    q: int


#  `FitOut` class
class FitOut(FitIn):
    success: bool
    message: str


#  `PredictIn` class
class PredictIn(BaseModel):
    ticker: str
    n_days: int


#  `PredictOut` class
class PredictOut(PredictIn):
    success: bool
    forecast: dict
    message: str



def build_model(ticker: str, use_new_data: bool = False) -> GarchModel:

    # Create DB connection
    connection = sqlite3.connect(settings.db_name, check_same_thread=False)

    # Create `SQLRepository`
    repo = SQLiteRepo(connection=connection)

    # Create model & Return model
    return GarchModel(ticker=ticker, use_new_data=use_new_data, repo=repo)




app = FastAPI()



# `"/hello" path with 200 status code
@app.get("/hello", status_code=200)
def hello():
    """Return dictionary with greeting message."""
    return {"message":"Hello World, I am live!"}



#  `"/fit" path, 200 status code
@app.post("/fit", status_code=200, response_model=FitOut)
def fit_model(request: FitIn):

    """Fit model, return confirmation message.

    Parameters
    ----------
    request : FitIn

    Returns
    ------
    dict
        Must conform to `FitOut` class
    """
    # Create `response` dictionary from `request`
    response = request.dict()

    # Create try block to handle exceptions
    try:
        # Build model with `build_model` function
        model = build_model(ticker=request.ticker, 
                            use_new_data=request.use_new_data)

        # Wrangle data
        model.calc_returns(n_observations=request.n_observations)

        # Fit model
        model.fit(p=request.p, q=request.q)

        # Save model
        filename = model.dump()

        # Add `"success"` key to `response`
        response["success"] = True

        # Add `"message"` key to `response` with `filename`
        response["message"] = f"Trained and saved '{filename}'. Metrics: AIC {model.aic}, BIC {model.bic}."
        
    # Create except block
    except Exception as e:
        # Add `"success"` key to `response`
        response["success"] = False

        # Add `"message"` key to `response` with error message
        response["message"] = str(e)

    # Return response
    return response


#  `"/predict" path, 200 status code
@app.post("/predict", status_code=200, response_model=PredictOut)
def get_prediction(request: PredictIn):

    # Create `response` dictionary from `request`
    response = request.dict()

    # Create try block to handle exceptions
    try:
        # Build model with `build_model` function
        model = build_model(ticker=request.ticker, use_new_data=False)

        # Load stored model
        model.load()

        # Generate prediction
        prediction = model.predict_volatility(horizon=request.n_days)

        # Add `"success"` key to `response`
        response["success"] = True

        # Add `"forecast"` key to `response`
        response["forecast"] = prediction

        # Add `"message"` key to `response`
        response["message"] = ""

    # Create except block
    except Exception as e:
        
        # Add `"success"` key to `response`
        response["success"] = False

        # Add `"forecast"` key to `response`
        response["forecast"] = {}

        #  Add `"message"` key to `response`
        response["message"] = str(e)

    # Return response
    return response


Overwriting main_module.py


Go to the command line, navigate to the directory for this project, and start your app server by entering the following command.

```bash
uvicorn main_module:app --reload --workers 1 --host localhost --port 8008
```

In [36]:
url = "http://localhost:8008/hello"
response = requests.get(url=url)

print(f"response code:{response.status_code}")
response.json()

response code:200


{'message': 'Hello World, I am live!'}

In [40]:
from main_module import FitIn, FitOut

# Instantiate `FitIn`. 
fi = FitIn(ticker="US2000",
           use_new_data=True,
           n_observations=2_000,
           p=1,
           q=1
)
print(fi)

# Instantiate `FitOut`. 
fo = FitOut(ticker="US2000",
           use_new_data=True,
           n_observations=2_000,
           p=1,
           q=1,
            success=True,
            message="model is ready to rock 'n' roll!!!"
)
print(fo)

ticker='US2000' use_new_data=True n_observations=2000 p=1 q=1
ticker='US2000' use_new_data=True n_observations=2000 p=1 q=1 success=True message="model is ready to rock 'n' roll!!!"


In [43]:
from main_module import build_model

# Instantiate `GarchModel` with function
model_shop = build_model(ticker="EURUSD", use_new_data=False)

# Is `SQLRepository` attached to `model_shop`?
assert isinstance(model_shop.repo, SQLiteRepo)

# Is SQLite database attached to `SQLRepository`
assert isinstance(model_shop.repo.connection, sqlite3.Connection)

# Is `ticker` attribute correct?
assert model_shop.ticker == "EURUSD"

# Is `use_new_data` attribute correct?
assert not model_shop.use_new_data

model_shop

<model.GarchModel at 0x1dc759eb610>

In [46]:
# URL of `/fit` path
url = "http://localhost:8008/fit"

# Data to send to path
json = {
    "ticker": "XAUUSD",
    "use_new_data": True,
    "n_observations": 2_000,
    "p": 1,
    "q": 1
}
# Response of post request
response = requests.post(url=url, json=json)
# Inspect response
print("response code:", response.status_code)
response.json()

response code: 200


{'ticker': 'XAUUSD',
 'use_new_data': True,
 'n_observations': 2000,
 'p': 1,
 'q': 1,
 'success': True,
 'message': "Trained and saved 'models\\XAUUSD_2023-02-25T00.07.48.627949.pkl'. Metrics: AIC 4731.956267959443, BIC 4754.359877797611."}

In [47]:
url = "http://localhost:8008/fit"

# Data to send to path
json = {
    "ticker": "BTC",
    "use_new_data": False,
    "n_observations": 2_000,
    "p": 1,
    "q": 1
}
# Response of post request
response = requests.post(url=url, json=json)
# Inspect response
print("response code:", response.status_code)
response.json()

response code: 200


{'ticker': 'BTC',
 'use_new_data': False,
 'n_observations': 2000,
 'p': 1,
 'q': 1,
 'success': False,
 'message': "Execution failed on sql 'SELECT * FROM 'BTC' LIMIT 2001': no such table: BTC"}

In [48]:
url = "http://localhost:8008/fit"

# Data to send to path
json = {
    "ticker": "BTC",
    "use_new_data": True,
    "n_observations": 2_000,
    "p": 1,
    "q": 1
}
# Response of post request
response = requests.post(url=url, json=json)
# Inspect response
print("response code:", response.status_code)
response.json()

response code: 200


{'ticker': 'BTC',
 'use_new_data': True,
 'n_observations': 2000,
 'p': 1,
 'q': 1,
 'success': False,
 'message': "Invalid Ticker name. HantecMarkets-MT5 does not contain 'BTC'."}

In [49]:
url = "http://localhost:8008/fit"

# Data to send to path
json = {
    "ticker": "EURGBP",
    "use_new_data": True,
    "n_observations": 2_000,
    "p": 1,
    "q": 1
}
# Response of post request
response = requests.post(url=url, json=json)
# Inspect response
print("response code:", response.status_code)
response.json()

response code: 200


{'ticker': 'EURGBP',
 'use_new_data': True,
 'n_observations': 2000,
 'p': 1,
 'q': 1,
 'success': True,
 'message': "Trained and saved 'models\\EURGBP_2023-02-25T00.10.09.612850.pkl'. Metrics: AIC 2387.29435602817, BIC 2409.6979658663386."}

In [52]:
from main_module import PredictIn, PredictOut

pi = PredictIn(ticker="USDJPY", n_days=5)
print(pi)

po = PredictOut(
    ticker="USDJPY", n_days=5, success=True, forecast={}, message="success"
)
print(po)

ticker='USDJPY' n_days=5
ticker='USDJPY' n_days=5 success=True forecast={} message='success'


In [54]:
# URL of `/predict` path
url = "http://localhost:8008/predict"
# Data to send to path
json = {"ticker": "XAUUSD", "n_days": 5}
# Response of post request
response = requests.post(url=url, json=json)
# Response JSON to be submitted to grader
submission = response.json()
# Inspect JSON
submission

{'ticker': 'XAUUSD',
 'n_days': 5,
 'success': True,
 'forecast': {'2023-02-27T00:00:00': 0.837385320823796,
  '2023-02-28T00:00:00': 0.8372922777102991,
  '2023-03-01T00:00:00': 0.8371997557077301,
  '2023-03-02T00:00:00': 0.8371077519534615,
  '2023-03-03T00:00:00': 0.8370162635999788},
 'message': ''}

In [56]:
# URL of `/predict` path
url = "http://localhost:8008/predict"
# Data to send to path
json = {"ticker": "US30", "n_days": 5}
# Response of post request
response = requests.post(url=url, json=json)
# Response JSON to be submitted to grader
submission = response.json()
# Inspect JSON
submission

{'ticker': 'US30',
 'n_days': 5,
 'success': True,
 'forecast': {'2023-02-27T00:00:00': 0.9889565243747797,
  '2023-02-28T00:00:00': 1.0036816388365841,
  '2023-03-01T00:00:00': 1.017646871909975,
  '2023-03-02T00:00:00': 1.0309036012115818,
  '2023-03-03T00:00:00': 1.0434983097042294},
 'message': ''}