In [None]:
# Install required packages for Contextual AI integration and data visualization
%pip install contextual-client matplotlib tqdm requests pandas dotenv

In [None]:
import os
import json
import requests
from pathlib import Path
from typing import List, Optional, Dict
from IPython.display import display, JSON
import pandas as pd
from contextual import ContextualAI
import ast
from IPython.display import display, Markdown
from tqdm import tqdm
import numpy as np
import matplotlib.pyplot as plt

API Authentication

Create your API key from app.contextual.ai , store it as .env file and then configure the key

In [None]:
# Load API key from .env
from dotenv import load_dotenv
import os
load_dotenv()

# Initialize with your API key
API_KEY = os.getenv("CONTEXTUAL_API_KEY")
client = ContextualAI(
    api_key=API_KEY
)

Create your document datastore

In [None]:
datastore_name = 'Financial_Demo_RAG'

# Check if datastore exists
datastores = client.datastores.list()
existing_datastore = next((ds for ds in datastores if ds.name == datastore_name), None)

if existing_datastore:
    datastore_id = existing_datastore.id
    print(f"Using existing datastore with ID: {datastore_id}")
else:
    result = client.datastores.create(name=datastore_name)
    datastore_id = result.id
    print(f"Created new datastore with ID: {datastore_id}")

Document Ingestion and Processing

In [None]:
import os
import requests

# Create data directory if it doesn't exist
if not os.path.exists('data'):
    os.makedirs('data')

# File list with corresponding GitHub URLs
files_to_upload = [
    # NVIDIA quarterly revnue 24/25
    ("A_Rev_by_Mkt_Qtrly_Trend_Q425.pdf", "https://raw.githubusercontent.com/ContextualAI/examples/refs/heads/main/08-ai-workshop/data/A_Rev_by_Mkt_Qtrly_Trend_Q425.pdf"),
    # NVIDIA quarterly revenue 22/23
    ("B_Q423-Qtrly-Revenue-by-Market-slide.pdf", "https://raw.githubusercontent.com/ContextualAI/examples/refs/heads/main/08-ai-workshop/data/B_Q423-Qtrly-Revenue-by-Market-slide.pdf"),
    # Spurious correlations report - fun example of graphs and statistical analysis
    ("C_Neptune.pdf", "https://raw.githubusercontent.com/ContextualAI/examples/refs/heads/main/08-ai-workshop/data/C_Neptune.pdf"),
    # Another spurious correlations report - fun example of graphs and statistical analysis
    ("D_Unilever.pdf", "https://raw.githubusercontent.com/ContextualAI/examples/refs/heads/main/08-ai-workshop/data/D_Unilever.pdf")
]

In [None]:
# Download and ingest all files
document_ids = []
for filename, url in files_to_upload:
    file_path = f'data/{filename}'

    # Download file if it doesn't exist
    if not os.path.exists(file_path):
        print(f"Fetching {file_path}")
        try:
            response = requests.get(url)
            response.raise_for_status()  # Raise an exception for bad status codes
            with open(file_path, 'wb') as f:
                f.write(response.content)
        except Exception as e:
            print(f"Error downloading {filename}: {str(e)}")
            continue

    # Upload to datastore
    try:
        with open(file_path, 'rb') as f:
            ingestion_result = client.datastores.documents.ingest(datastore_id, file=f)
            document_id = ingestion_result.id
            document_ids.append(document_id)
            print(f"Successfully uploaded {filename} to datastore {datastore_id}")
    except Exception as e:
        print(f"Error uploading {filename}: {str(e)}")

print(f"Successfully uploaded {len(document_ids)} files to datastore")
print(f"Document IDs: {document_ids}")

Agent Creation and Configuration

In [None]:
ystem_prompt = '''
You are a helpful AI assistant created by Contextual AI to answer questions about relevant documentation provided to you. Your responses should be precise, accurate, and sourced exclusively from the provided information. Please follow these guidelines:
* Only use information from the provided documentation. Avoid opinions, speculation, or assumptions.
* Use the exact terminology and descriptions found in the provided content.
* Keep answers concise and relevant to the user's question.
* Use acronyms and abbreviations exactly as they appear in the documentation or query.
* Apply markdown if your response includes lists, tables, or code.
* Directly answer the question, then STOP. Avoid additional explanations unless specifically relevant.
* If the information is irrelevant, simply respond that you don't have relevant documentation and do not provide additional comments or suggestions. Ignore anything that cannot be used to directly answer this query.
'''

agent_name = "Demo"

# Get list of existing agents
agents = client.agents.list()

# Check if agent already exists
existing_agent = next((agent for agent in agents if agent.name == agent_name), None)

if existing_agent:
    agent_id = existing_agent.id
    print(f"Using existing agent with ID: {agent_id}")
else:
    print("Creating new agent")
    app_response = client.agents.create(
        name=agent_name,
        description="Helpful Grounded AI Assistant",
        datastore_ids=[datastore_id],
        agent_configs={
        "global_config": {
            "enable_multi_turn": False # Turning this off for deterministic responses for this demo
        }
        },
        suggested_queries=[
            "What was NVIDIA's annual revenue by fiscal year 2022 to 2025?",
            "When did NVIDIA's data center revenue overtake gaming revenue?",
            "What's the correlation between the distance between Neptune and the Sun and Burglary rates in the US?",
            "What's the correlation between Global revenue generated by Unilever Group and Google searches for 'lost my wallet'?",
            "Does this imply that Unilever Group's revenue is derived from lost wallets?",
            "What's the correlation between the distance between Neptune and the Sun and Global revenue generated by Unilever Group?"
        ]
    )
    agent_id = app_response.id
    print(f"Agent ID created: {agent_id}")

Query the Agent

In [None]:
query_result = client.agents.query.create(
    agent_id=agent_id,
    messages=[{
        "content": "What was NVIDIA's annual revenue by fiscal year 2022 to 2025?",
        "role": "user"
    }]
)
print(query_result.message.content)

Optional - Displaying the retrieved documents

In [None]:
import base64
import io
from PIL import Image
import matplotlib.pyplot as plt

def display_base64_image(base64_string, title="Document"):
    # Decode base64 string
    img_data = base64.b64decode(base64_string)

    # Create PIL Image object
    img = Image.open(io.BytesIO(img_data))

    # Display using matplotlib
    plt.figure(figsize=(10, 10))
    plt.imshow(img)
    plt.axis('off')
    plt.title(title)
    plt.show()

    return img

# Retrieve and display all referenced documents
for i, retrieval_content in enumerate(query_result.retrieval_contents):
    print(f"\n--- Processing Document {i+1} ---")

    # Get retrieval info for this document
    ret_result = client.agents.query.retrieval_info(
        message_id=query_result.message_id,
        agent_id=agent_id,
        content_ids=[retrieval_content.content_id]
    )

    print(f"Retrieval Info for Document {i+1}:")

    # Display the document image
    if ret_result.content_metadatas and ret_result.content_metadatas[0].page_img:
        base64_string = ret_result.content_metadatas[0].page_img
        img = display_base64_image(base64_string, f"Document {i+1}")
    else:
        print(f"No image available for Document {i+1}")

print(f"\nTotal documents processed: {len(query_result.retrieval_contents)}")

OPTIONAL - COMPONENTS AVAILABLE IN CONTEXTUAL AI  

Document Parser

In [None]:
# Download the Attention is All You Need paper from arXiv
url = "https://arxiv.org/pdf/1706.03762"
file_path = "data/attention-is-all-you-need.pdf"

with open(file_path, "wb") as f:
    f.write(requests.get(url).content)

print(f"Downloaded paper to {file_path}")

In [None]:
# Setup headers for direct API calls
base_url = "https://api.contextual.ai/v1"
headers = {
    "accept": "application/json",
    "authorization": f"Bearer {API_KEY}"
}

# Submit parse job
url = f"{base_url}/parse"

config = {
    "parse_mode": "standard",
    "figure_caption_mode": "concise",
    "enable_document_hierarchy": True,
    "page_range": "0-5",
}

with open(file_path, "rb") as fp:
    file = {"raw_file": fp}
    result = requests.post(url, headers=headers, data=config, files=file)
    response = json.loads(result.text)

job_id = response['job_id']
print(f"Parse job submitted with ID: {job_id}")

In [None]:
# Get the parse results
url = f"{base_url}/parse/jobs/{job_id}/results"

output_types = ["markdown-per-page"]

result = requests.get(
    url,
    headers=headers,
    params={"output_types": ",".join(output_types)},
)

result = json.loads(result.text)
print(f"Parse job is {result['status']}.")

In [None]:
# Display the first page's parsed markdown
if 'pages' in result and len(result['pages']) > 0:
    display(Markdown(result['pages'][0]['markdown']))
else:
    print("No parsed content available. Please check if the job completed successfully.")

Instruction-Following Reranker

In [None]:
# Define our query and instruction
query = "What is the current enterprise pricing for the RTX 5090 GPU for bulk orders?"

instruction = "Prioritize internal sales documents over market analysis reports. More recent documents should be weighted higher. Enterprise portal content supersedes distributor communications."

# Sample documents with conflicting information
documents = [
    "Following detailed cost analysis and market research, we have implemented the following changes: AI training clusters will see a 15% uplift in raw compute performance, enterprise support packages are being restructured, and bulk procurement programs (100+ units) for the RTX 5090 Enterprise series will operate on a $2,899 baseline.",
    "Enterprise pricing for the RTX 5090 GPU bulk orders (100+ units) is currently set at $3,100-$3,300 per unit. This pricing for RTX 5090 enterprise bulk orders has been confirmed across all major distribution channels.",
    "RTX 5090 Enterprise GPU requires 450W TDP and 20% cooling overhead."
]

# Metadata that helps distinguish document sources and dates
metadata = [
    "Date: January 15, 2025. Source: NVIDIA Enterprise Sales Portal. Classification: Internal Use Only",
    "TechAnalytics Research Group. 11/30/2023.",
    "January 25, 2025; NVIDIA Enterprise Sales Portal; Internal Use Only"
]

# Use the instruction-following reranker model
model = "ctxl-rerank-en-v1-instruct"

In [None]:
# Execute the reranking
rerank_response = client.rerank.create(
    query=query,
    instruction=instruction,
    documents=documents,
    metadata=metadata,
    model=model
)

print("Reranking Results:")
print("=" * 50)
print(rerank_response.to_dict())

In [None]:
# Display ranked results in a more readable format
print("\nRanked Documents (by relevance score):")
print("=" * 60)

for i, result in enumerate(rerank_response.results):
    doc_index = result.index
    score = result.relevance_score

    print(f"\nRank {i+1}: Score {score:.4f}")
    print(f"Document {doc_index + 1}:")
    print(f"Content: {documents[doc_index][:100]}...")
    print(f"Metadata: {metadata[doc_index]}")
    print("-" * 40)

In [None]:
# Rerank without instructions for comparison
rerank_no_instruction = client.rerank.create(
    query=query,
    documents=documents,
    metadata=metadata,
    model=model
)

print("\nRanking WITHOUT Instructions:")
print("=" * 50)

for i, result in enumerate(rerank_no_instruction.results):
    doc_index = result.index
    score = result.relevance_score

    print(f"Rank {i+1}: Document {doc_index + 1}, Score: {score:.4f}")

print("\nRanking WITH Instructions:")
print("=" * 50)

for i, result in enumerate(rerank_response.results):
    doc_index = result.index
    score = result.relevance_score

    print(f"Rank {i+1}: Document {doc_index + 1}, Score: {score:.4f}")

Grounded Language Model (GLM)

In [None]:
# Example conversation messages
messages = [
    {
        "role": "user",
        "content": "What are the most promising renewable energy technologies for addressing climate change in developing nations?"
    },
    {
        "role": "assistant",
        "content": "Based on current research, solar and wind power show significant potential for developing nations due to decreasing costs and scalability. Would you like to know more about specific implementation challenges and success stories?"
    },
    {
        "role": "user",
        "content": "Yes, please tell me about successful solar implementations in Africa and their economic impact, particularly focusing on rural electrification."
    }
]

# Detailed knowledge sources with varied information
knowledge = [
    """According to the International Renewable Energy Agency (IRENA) 2023 report:
    - Solar PV installations in Africa reached 10.4 GW in 2022
    - The cost of solar PV modules decreased by 80% between 2010 and 2022
    - Rural electrification projects have provided power to 17 million households""",

    """Case Study: Rural Electrification in Kenya (2020-2023)
    - 2.5 million households connected through mini-grid systems
    - Average household income increased by 35% after electrification
    - Local businesses reported 47% growth in revenue
    - Education outcomes improved with 3 additional study hours per day""",

    """Economic Analysis of Solar Projects in Sub-Saharan Africa:
    - Job creation: 25 jobs per MW of installed capacity
    - ROI average of 12-15% for mini-grid projects
    - Reduced energy costs by 60% compared to diesel generators
    - Carbon emissions reduction: 2.3 million tonnes CO2 equivalent""",

    """Technical Specifications and Best Practices:
    - Optimal solar panel efficiency in African climate conditions: 15-22%
    - Battery storage requirements: 4-8 kWh per household
    - Maintenance costs: $0.02-0.04 per kWh
    - Expected system lifetime: 20-25 years""",

    """Social Impact Assessment:
    - Women-led businesses increased by 45% in electrified areas
    - Healthcare facilities reported 72% improvement in service delivery
    - Mobile money usage increased by 60%
    - Agricultural productivity improved by 28% with electric irrigation"""
]

In [None]:
# Setup for direct API call
base_url = "https://api.contextual.ai/v1"
generate_api_endpoint = f"{base_url}/generate"

headers = {
    "accept": "application/json",
    "content-type": "application/json",
    "authorization": f"Bearer {API_KEY}"
}

# Configure the GLM request
payload = {
    "model": "v1",
    "messages": messages,
    "knowledge": knowledge,
    "avoid_commentary": False,
    "max_new_tokens": 1024,
    "temperature": 0,
    "top_p": 0.9
}

# Generate the response
generate_response = requests.post(generate_api_endpoint, json=payload, headers=headers)

print("GLM Grounded Response:")
print("=" * 50)
print(generate_response.json()['response'])

In [None]:
# Generate response with avoid_commentary enabled
payload_no_commentary = payload.copy()
payload_no_commentary["avoid_commentary"] = True

generate_response_no_commentary = requests.post(generate_api_endpoint, json=payload_no_commentary, headers=headers)

print("GLM Response (with avoid_commentary=True):")
print("=" * 50)
print(generate_response_no_commentary.json()['response'])

In [None]:
print("COMPARISON:")
print("=" * 60)
print("\n1. Standard GLM Response (avoid_commentary=False):")
print("-" * 50)
print(generate_response.json()['response'])

print("\n\n2. Strict Grounding Mode (avoid_commentary=True):")
print("-" * 50)
print(generate_response_no_commentary.json()['response'])

print("\n\nKey Differences:")
print("- Standard mode may include helpful context and commentary")
print("- Strict mode focuses purely on information from knowledge sources")
print("- Both modes maintain strong grounding in provided sources")

In [None]:
# Query about a completely different topic
different_query = [
    {
        "role": "user",
        "content": "What are the latest developments in quantum computing hardware?"
    }
]

# Same renewable energy knowledge (irrelevant to quantum computing)
irrelevant_payload = {
    "model": "v1",
    "messages": different_query,
    "knowledge": knowledge,  # Still about renewable energy
    "avoid_commentary": False,
    "max_new_tokens": 512,
    "temperature": 0,
    "top_p": 0.9
}

irrelevant_response = requests.post(generate_api_endpoint, json=irrelevant_payload, headers=headers)

print("GLM Response to Irrelevant Query:")
print("=" * 50)
print("Query: What are the latest developments in quantum computing hardware?")
print("Knowledge provided: Renewable energy information")
print("\nGLM Response:")
print(irrelevant_response.json()['response'])

LMUnit: Natural Language Unit Testing

In [None]:
# Simple example
query = "What was NVIDIA's Data Center revenue in Q4 FY25?"

response = """NVIDIA's Data Center revenue for Q4 FY25 was $35,580 million.

This represents a significant increase from the previous quarter (Q3 FY25) when Data Center revenue was $30,771 million.

The full quarterly trend for Data Center revenue in FY25 was:
- Q4 FY25: $35,580 million
- Q3 FY25: $30,771 million
- Q2 FY25: $26,272 million
- Q1 FY25: $22,563 million"""

unit_test = "Does the response avoid unnecessary information?"

# Evaluate with LMUnit
result = client.lmunit.create(
    query=query,
    response=response,
    unit_test=unit_test
)

print(f"Unit Test: {unit_test}")
print(f"Score: {result.score}/5")
print(f"\nAnalysis: The response includes additional quarterly trends beyond the specific Q4 request,")
print(f"which explains the lower score for avoiding unnecessary information.")

In [None]:
# Define comprehensive unit tests for quantitative reasoning
unit_tests = [
    "Does the response accurately extract specific numerical data from the documents?",
    "Does the agent properly distinguish between correlation and causation?",
    "Are multi-document comparisons performed correctly with accurate calculations?",
    "Are potential limitations or uncertainties in the data clearly acknowledged?",
    "Are quantitative claims properly supported with specific evidence from the source documents?",
    "Does the response avoid unnecessary information?"
]

# Create category mapping for visualization
test_categories = {
    'Does the response accurately extract specific numerical data': 'ACCURACY',
    'Does the agent properly distinguish between correlation and causation': 'CAUSATION',
    'Are multi-document comparisons performed correctly': 'SYNTHESIS',
    'Are potential limitations or uncertainties in the data': 'LIMITATIONS',
    'Are quantitative claims properly supported with specific evidence': 'EVIDENCE',
    'Does the response avoid unnecessary information': 'RELEVANCE'
}

print("Unit Test Framework:")
print("=" * 50)
for i, test in enumerate(unit_tests, 1):
    category = next((v for k, v in test_categories.items() if k.lower() in test.lower()), 'OTHER')
    print(f"{i}. {category}: {test}")

In [None]:
# Sample evaluation dataset
evaluation_data = [
    {
        "prompt": "What was NVIDIA's Data Center revenue in Q4 FY25?",
        "response": "NVIDIA's Data Center revenue for Q4 FY25 was $35,580 million. This represents a significant increase from the previous quarter."
    },
    {
        "prompt": "What is the correlation coefficient between Neptune's distance from the Sun and US burglary rates?",
        "response": "According to the Tyler Vigen spurious correlations dataset, there is a correlation coefficient of 0.87 between Neptune's distance from the Sun and US burglary rates. However, this is clearly a spurious correlation with no causal relationship."
    },
    {
        "prompt": "How did NVIDIA's total revenue change from Q1 FY22 to Q4 FY25?",
        "response": "NVIDIA's total revenue grew from $5.66 billion in Q1 FY22 to $60.9 billion in Q4 FY25, representing a massive increase driven primarily by AI and data center demand."
    }
]

eval_df = pd.DataFrame(evaluation_data)
print("Sample Evaluation Dataset:")
print(eval_df.to_string(index=False))

In [None]:
ef run_unit_tests_with_progress(
    df: pd.DataFrame,
    unit_tests: List[str]
) -> List[Dict]:
    """
    Run unit tests with progress tracking and error handling.
    """
    results = []

    for idx in tqdm(range(len(df)), desc="Processing responses"):
        row = df.iloc[idx]
        row_results = []

        for test in unit_tests:
            try:
                result = client.lmunit.create(
                    query=row['prompt'],
                    response=row['response'],
                    unit_test=test
                )

                row_results.append({
                    'test': test,
                    'score': result.score,
                    'metadata': result.metadata if hasattr(result, 'metadata') else None
                })

            except Exception as e:
                print(f"Error with prompt {idx}, test '{test}': {e}")
                row_results.append({
                    'test': test,
                    'score': None,
                    'error': str(e)
                })

        results.append({
            'prompt': row['prompt'],
            'response': row['response'],
            'test_results': row_results
        })

    return results

# Run the evaluation
print("Running comprehensive unit test evaluation...")
results = run_unit_tests_with_progress(eval_df, unit_tests)

# Display detailed results
for i, result in enumerate(results):
    print(f"\n{'='*60}")
    print(f"EVALUATION {i+1}")
    print(f"{'='*60}")
    print(f"Prompt: {result['prompt']}")
    print(f"Response: {result['response'][:100]}...")
    print("\nUnit Test Scores:")

    for test_result in result['test_results']:
        if 'score' in test_result and test_result['score'] is not None:
            category = next((v for k, v in test_categories.items() if k.lower() in test_result['test'].lower()), 'OTHER')
            print(f"  {category}: {test_result['score']:.2f}/5")
        else:
            print(f"  Error: {test_result.get('error', 'Unknown error')}")

In [None]:
def map_test_to_category(test_question: str) -> str:
    """Map the full test question to its category."""
    for key, value in test_categories.items():
        if key.lower() in test_question.lower():
            return value
    return None

def create_unit_test_plots(results: List[Dict], test_indices: Optional[List[int]] = None):
    """
    Create polar plot(s) for unit test results.
    """
    if test_indices is None:
        test_indices = list(range(len(results)))
    elif isinstance(test_indices, int):
        test_indices = [test_indices]

    categories = ['ACCURACY', 'CAUSATION', 'SYNTHESIS', 'LIMITATIONS', 'EVIDENCE', 'RELEVANCE']
    angles = np.linspace(0, 2*np.pi, len(categories), endpoint=False)
    angles = np.concatenate((angles, [angles[0]]))

    num_plots = len(test_indices)
    fig = plt.figure(figsize=(6 * num_plots, 6))

    for plot_idx, result_idx in enumerate(test_indices):
        result = results[result_idx]
        ax = plt.subplot(1, num_plots, plot_idx + 1, projection='polar')

        scores = []
        for category in categories:
            score = None
            for test_result in result['test_results']:
                mapped_category = map_test_to_category(test_result['test'])
                if mapped_category == category:
                    score = test_result['score']
                    break
            scores.append(score if score is not None else 0)

        scores = np.concatenate((scores, [scores[0]]))

        ax.plot(angles, scores, 'o-', linewidth=2, color='blue')
        ax.fill(angles, scores, alpha=0.25, color='blue')
        ax.set_xticks(angles[:-1])
        ax.set_xticklabels(categories)
        ax.set_ylim(0, 5)
        ax.grid(True)

        for angle, score, category in zip(angles[:-1], scores[:-1], categories):
            ax.text(angle, score + 0.2, f'{score:.1f}', ha='center', va='bottom')

        prompt = result['prompt'][:50] + "..." if len(result['prompt']) > 50 else result['prompt']
        ax.set_title(f"Evaluation {result_idx + 1}\n{prompt}", pad=20)

    plt.tight_layout()
    return fig

# Create visualizations
if len(results) > 0:
    fig = create_unit_test_plots(results)
    plt.show()
else:
    print("No results to visualize")

In [None]:
# Create aggregate analysis
all_scores = []
for result in results:
    for test_result in result['test_results']:
        if 'score' in test_result and test_result['score'] is not None:
            category = map_test_to_category(test_result['test'])
            all_scores.append({
                'category': category,
                'score': test_result['score'],
                'test': test_result['test']
            })

scores_df = pd.DataFrame(all_scores)

if not scores_df.empty:
    # Calculate average scores by category
    avg_scores = scores_df.groupby('category')['score'].agg(['mean', 'std', 'count']).round(2)

    print("\nAggregate Performance by Category:")
    print("=" * 50)
    print(avg_scores)

    # Overall statistics
    print(f"\nOverall Statistics:")
    print(f"Mean Score: {scores_df['score'].mean():.2f}/5")
    print(f"Standard Deviation: {scores_df['score'].std():.2f}")
    print(f"Total Evaluations: {len(scores_df)}")
else:
    print("No valid scores to analyze")