# Wikipedia Data Extractor

In this notebook we will crawl  Wikipedia extract pages about Beijing Olympics 2022, which we will later use with Vertex PaLM API.



In [None]:
#Authenticate with your google cloud account
from google.colab import auth as google_auth
google_auth.authenticate_user()

In [None]:
# Install all the needed packages

#Midiawiki client
!pip install mwclient

#MidiaWiki parser
!pip install mwparserfromhell


Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting mwclient
  Downloading mwclient-0.10.1-py2.py3-none-any.whl (27 kB)
Installing collected packages: mwclient
Successfully installed mwclient-0.10.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting mwparserfromhell
  Downloading mwparserfromhell-0.6.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (190 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m190.6/190.6 kB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: mwparserfromhell
Successfully installed mwparserfromhell-0.6.4


In [None]:
# Import the needed libraries for this module

# Import packages
import re
import pandas as pd
import mwclient
import mwparserfromhell


In [None]:
# get Wikipedia pages about the 2022 Winter Olympics

CATEGORY_TITLE = "Category:2022 Winter Olympics"#@param
WIKI_SITE = "en.wikipedia.org"


  and should_run_async(code)


In [None]:


def titles_from_category(
    category: mwclient.listing.Category, max_depth: int
) -> set[str]:
    """Return a set of page titles in a given Wiki category and its subcategories."""
    titles = set()
    for cm in category.members():
        if type(cm) == mwclient.page.Page:
            # ^type() used instead of isinstance() to catch match w/ no inheritance
            titles.add(cm.name)
        elif isinstance(cm, mwclient.listing.Category) and max_depth > 0:
            deeper_titles = titles_from_category(cm, max_depth=max_depth - 1)
            titles.update(deeper_titles)
    return titles


site = mwclient.Site(WIKI_SITE)
category_page = site.pages[CATEGORY_TITLE]
titles = titles_from_category(category_page, max_depth=1)
# ^note: max_depth=1 means we go one level deep in the category tree
print(f"Found {len(titles)} article titles in {CATEGORY_TITLE}.")

  and should_run_async(code)


Found 732 article titles in Category:2022 Winter Olympics.


Now that we have our reference documents, we need to prepare them for search.

Because PaLM TextEmbedding can only read a limited amount of text at once, we'll split each document into chunks short enough to be read.

For this specific example on Wikipedia articles, we'll:

Discard less relevant-looking sections like External Links and Footnotes
Clean up the text by removing reference tags (e.g., ), whitespace, and super short sections
Split each article into sections
Prepend titles and subtitles to each section's text, to help PaLM understand the context
If a section is long (say, > 1,600 tokens), we'll recursively split it into smaller sections, trying to split along semantic boundaries like paragraphs

In [None]:
SECTIONS_TO_IGNORE = [
    "See also",
    "References",
    "External links",
    "Further reading",
    "Footnotes",
    "Bibliography",
    "Sources",
    "Citations",
    "Literature",
    "Footnotes",
    "Notes and references",
    "Photo gallery",
    "Works cited",
    "Photos",
    "Gallery",
    "Notes",
    "References and sources",
    "References and notes",
]


def all_subsections_from_section(
    section: mwparserfromhell.wikicode.Wikicode,
    parent_titles: list[str],
    sections_to_ignore: set[str],
) -> list[tuple[list[str], str]]:
    """
    From a Wikipedia section, return a flattened list of all nested subsections.
    Each subsection is a tuple, where:
        - the first element is a list of parent subtitles, starting with the page title
        - the second element is the text of the subsection (but not any children)
    """
    headings = [str(h) for h in section.filter_headings()]
    title = headings[0]
    if title.strip("=" + " ") in sections_to_ignore:
        # ^wiki headings are wrapped like "== Heading =="
        return []
    titles = parent_titles + [title]
    full_text = str(section)
    section_text = full_text.split(title)[1]
    if len(headings) == 1:
        return [(titles, section_text)]
    else:
        first_subtitle = headings[1]
        section_text = section_text.split(first_subtitle)[0]
        results = [(titles, section_text)]
        for subsection in section.get_sections(levels=[len(titles) + 1]):
            results.extend(all_subsections_from_section(subsection, titles, sections_to_ignore))
        return results


def all_subsections_from_title(
    title: str,
    sections_to_ignore: set[str] = SECTIONS_TO_IGNORE,
    site_name: str = WIKI_SITE,
) -> list[tuple[list[str], str]]:
    """From a Wikipedia page title, return a flattened list of all nested subsections.
    Each subsection is a tuple, where:
        - the first element is a list of parent subtitles, starting with the page title
        - the second element is the text of the subsection (but not any children)
    """
    site = mwclient.Site(site_name)
    page = site.pages[title]
    text = page.text()
    parsed_text = mwparserfromhell.parse(text)
    headings = [str(h) for h in parsed_text.filter_headings()]
    if headings:
        summary_text = str(parsed_text).split(headings[0])[0]
    else:
        summary_text = str(parsed_text)
    results = [([title], summary_text)]
    for subsection in parsed_text.get_sections(levels=[2]):
        results.extend(all_subsections_from_section(subsection, [title], sections_to_ignore))
    return results

# split pages into sections
# may take ~1 minute per 100 articles
wikipedia_sections = []
for title in titles:
    wikipedia_sections.extend(all_subsections_from_title(title))
print(f"Found {len(wikipedia_sections)} sections in {len(titles)} pages.")




  and should_run_async(code)


Found 5718 sections in 732 pages.


In [None]:
# This is for English Language. Change this for other languagse
tokens_to_char = 4 #@param

  and should_run_async(code)


In [None]:

MAX_INPUT_TOKEN = 4095 # One less than Max for PaLM API of 4096


def num_tokens(text: str) -> int:
    """Return the number of tokens in a string."""
    strlength =  len(text)
    tokens = int(strlength/4)
    return tokens


def halved_by_delimiter(string: str, delimiter: str = "\n") -> list[str, str]:
    """Split a string in two, on a delimiter, trying to balance tokens on each side."""
    chunks = string.split(delimiter)
    if len(chunks) == 1:
        return [string, ""]  # no delimiter found
    elif len(chunks) == 2:
        return chunks  # no need to search for halfway point
    else:
        total_tokens = num_tokens(string)
        halfway = total_tokens // 2
        best_diff = halfway
        for i, chunk in enumerate(chunks):
            left = delimiter.join(chunks[: i + 1])
            left_tokens = num_tokens(left)
            diff = abs(halfway - left_tokens)
            if diff >= best_diff:
                break
            else:
                best_diff = diff
        left = delimiter.join(chunks[:i])
        right = delimiter.join(chunks[i:])
        return [left, right]


def truncated_string(
    string: str,
    max_tokens: int,
    print_warning: bool = True,
) -> str:
    """Truncate a string to a maximum number of tokens."""
    truncated_string = string[:MAX_INPUT_TOKEN]
    if print_warning and len(string) > MAX_INPUT_TOKEN:
        print(f"Warning: Truncated string from {len(string)} tokens to {MAX_INPUT_TOKEN} tokens.")
    return truncated_string


def split_strings_from_subsection(
    subsection: tuple[list[str], str],
    max_tokens: int = 4095,
    max_recursion: int = 5,
) -> list[str]:
    """
    Split a subsection into a list of subsections, each with no more than max_tokens.
    Each subsection is a tuple of parent titles [H1, H2, ...] and text (str).
    """
    titles, text = subsection
    string = "\n\n".join(titles + [text])
    num_tokens_in_string = num_tokens(string)
    # if length is fine, return string
    if num_tokens_in_string <= MAX_INPUT_TOKEN:
        return [string]
    # if recursion hasn't found a split after X iterations, just truncate
    elif max_recursion == 0:
        return [truncated_string(string, max_tokens=MAX_INPUT_TOKEN)]
    # otherwise, split in half and recurse
    else:
        titles, text = subsection
        for delimiter in ["\n\n", "\n", ". "]:
            left, right = halved_by_delimiter(text, delimiter=delimiter)
            if left == "" or right == "":
                # if either half is empty, retry with a more fine-grained delimiter
                continue
            else:
                # recurse on each half
                results = []
                for half in [left, right]:
                    half_subsection = (titles, half)
                    half_strings = split_strings_from_subsection(
                        half_subsection,
                        max_tokens=max_tokens,
                        max_recursion=max_recursion - 1,
                    )
                    results.extend(half_strings)
                return results
    # otherwise no split was found, so just truncate (should be very rare)
    return [truncated_string(string, max_tokens=max_tokens)]

  and should_run_async(code)


In [None]:
# split sections into chunks
MAX_TOKENS = 1600
wikipedia_strings = []
for section in wikipedia_sections:
    wikipedia_strings.extend(split_strings_from_subsection(section, max_tokens=MAX_TOKENS))

print(f"{len(wikipedia_sections)} Wikipedia sections split into {len(wikipedia_strings)} strings.")


5718 Wikipedia sections split into 5780 strings.


  and should_run_async(code)


In [None]:
#Validate to see the max Size of the chuncks
len_str = []
for wik in wikipedia_strings:
 len_str.append(len(wik))

len_str.sort(reverse=True)
len_str[:5]

  and should_run_async(code)


[16307, 16261, 16044, 15979, 15800]

In [None]:
#Save the Chunked documents in a Dataframe to persist to storage
import pandas as pd
df = pd.DataFrame()
df["text"] = wikipedia_strings
df.to_csv("wikipedia_strings.csv")




  and should_run_async(code)


In [None]:

PROJECT_ID = "demogct2022" #@param
BUCKET_TO_SAVE= f"wikipedia_strings_{PROJECT_ID}"#@param
REGION ="us-central1"#@param

  and should_run_async(code)


In [None]:
import os
from google.cloud import storage

# Create a storage client
client = storage.Client(project=PROJECT_ID)
bucket = client.bucket(BUCKET_TO_SAVE)
bucket.location = REGION

try:
  bucket = client.get_bucket(BUCKET_TO_SAVE)
  print(f"Bucket {BUCKET_TO_SAVE} exists.")
except:
  print(f"Creating bucket {BUCKET_TO_SAVE}")
  bucket.create()
  print(f"Bucket {BUCKET_TO_SAVE} created.")


# Check if the wikipedia_strings.csv file exists locally
if os.path.exists("wikipedia_strings.csv"):
  # Copy the file
  #client.copy_file('wikipedia_strings.csv', f'gs://{BUCKET_TO_SAVE}/wikipedia_strings.csv')
  blob = bucket.blob("wikipedia_strings.csv")
  blob.upload_from_filename("wikipedia_strings.csv")
  print(f"Blob {BUCKET_TO_SAVE}/wikipedia_strings.csv uploaded to {BUCKET_TO_SAVE}.")
else:
  print("The wikipedia_strings.csv file does not exist.")
  print ("Please save the dataframe in the above frames")

stats = storage.Blob(bucket=bucket, name="wikipedia_strings.csv").exists(client)
if stats:
  print(f"Blob gs://{BUCKET_TO_SAVE}/wikipedia_strings.csv exists.")
else:
  print(f"Blob gs://{BUCKET_TO_SAVE}/wikipedia_strings.csv does not exist.")


  and should_run_async(code)
  bucket.location = REGION


Bucket wikipedia_strings_demogct2022 exists.
Blob wikipedia_strings_demogct2022/wikipedia_strings.csv uploaded to wikipedia_strings_demogct2022.
Blob wikipedia_strings_demogct2022/wikipedia_strings.csv exists.


# Completed

Now you have completed crawling and creating the data.
In the next section we will load this data into matching Engine