<a href="https://colab.research.google.com/github/jonzyyyy/Earnings_QnA/blob/main/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Data preparation
- Obtain earnings call transcripts and create a custom dataset of question-answer pairs in a suitable format (e.g., SQuAD format).
- This will involve manually identifying questions and their corresponding answers within the transcripts. Split the dataset into training and validation sets.

Model selection and loading
- Load a pre-trained transformer model suitable for question answering (e.g., `distilbert-base-cased-distilled-squad`) using the `transformers` library and load the corresponding tokenizer.


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
%%capture
%pip install transformers datasets torch evaluate

In [None]:
# TO FILL UP VALUES
from google.colab import userdata

API_KEY = userdata.get('API_KEY')
COMPANY_TICKER = 'MSFT'
FISCAL_QUARTER = 'Q4'
FISCAL_YEAR = '2025' # Using current year for this example

In [None]:
from transformers import AutoModelForQuestionAnswering, AutoTokenizer

model_name = "distilbert-base-cased-distilled-squad"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForQuestionAnswering.from_pretrained(model_name)

In [None]:
import requests

def retrieve_transcript(company_ticker, fiscal_year, fiscal_quarter):
    """
    Retrieve the earnings call transcript for a given company and fiscal quarter.
    """
    url = f'https://www.alphavantage.co/query?function=EARNINGS_CALL_TRANSCRIPT&symbol={company_ticker}&quarter={fiscal_year}{fiscal_quarter}&apikey={API_KEY}'

    response = requests.get(url)
    data = response.json()

    if 'transcript' in data:
        return data['transcript']
    else:
        raise ValueError("Could not retrieve transcript. Response: {}".format(data))

In [None]:
import os

# Define the filename
filename = f'./data/{COMPANY_TICKER}_{FISCAL_YEAR}{FISCAL_QUARTER}_transcript.txt'
transcript = retrieve_transcript(COMPANY_TICKER, FISCAL_YEAR, FISCAL_QUARTER)

def save_transcript(filename, transcript):
    """
    Save the transcript to a file.
    """
    # Create the directory if it doesn't exist
    os.makedirs(os.path.dirname(filename), exist_ok=True)

    with open(filename, 'w') as f:
        for entry in transcript:
            f.write(entry['speaker'] + ': ' + entry['content'] + '\n')

    print(f"Transcript saved to {filename}")

save_transcript(filename, transcript)

Transcript saved to ./data/MSFT_2025Q4_transcript.txt


In [None]:
import json
import random

# Define the path to your annotations file and transcript file
annotations_file = '/content/drive/My Drive/QA Bot/data/annotations.json'
transcript_file = filename

def load_qa_pairs(annotations_file, transcript_file):

    # Load the transcript text
    with open(transcript_file, 'r') as f:
        transcript_text = f.read()

    # Load the question-answer pairs from the JSON file
    with open(annotations_file, 'r') as f:
        qa_data = json.load(f)

    # Convert the loaded data into the desired format (list of dictionaries)
    # Assuming the JSON structure is a list of objects with "question", "answers.text", and "answers.answer_start"
    qa_pairs = []
    for id, item in enumerate(qa_data):
        qa_pairs.append({
            "id": str(id),
            "question": item["question"],
            "answer_text": item["answers"]["text"][0],  # Assuming only one answer text per question
            "answer_start": item["answers"]["answer_start"][0], # Assuming only one answer start per question
            "context": transcript_text # Use the previously loaded transcript text as context
        })

    print(f"Loaded {len(qa_pairs)} question-answer pairs.")
    # You might want to display a sample of the loaded data to verify
    print(qa_pairs[:5])
    return qa_pairs

def split_data(qa_pairs, train_ratio=0.8):

    # Shuffle and split the data into training and validation sets (80/20 split)
    random.shuffle(qa_pairs)
    split_index = int(len(qa_pairs) * train_ratio)
    train_dataset = qa_pairs[:split_index]
    validation_dataset = qa_pairs[split_index:]

    print(f"Created training dataset with {len(train_dataset)} samples.")
    print(f"Created validation dataset with {len(validation_dataset)} samples.")

    return train_dataset, validation_dataset

## Data preprocessing

### Subtask:
Preprocess the custom dataset to be compatible with the chosen transformer model, including tokenization and formatting the data into the required input format for question answering.


In [None]:
from datasets import Dataset

# Convert lists of dictionaries to Dataset objects
qa_pairs = load_qa_pairs(annotations_file, transcript_file)
train_dataset, validation_dataset = split_data(qa_pairs, train_ratio=0.8)
train_dataset = Dataset.from_list(train_dataset)
validation_dataset = Dataset.from_list(validation_dataset)

def preprocess_function(examples):
    questions = [q.strip() for q in examples["question"]]
    inputs = tokenizer(
        questions,
        examples["context"],
        max_length=512,
        truncation="only_second",
        stride=128,
        return_overflowing_tokens=True,
        return_offsets_mapping=True,
        padding="max_length",
    )

    offset_mapping = inputs["offset_mapping"]
    sample_map = inputs["overflow_to_sample_mapping"]
    answers = examples["answer_text"]
    start_positions = examples["answer_start"]
    example_ids = examples["id"] # Get the original example IDs

    inputs["start_positions"] = []
    inputs["end_positions"] = []
    inputs["sequence_ids"] = [] # Add sequence_ids to inputs
    inputs["example_id"] = [] # Add example_id to inputs

    for i, offset in enumerate(offset_mapping):
        sample_idx = sample_map[i]
        answer = answers[sample_idx]
        start_char = start_positions[sample_idx]
        end_char = start_char + len(answer)
        example_id = example_ids[sample_idx] # Get the example ID for the current feature

        sequence_ids = inputs.sequence_ids(i)
        inputs["sequence_ids"].append(sequence_ids) # Append sequence_ids for each feature
        inputs["example_id"].append(example_id) # Append example_id for each feature


        # Find the start and end of the context
        idx = 0
        while sequence_ids[idx] != 1:
            idx += 1
        context_start = idx
        while sequence_ids[idx] == 1:
            idx += 1
        context_end = idx - 1

        # If the answer is not fully contained in the context chunk, label as 0
        if offset[context_start][0] > start_char or offset[context_end][1] < end_char:
            inputs["start_positions"].append(0)
            inputs["end_positions"].append(0)
        else:
            # Otherwise it's the start and end token positions
            idx = context_start
            while idx <= context_end and offset[idx][0] < start_char:
                idx += 1
            inputs["start_positions"].append(idx)

            idx = context_end
            while idx >= context_start and offset[idx][1] > end_char:
                idx -= 1
            inputs["end_positions"].append(idx)

    return inputs

tokenized_train_dataset = train_dataset.map(preprocess_function, batched=True, remove_columns=["id", "question", "answer_text", "answer_start", "context"]) # Update remove_columns
tokenized_validation_dataset = validation_dataset.map(preprocess_function, batched=True, remove_columns=["id", "question", "answer_text", "answer_start", "context"]) # Update remove_columns

print("Preprocessing complete.")
print("Tokenized training dataset:", tokenized_train_dataset)
print("Tokenized validation dataset:", tokenized_validation_dataset)

Loaded 48 question-answer pairs.
[{'id': '0', 'question': 'What was the total annual revenue for Microsoft Cloud?', 'answer_text': '$168 billion', 'answer_start': 3009, 'context': "Operator: Greetings, and welcome to the Microsoft Fiscal Year 2025 Fourth Quarter Earnings Conference Call. As a reminder, this conference is being recorded. It is now my pleasure to introduce Jonathan Neilson, Vice President of Investor Relations.\nJonathan Neilson: Good afternoon, and thank you for joining us today. On the call with me are Satya Nadella, Chairman and Chief Executive Officer; Amy Hood, Chief Financial Officer; Alice Jolla, Chief Accounting Officer; and Keith Dolliver, Corporate Secretary and Deputy General Counsel. On the Microsoft Investor Relations website, you can find our earnings press release and financial summary slide deck, which is intended to supplement our prepared remarks during today's call and provides a reconciliation of differences between GAAP and non-GAAP financial measure

Map:   0%|          | 0/38 [00:00<?, ? examples/s]

Map:   0%|          | 0/10 [00:00<?, ? examples/s]

Preprocessing complete.
Tokenized training dataset: Dataset({
    features: ['input_ids', 'attention_mask', 'offset_mapping', 'overflow_to_sample_mapping', 'start_positions', 'end_positions', 'sequence_ids', 'example_id'],
    num_rows: 1032
})
Tokenized validation dataset: Dataset({
    features: ['input_ids', 'attention_mask', 'offset_mapping', 'overflow_to_sample_mapping', 'start_positions', 'end_positions', 'sequence_ids', 'example_id'],
    num_rows: 275
})


In [None]:
import collections
import numpy as np
import evaluate

metric = evaluate.load("squad")

def compute_metrics(eval_predictions):
    start_logits, end_logits = eval_predictions.predictions

    n_best = 20
    max_answer_length = 30

    example_to_features = collections.defaultdict(list)
    for i, feature in enumerate(tokenized_validation_dataset):
         example_to_features[feature["overflow_to_sample_mapping"]].append(i)

    predictions = {}
    references = {}

    for example_index, example in enumerate(validation_dataset):
        example_id = example["id"]
        references[example_id] = [example["answer_text"]]


        best_score = -float("inf")
        start_logit = None
        end_logit = None

        feature_indices = example_to_features[example_index]

        # Get all the features associated with the example
        features = [tokenized_validation_dataset[i] for i in feature_indices]

        # Get the corresponding logits
        start_logits_for_example = start_logits[feature_indices]
        end_logits_for_example = end_logits[feature_indices]

        # Find the best answer span across all features for this example
        for feature_index, feature in zip(feature_indices, features):
            start_logits_feature = start_logits[feature_index]
            end_logits_feature = end_logits[feature_index]
            offset_mapping = feature["offset_mapping"]

            # Limit the search to the context part of the sequence
            sequence_ids = feature["sequence_ids"]
            context_start = 0
            while sequence_ids[context_start] != 1:
                context_start += 1
            context_end = len(sequence_ids) - 1
            while sequence_ids[context_end] != 1:
                context_end -= 1

            # Get the n_best scores and indices
            start_indexes = np.argsort(start_logits_feature)[-1 - n_best :][::-1]
            end_indexes = np.argsort(end_logits_feature)[-1 - n_best :][::-1]

            for start_index in start_indexes:
                for end_index in end_indexes:
                    # Ignore predictions outside of the context span
                    if start_index < context_start or end_index > context_end:
                        continue
                    # Ignore predictions where the end is before the start
                    if end_index < start_index:
                        continue
                    # Ignore predictions that are too long
                    if end_index - start_index + 1 > max_answer_length:
                        continue

                    score = start_logits_feature[start_index] + end_logits_feature[end_index]
                    if score > best_score:
                        best_score = score
                        start_logit = start_index
                        end_logit = end_index

        if start_logit is not None and end_logit is not None:
            # Extract the predicted answer text from the context
            start_char = offset_mapping[start_logit][0]
            end_char = offset_mapping[end_logit][1]
            predicted_answer = example["context"][start_char:end_char]
        else:
            predicted_answer = ""

        predictions[example_id] = predicted_answer


    # Calculate the metrics
    return metric.compute(predictions=predictions, references=list(references.items()))

# Pass this function to Trainer during evaluation
# trainer.evaluate(eval_dataset=tokenized_validation_dataset, metric_key_prefix="eval", compute_metrics=compute_metrics)

# Train

In [None]:
from transformers import DefaultDataCollator, TrainingArguments, Trainer

data_collator = DefaultDataCollator()

training_args = TrainingArguments(
    output_dir="./qa_results",
    num_train_epochs=3,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    learning_rate=2e-5,
    eval_strategy="epoch",
    logging_dir="./qa_logs",
    report_to="none",
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train_dataset,
    eval_dataset=tokenized_validation_dataset,
    data_collator=data_collator,
)

In [None]:
# Train the model (this was missing from your original notebook)
trainer.train()

# Get predictions from the model on your validation set
raw_predictions = trainer.predict(tokenized_validation_dataset)

Epoch,Training Loss,Validation Loss
1,No log,0.164272
2,No log,0.178036
3,No log,0.154136


## Evaluate
## 7. Analyse Incorrect predictions

This is a robust evaluation framework to go beyond a single score and manually inspect the model's failures for safety analysis.



In [None]:
import collections
from tqdm.auto import tqdm

def postprocess_qa_predictions(
    examples,
    features,
    raw_predictions,
    n_best_size=20,
    max_answer_length=30,
):
    all_start_logits, all_end_logits = raw_predictions

    # Access columns directly from the Dataset object instead of using .pop()
    sample_mapping = features["overflow_to_sample_mapping"]

    # Create a mapping from example ID to its index in the original examples dataset
    example_id_to_index = {k: i for i, k in enumerate(examples["id"])}

    features_per_example = collections.defaultdict(list)
    # Use the sample_mapping to map each feature index to its original example index
    for i, sample_idx in enumerate(sample_mapping):
        features_per_example[sample_idx].append(i)

    predictions = collections.OrderedDict()

    print("Post-processing predictions for error analysis...")
    # Loop through each original example
    for example_index, example in enumerate(tqdm(examples)):
        feature_indices = features_per_example[example_index]
        min_null_score = None
        valid_answers = []
        context = example["context"]

        # For each example, loop through all of its corresponding features (chunks)
        for feature_index in feature_indices:
            start_logits = all_start_logits[feature_index]
            end_logits = all_end_logits[feature_index]
            offset_mapping = features[feature_index]["offset_mapping"]

            start_indexes = np.argsort(start_logits)[-1 : -n_best_size - 1 : -1].tolist()
            end_indexes = np.argsort(end_logits)[-1 : -n_best_size - 1 : -1].tolist()
            for start_index in start_indexes:
                for end_index in end_indexes:
                    if (
                        start_index >= len(offset_mapping)
                        or end_index >= len(offset_mapping)
                        or offset_mapping[start_index] is None
                        or offset_mapping[end_index] is None
                    ):
                        continue
                    if end_index < start_index or end_index - start_index + 1 > max_answer_length:
                        continue

                    start_char = offset_mapping[start_index][0]
                    end_char = offset_mapping[end_index][1]
                    valid_answers.append(
                        {
                            "score": start_logits[start_index] + end_logits[end_index],
                            "text": context[start_char:end_char],
                        }
                    )

        if len(valid_answers) > 0:
            best_answer = sorted(valid_answers, key=lambda x: x["score"], reverse=True)[0]
        else:
            best_answer = {"text": "", "score": 0.0}

        predictions[example["id"]] = best_answer["text"]

    return predictions

# Generate predictions for error analysis
if 'raw_predictions' in locals() and raw_predictions:
    final_predictions = postprocess_qa_predictions(
        validation_dataset, tokenized_validation_dataset, raw_predictions.predictions
    )

    # Qualitative Analysis Loop
    incorrect_predictions = []
    for example in validation_dataset:
        question_id = example['id']
        ground_truth = example['answer_text']
        prediction = final_predictions[question_id]

        if prediction.strip().lower() != ground_truth.strip().lower():
            # Risk Categorization Logic
            risk_category = "Uncategorized"
            if ground_truth.lower() in prediction.lower():
                risk_category = "Partial Misinformation (Over-extraction)"
            elif any(char.isdigit() for char in ground_truth) and not any(char.isdigit() for char in prediction):
                 risk_category = "Critical Omission (Missing Value)"
            elif prediction == "":
                 risk_category = "Failure to Answer"
            else:
                 risk_category = "Incorrect Information (Hallucination/Wrong Value)"

            incorrect_predictions.append({
                "question": example['question'],
                "ground_truth": ground_truth,
                "model_prediction": prediction,
                "risk_category": risk_category
            })

    # Print Risk Assessment Report
    if incorrect_predictions:
        print(f"\nFound {len(incorrect_predictions)} incorrect predictions:\n")
        for item in incorrect_predictions:
            print(f"Question: {item['question']}")
            print(f"Correct Answer: {item['ground_truth']}")
            print(f"Model Prediction: {item['model_prediction']}")
            print(f"Risk Category: {item['risk_category']}\n")
            print("-" * 80)
    else:
        print("\nAll validation predictions were correct.")

    print("\nComplete.")
else:
    print("\nNo predictions were generated.")

Post-processing predictions for error analysis...


  0%|          | 0/10 [00:00<?, ?it/s]


Found 4 incorrect predictions:

Question: How many new gigawatts of capacity did Microsoft stand up in the past 12 months?
Correct Answer: more than 2 gigawatts
Model Prediction: more than 2
Risk Category: Incorrect Information (Hallucination/Wrong Value)

--------------------------------------------------------------------------------
Question: Which family of models has the highest volume of inference tokens?
Correct Answer: GPT-4 family of models
Model Prediction: GPT-4
Risk Category: Incorrect Information (Hallucination/Wrong Value)

--------------------------------------------------------------------------------
Question: How many monthly active users does the family of Copilot apps have?
Correct Answer: 100 million
Model Prediction: surpassed 100 million
Risk Category: Partial Misinformation (Over-extraction)

--------------------------------------------------------------------------------
Question: What was the annual revenue for Azure?
Correct Answer: $75 billion
Model Predict