In [1]:
# Preamble
import PyPDF2 # For parsing PDF documents
import ast  # covert embeddings saved as strings back to arrays
import openai  # OpenAI API
import pandas as pd  # for storing text and embeddings data
import tiktoken  # for counting tokens
from scipy import spatial  # for calculating vector similarities for search
import wikipedia # For sourcing Wikipedia article text
import re  # for cutting <ref> links out of Wikipedia articles
import mwparserfromhell  # for splitting Wikipedia articles into sections
from copy import deepcopy # for copying dataframes

In [2]:
# Config
EMBEDDING_MODEL = "text-embedding-ada-002"
GPT_MODEL = "gpt-3.5-turbo"
SAVE_PATH = "assets/computer_vision_wiki_embeddings.csv"
BATCH_SIZE = 1000  # you can submit up to 2048 embedding inputs per request
CHATBOT_KNOWLEDGE_TOPIC = "Computer Vision"
ANSWER_NOT_FOUND_MSG = "I could not find an answer in the text I\'ve been provided, sorry! Please try again."

In [3]:
SECTIONS_TO_IGNORE = [
    "See also",
    "References",
    "External links",
    "Further reading",
    "Footnotes",
    "Bibliography",
    "Sources",
    "Citations",
    "Literature",
    "Footnotes",
    "Notes and references",
    "Photo gallery",
    "Works cited",
    "Photos",
    "Gallery",
    "Notes",
    "References and sources",
    "References and notes",
]

WIKI_PAGE = "Computer vision"

def all_subsections_from_section(
    section: mwparserfromhell.wikicode.Wikicode,
    parent_titles: list[str],
    sections_to_ignore: set[str],
) -> list[tuple[list[str], str]]:
    """
    From a Wikipedia section, return a flattened list of all nested subsections.
    Each subsection is a tuple, where:
        - the first element is a list of parent subtitles, starting with the page title
        - the second element is the text of the subsection (but not any children)
    """
    headings = [str(h) for h in section.filter_headings()]
    title = headings[0]
    if title.strip("=" + " ") in sections_to_ignore:
        # ^wiki headings are wrapped like "== Heading =="
        return []
    titles = parent_titles + [title]
    full_text = str(section)
    section_text = full_text.split(title)[1]
    if len(headings) == 1:
        return [(titles, section_text)]
    else:
        first_subtitle = headings[1]
        section_text = section_text.split(first_subtitle)[0]
        results = [(titles, section_text)]
        for subsection in section.get_sections(levels=[len(titles) + 1]):
            results.extend(all_subsections_from_section(subsection, titles, sections_to_ignore))
        return results

def all_subsections_from_title(
    title: str = WIKI_PAGE,
    sections_to_ignore: set[str] = SECTIONS_TO_IGNORE,
) -> list[tuple[list[str], str]]:
    """From a Wikipedia page title, return a flattened list of all nested subsections.
    Each subsection is a tuple, where:
        - the first element is a list of parent subtitles, starting with the page title
        - the second element is the text of the subsection (but not any children)
    """
    site = wikipedia.page(title, auto_suggest=False)
    text = site.content
    parsed_text = mwparserfromhell.parse(text)
    headings = [str(h) for h in parsed_text.filter_headings()]
    if headings:
        summary_text = str(parsed_text).split(headings[0])[0]
    else:
        summary_text = str(parsed_text)
    results = [([title], summary_text)]
    for subsection in parsed_text.get_sections(levels=[2]):
        results.extend(all_subsections_from_section(subsection, [title], sections_to_ignore))
    return results

# split pages into sections
# may take ~1 minute per 100 articles
wikipedia_sections = []
wikipedia_sections.extend(all_subsections_from_title(WIKI_PAGE))
print(f"Found {len(wikipedia_sections)} sections.")
i=0
for tup in wikipedia_sections:
    print(f'{i}. {tup[0]}: {str(tup[1])[0:30]}')
    i+=1

Found 24 sections.
0. ['Computer vision']: Computer vision tasks include 
1. ['Computer vision', '== Definition ==']: 
Computer vision is an interdi
2. ['Computer vision', '== History ==']: 
In the late 1960s, computer v
3. ['Computer vision', '== Related fields ==']: 



4. ['Computer vision', '== Related fields ==', '=== Solid-state physics ===']: 
Solid-state physics is anothe
5. ['Computer vision', '== Related fields ==', '=== Neurobiology ===']: 
Neurobiology has greatly infl
6. ['Computer vision', '== Related fields ==', '=== Signal processing ===']: 
Yet another field related to 
7. ['Computer vision', '== Related fields ==', '=== Robotic navigation ===']: 
Robot navigation sometimes de
8. ['Computer vision', '== Related fields ==', '=== Other fields ===']: 
Besides the above-mentioned v
9. ['Computer vision', '== Related fields ==', '=== Distinctions ===']: 
The fields most closely relat
10. ['Computer vision', '== Applications ==']: 
Applications range from tasks
11. ['Compute

In [4]:
# clean text
def clean_section(section: tuple[list[str], str]) -> tuple[list[str], str]:
    """
    Return a cleaned up section with:
        - <ref>xyz</ref> patterns removed
        - leading/trailing whitespace removed
    """
    titles, text = section
    text = re.sub(r"<ref.*?</ref>", "", text)
    text = text.strip()
    return (titles, text)


wikipedia_sections = [clean_section(ws) for ws in wikipedia_sections]

# filter out short/blank sections
def keep_section(section: tuple[list[str], str]) -> bool:
    """Return True if the section should be kept, False otherwise."""
    titles, text = section
    if len(text) < 16:
        return False
    else:
        return True


original_num_sections = len(wikipedia_sections)
wikipedia_sections = [ws for ws in wikipedia_sections if keep_section(ws)]
print(f"Filtered out {original_num_sections-len(wikipedia_sections)} sections, leaving {len(wikipedia_sections)} sections.")

Filtered out 1 sections, leaving 23 sections.


In [5]:
# recursively split long sections into smaller sections.

def num_tokens(text: str, model: str = GPT_MODEL) -> int:
    """Return the number of tokens in a string."""
    encoding = tiktoken.encoding_for_model(model)
    return len(encoding.encode(text))


def halved_by_delimiter(string: str, delimiter: str = "\n") -> list[str, str]:
    """Split a string in two, on a delimiter, trying to balance tokens on each side."""
    chunks = string.split(delimiter)
    if len(chunks) == 1:
        return [string, ""]  # no delimiter found
    elif len(chunks) == 2:
        return chunks  # no need to search for halfway point
    else:
        total_tokens = num_tokens(string)
        halfway = total_tokens // 2
        best_diff = halfway
        for i, chunk in enumerate(chunks):
            left = delimiter.join(chunks[: i + 1])
            left_tokens = num_tokens(left)
            diff = abs(halfway - left_tokens)
            if diff >= best_diff:
                break
            else:
                best_diff = diff
        left = delimiter.join(chunks[:i])
        right = delimiter.join(chunks[i:])
        return [left, right]


def truncated_string(
    string: str,
    model: str,
    max_tokens: int,
    print_warning: bool = True,
) -> str:
    """Truncate a string to a maximum number of tokens."""
    encoding = tiktoken.encoding_for_model(model)
    encoded_string = encoding.encode(string)
    truncated_string = encoding.decode(encoded_string[:max_tokens])
    if print_warning and len(encoded_string) > max_tokens:
        print(f"Warning: Truncated string from {len(encoded_string)} tokens to {max_tokens} tokens.")
    return truncated_string


def split_strings_from_subsection(
    subsection: tuple[list[str], str],
    max_tokens: int = 1000,
    model: str = GPT_MODEL,
    max_recursion: int = 5,
) -> list[str]:
    """
    Split a subsection into a list of subsections, each with no more than max_tokens.
    Each subsection is a tuple of parent titles [H1, H2, ...] and text (str).
    """
    titles, text = subsection
    string = "\n\n".join([text])
    num_tokens_in_string = num_tokens(string)
    # if length is fine, return string
    if num_tokens_in_string <= max_tokens:
        return [string]
    # if recursion hasn't found a split after X iterations, just truncate
    elif max_recursion == 0:
        return [truncated_string(string, model=model, max_tokens=max_tokens)]
    # otherwise, split in half and recurse
    else:
        titles, text = subsection
        for delimiter in ["\n\n", "\n", ". "]:
            left, right = halved_by_delimiter(text, delimiter=delimiter)
            if left == "" or right == "":
                # if either half is empty, retry with a more fine-grained delimiter
                continue
            else:
                # recurse on each half
                results = []
                for half in [left, right]:
                    half_subsection = (titles, half)
                    half_strings = split_strings_from_subsection(
                        half_subsection,
                        max_tokens=max_tokens,
                        model=model,
                        max_recursion=max_recursion - 1,
                    )
                    results.extend(half_strings)
                return results
    # otherwise no split was found, so just truncate (should be very rare)
    return [truncated_string(string, model=model, max_tokens=max_tokens)]

# split sections into chunks
MAX_TOKENS = 1600
wikipedia_strings = []
for section in wikipedia_sections:
    wikipedia_strings.extend(split_strings_from_subsection(section, max_tokens=MAX_TOKENS))

print(f"{len(wikipedia_sections)} Wikipedia sections split into {len(wikipedia_strings)} strings.")

23 Wikipedia sections split into 23 strings.


In [6]:
# Used throughout
def num_tokens(
        text: str,
        encoding: tiktoken.encoding_for_model = tiktoken.encoding_for_model(GPT_MODEL)
) -> int:
    """Returns the number of tokens in a string."""
    return len(encoding.encode(text))

def get_embedding(content: str, model: str = EMBEDDING_MODEL):
    return openai.Embedding.create(input=content, model=model)

In [7]:
# Now that we've split our library into shorter self-contained strings, we can compute embeddings for each.
# calculate embeddings

embeddings = []
wikipedia_strings = [str(i).removeprefix("\n") for i in wikipedia_strings]
wikipedia_strings = [s for s in wikipedia_strings if s!='\n\n']
for batch_start in range(0, len(wikipedia_strings), BATCH_SIZE):
    batch_end = batch_start + BATCH_SIZE
    batch = wikipedia_strings[batch_start:batch_end]
    print(f"Batch {batch_start} to {batch_end-1}")
    response = get_embedding(batch)
    for i, be in enumerate(response["data"]):
        assert i == be["index"]  # double check embeddings are in same order as input
    batch_embeddings = [e["embedding"] for e in response["data"]]
    embeddings.extend(batch_embeddings)

df = pd.DataFrame({"text": wikipedia_strings, "embedding": embeddings})

Batch 0 to 999


In [8]:
df

Unnamed: 0,text,embedding
0,Computer vision tasks include methods for acqu...,"[-0.017700279131531715, 0.000514430517796427, ..."
1,Computer vision is an interdisciplinary field ...,"[-0.018318993970751762, 0.001517261378467083, ..."
2,"In the late 1960s, computer vision began at un...","[-0.012142791412770748, -0.0038553199265152216..."
3,Solid-state physics is another field that is c...,"[0.0041972859762609005, 0.012276003137230873, ..."
4,Neurobiology has greatly influenced the develo...,"[-0.010451305657625198, -0.0007485592504963279..."
5,Yet another field related to computer vision i...,"[-0.029517987743020058, 0.009650111198425293, ..."
6,Robot navigation sometimes deals with autonomo...,"[0.0059881932102143764, -0.01309896819293499, ..."
7,Besides the above-mentioned views on computer ...,"[0.0016696708044037223, -0.005273657850921154,..."
8,The fields most closely related to computer vi...,"[-0.01812841184437275, 0.005136170424520969, 0..."
9,Applications range from tasks such as industri...,"[-0.022646257653832436, 0.005542135331779718, ..."
