<img src="https://hilpisch.com/tpq_logo.png" alt="The Python Quants" width="35%" align="right" border="0"><br>

# NLP Basics

**Deploying and Chatting with Multiple LLMs**

&copy; Dr. Yves J. Hilpisch

<a href="https://tpq.io" target="_blank">https://tpq.io</a> | <a href="https://twitter.com/dyjh" target="_blank">@dyjh</a> | <a href="mailto:team@tpq.io">team@tpq.io</a>

## Gradio

The `gradio` Python package allows the efficient generation of chat interfaces for LLMs.

Compare, for example, https://huggingface.co/spaces/akhaliq/anychat.

See their website at https://www.gradio.app.

## Set the Model Path

Define where the (large) model files are getting stored:

    export HF_HOME=/path/to/custom/cache

## Chat Interface

_Generated with `o3-mini-high`._

In [None]:
!git clone https://github.com/tpq-classes/natural_language_processing.git
import sys
sys.path.append('natural_language_processing')


In [None]:
#
# From ChatGPT (o3-mini-high)
#

import time
import threading
import gradio as gr
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer

# List of available models
models = [
    "Qwen/Qwen1.5-1.8B-Chat",
    "deepseek-ai/deepseek-coder-1.3b-base",
    "deepseek-ai/deepseek-coder-1.3b-instruct",
    "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B",
    # "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B"
]

# Check for available device: CUDA > MPS > CPU
if torch.cuda.is_available():
    device = "cuda"
elif torch.backends.mps.is_available():
    device = "mps"
else:
    device = "cpu"
print(f"Using device: {device}")

# Global cache for loaded models and tokenizers.
loaded_models = {}

def load_model(model_name: str):
    if model_name not in loaded_models:
        print(f"Loading model {model_name} ...")
        tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False)
        model = AutoModelForCausalLM.from_pretrained(model_name)
        model.to(device)
        loaded_models[model_name] = (model, tokenizer)
    return loaded_models[model_name]

def format_prompt(history):
    """
    Formats the conversation history into a prompt string.
    Expects history to be a list of dictionaries with keys "role" and "content".
    For the last assistant message with empty content, it appends "Assistant:" without a newline.
    """
    prompt = ""
    for i, message in enumerate(history):
        if message["role"] == "user":
            prompt += f"User: {message['content']}\n"
        elif message["role"] == "assistant":
            # For the last assistant message with empty content, do not add a newline.
            if i == len(history) - 1 and message["content"] == "":
                prompt += "Assistant:"
            else:
                prompt += f"Assistant: {message['content']}\n"
    return prompt

def chat_with_model(user_message, history, model_name, max_new_tokens):
    if history is None:
        history = []
    # Append new user message and an empty assistant message.
    history = history + [
        {"role": "user", "content": user_message},
        {"role": "assistant", "content": ""}
    ]
    prompt = format_prompt(history)
    
    model, tokenizer = load_model(model_name)
    inputs = tokenizer(prompt, return_tensors="pt")
    input_ids = inputs.input_ids.to(device)
    attention_mask = inputs.attention_mask.to(device) if "attention_mask" in inputs else None

    streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)
    generation_kwargs = dict(
        input_ids=input_ids,
        attention_mask=attention_mask,
        max_new_tokens=int(max_new_tokens),
        streamer=streamer,
        do_sample=True,
        temperature=0.7,
    )

    generation_thread = threading.Thread(target=model.generate, kwargs=generation_kwargs)
    generation_thread.start()

    start_time = time.time()
    token_count = 0
    generated_text = ""
    
    for new_text in streamer:
        generated_text += new_text
        token_count += len(tokenizer.tokenize(new_text))
        elapsed = time.time() - start_time
        tokens_per_sec = token_count / elapsed if elapsed > 0 else token_count
        history[-1]["content"] = generated_text
        yield history, f"{tokens_per_sec:.2f} tokens/s", history

    generation_thread.join()

with gr.Blocks(title="Chat with Transformers Models") as demo:
    gr.Markdown("## Chat with Transformers Models")
    
    with gr.Row():
        model_dropdown = gr.Dropdown(label="Select Model", choices=models, value=models[0])
        max_tokens_input = gr.Number(label="Max New Tokens", value=250)
    
    # Use the new 'messages' type for the Chatbot component.
    chatbot = gr.Chatbot(type="messages", label="Chat")
    token_speed_display = gr.Textbox(label="Token Speed (tokens/s)", interactive=False)
    
    with gr.Row():
        user_input = gr.Textbox(placeholder="Enter your message here...", label="Your Message")
        submit_btn = gr.Button("Submit")
    
    state = gr.State([])

    submit_btn.click(
        chat_with_model,
        inputs=[user_input, state, model_dropdown, max_tokens_input],
        outputs=[chatbot, token_speed_display, state],
        api_name="chat"
    )
    user_input.submit(
        chat_with_model,
        inputs=[user_input, state, model_dropdown, max_tokens_input],
        outputs=[chatbot, token_speed_display, state],
        api_name="chat"
    )

if __name__ == "__main__":
    demo.launch(server_name="0.0.0.0")

## Code Testing

From Gemini Flash 2.0

In [None]:
import numpy as np
import matplotlib.pyplot as plt

def geometric_brownian_motion(S0, r, T, sigma, steps, paths):
    """
    Simulates geometric Brownian motion.

    Args:
        S0: Initial price.
        r: Risk-free rate.
        T: Time to maturity.
        sigma: Volatility.
        steps: Number of time steps.
        paths: Number of simulation paths.

    Returns:
        A NumPy array of shape (paths, steps + 1) containing the simulated price paths.
    """

    dt = T / steps
    dW = np.random.normal(loc=0, scale=np.sqrt(dt), size=(paths, steps))  # Wiener process increments
    W = np.cumsum(dW, axis=1)  # Wiener process
    time_steps = np.linspace(0, T, steps + 1)
    St = S0 * np.exp((r - 0.5 * sigma**2) * time_steps + sigma * np.insert(W, 0, 0, axis=1)) #Geometric Brownian Motion formula

    return St


# Parameters
S0 = 100  # Initial price
r = 0.05  # Risk-free rate
T = 1.0   # Time to maturity
sigma = 0.2  # Volatility
steps = 50  # Number of time steps
paths = 10000  # Number of simulation paths

# Simulate GBM
St = geometric_brownian_motion(S0, r, T, sigma, steps, paths)

# Plotting the results (optional but highly recommended)
plt.figure(figsize=(10, 6))
for i in range(min(paths, 100)):  # Plot a subset of paths for clarity
    plt.plot(np.linspace(0, T, steps+1), St[i,:])
plt.xlabel("Time (t)")
plt.ylabel("Stock Price (St)")
plt.title("Geometric Brownian Motion Simulation")
plt.grid(True)
plt.show()

# Example: Accessing the final prices at time T
final_prices = St[:, -1]
print(f"Mean of final prices: {np.mean(final_prices)}")
print(f"Standard deviation of final prices: {np.std(final_prices)}")

#You can further analyze final_prices to estimate things like option prices.

In [None]:
import numpy as np
from scipy.stats import norm

def black_scholes(S0, K, r, T, sigma, option_type="call"):
    """
    Calculates European option price using the Black-Scholes formula.

    Args:
        S0: Initial stock price.
        K: Strike price.
        r: Risk-free interest rate.
        T: Time to maturity.
        sigma: Volatility.
        option_type: "call" or "put".

    Returns:
        European option price.
    """
    d1 = (np.log(S0 / K) + (r + 0.5 * sigma**2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)

    if option_type == "call":
        price = S0 * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)
    elif option_type == "put":
        price = K * np.exp(-r * T) * norm.cdf(-d2) - S0 * norm.cdf(-d1)
    else:
        raise ValueError("Invalid option type. Must be 'call' or 'put'.")
    return price


def lsmc_american_option(S0, K, r, T, sigma, paths=100000, steps=50, option_type="call"):
    """
    Calculates American option price using the Least-Squares Monte Carlo (LSM) method.

    Args:
        S0: Initial stock price.
        K: Strike price.
        r: Risk-free interest rate.
        T: Time to maturity.
        sigma: Volatility.
        paths: Number of Monte Carlo paths.
        steps: Number of time steps.
        option_type: "call" or "put".

    Returns:
        American option price.
    """

    dt = T / steps
    df = np.exp(-r * dt)

    # Generate stock price paths
    S = np.zeros((paths, steps + 1))
    S[:, 0] = S0
    for i in range(steps):
        S[:, i + 1] = S[:, i] * np.exp((r - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * np.random.normal(size=paths))

    # Calculate intrinsic values at maturity
    if option_type == "call":
        intrinsic_values = np.maximum(S[:, -1] - K, 0)
    elif option_type == "put":
        intrinsic_values = np.maximum(K - S[:, -1], 0)
    else:
        raise ValueError("Invalid option type. Must be 'call' or 'put'.")


    # Backward induction
    V = intrinsic_values  # Initialize continuation values at maturity
    for t in range(steps - 1, 0, -1):
        if option_type == "call":
            exercise_values = np.maximum(S[:, t] - K, 0)
        elif option_type == "put":
            exercise_values = np.maximum(K - S[:, t], 0)
        else:
            raise ValueError("Invalid option type. Must be 'call' or 'put'.")


        # Regression to estimate continuation value
        X = S[:, t]  # Use current stock price as regressor. Could add more, e.g. S**2
        try: # Try except block to catch singular matrix errors.
            coefficients = np.polyfit(X, V * df, 5) # Fit 2nd order polynomial
            continuation_values = np.polyval(coefficients, X)
        except np.linalg.LinAlgError: # If error, use simpler regression (or handle differently)
            coefficients = np.polyfit(X, V * df, 1) # Fit 1st order polynomial
            continuation_values = np.polyval(coefficients, X)



        # Determine optimal exercise decision
        V = np.where(exercise_values > continuation_values, exercise_values, V * df)

    # Option price is the discounted average of the continuation values at t=0
    return np.mean(V)



# Example usage (using your defaults)
S0 = 36
K = 40
r = 0.06
T = 1
sigma = 0.2
paths = 100000
steps = 50

call_price = lsmc_american_option(S0, K, r, T, sigma, paths, steps, option_type="call")
put_price = lsmc_american_option(S0, K, r, T, sigma, paths, steps, option_type="put")

print(f"American Call Price (LSM): {call_price}")
print(f"American Put Price (LSM): {put_price}")


# For comparison, calculate European prices:
european_call = black_scholes(S0, K, r, T, sigma, option_type="call")
european_put = black_scholes(S0, K, r, T, sigma, option_type="put")
print(f"European Call Price (Black-Scholes): {european_call}")
print(f"European Put Price (Black-Scholes): {european_put}")

## Code Testing

From ChatGPT (o3-mini-high), after a few minutes of reasoning.

In [None]:
import numpy as np

def american_option_lsm(S0=36, K=40, r=0.06, T=1, sigma=0.2, paths=100000, steps=50, option_type='put'):
    """
    Price an American option (put or call) using the Least-Squares Monte Carlo (LSM) algorithm
    under the Black-Scholes model.
    
    Parameters:
      S0         : initial stock price (default 36)
      K          : strike price (default 40)
      r          : risk-free interest rate (default 0.06)
      T          : time to maturity (default 1 year)
      sigma      : volatility (default 0.2)
      paths      : number of Monte Carlo simulation paths (default 100000)
      steps      : number of time steps (default 50)
      option_type: 'put' or 'call' (default 'put')
      
    Returns:
      price      : estimated option price.
    """
    dt = T / steps  # time increment
    
    # Simulate asset paths: shape (paths, steps+1)
    S = np.empty((paths, steps + 1))
    S[:, 0] = S0
    for t in range(1, steps + 1):
        # Generate standard normal variates for each path
        Z = np.random.normal(size=paths)
        S[:, t] = S[:, t-1] * np.exp((r - 0.5 * sigma**2) * dt + sigma * np.sqrt(dt) * Z)
    
    # Compute the immediate payoff at each time.
    # For a put: payoff = max(K - S, 0); for a call: payoff = max(S - K, 0)
    if option_type.lower() == 'put':
        payoff = np.maximum(K - S, 0)
    elif option_type.lower() == 'call':
        payoff = np.maximum(S - K, 0)
    else:
        raise ValueError("option_type must be either 'put' or 'call'")
    
    # At maturity, the option payoff is known.
    V = payoff[:, -1]
    
    # Backward induction:
    # (We consider decision dates at t = steps-1, steps-2, …, 1. In many LSM implementations
    #  the decision at time 0 is not optimized, so the price at time 0 is obtained by discounting
    #  the time-1 value.)
    for t in range(steps - 1, 0, -1):
        # Discount the continuation value from time t+1 back to time t
        discounted_V = V * np.exp(-r * dt)
        
        # Find paths that are "in the money" at time t (i.e. where immediate exercise gives positive payoff)
        itm = np.where(payoff[:, t] > 0)[0]
        
        if len(itm) > 0:
            # For these paths, we estimate the continuation value using regression.
            # Here we use a quadratic polynomial basis: [1, S, S^2]
            X = S[itm, t]
            # Build the design matrix for regression
            A = np.vstack([np.ones_like(X), X, X**2]).T
            # The “observed” continuation value is the discounted payoff from later exercise
            Y = discounted_V[itm]
            # Compute least-squares regression coefficients
            coeffs, _, _, _ = np.linalg.lstsq(A, Y, rcond=None)
            # Estimated continuation value for each in–the–money path
            continuation_est = coeffs[0] + coeffs[1] * X + coeffs[2] * X**2
            
            # Immediate exercise value at time t for these paths
            exercise_value = payoff[itm, t]
            
            # For those paths where immediate exercise is better than holding, update the value.
            exercise = exercise_value > continuation_est
            discounted_V[itm[exercise]] = exercise_value[exercise]
        
        # Now V becomes the option value at time t.
        V = discounted_V
    
    # Finally, discount the value from the first decision date (time dt) back to time 0.
    price = np.mean(V * np.exp(-r * dt))
    return price

if __name__ == "__main__":
    # Default parameters:
    S0 = 36
    K = 40
    r = 0.06
    T = 1
    sigma = 0.2
    paths = 100000
    steps = 50
    
    # Price an American put option.
    put_price = american_option_lsm(S0=S0, K=K, r=r, T=T, sigma=sigma,
                                    paths=paths, steps=steps, option_type='put')
    # Price an American call option.
    call_price = american_option_lsm(S0=S0, K=K, r=r, T=T, sigma=sigma,
                                     paths=paths, steps=steps, option_type='call')
    
    print("American Put Option Price (LSM): {:.4f}".format(put_price))
    print("American Call Option Price (LSM): {:.4f}".format(call_price))


<img src="https://hilpisch.com/tpq_logo.png" alt="The Python Quants" width="35%" align="right" border="0"><br>

<a href="https://tpq.io" target="_blank">https://tpq.io</a> | <a href="https://twitter.com/dyjh" target="_blank">@dyjh</a> | <a href="mailto:team@tpq.io">team@tpq.io</a>