# Transformer Based Model for Chat-Bot
Using Pytorch
  

In [27]:
import warnings
warnings.filterwarnings('ignore')

### Import Statements

In [28]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import pandas as pd
import numpy as np
import re

### Set Huperparameters for model

- MAX_LEN = 40: The maximum length of input/output sequences (in tokens) for the model.
- BATCH_SIZE = 64: The number of training samples processed in one forward/backward pass.
- NUM_HEADS = 8: The number of attention heads in the multi-head attention mechanism.
- D_MODEL = 512: The dimensionality of the model’s hidden layer representations.
- FFN_UNITS = 2048: The number of units in the feed-forward neural network after each attention layer.
- DROPOUT = 0.1: The fraction of units to drop during training to prevent overfitting.
- NUM_LAYERS = 4: The number of layers in the encoder and decoder of the Transformer.
- EPOCHS = 300: The number of full passes through the training dataset during training.
- VOCAB_SIZE = 8000: The number of unique tokens in the model's vocabulary.


In [29]:
# Hyperparameters
MAX_LEN = 40
BATCH_SIZE = 64
NUM_HEADS = 8
D_MODEL = 512
FFN_UNITS = 2048
DROPOUT = 0.1
NUM_LAYERS = 4
EPOCHS = 300
VOCAB_SIZE = 8000

#### Setup decide for training

In [30]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cpu


#### load CSV file

In [31]:
file_path = 'data/reminiscences_of_a_stock_operator_qa.csv'
data = pd.read_csv(file_path, sep='\t')

In [32]:
questions = data['question'].tolist()
answers = data['answer'].tolist()

### Custom Tokenizer to keepp track of vocab and word to index

In [33]:
class CustomTokenizer:
    def __init__(self, vocab_size):
        self.vocab_size = vocab_size
        self.word2idx = {"<pad>": 0, "<start>": 1, "<end>": 2, "<unk>": 3}
        self.idx2word = {0: "<pad>", 1: "<start>", 2: "<end>", 3: "<unk>"}
        self.word_count = {}

    def fit_on_texts(self, texts):
        for text in texts:
            for word in text.split():
                self.word_count[word] = self.word_count.get(word, 0) + 1
        sorted_vocab = sorted(self.word_count.items(), key=lambda x: x[1], reverse=True)[:self.vocab_size - 4]
        for idx, (word, _) in enumerate(sorted_vocab, start=4):
            self.word2idx[word] = idx
            self.idx2word[idx] = word

    def texts_to_sequences(self, texts):
        sequences = []
        for text in texts:
            seq = [self.word2idx.get(word, self.word2idx["<unk>"]) for word in text.split()]
            sequences.append(seq)
        return sequences

    def sequences_to_texts(self, sequences):
        texts = []
        for seq in sequences:
            text = " ".join([self.idx2word.get(idx, "<unk>") for idx in seq])
            texts.append(text)
        return texts


In [34]:
tokenizer = CustomTokenizer(VOCAB_SIZE)
tokenizer.fit_on_texts(questions + answers)

### Initialize the dataset by converting the questions and answers to sequences of integers.

In [35]:
class ChatDataset(Dataset):
    def __init__(self, questions, answers, tokenizer, max_len):
        self.questions = tokenizer.texts_to_sequences(questions)
        self.answers = tokenizer.texts_to_sequences(answers)
        self.max_len = max_len

    def __len__(self):
        return len(self.questions)

    def __getitem__(self, idx):
        question = self.questions[idx]
        answer = self.answers[idx]

        question = [1] + question[:self.max_len - 2] + [2]
        answer = [1] + answer[:self.max_len - 2] + [2]

        question = question + [0] * (self.max_len - len(question))
        answer = answer + [0] * (self.max_len - len(answer))

        return torch.tensor(question), torch.tensor(answer)

### Compute the scaled dot-product attention.

In [36]:
def scaled_dot_product_attention(q, k, v, mask):
    matmul_qk = torch.matmul(q, k.transpose(-2, -1))
    dk = q.size(-1)
    scaled_attention_logits = matmul_qk / torch.sqrt(torch.tensor(dk, dtype=torch.float32, device=q.device))
    if mask is not None:
        scaled_attention_logits += (mask * -1e9)
    attention_weights = F.softmax(scaled_attention_logits, dim=-1)
    output = torch.matmul(attention_weights, v)
    return output

### Multi Head Atention 

In [37]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        self.depth = d_model // num_heads

        self.wq = nn.Linear(d_model, d_model)
        self.wk = nn.Linear(d_model, d_model)
        self.wv = nn.Linear(d_model, d_model)

        self.dense = nn.Linear(d_model, d_model)

    def forward(self, q, k, v, mask):
        batch_size = q.size(0)
        q = self.wq(q).view(batch_size, -1, self.num_heads, self.depth).transpose(1, 2)
        k = self.wk(k).view(batch_size, -1, self.num_heads, self.depth).transpose(1, 2)
        v = self.wv(v).view(batch_size, -1, self.num_heads, self.depth).transpose(1, 2)

        attention = scaled_dot_product_attention(q, k, v, mask)
        attention = attention.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        return self.dense(attention)

### Initialize the feed-forward network with two linear layers, ReLU activation, and dropout regularization

In [38]:
class FeedForwardNetwork(nn.Module):
    def __init__(self, d_model, ffn_units, dropout_rate):
        super(FeedForwardNetwork, self).__init__()
        self.linear1 = nn.Linear(d_model, ffn_units)
        self.linear2 = nn.Linear(ffn_units, d_model)
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x):
        x = self.linear1(x)
        x = F.relu(x)
        x = self.dropout(x)
        return self.linear2(x)

### Encoder: Multi Head Attention with Layer normalization

In [39]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, ffn_units, dropout_rate):
        super(EncoderLayer, self).__init__()
        self.attention = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForwardNetwork(d_model, ffn_units, dropout_rate)
        self.layernorm1 = nn.LayerNorm(d_model)
        self.layernorm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x, mask):
        attn_output = self.attention(x, x, x, mask)
        out1 = self.layernorm1(x + self.dropout(attn_output))
        ffn_output = self.ffn(out1)
        return self.layernorm2(out1 + self.dropout(ffn_output))

### Decoder: With look ahead mask and Cross attention

In [40]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, ffn_units, dropout_rate):
        super(DecoderLayer, self).__init__()
        self.attention1 = MultiHeadAttention(d_model, num_heads)
        self.attention2 = MultiHeadAttention(d_model, num_heads)
        self.ffn = FeedForwardNetwork(d_model, ffn_units, dropout_rate)
        self.layernorm1 = nn.LayerNorm(d_model)
        self.layernorm2 = nn.LayerNorm(d_model)
        self.layernorm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, x, enc_output, look_ahead_mask, padding_mask):
        attn1 = self.attention1(x, x, x, look_ahead_mask)
        out1 = self.layernorm1(x + self.dropout(attn1))

        attn2 = self.attention2(out1, enc_output, enc_output, padding_mask)
        out2 = self.layernorm2(out1 + self.dropout(attn2))

        ffn_output = self.ffn(out2)
        return self.layernorm3(out2 + self.dropout(ffn_output))

In [41]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len):
        super(PositionalEncoding, self).__init__()
        self.encoding = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float32).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        self.encoding[:, 0::2] = torch.sin(position * div_term)
        self.encoding[:, 1::2] = torch.cos(position * div_term)
        self.encoding = self.encoding.unsqueeze(0)

    def forward(self, x):
        return x + self.encoding[:, :x.size(1), :].to(x.device)

In [42]:
class Transformer(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, ffn_units, num_layers, dropout_rate, max_len):
        super(Transformer, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_len)

        # Encoder
        self.encoder_layers = nn.ModuleList([
            EncoderLayer(d_model, num_heads, ffn_units, dropout_rate)
            for _ in range(num_layers)
        ])

        # Decoder
        self.decoder_layers = nn.ModuleList([
            DecoderLayer(d_model, num_heads, ffn_units, dropout_rate)
            for _ in range(num_layers)
        ])

        self.fc = nn.Linear(d_model, vocab_size)

    def create_look_ahead_mask(self, size):
        mask = torch.triu(torch.ones(size, size), diagonal=1)
        return mask == 1

    def forward(self, encoder_input, decoder_input, encoder_mask=None, decoder_mask=None):
        # Encoder
        encoder_embedded = self.embedding(encoder_input)
        encoder_embedded = self.positional_encoding(encoder_embedded)

        encoder_output = encoder_embedded
        for layer in self.encoder_layers:
            encoder_output = layer(encoder_output, encoder_mask)

        # Decoder
        decoder_embedded = self.embedding(decoder_input)
        decoder_embedded = self.positional_encoding(decoder_embedded)

        look_ahead_mask = self.create_look_ahead_mask(decoder_input.size(1)).to(decoder_input.device)

        decoder_output = decoder_embedded
        for layer in self.decoder_layers:
            decoder_output = layer(decoder_output, encoder_output, look_ahead_mask, encoder_mask)

        return self.fc(decoder_output)

In [43]:
model = Transformer(VOCAB_SIZE, D_MODEL, NUM_HEADS, FFN_UNITS, NUM_LAYERS, DROPOUT, MAX_LEN)
model = model.to(device)

In [44]:
dataset = ChatDataset(questions, answers, tokenizer, MAX_LEN)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

### Optimizer and loss functions

In [45]:
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = nn.CrossEntropyLoss()

### Model Training

In [46]:
# Training Loop with Loss and Accuracy
for epoch in range(EPOCHS):
    model.train()
    total_loss = 0
    total_accuracy = 0
    total_tokens = 0

    for questions_batch, answers_batch in dataloader:
        questions_batch = questions_batch.to(device)
        decoder_input = answers_batch[:, :-1].to(device)  # Input for decoder
        target = answers_batch[:, 1:].to(device)  # Target for training

        optimizer.zero_grad()

        output = model(questions_batch, decoder_input)
        loss = criterion(output.reshape(-1, VOCAB_SIZE), target.reshape(-1))

        # Backpropagation
        loss.backward()
        optimizer.step()

        # Calculate accuracy
        predicted_tokens = torch.argmax(output, dim=-1)
        correct_tokens = (predicted_tokens == target).sum().item()
        total_accuracy += correct_tokens
        total_tokens += target.numel()

        total_loss += loss.item()

    epoch_loss = total_loss / len(dataloader)
    epoch_accuracy = total_accuracy / total_tokens

    print(f"Epoch {epoch + 1}/{EPOCHS}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_accuracy:.4f}")

Epoch 1/300, Loss: 5.5032, Accuracy: 0.3662
Epoch 2/300, Loss: 4.4478, Accuracy: 0.4203
Epoch 3/300, Loss: 3.9433, Accuracy: 0.4389
Epoch 4/300, Loss: 3.7801, Accuracy: 0.4529
Epoch 5/300, Loss: 3.6486, Accuracy: 0.4767
Epoch 6/300, Loss: 3.5726, Accuracy: 0.5061
Epoch 7/300, Loss: 3.3048, Accuracy: 0.5241
Epoch 8/300, Loss: 3.1798, Accuracy: 0.5382
Epoch 9/300, Loss: 2.9465, Accuracy: 0.5491
Epoch 10/300, Loss: 2.8544, Accuracy: 0.5615
Epoch 11/300, Loss: 2.7124, Accuracy: 0.5695
Epoch 12/300, Loss: 2.6106, Accuracy: 0.5807
Epoch 13/300, Loss: 2.5330, Accuracy: 0.5869
Epoch 14/300, Loss: 2.3795, Accuracy: 0.5963
Epoch 15/300, Loss: 2.3047, Accuracy: 0.6050
Epoch 16/300, Loss: 2.3672, Accuracy: 0.6138
Epoch 17/300, Loss: 2.2210, Accuracy: 0.6189
Epoch 18/300, Loss: 2.2178, Accuracy: 0.6257
Epoch 19/300, Loss: 2.0740, Accuracy: 0.6372
Epoch 20/300, Loss: 2.0231, Accuracy: 0.6464
Epoch 21/300, Loss: 1.9146, Accuracy: 0.6536
Epoch 22/300, Loss: 1.8078, Accuracy: 0.6640
Epoch 23/300, Loss:

### Export the Model

In [47]:
torch.save(model.state_dict(), 'model/transformer_chatbot_gpu_deco_1.pth')

In [48]:
import pickle
with open('model/tokenizer.pkl', 'wb') as f:
    pickle.dump(tokenizer, f)


In [49]:
# Imports specific to the trading strategy functionality
import yfinance as yf
from datetime import datetime
import matplotlib.pyplot as plt

def livermore_backtest(stock_symbol='NVDA', start_date_str='2020-01-01', end_date_str='2024-12-31'):
    """
    Runs a Livermore-inspired backtest on a given stock from start_date to end_date.
    Returns a summary string and a matplotlib figure showing the cumulative returns.
    """
    # Convert date strings to datetime objects
    start_date = pd.to_datetime(start_date_str)
    end_date = pd.to_datetime(end_date_str)
    
    # Download historical data for the stock
    stock_data = yf.download(stock_symbol, start=start_date, end=end_date)
    
    # Check if the downloaded data has MultiIndex columns
    if isinstance(stock_data.columns, pd.MultiIndex):
        try:
            # Some versions may use 'Ticker' as the level name
            df = stock_data.xs(stock_symbol, axis=1, level='Ticker')
        except Exception:
            # Otherwise, try with the first level
            df = stock_data.xs(stock_symbol, axis=1, level=0)
    else:
        df = stock_data.copy()
        
    # Now, df should have simple columns like 'Open', 'High', 'Low', 'Close', etc.
    # Calculate moving averages
    df['50MA'] = df['Close'].rolling(window=50).mean()
    df['200MA'] = df['Close'].rolling(window=200).mean()
    
    # Define breakout levels (previous 20-day high/low)
    df['20High'] = df['Close'].rolling(window=20).max()
    df['20Low'] = df['Close'].rolling(window=20).min()
    
    # Create positions: buy when price breaks above yesterday's 20-day high and is above both moving averages; sell otherwise
    df['Position'] = 0
    df.loc[
        (df['Close'] > df['20High'].shift(1)) &
        (df['Close'] > df['50MA']) &
        (df['Close'] > df['200MA']),
        'Position'
    ] = 1
    df.loc[df['Close'] < df['20Low'].shift(1), 'Position'] = 0
    df['Position'] = df['Position'].ffill().fillna(0)
    
    # Compute returns for buy-and-hold versus strategy
    df['Buy-and-Hold Return'] = df['Close'].pct_change()
    df['Strategy Return'] = df['Buy-and-Hold Return'] * df['Position']
    
    # Compute cumulative returns
    buy_and_hold_cum = df['Buy-and-Hold Return'].cumsum()
    strategy_cum = df['Strategy Return'].cumsum()
    
    # Plot the cumulative returns
    fig, ax = plt.subplots(figsize=(12,6))
    ax.plot(buy_and_hold_cum, label='Buy-and-Hold Return')
    ax.plot(strategy_cum, label='Strategy Return')
    ax.set_title(f"{stock_symbol}: Livermore-Inspired Trading Strategy vs Buy-and-Hold")
    ax.set_xlabel("Date")
    ax.set_ylabel("Cumulative Return")
    ax.grid(True)
    ax.legend()
    plt.close(fig)  # close figure to prevent auto display in non-interactive environments
    
    # Create a summary message
    summary = f"**{stock_symbol} Backtest Summary ({start_date_str} to {end_date_str})**\n"
    summary += f"Final Buy-and-Hold Return: {buy_and_hold_cum.iloc[-1]:.2%}\n"
    summary += f"Final Strategy Return: {strategy_cum.iloc[-1]:.2%}\n"
    return summary, fig



In [50]:
def apply_livermore_strategy(response, question):
    """
    Examines the user question for trading-related keywords. If detected,
    runs the Livermore backtest and appends the summary to the base response.
    """
    keywords = ['backtest', 'trading', 'stock', 'chart', 'performance']
    if any(keyword in question.lower() for keyword in keywords):
        # You could also attempt to parse out custom parameters from the question.
        # For now, we use default values.
        summary, fig = livermore_backtest()
        # Append the backtest summary to the generated answer
        response += "\n\n" + summary
        # For notebooks: display the plot
        try:
            from IPython.display import display
            display(fig)
        except ImportError:
            pass  # In a non-notebook environment, you might save the figure instead
    return response


In [51]:
def chat_response_with_strategy(question, tokenizer, model, max_len=40):
    """
    Generates a chatbot response using the transformer model and then applies
    Livermore-inspired modifications if the question is trading-related.
    """
    base_response = chat_response(question, tokenizer, model, max_len)
    final_response = apply_livermore_strategy(base_response, question)
    return final_response


### Load and Test the model

In [52]:
# Load the Trained Model
model.load_state_dict(torch.load("model/transformer_chatbot_gpu_deco_1.pth"))
model.eval()

# Chat Function
def chat_response(question, tokenizer, model, max_len=40):
    model.eval()
    question_seq = tokenizer.texts_to_sequences([question])[0]
    question_seq = [1] + question_seq[:max_len - 2] + [2]  # Add <start> and <end> tokens
    question_seq = question_seq + [0] * (max_len - len(question_seq))  # Pad to max_len
    question_tensor = torch.tensor([question_seq]).to(device)

    decoder_input = torch.tensor([[1]]).to(device)  # Start token
    response = []

    for _ in range(max_len):
        with torch.no_grad():
            output = model(question_tensor, decoder_input)
        predicted_id = torch.argmax(output[:, -1, :], dim=-1).item()
        if predicted_id == 2:  # End token
            break
        response.append(predicted_id)
        decoder_input = torch.cat([decoder_input, torch.tensor([[predicted_id]]).to(device)], dim=-1)

    return tokenizer.sequences_to_texts([response])[0].replace("<start>", "").replace("<end>", "").strip()

# Interactive Chat Loop
# while True:
#     user_input = input("You: ")
#     if user_input.lower() in ["exit", "quit"]:
#         print("Exiting chatbot. Goodbye!")
#         break
#     bot_response = chat_response(user_input, tokenizer, model)
#     print(f"Bot: {bot_response}")
while True:
    user_input = input("You: ")
    if user_input.lower() in ["exit", "quit"]:
        print("Exiting chatbot. Goodbye!")
        break
    # Generate a base response and then adjust it with Livermore logic
    bot_response = chat_response_with_strategy(user_input, tokenizer, model)
    print(f"Livermore: {bot_response}")


You:  Hi


Livermore: Hello. I'm always watching the tape. What brings you to the market today?


You:  Hello


Livermore: Hello there. Shall we talk tape or tactics?


You:  Hello


Livermore: Hello there. Shall we talk tape or tactics?


You:  How are you?


Livermore: I'm steady—like a trader should be. Emotions must never swing more than the market.


You:  How are you?


Livermore: I'm steady—like a trader should be. Emotions must never swing more than the market.


You:  Hi


Livermore: Hello. I'm always watching the tape. What brings you to the market today?


You:  How are you doing?


Livermore: Analyzing trends and waiting for the right setup. Patience is a position too.


You:  Who are you?


Livermore: I'm always observing the market's mood. Today, it seems to be showing signs of accumulation.


You:  What's your name?


Livermore: I’m Jesse Livermore. Some called me the Boy Plunger. I just followed the tape.


You:  What are your principles?


Livermore: My trading principles are forged through decades of wins and losses. First and foremost, I cut my losses quickly—there’s no honor in holding a losing position. I let my winners run, never interfere with a stock that’s working.


You:  What is boolish?


Livermore: A bullish market or stock shows strength—higher highs, strong demand, and volume confirming momentum. It means the path of least resistance is upward. In such times, I look to go long.


You:  exit


Exiting chatbot. Goodbye!
