# Prompt caching through the Anthropic API

Prompt caching allows you to store and reuse context within your prompt. This makes it more practical to include additional information in your prompt—such as detailed instructions and example responses—which help improve every response Claude generates.

In addition, by fully leveraging prompt caching within your prompt, you can reduce latency by >2x and costs up to 90%. This can generate significant savings when building solutions that involve repetitive tasks around detailed book_content.

In this cookbook, we will demonstrate how to use prompt caching in a single turn and across a multi-turn conversation. 


## Setup

First, let's set up our environment with the necessary imports and initializations:

In [None]:
# %pip install scrapegraphai
# %playwright install 
# %pip install 'scrapegraphai[burr]'
# %pip install nest_asyncio

In [None]:
# !ollama list

In [None]:
# import json 
# from scrapegraphai.graphs import SmartScraperGraph

# # Load the graph
# graph_config = {
#   "llm": {
#     "model": "ollama/llama3.1:latest",
#     "temperature": 0.0,
#     "format": "json",
#     "base_url": "http://localhost:11434",
#   },
#   "embeddings": {
#     "model": "ollama/nomic-embed-text:latest",
#     "base_url": "http://localhost:11434",
#   }
# }

# original_link = "https://dp0-2.lsst.io/data-access-analysis-tools/adql-recipes.html#polygon-search"

In [None]:
# # create the SmartScraperGraph instace 
# smart_scraper_graph = SmartScraperGraph(
#   prompt="extract the website content and structure, don't forget any words",
#   source=original_link,
#   config=graph_config
# )

In [None]:
# import nest_asyncio
# import asyncio

# # Apply the nest_asyncio patch
# nest_asyncio.apply()

# result = smart_scraper_graph.run()
# print(json.dumps(result, indent=2))

In [None]:
# %pip install PyPDF2
# %pip install pdfplumber
# %pip install requests pdfkit pdfplumber
# !sudo apt-get install wkhtmltopdf

In [None]:
# import requests
# from bs4 import BeautifulSoup
# import html2text
# import os
# import nbformat
# from nbconvert import MarkdownExporter
# import json
# import hashlib
# import time

# class LinkCounter:
#     def __init__(self):
#         self.jupyter_count = 0
#         self.other_count = 0

#     def increment(self, is_jupyter):
#         if is_jupyter:
#             self.jupyter_count += 1
#         else:
#             self.other_count += 1

#     def __str__(self):
#         return f"Jupyter notebooks: {self.jupyter_count}, Other links: {self.other_count}"

# def html_to_markdown(url):
#     response = requests.get(url)
#     html_content = response.text
#     soup = BeautifulSoup(html_content, 'html.parser')
#     for script in soup(["script", "style"]):
#         script.decompose()
#     h = html2text.HTML2Text()
#     h.ignore_links = False
#     h.ignore_images = False
#     h.ignore_tables = False
#     h.body_width = 0
#     markdown_content = h.handle(str(soup))
#     return markdown_content, soup

# def extract_links(soup, base_url):
#     links = {}
#     for a in soup.find_all('a', href=True):
#         href = a['href']
#         if href.startswith('/'):
#             href = base_url + href
#         if href.startswith('http'):
#             file_name = generate_unique_filename(href)
#             links[file_name] = href
#     return links

# def generate_unique_filename(url):
#     hash_object = hashlib.md5(url.encode())
#     return hash_object.hexdigest()[:10] + '.md'

# def jupyter_to_markdown(jupyter_path):
#     with open(jupyter_path, "r") as file:
#         notebook = nbformat.read(file, as_version=4)
#     exporter = MarkdownExporter()           
#     markdown, _ = exporter.from_notebook_node(notebook)
#     return markdown

# def save_markdown(content, file_path):
#     with open(file_path, "w", encoding='utf-8') as file:
#         file.write(content)

# def load_processed_links(output_dir):
#     processed_links_file = os.path.join(output_dir, "processed_links.json")
#     if os.path.exists(processed_links_file):
#         with open(processed_links_file, 'r') as f:
#             return json.load(f)
#     return {}

# def save_processed_links(processed_links, output_dir):
#     processed_links_file = os.path.join(output_dir, "processed_links.json")
#     with open(processed_links_file, 'w') as f:
#         json.dump(processed_links, f, indent=2)

# def process_url(url, output_dir, processed_links, link_counter, depth=0, max_depth=100):
#     if depth > max_depth:
#         return processed_links

#     os.makedirs(output_dir, exist_ok=True)
    
#     if url in processed_links.values():
#         print(f"Skipping already processed URL: {url}")
#         return processed_links

#     print(f"Processing URL: {url} (Depth: {depth})")
    
#     try:
#         markdown_content, soup = html_to_markdown(url)
#         file_name = generate_unique_filename(url)
#         file_path = os.path.join(output_dir, file_name)
#         save_markdown(markdown_content, file_path)
#         processed_links[file_name] = url
#         print(f"Content saved to {file_path}")

#         base_url = '/'.join(url.split('/')[:3])
#         links = extract_links(soup, base_url)

#         for file_name, link in links.items():
#             if link in processed_links.values():
#                 print(f"Skipping already processed link: {link}")
#                 continue

#             file_path = os.path.join(output_dir, file_name)
#             if link.endswith('.ipynb'):
#                 link_counter.increment(True)
#                 try:
#                     jupyter_content = requests.get(link).text
#                     jupyter_file = file_path.replace('.md', '.ipynb')
#                     with open(jupyter_file, 'w') as f:
#                         f.write(jupyter_content)
#                     markdown = jupyter_to_markdown(jupyter_file)
#                     save_markdown(markdown, file_path)
#                     processed_links[file_name] = link
#                     print(f"Jupyter notebook converted and saved to {file_path}")
#                 except Exception as e:
#                     print(f"Error processing Jupyter notebook {link}: {str(e)}")
#             else:
#                 link_counter.increment(False)
#                 processed_links[file_name] = link
#                 print(f"Link recorded: {link}")

#             save_processed_links(processed_links, output_dir)

#             processed_links = process_url(link, output_dir, processed_links, link_counter, depth + 1, max_depth)

#             time.sleep(1)

#     except Exception as e:
#         print(f"Error processing URL {url}: {str(e)}")

#     return processed_links

# # Main execution
# # url = "https://dp0-3.lsst.io/tutorials-dp0-3/index.html#dp0-3-tutorials-contributed"
# url = "https://www.lsst.io/"

# output_dir = "extracted_content"
# processed_links = load_processed_links(output_dir)
# link_counter = LinkCounter()
# process_url(url, output_dir, processed_links, link_counter, max_depth=100)

# print("\nLink Processing Summary:")
# print(link_counter)
# print(f"Total links processed: {len(processed_links)}")

In [None]:
# %pip install nbformat nbconvert

In [None]:
import requests
from bs4 import BeautifulSoup
import html2text
import os
import nbformat
from nbconvert import MarkdownExporter
import json
import hashlib
import time

import re
import requests
from bs4 import BeautifulSoup
import html2text

import urllib3
from requests.exceptions import SSLError, RequestException

# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

def html_to_markdown(url, verify_ssl=False):
    try: 
         # Fetch HTML content from the URL
        response = requests.get(url, verify=verify_ssl, timeout=10)
        response.raise_for_status()
        html_content = response.text
        
        # Parse HTML content
        soup = BeautifulSoup(html_content, 'html.parser')
        
        # # Remove script and style elements
        # for script in soup(["script", "style"]):
        #     script.decompose()
        
        # Configure HTML to Markdown converter
        h = html2text.HTML2Text()
        h.ignore_links = True
        h.ignore_images = True
        h.ignore_tables = True
        h.body_width = 0  # Disable line wrapping
        
        # Convert HTML to Markdown
        markdown_content = h.handle(str(soup))
        
        # Clean up the Markdown content
        # cleaned_content = clean_markdown(markdown_content)
        # return cleaned_content, soup

        return markdown_content, soup
    except SSLError as e:
        print(f"Error processing URL {url}: {str(e)}")
        return "", None

    except RequestException as e:
        print(f"Error processing URL {url}: {str(e)}")
        return "", None

def clean_markdown(content):

    # Remove [![Rubin Observatory logo](...)](/)
    content = re.sub(r'\[!\[Rubin Observatory logo\]\(.*?\)\]\(.*?\)', '', content)

    # Remove: ## Footer, ### Footer navigation, and the subsequent content
    content = re.sub(r'## Footer.*', '', content, flags=re.DOTALL) # Remove Footer, Footer navigation, and the subsequent content

    # Remove: ## Navigation, ### Navigation, and the subsequent content 
    content = re.sub(r'## Navigation.*', '', content, flags=re.DOTALL) # Remove Navigation, Navigation, and the subsequent content  

    # Remove: ## Sidebar, ### Sidebar, and the subsequent content
    content = re.sub(r'## Sidebar.*', '', content, flags=re.DOTALL) # Remove Sidebar, Sidebar, and the subsequent content

    # Remove: ## Navigation Menu
    content = re.sub(r'## Navigation Menu.*', '', content, flags=re.DOTALL) # Remove Navigation Menu and the subsequent content

    # Remove: ## History
    content = re.sub(r'## History.*', '', content, flags=re.DOTALL) # Remove History and the subsequent content                                         

    # Remove: ## Have feedback?
    content = re.sub(r'## Have feedback\?.*', '', content, flags=re.DOTALL) # Remove Have feedback? and the subsequent content

    # Remove: images, links 
    content = re.sub(r'!\[.*?\]\(.*?\)', '', content) # Remove images
    content = re.sub(r'\[.*?\]\(.*?\)', '', content) # Remove links

    # Remove: unnecessary whitespace
    content = content.strip()

    # Remove: redundant links
    content = re.sub(r'\[(.*?)\]\(.*?\)', r'\1', content)

    # Remove: the line with less than 5 characters
    content = re.sub(r'^.{1,5}$\n?', '', content, flags=re.MULTILINE)


    
    # Remove: empty lines
    content = re.sub(r'\n\s*\n', '\n\n', content)
    # Remove empty line
    content = re.sub(r'^\s*\n', '', content)
    # Remove empty line
    content = re.sub(r'\n\s*$', '', content)


    # # Remove "Skip to content" and navigation menu
    # content = re.sub(r'Skip to content.*?##', '##', content, flags=re.DOTALL)
    
    # # Remove base64-encoded images
    # content = re.sub(r'!\[.*?\]\(data:image/[^;]+;base64,[^\)]+\)', '', content)
    
    # # Remove empty lines
    # content = re.sub(r'\n\s*\n', '\n\n', content)
    
    # # Remove unnecessary whitespace
    # content = content.strip()
    
    # # Simplify headers (remove extra #)
    # content = re.sub(r'#{3,}', '##', content)
    
    # # Remove redundant links
    # content = re.sub(r'\[(.*?)\]\(.*?\)', r'\1', content)
    
    # # Remove navigation menu items
    # content = re.sub(r'^\s*\*\s.*$\n?', '', content, flags=re.MULTILINE)
    
    # # Remove "You must be signed in" messages
    # content = re.sub(r'You must be signed in.*$\n?', '', content, flags=re.MULTILINE)
    
    # # Remove GitHub-specific elements
    # content = re.sub(r'(Notifications|Fork \d+|Star \d+|Branches|Tags|Activity)', '', content)
    
    # # Remove empty bullet points
    # content = re.sub(r'^\s*\*\s*$\n?', '', content, flags=re.MULTILINE)
    
    # # Remove lines with just symbols
    # content = re.sub(r'^\s*[#\*\-]+\s*$\n?', '', content, flags=re.MULTILINE)
    
    # # Remove repeated newlines
    # content = re.sub(r'\n{3,}', '\n\n', content)
    
    return content

# def html_to_markdown(url):
#     # Fetch HTML content from the URL
#     response = requests.get(url)
#     html_content = response.text
    
#     # Parse HTML content
#     soup = BeautifulSoup(html_content, 'html.parser')
    
#     # Remove script and style elements
#     for script in soup(["script", "style"]):
#         script.decompose()
    
#     # Configure HTML to Markdown converter
#     h = html2text.HTML2Text()
#     h.ignore_links = False
#     h.ignore_images = False
#     h.ignore_tables = False
#     h.body_width = 0  # Disable line wrapping
    
#     # Convert HTML to Markdown
#     markdown_content = h.handle(str(soup))
#     return markdown_content, soup

def extract_links(soup, base_url):
    # Extract all links from the page
    links = {}
    for a in soup.find_all('a', href=True):
        href = a['href']
        # Handle relative URLs
        if href.startswith('/'):
            href = base_url + href
        if href.startswith('http'):
            file_name = generate_unique_filename(href)
            links[file_name] = href
    return links

def generate_unique_filename(url):
    # Generate a unique filename based on URL hash
    hash_object = hashlib.md5(url.encode())
    return hash_object.hexdigest()[:10] + '.md'

def jupyter_to_markdown(jupyter_path):
    # Convert Jupyter notebook to Markdown
    with open(jupyter_path, "r") as file:
        notebook = nbformat.read(file, as_version=4)
    exporter = MarkdownExporter()           
    markdown, _ = exporter.from_notebook_node(notebook)
    return markdown

def save_markdown(content, file_path):
    # Save Markdown content to file
    with open(file_path, "w", encoding='utf-8') as file:
        file.write(content)

def load_processed_links(output_dir):
    # Load previously processed links from JSON file
    processed_links_file = os.path.join(output_dir, "processed_links.json")
    if os.path.exists(processed_links_file):
        with open(processed_links_file, 'r') as f:
            return json.load(f)
    return {}

def save_processed_links(processed_links, output_dir):
    # Save processed links to JSON file
    processed_links_file = os.path.join(output_dir, "processed_links.json")
    with open(processed_links_file, 'w') as f:
        json.dump(processed_links, f, indent=2)

def process_url(url, output_dir, processed_links, depth=0, max_depth=100, link_count=0, jupyter_count=0):
    # Recursive function to process URLs and their links
    if depth > max_depth:
        return processed_links, link_count, jupyter_count

    os.makedirs(output_dir, exist_ok=True)
    
    # Skip if URL has already been processed
    if url in processed_links.values():
        print(f"Skipping already processed URL: {url}")
        return processed_links, link_count, jupyter_count

    print(f"Processing URL: {url}")
    
    # Convert page to Markdown
    markdown_content, soup = html_to_markdown(url)
    file_name = generate_unique_filename(url)
    file_path = os.path.join(output_dir, file_name)
    save_markdown(markdown_content, file_path)
    processed_links[file_name] = url
    print(f"Content saved to {file_path}")

    # Extract and process links
    base_url = '/'.join(url.split('/')[:3])
    links = extract_links(soup, base_url)

    # Process each extracted link
    for file_name, link in links.items():
        if link in processed_links.values():
            print(f"Skipping already processed link: {link}")
            continue

        file_path = os.path.join(output_dir, file_name)
        if link.endswith('.ipynb'):
            # Handle Jupyter notebooks
            try:
                jupyter_content = requests.get(link).text
                jupyter_file = file_path.replace('.md', '.ipynb')
                with open(jupyter_file, 'w') as f:
                    f.write(jupyter_content)
                markdown = jupyter_to_markdown(jupyter_file)
                save_markdown(markdown, file_path)
                processed_links[file_name] = link
                print(f"Jupyter notebook converted and saved to {file_path}")
                jupyter_count += 1
                
                # Delete the Jupyter notebook file after conversion
                os.remove(jupyter_file)
                print(f"Deleted Jupyter notebook: {jupyter_file}")

            except Exception as e:
                print(f"Error processing Jupyter notebook {link}: {str(e)}")
        else:
            # Handle other links
            try:
                content, _ = html_to_markdown(link)
                save_markdown(content, file_path)
                processed_links[file_name] = link
                print(f"Content from {link} saved to {file_path}")
                link_count += 1
            except Exception as e:
                print(f"Error processing link {link}: {str(e)}")

        # Save progress after each processed link
        save_processed_links(processed_links, output_dir)

        # Recursively process the new link
        processed_links, link_count, jupyter_count = process_url(link, output_dir, processed_links, depth + 1, max_depth, link_count, jupyter_count)

        # Delay to avoid overwhelming the server
        time.sleep(1)

    return processed_links, link_count, jupyter_count


In [None]:
def main():
    url = "https://www.lsst.io/"
    output_dir = "extracted_content"
    processed_links = load_processed_links(output_dir)
    
    processed_links, link_count, jupyter_count = process_url(url, output_dir, processed_links)

    # # remove markdown file if it is less than 20 lines 
    # for file in os.listdir(output_dir):
    #     file_path = os.path.join(output_dir, file)
    #     with open(file_path, 'r') as f:
    #         lines = f.readlines()
    #         if len(lines) < 20:
    #             os.remove(file_path)
    #             print(f"Deleted {file_path}")       
    
    print(f"Total links processed: {link_count}")
    print(f"Total Jupyter notebooks processed: {jupyter_count}")

if __name__ == "__main__":
    main()

In [None]:
file_path = "/home/david/Desktop/scraping-rubin-links/extracted_content/processed_links.json"

In [None]:
# Load filename and URL mapping
# file_path = "processed_links.json"
with open(file_path, 'r') as f:
    processed_links = json.load(f)

# Extract URLs from processed_links
urls = list(processed_links.values())

# Process URLs
for file_name, url in processed_links.items():
    print(f"Processing: {url}")
    markdown_content, soup = html_to_markdown(url)
    if markdown_content:
        base_url = '/'.join(url.split('/')[:3])
        links = extract_links(soup, base_url)
        for link_file_name, link in links.items():
            if link not in urls:
                print(f"New link found: {link}")
                content, _ = html_to_markdown(link)
                if content:
                    file_path = os.path.join("extracted_content", generate_unique_filename(link))
                    save_markdown(content, file_path)
                    urls.append(link)
                    print(f"Content from {link} saved to {file_path}.md")
            else:
                print(f"Skipping already processed link: {link}")

print("Processing completed.")

In [None]:
len(urls)

# check the number of links which include ipynb files
ipynb_links = [url for url in urls if url.endswith('.ipynb')]
len(ipynb_links)

In [None]:
# url = "https://www.lsst.io/"
url = "https://github.com/lsst"
output_dir = "extracted_content"
processed_links = load_processed_links(output_dir)

processed_links, link_count, jupyter_count = process_url(url, output_dir, processed_links)

# # remove markdown file if it is less than 20 lines 
# for file in os.listdir(output_dir):
#     file_path = os.path.join(output_dir, file)
#     with open(file_path, 'r') as f:
#         lines = f.readlines()
#         if len(lines) < 20:
#             os.remove(file_path)
#             print(f"Deleted {file_path}")       

print(f"Total links processed: {link_count}")
print(f"Total Jupyter notebooks processed: {jupyter_count }")

In [None]:
# url = "https://www.lsst.io/"
# url = "https://github.com/lsst"
url = "https://github.com/rubin-dp0/tutorial-notebooks"
output_dir = "extracted_content"
processed_links = load_processed_links(output_dir)

processed_links, link_count, jupyter_count = process_url(url, output_dir, processed_links)

# # remove markdown file if it is less than 20 lines 
# for file in os.listdir(output_dir):
#     file_path = os.path.join(output_dir, file)
#     with open(file_path, 'r') as f:
#         lines = f.readlines()
#         if len(lines) < 20:
#             os.remove(file_path)
#             print(f"Deleted {file_path}")       

print(f"Total links processed: {link_count}")
print(f"Total Jupyter notebooks processed: {jupyter_count }")

In [None]:
print(len(urls))

In [None]:
# create a file with the urls
with open("extracted_content/urls.txt", "w") as file:
    for url in urls:
        file.write(url + "\n")

In [None]:
# create a new folder, and copy  the extracted content
import shutil

shutil.copytree("extracted_content", "extracted_content_v2")                

In [None]:
# only keep the .md files in the extracted_content_v2 folder
for file in os.listdir("extracted_content_v2"):
    if not file.endswith(".md"):
        os.remove(os.path.join("extracted_content_v2", file)) 

/home/david/Desktop/scraping-rubin-links/prompt_caching.ipynb

In [None]:
# remove the .md file which include dictionary -> {"payload":
for file in os.listdir("extracted_content_v2"):
    file_path = os.path.join("extracted_content_v2", file)
    with open(file_path, 'r') as f:
        content = f.read()
        if content.startswith('{"payload":'):
            os.remove(file_path)
            print(f"Deleted {file_path}")

In [None]:
# remove .md files which has the same content
content_files = {}        
for file in os.listdir("extracted_content_v2"):
    file_path = os.path.join("extracted_content_v2", file)
    with open(file_path, 'r') as f:
        content = f.read()
        if content in content_files:
            os.remove(file_path)
            print(f"Deleted {file_path}")
        else:
            content_files[content] = file_path        

In [None]:
# create a new folder, transfer .md files to .mdx file 
os.makedirs("extracted_content_mdx", exist_ok=True)

for file in os.listdir("extracted_content_v2"):
    file_path = os.path.join("extracted_content_v2", file)
    new_file_path = os.path.join("extracted_content_mdx", file.replace(".md", ".mdx"))
    shutil.move(file_path, new_file_path)


In [None]:
import requests
from bs4 import BeautifulSoup
import html2text
import os
import json
import urllib3

# Disable SSL warnings (use with caution)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

def html_to_markdown(url):
    try:
        # Fetch HTML content from the URL, ignoring SSL verification
        response = requests.get(url, verify=False, timeout=10)
        response.raise_for_status()
        html_content = response.text
        
        # Parse HTML content
        soup = BeautifulSoup(html_content, 'html.parser')
        
        # Configure HTML to Markdown converter
        h = html2text.HTML2Text()
        h.ignore_links = False
        h.ignore_images = False
        h.ignore_tables = False
        h.body_width = 0  # Disable line wrapping
        
        # Convert HTML to Markdown
        markdown_content = h.handle(str(soup))
        
        # Clean up the Markdown content
        cleaned_content = clean_markdown(markdown_content)
        
        return cleaned_content, soup
    except requests.exceptions.RequestException as e:
        print(f"Error fetching {url}: {e}")
        return None, None

def extract_links(soup, base_url):
    links = {}
    if soup:
        for a in soup.find_all('a', href=True):
            href = a['href']
            if href.startswith('/'):
                href = base_url + href
            if href.startswith('http'):
                file_name = generate_unique_filename(href)
                links[file_name] = href
    return links

def generate_unique_filename(url):
    # Generate a unique filename based on URL
    return url.split('/')[-1] or 'index'

def save_markdown(content, file_path):
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
    with open(file_path + '.md', "w", encoding='utf-8') as file:
        file.write(content)

# Load filename and URL mapping
# file_path = "processed_links.json"
with open(file_path, 'r') as f:
    processed_links = json.load(f)

# Extract URLs from processed_links
urls = list(processed_links.values())

# Process URLs
for file_name, url in processed_links.items():
    print(f"Processing: {url}")
    markdown_content, soup = html_to_markdown(url)
    if markdown_content:
        base_url = '/'.join(url.split('/')[:3])
        links = extract_links(soup, base_url)
        for link_file_name, link in links.items():
            if link not in urls:
                print(f"New link found: {link}")
                content, _ = html_to_markdown(link)
                if content:
                    file_path = os.path.join("extracted_content", generate_unique_filename(link))
                    save_markdown(content, file_path)
                    urls.append(link)
                    print(f"Content from {link} saved to {file_path}.md")
            else:
                print(f"Skipping already processed link: {link}")

print("Processing completed.")

In [None]:
# # Load filename and URL mapping
# with open(file_path, 'r') as f:
#     processed_links = json.load(f)

# # Display the processed links
# for file_name, url in processed_links.items():
#     print(f"{file_name}: {url}")

# # extract the url from the processed_links and save to a list
# urls = []                       
# for file_name, url in processed_links.items():
#     urls.append(url)                                        
                   
# # iterate through the urls, extract the links in url, if the link is not in the urls, extract the content and save to a file, if the link is in the urls, skip, add the link to the urls list
# # interatvie through the urls list
# for file_name, url in processed_links.items():
#     # extract the links in the url
#     markdown_content, soup = html_to_markdown(url)
#     base_url = '/'.join(url.split('/')[:3])
#     links = extract_links(soup, base_url)
#     for file_name, link in links.items():
#         # if the link is not in the urls list, extract the content and save to a file
#         if link not in urls:
#             # create a new folder using the file_name, file_name doesn't include .md
#             file_name = file_name.replace('.md', '')
#             file_path = os.path.join("extracted_content", file_name)
#             # file_path = os.path.join(output_dir, file_name)
#             content, _ = html_to_markdown(link)
#             save_markdown(content, file_path)
#             urls.append(link)
#             print(f"Content from {link} saved to {file_path}")
#         else:
#             # if the link is in the urls list, skip
#             print(f"Skipping already processed link: {link}")         
# # extract the links in the url
# # if the link is not in the urls list, extract the content and save to a file
# # if the link is in the urls list, skip
# # add the link to the urls list



# # create a folder using the name of the URL

In [None]:
import requests
from bs4 import BeautifulSoup
import html2text
import os
import json
import hashlib
import time
from urllib.parse import urljoin, urlparse

class LinkCounter:
    def __init__(self):
        self.jupyter_count = 0
        self.other_count = 0

    def increment(self, is_jupyter):
        if is_jupyter:
            self.jupyter_count += 1
        else:
            self.other_count += 1

    def __str__(self):
        return f"Jupyter notebooks: {self.jupyter_count}, Other links: {self.other_count}"

def html_to_markdown(url):
    response = requests.get(url)
    html_content = response.text
    soup = BeautifulSoup(html_content, 'html.parser')
    for script in soup(["script", "style"]):
        script.decompose()
    h = html2text.HTML2Text()
    h.ignore_links = False
    h.ignore_images = False
    h.ignore_tables = False
    h.body_width = 0
    markdown_content = h.handle(str(soup))
    return markdown_content, soup

def extract_links(soup, base_url):
    links = set()
    for a in soup.find_all('a', href=True):
        href = a['href']
        full_url = urljoin(base_url, href)
        if full_url.startswith('http'):
            links.add(full_url)
    return links

def generate_unique_filename(url):
    parsed_url = urlparse(url)
    domain = parsed_url.netloc
    path = parsed_url.path.strip('/')
    hash_object = hashlib.md5(url.encode())
    hash_str = hash_object.hexdigest()[:10]
    return f"{domain}_{path}_{hash_str}.md".replace('/', '_')

def save_markdown(content, file_path):
    with open(file_path, "w", encoding='utf-8') as file:
        file.write(content)

def load_processed_links(file_path):
    if os.path.exists(file_path):
        with open(file_path, 'r') as f:
            return json.load(f)
    return {}

def save_processed_links(processed_links, file_path):
    with open(file_path, 'w') as f:
        json.dump(processed_links, f, indent=2)

def process_url(url, output_dir, processed_links, link_counter, depth=0, max_depth=100):
    if depth >= max_depth or url in processed_links:
        return processed_links, set()

    print(f"Processing URL: {url} (Depth: {depth})")
    
    try:
        markdown_content, soup = html_to_markdown(url)
        file_name = generate_unique_filename(url)
        file_path = os.path.join(output_dir, file_name)
        save_markdown(markdown_content, file_path)
        processed_links[url] = file_name
        print(f"Content saved to {file_path}")

        if url.endswith('.ipynb'):
            link_counter.increment(True)
        else:
            link_counter.increment(False)

        new_links = extract_links(soup, url)
        return processed_links, new_links

    except Exception as e:
        print(f"Error processing URL {url}: {str(e)}")
        return processed_links, set()

def main():
    output_dir = "extracted_content"
    os.makedirs(output_dir, exist_ok=True)
    processed_links_file = "/home/david/Desktop/scraping-rubin-links/extracted_content/processed_links.json"
    processed_links = load_processed_links(processed_links_file)
    link_counter = LinkCounter()

    # Start with the links from processed_links.json
    links_to_process = set(processed_links.keys())

    iteration = 0
    while iteration < 100 and links_to_process:
        iteration += 1
        print(f"\nIteration {iteration}")
        
        new_links = set()
        for url in list(links_to_process):
            processed_links, extracted_links = process_url(url, output_dir, processed_links, link_counter)
            new_links.update(extracted_links)
            links_to_process.remove(url)
            time.sleep(1)  # Be respectful to the server

        links_to_process.update(new_links - set(processed_links.keys()))
        save_processed_links(processed_links, processed_links_file)
        
        print(f"Links remaining to process: {len(links_to_process)}")
        if not links_to_process:
            print("All links processed.")
            break

    print("\nLink Processing Summary:")
    print(link_counter)
    print(f"Total links processed: {len(processed_links)}")
    print(f"Total iterations: {iteration}")

if __name__ == "__main__":
    main()

In [None]:
import requests
from bs4 import BeautifulSoup
import html2text
import os
import nbformat
from nbconvert import MarkdownExporter
import json
import hashlib
import time

def html_to_markdown(url):
    # Fetch HTML content from the URL
    response = requests.get(url)
    html_content = response.text
    
    # Parse HTML content
    soup = BeautifulSoup(html_content, 'html.parser')
    
    # Remove script and style elements
    for script in soup(["script", "style"]):
        script.decompose()
    
    # Configure HTML to Markdown converter
    h = html2text.HTML2Text()
    h.ignore_links = False
    h.ignore_images = False
    h.ignore_tables = False
    h.body_width = 0  # Disable line wrapping
    
    # Convert HTML to Markdown
    markdown_content = h.handle(str(soup))
    return markdown_content, soup

def extract_links(soup, base_url):
    # Extract all links from the page
    links = {}
    for a in soup.find_all('a', href=True):
        href = a['href']
        # Handle relative URLs
        if href.startswith('/'):
            href = base_url + href
        if href.startswith('http'):
            file_name = generate_unique_filename(href)
            links[file_name] = href
    return links

def generate_unique_filename(url):
    # Generate a unique filename based on URL hash
    hash_object = hashlib.md5(url.encode())
    return hash_object.hexdigest()[:10] + '.md'

def jupyter_to_markdown(jupyter_path):
    # Convert Jupyter notebook to Markdown
    with open(jupyter_path, "r") as file:
        notebook = nbformat.read(file, as_version=4)
    exporter = MarkdownExporter()           
    markdown, _ = exporter.from_notebook_node(notebook)
    return markdown

def save_markdown(content, file_path):
    # Save Markdown content to file
    with open(file_path, "w", encoding='utf-8') as file:
        file.write(content)

def load_processed_links(output_dir):
    # Load previously processed links from JSON file
    processed_links_file = os.path.join(output_dir, "processed_links.json")
    if os.path.exists(processed_links_file):
        with open(processed_links_file, 'r') as f:
            return json.load(f)
    return {}

def save_processed_links(processed_links, output_dir):
    # Save processed links to JSON file
    processed_links_file = os.path.join(output_dir, "processed_links.json")
    with open(processed_links_file, 'w') as f:
        json.dump(processed_links, f, indent=2)

def process_url(url, output_dir, processed_links, depth=0, max_depth=100):
    # Recursive function to process URLs and their links
    if depth > max_depth:
        return processed_links

    os.makedirs(output_dir, exist_ok=True)
    
    # Skip if URL has already been processed
    if url in processed_links.values():
        print(f"Skipping already processed URL: {url}")
        return processed_links

    print(f"Processing URL: {url}")
    
    # Convert page to Markdown
    markdown_content, soup = html_to_markdown(url)
    file_name = generate_unique_filename(url)
    file_path = os.path.join(output_dir, file_name)
    save_markdown(markdown_content, file_path)
    processed_links[file_name] = url
    print(f"Content saved to {file_path}")

    # Extract and process links
    base_url = '/'.join(url.split('/')[:3])
    links = extract_links(soup, base_url)

    # Process each extracted link
    for file_name, link in links.items():
        if link in processed_links.values():
            print(f"Skipping already processed link: {link}")
            continue

        file_path = os.path.join(output_dir, file_name)
        if link.endswith('.ipynb'):
            # Handle Jupyter notebooks
            try:
                jupyter_content = requests.get(link).text
                jupyter_file = file_path.replace('.md', '.ipynb')
                with open(jupyter_file, 'w') as f:
                    f.write(jupyter_content)
                markdown = jupyter_to_markdown(jupyter_file)
                save_markdown(markdown, file_path)
                processed_links[file_name] = link
                print(f"Jupyter notebook converted and saved to {file_path}")
            except Exception as e:
                print(f"Error processing Jupyter notebook {link}: {str(e)}")
        else:
            # Handle other links
            try:
                content, _ = html_to_markdown(link)
                save_markdown(content, file_path)
                processed_links[file_name] = link
                print(f"Content from {link} saved to {file_path}")
            except Exception as e:
                print(f"Error processing link {link}: {str(e)}")

        # Save progress after each processed link
        save_processed_links(processed_links, output_dir)

        # Recursively process the new link
        processed_links = process_url(link, output_dir, processed_links, depth + 1, max_depth)

        # Delay to avoid overwhelming the server
        time.sleep(1)

    return processed_links

# Main execution
# url = "https://dp0-3.lsst.io/tutorials-dp0-3/index.html#dp0-3-tutorials-contributed"
url = "https://www.lsst.io/"

output_dir = "extracted_content"
processed_links = load_processed_links(output_dir)
process_url(url, output_dir, processed_links)

In [None]:
import requests
from bs4 import BeautifulSoup
import html2text
import os
import nbformat
from nbconvert import MarkdownExporter
import json

def html_to_markdown(url):
    response = requests.get(url)
    html_content = response.text
    soup = BeautifulSoup(html_content, 'html.parser')
    for script in soup(["script", "style"]):
        script.decompose()
    h = html2text.HTML2Text()
    h.ignore_links = False
    h.ignore_images = False
    h.ignore_tables = False
    h.body_width = 0
    markdown_content = h.handle(str(soup))
    return markdown_content, soup

def extract_links(soup, base_url):
    links = {}
    for a in soup.find_all('a', href=True):
        href = a['href']
        if href.startswith('/'):
            href = base_url + href
        if href.startswith('http'):
            file_name = a.text.strip().replace(' ', '_') + '.md'
            links[file_name] = href
    return links

def jupyter_to_markdown(jupyter_path):
    with open(jupyter_path, "r") as file:
        notebook = nbformat.read(file, as_version=4)
    exporter = MarkdownExporter()           
    markdown, _ = exporter.from_notebook_node(notebook)
    return markdown

def save_markdown(content, file_path):
    with open(file_path, "w", encoding='utf-8') as file:
        file.write(content)

def process_url(url, output_dir):
    os.makedirs(output_dir, exist_ok=True)
    
    # Convert main page to Markdown
    markdown_content, soup = html_to_markdown(url)
    main_file = os.path.join(output_dir, "main_page.md")
    save_markdown(markdown_content, main_file)
    print(f"Main content saved to {main_file}")

    # Extract and process links
    base_url = '/'.join(url.split('/')[:3])
    links = extract_links(soup, base_url)
    links_file = os.path.join(output_dir, "links.json")
    with open(links_file, 'w') as f:
        json.dump(links, f, indent=2)
    print(f"Links saved to {links_file}")

    # Process each link
    for file_name, link in links.items():
        file_path = os.path.join(output_dir, file_name)
        if link.endswith('.ipynb'):
            try:
                jupyter_content = requests.get(link).text
                with open(file_path.replace('.md', '.ipynb'), 'w') as f:
                    f.write(jupyter_content)
                markdown = jupyter_to_markdown(file_path.replace('.md', '.ipynb'))
                save_markdown(markdown, file_path)
                print(f"Jupyter notebook converted and saved to {file_path}")
            except Exception as e:
                print(f"Error processing Jupyter notebook {link}: {str(e)}")
        else:
            try:
                content, _ = html_to_markdown(link)
                save_markdown(content, file_path)
                print(f"Content from {link} saved to {file_path}")
            except Exception as e:
                print(f"Error processing link {link}: {str(e)}")

# Main execution
# url = "https://github.com/lsst"
url = "https://github.com/rubin-dp0/tutorial-notebooks"
output_dir = "extracted_content"
process_url(url, output_dir)

In [None]:
jupyter_path = "/home/david/Desktop/scraping-rubin-links/dist/all-links/DP02_01_Introduction_to_DP02.ipynb"

# Read the Jupyter notebook
import nbformat
from nbconvert import MarkdownExporter




with open(jupyter_path, "r") as file:
    notebook = nbformat.read(file, as_version=4)

# convert the notebook to markdown
exporter = MarkdownExporter()           
markdown, _ = exporter.from_notebook_node(notebook)

# Save the markdown
markdown_path = jupyter_path.replace(".ipynb", ".md")
with open(markdown_path, "w") as file:
    file.write(markdown)

In [None]:
import requests
from bs4 import BeautifulSoup
import html2text

def html_to_markdown(url):
    # Fetch the HTML content
    response = requests.get(url)
    html_content = response.text

    # Parse the HTML content
    soup = BeautifulSoup(html_content, 'html.parser')

    # Remove script and style elements
    for script in soup(["script", "style"]):
        script.decompose()

    # Initialize html2text
    h = html2text.HTML2Text()
    h.ignore_links = False
    h.ignore_images = False
    h.ignore_tables = False
    h.body_width = 0  # Disable line wrapping

    # Convert to Markdown
    markdown_content = h.handle(str(soup))

    return markdown_content

# Specify the URL of the HTML page
# url = "https://dp0-3.lsst.io/tutorials-dp0-3/index.html#dp0-3-tutorials-contributed"
url = "https://dp0-3.lsst.io/"

# Convert HTML to Markdown
markdown_content = html_to_markdown(url)

# Write the Markdown content to a file
markdown_file = "page.md"
with open(markdown_file, "w", encoding='utf-8') as file:
    file.write(markdown_content)

print(f"Content extracted and saved to {markdown_file}")

# Optionally, print the first 500 characters of the Markdown content
print("\nFirst 500 characters of Markdown content:")
print(markdown_content[:500])

In [None]:
# %pip install anthropic bs4

In [None]:
%pwd

In [None]:
# import anthropic
import time
import requests
from bs4 import BeautifulSoup

# client = anthropic.Anthropic()
# MODEL_NAME = "claude-3-5-sonnet-20240620"

Now let's fetch some text content to use in our examples. We'll use the text from Pride and Prejudice by Jane Austen which is around ~187,000 tokens long.

In [None]:
import requests
from bs4 import BeautifulSoup
import re
import os
import json

def fetch_article_content(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.content, 'html.parser')
    
    # Remove script and style elements
    for script in soup(["script", "style"]):
        script.decompose()
    
    # Get text
    text = soup.get_text()
    
    # Break into lines and remove leading and trailing space on each
    lines = (line.strip() for line in text.splitlines())
    # Break multi-headlines into a line each
    chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
    # Drop blank lines
    text = '\n'.join(chunk for chunk in chunks if chunk)
    
    return text

def extract_linked_url(content):
    # Regular expression to find URLs
    url_pattern = re.compile(r'https?://\S+')
    match = url_pattern.search(content)
    if match:
        return match.group(0)
    return None


In [None]:
# Read links and filenames from allLinks.json
with open('/home/david/Desktop/scraping-rubin-links/dist/all-links/allLinks.json', 'r') as file:
    links_data = json.load(file)

In [None]:
import urllib.parse

# print(len(links_data))
count = 0
urls = []
links_clean = {}
for i, links in enumerate(links_data):
  for j, link in enumerate(links):
    # print(i, j)
    # print(link)
    filename = link['filename']    
    url = link['links'][0]
    urls.append(url)
    # use the filename and url  create a new file name 
    domain = urllib.parse.urlparse(url).netloc
    new_filename = f"{domain}-{filename}"

    # add the new filename and url to the links_clean dictionary  
    links_clean[new_filename] = url
    count += 1


# check if there any duplicate links in the links_clean dictionary
# if there are duplicates, then remove the duplicates
seen_urls = set()
unique_links_names = {}

for new_filename, url in links_clean.items():
  # find unique links
    if url not in seen_urls:
        seen_urls.add(url)
        unique_links_names[new_filename] = url


    # # print(f"filename: {filename}")               
    # url = link["links"][0]
    # urls.append(url)
    # links_clean[filename] = url
    # count += 1
    # print(f"url: {url}")

    # if len(url) > 1:
    #   print(f"url: {url}")



In [None]:
print(len(unique_links_names))
print(unique_links_names)

In [None]:
%pwd

In [None]:
# read unique_links_names, fetech the content of the article from url and save it to a file use the filename

import os
import time

# Ensure the directory exists
output_dir = "/home/david/Desktop/scraping-rubin-links/dist/all-links/articles/"
os.makedirs(output_dir, exist_ok=True)

# Read unique_links_names, fetch the content of the article from url and save it to a file using the filename
for filename, url in unique_links_names.items():
    print(f"filename: {filename}")
    print(f"url: {url}")
    content = fetch_article_content(url)
    with open(os.path.join(output_dir, filename), 'w') as file:
        file.write(content)
    time.sleep(1)



In [None]:
# Read links and filenames from allLinks.json
with open('/home/david/Desktop/scraping-rubin-links/dist/all-links/allLinks.json', 'r') as file:
    links_data = json.load(file)

# Ensure the directory exists
os.makedirs("data/papers", exist_ok=True)

for entrys in links_data:
    # print(f"entry: {entrys}")
    for entry in entrys:
        print(f"entry: {entry}")
        if isinstance(entry, dict) and 'links' in entry and 'filename' in entry:
            
            file_name = entry['filename']
            
            # Get the URL of the book
            for link in entry['links']:
                book_url = link['url']
                    # break
            
                print(f"Fetching {book_url}...")
                # Fetch the content of the article
                book_content = fetch_article_content(book_url)

                
                # Extract the linked URL from the content
                linked_url = extract_linked_url(book_content)
                
                # Store the book content
                file_path = f"data/papers/{file_name}.md"
                with open(file_path, "w") as file:
                    file.write(book_content)
                
                # Store the linked URL
                if linked_url:
                    linked_url_path = f"data/papers/{file_name}_linked_url.txt"
                    with open(linked_url_path, "w") as file:
                        file.write(linked_url)
                
                print(f"Fetched {len(book_content)} characters from {book_url}.")
                print("First 500 characters:")
                print(book_content[:500])
                if linked_url:
                    print(f"Extracted linked URL: {linked_url}")
                else:
                    print("No linked URL found.")
        else:
            print(f"Invalid entry: {entry}")

In [None]:
# %pwd
# %pip install nbformat nbconvert

In [None]:
jupyter_path = "/home/david/Desktop/scraping-rubin-links/dist/all-links/DP02_01_Introduction_to_DP02.ipynb"

# Read the Jupyter notebook
import nbformat
from nbconvert import MarkdownExporter

In [None]:
%pwd

In [None]:
jupyter_path = "/home/david/Desktop/scraping-rubin-links/dist/all-links/DP02_01_Introduction_to_DP02.ipynb"

# Read the Jupyter notebook
import nbformat
from nbconvert import MarkdownExporter




with open(jupyter_path, "r") as file:
    notebook = nbformat.read(file, as_version=4)

# convert the notebook to markdown
exporter = MarkdownExporter()           
markdown, _ = exporter.from_notebook_node(notebook)

# Save the markdown
markdown_path = jupyter_path.replace(".ipynb", ".md")
with open(markdown_path, "w") as file:
    file.write(markdown)

In [None]:
%pwd

In [None]:
# featch the content from the url and extract the linked url from the content, save the content to a markdown file and the linked url to another file
# create fetch_link_content() function to do this

import requests
from bs4 import BeautifulSoup
import os

def fetch_link_content(url, content_filename, links_filename):
    # Fetch the content from the URL
    response = requests.get(url)
    response.raise_for_status()  # Raise an exception for HTTP errors
    content = response.text

    # Parse the content to extract linked URLs
    soup = BeautifulSoup(content, 'html.parser')
    links = [a['href'] for a in soup.find_all('a', href=True)]

    # Save the content to a Markdown file
    with open(content_filename, 'w') as content_file:
        content_file.write(content)

    # Save the linked URLs to another file
    with open(links_filename, 'w') as links_file:
        for link in links:
            links_file.write(link + '\n')


In [None]:
original_link = "https://dp0-2.lsst.io/"

# Read content from the link and save it to a markdown file: create a file name from the link     
# content = fetch_link_content(original_link)
content_filename = original_link.replace("https://", "").replace("http://", "").replace("/", "-") + ".md"
links_filename = original_link.replace("https://", "").replace("http://", "").replace("/", "-") + "_links.txt"

# # Ensure the directory exists
# os.makedirs(os.path.dirname(content_filename), exist_ok=True)
# os.makedirs(os.path.dirname(links_filename), exist_ok=True)

fetch_link_content(original_link, content_filename, links_filename)
                       

In [None]:
import requests
from bs4 import BeautifulSoup
import re
import os

def fetch_article_content(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.content, 'html.parser')
    
    # Remove script and style elements
    for script in soup(["script", "style"]):
        script.decompose()
    
    # Get text
    text = soup.get_text()
    
    # Break into lines and remove leading and trailing space on each
    lines = (line.strip() for line in text.splitlines())
    # Break multi-headlines into a line each
    chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
    # Drop blank lines
    text = '\n'.join(chunk for chunk in chunks if chunk)
    
    return text

def extract_linked_url(content):
    # Regular expression to find URLs
    url_pattern = re.compile(r'https?://\S+')
    match = url_pattern.search(content)
    if match:
        return match.group(0)
    return None

# Fetch the content of the article
book_url = "https://github.com/rubin-dp0/tutorial-notebooks/blob/main/DP02_01_Introduction_to_DP02.ipynb"
book_content = fetch_article_content(book_url)

# Extract the linked URL from the content
linked_url = extract_linked_url(book_content)

# Ensure the directory exists
os.makedirs("data/papers", exist_ok=True)

# Store the book content
file_name = book_url.split("/")[-2]
file_path = f"data/papers/{file_name}.md"
with open(file_path, "w") as file:
    file.write(book_content)

# Store the linked URL
if linked_url:
    linked_url_path = f"data/papers/{file_name}_linked_url.txt"
    with open(linked_url_path, "w") as file:
        file.write(linked_url)

print(f"Fetched {len(book_content)} characters from the book.")
print("First 500 characters:")
print(book_content)
# save the content to a md file
with open("data/papers/book.md", "w") as file:
    file.write(book_content)
if linked_url:
    print(f"Extracted linked URL: {linked_url}")
else:
    print("No linked URL found.")

In [None]:
def fetch_article_content(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.content, 'html.parser')
    
    # Remove script and style elements
    for script in soup(["script", "style"]):
        script.decompose()
    
    # Get text
    text = soup.get_text()
    
    # Break into lines and remove leading and trailing space on each
    lines = (line.strip() for line in text.splitlines())
    # Break multi-headlines into a line each
    chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
    # Drop blank lines
    text = '\n'.join(chunk for chunk in chunks if chunk)
    
    return text

# Fetch the content of the article
# book_url = "https://www.gutenberg.org/cache/epub/1342/pg1342.txt"
book_url = "https://www.lsst.io/"
book_content = fetch_article_content(book_url)

print(f"Fetched {len(book_content)} characters from the book.")
print("First 500 characters:")
print(book_content[:])

#and create a file name from the book_url, transfer book_content to markdown file, store the file to folder data/papers, 

file_name = book_url.split("/")[-2]
file_path = f"data/papers/{file_name}.md"
with open(file_path, "w") as file:
    file.write(book_content)
    





## Example 1: Single turn

Let's demonstrate prompt caching with a large document, comparing the performance and cost between cached and non-cached API calls.

### Part 1: Non-cached API Call

First, let's make a non-cached API call. This will load the prompt into the cache so that our subsequent cached API calls can benefit from the prompt caching.

We will ask for a short output string to keep the output response time low since the benefit of prompt caching applies only to the input processing time.

In [None]:
def make_non_cached_api_call():
    messages = [
        {
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": "<book>" + book_content + "</book>",
                    "cache_control": {"type": "ephemeral"}
                },
                {
                    "type": "text",
                    "text": "What is the title of this book? Only output the title."
                }
            ]
        }
    ]

    start_time = time.time()
    response = client.messages.create(
        model=MODEL_NAME,
        max_tokens=300,
        messages=messages,
        extra_headers={"anthropic-beta": "prompt-caching-2024-07-31"}

    )
    end_time = time.time()

    return response, end_time - start_time

non_cached_response, non_cached_time = make_non_cached_api_call()

print(f"Non-cached API call time: {non_cached_time:.2f} seconds")
print(f"Non-cached API call input tokens: {non_cached_response.usage.input_tokens}")
print(f"Non-cached API call output tokens: {non_cached_response.usage.output_tokens}")

print("\nSummary (non-cached):")
print(non_cached_response.content)

### Part 2: Cached API Call

Now, let's make a cached API call. I'll add in the "cache_control": {"type": "ephemeral"} attribute to the content object and add the "prompt-caching-2024-07-31" beta header to the request. This will enable prompt caching for this API call.

To keep the output latency constant, we will ask Claude the same question as before. Note that this question is not part of the cached content.

In [None]:
def make_cached_api_call():
    messages = [
        {
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": "<book>" + book_content + "</book>",
                    "cache_control": {"type": "ephemeral"}
                },
                {
                    "type": "text",
                    "text": "What is the title of this book? Only output the title."
                }
            ]
        }
    ]

    start_time = time.time()
    response = client.messages.create(
        model=MODEL_NAME,
        max_tokens=300,
        messages=messages,
        extra_headers={"anthropic-beta": "prompt-caching-2024-07-31"}
    )
    end_time = time.time()

    return response, end_time - start_time

cached_response, cached_time = make_cached_api_call()

print(f"Cached API call time: {cached_time:.2f} seconds")
print(f"Cached API call input tokens: {cached_response.usage.input_tokens}")
print(f"Cached API call output tokens: {cached_response.usage.output_tokens}")

print("\nSummary (cached):")
print(cached_response.content)

As you can see, the cached API call only took 3.64 seconds total compared to 21.44 seconds for the non-cached API call. This is a significant improvement in overall latency due to caching.

## Example 2: Multi-turn Conversation with Incremental Caching

Now, let's look at a multi-turn conversation where we add cache breakpoints as the conversation progresses.

In [None]:
class ConversationHistory:
    def __init__(self):
        # Initialize an empty list to store conversation turns
        self.turns = []

    def add_turn_assistant(self, content):
        # Add an assistant's turn to the conversation history
        self.turns.append({
            "role": "assistant",
            "content": [
                {
                    "type": "text",
                    "text": content
                }
            ]
        })

    def add_turn_user(self, content):
        # Add a user's turn to the conversation history
        self.turns.append({
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": content
                }
            ]
        })

    def get_turns(self):
        # Retrieve conversation turns with specific formatting
        result = []
        user_turns_processed = 0
        # Iterate through turns in reverse order
        for turn in reversed(self.turns):
            if turn["role"] == "user" and user_turns_processed < 2:
                # Add the last two user turns with ephemeral cache control
                result.append({
                    "role": "user",
                    "content": [
                        {
                            "type": "text",
                            "text": turn["content"][0]["text"],
                            "cache_control": {"type": "ephemeral"}
                        }
                    ]
                })
                user_turns_processed += 1
            else:
                # Add other turns as they are
                result.append(turn)
        # Return the turns in the original order
        return list(reversed(result))

# Initialize the conversation history
conversation_history = ConversationHistory()

# System message containing the book content
# Note: 'book_content' should be defined elsewhere in the code
system_message = f"<file_contents> {book_content} </file_contents>"

# Predefined questions for our simulation
questions = [
    "What is the title of this novel?",
    "Who are Mr. and Mrs. Bennet?",
    "What is Netherfield Park?",
    "What is the main theme of this novel?"
]

def simulate_conversation():
    for i, question in enumerate(questions, 1):
        print(f"\nTurn {i}:")
        print(f"User: {question}")
        
        # Add user input to conversation history
        conversation_history.add_turn_user(question)

        # Record the start time for performance measurement
        start_time = time.time()

        # Make an API call to the assistant
        response = client.messages.create(
            model=MODEL_NAME,
            extra_headers={
              "anthropic-beta": "prompt-caching-2024-07-31"
            },
            max_tokens=300,
            system=[
                {"type": "text", "text": system_message, "cache_control": {"type": "ephemeral"}},
            ],
            messages=conversation_history.get_turns(),
        )

        # Record the end time
        end_time = time.time()

        # Extract the assistant's reply
        assistant_reply = response.content[0].text
        print(f"Assistant: {assistant_reply}")

        # Print token usage information
        input_tokens = response.usage.input_tokens
        output_tokens = response.usage.output_tokens
        input_tokens_cache_read = getattr(response.usage, 'cache_read_input_tokens', '---')
        input_tokens_cache_create = getattr(response.usage, 'cache_creation_input_tokens', '---')
        print(f"User input tokens: {input_tokens}")
        print(f"Output tokens: {output_tokens}")
        print(f"Input tokens (cache read): {input_tokens_cache_read}")
        print(f"Input tokens (cache write): {input_tokens_cache_create}")

        # Calculate and print the elapsed time
        elapsed_time = end_time - start_time

        # Calculate the percentage of input prompt cached
        total_input_tokens = input_tokens + (int(input_tokens_cache_read) if input_tokens_cache_read != '---' else 0)
        percentage_cached = (int(input_tokens_cache_read) / total_input_tokens * 100 if input_tokens_cache_read != '---' and total_input_tokens > 0 else 0)

        print(f"{percentage_cached:.1f}% of input prompt cached ({total_input_tokens} tokens)")
        print(f"Time taken: {elapsed_time:.2f} seconds")

        # Add assistant's reply to conversation history
        conversation_history.add_turn_assistant(assistant_reply)

# Run the simulated conversation
simulate_conversation()

As you can see in this example, response times decreased from nearly 24 seconds to just 8-9 seconds after the initial cache setup, while maintaining the same level of quality across the answers. Most of this remaining latency is due to the time it takes to generate the response, which is not affected by prompt caching.

And since nearly 100% of input tokens were cached in subsequent turns as we kept adjusting the cache breakpoints, we were able to read the next user message nearly instantly.