In [7]:
# Import necessary libraries
from fastapi import FastAPI  # API framework to handle HTTP requests and serve APIs
from transformers import GPT2LMHeadModel, GPT2Tokenizer  # Hugging Face transformers to load GPT-2 model and tokenizer
from sklearn.feature_extraction.text import TfidfVectorizer  # TF-IDF for transforming text into numerical vectors
from sklearn.metrics.pairwise import cosine_similarity  # Cosine similarity for ranking documents based on query
import numpy as np  # Numpy for array operations

In [2]:

# Initialize FastAPI app
app = FastAPI()

# Simulated database of financial documents with metadata tagging for retrieval
database = {
    "doc1": {"text": "Company X has significant liquidity risk due to its high debt ratio.",
             "metadata": ["liquidity risk", "debt"]},
    "doc2": {"text": "Company X maintains a solid cash reserve, which mitigates some of the liquidity concerns.",
             "metadata": ["cash reserve", "liquidity"]},
    "doc3": {"text": "The current market conditions have led to increased liquidity stress for many firms, including Company X.",
             "metadata": ["market conditions", "liquidity stress"]},
    "doc4": {"text": "Company X has recently restructured its debt, which lowers its short-term liquidity risk.",
             "metadata": ["debt restructuring", "liquidity risk"]}
}

# Step 1: User Query
user_query = "What is the liquidity risk for Company X?"

# Step 2: Data Retrieval with Metadata Tagging
def retrieve_documents(query, database):
    relevant_docs = []
    for doc_id, doc_data in database.items():
        text = doc_data['text']
        metadata = doc_data['metadata']
        if any(tag in query.lower() for tag in metadata):
            relevant_docs.append(text)
    return relevant_docs

# Retrieve relevant documents based on the user's query
documents = retrieve_documents(user_query, database)

# Step 3: Preprocessing (Tokenization and Lowercasing)
def preprocess_text(text):
    tokens = text.lower().split()
    return tokens

# Preprocess all retrieved documents
preprocessed_docs = [preprocess_text(doc) for doc in documents]

# Step 4: Indexing (TF-IDF for Vectorization)
vectorizer = TfidfVectorizer()
doc_vectors = vectorizer.fit_transform(documents)

# Step 5: Preprocessing the user query for similarity comparison
query_vector = vectorizer.transform([user_query])

# Step 6: Ranking with Metadata (Cosine Similarity)
similarities = cosine_similarity(query_vector, doc_vectors).flatten()
ranked_doc_indices = np.argsort(similarities)[::-1]
ranked_docs = [documents[i] for i in ranked_doc_indices]

# Step 7: LLM Integration (GPT-2 for language generation)
model_name = "gpt2"
tokenizer = GPT2Tokenizer.from_pretrained(model_name)
model = GPT2LMHeadModel.from_pretrained(model_name)

# Assign the eos_token to the pad_token to avoid padding errors
tokenizer.pad_token = tokenizer.eos_token

# Generate input context for the model using the top 2 ranked documents
context = " ".join(ranked_docs[:2])
input_ids = tokenizer.encode(context, return_tensors='pt', padding=True)

# Set pad_token_id to eos_token_id to handle padding correctly
model.config.pad_token_id = tokenizer.eos_token_id

# Create an attention mask to avoid confusion between padding and real tokens
attention_mask = (input_ids != tokenizer.pad_token_id).long()

# Generate the response using GPT-2
output = model.generate(input_ids, attention_mask=attention_mask, max_length=100, num_return_sequences=1)

# Decode the generated output back to text, setting clean_up_tokenization_spaces to avoid the warning
generated_text = tokenizer.decode(output[0], skip_special_tokens=True, clean_up_tokenization_spaces=True)

# Step 8: Corrective RAG (If Needed)
def corrective_rag(generated_text, database):
    if "liquidity risk" not in generated_text:
        additional_docs = retrieve_documents("liquidity risk", database)
        additional_context = " ".join(additional_docs[:2])
        input_ids = tokenizer.encode(additional_context, return_tensors='pt')
        output = model.generate(input_ids, max_length=100, num_return_sequences=1)
        return tokenizer.decode(output[0], skip_special_tokens=True, clean_up_tokenization_spaces=True)
    return generated_text

# Apply corrective RAG if the generated response is incomplete
final_response = corrective_rag(generated_text, database)

# Step 9: Output the Final Risk Assessment (Handled by FastAPI)
@app.get("/assess_liquidity_risk")
def assess_liquidity_risk():
    return {"query": user_query, "response": final_response}



Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


In [5]:
import requests

# Make a GET request to your FastAPI endpoint
response = requests.get("http://127.0.0.1:8000/assess_liquidity_risk")

# Print the JSON response from the FastAPI server
print(response.json())


{'query': 'What is the liquidity risk for Company X?', 'response': "Company X maintains a solid cash reserve, which mitigates some of the liquidity concerns. Company X has significant liquidity risk due to its high debt ratio.\n\nThe Company's cash and cash equivalents are limited to $1.5 billion and $1.6 billion, respectively.\n\nThe Company's net income is $1.1 billion and $1.2 billion, respectively.\n\nThe Company's net income is $1.3 billion and $1.4 billion, respectively"}
