# Conditional Knowledge Extraction and Enrichment for Graph RAG usage
## Seattle Children Hospital
### July 2025
#### Jerome Massot (jeromemassot@google.com)

## 00- Setup and Import Modules

In [1]:
from google.cloud import storage
from google.genai import types
from google import genai

In [2]:
from html_to_markdown import convert_to_markdown
from IPython.display import display, Markdown
from bs4 import XMLParsedAsHTMLWarning
from bs4 import BeautifulSoup
import requests

In [3]:
from pydantic import BaseModel, Field
from typing import List

In [4]:
import warnings
import pprint
import tqdm
import uuid
import json
import time
import re

In [5]:
warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning)

## 01- Conditions Pages Collection Extraction

This section of the notebook extracts the list of conditions pages from the index page available in the Seattle Children website.

In [13]:
def extract_conditions_collection(url: str) -> dict:
    """ 
    Extract the collection of conditions pages
    :param url: url of the index page
    :return: a dictionary with the conditions names and urls
    """

    # prefix for the links
    prefix = "https://www.seattlechildrens.org"

    # get the source of the page
    response = requests.get(url)
    response.raise_for_status()  # Raise an exception for bad status codes
    soup = BeautifulSoup(response.content, 'html.parser')

    # list the links corresponding to the conditions pages
    conditions_pages = {
            a.text: prefix+a.get('href') for a in soup.find_all('a', href=True) if a.get('href').startswith("/conditions") 
        }

    return conditions_pages

In [14]:
conditions_pages = extract_conditions_collection("https://www.seattlechildrens.org/conditions/a-z/")

In [15]:
print(f"There are {len(conditions_pages)} Condition pages in the collection...")

There are 392 Condition pages in the collection...


## 02- Conditions Pages Content Extraction

### 02-01- Extraction Engine

In [33]:
def chunk_markdown(markdown_content: str, links: dict) -> list:
    """ 
    Chunk the markdown content
    :param markdown_content: content in markdown format
    :param links: dictionary containing the links extracted from content
    :return: list of chunks
    """

    # list of chunks
    chunks = []

    # chunking is based on the mardown layout
    lines = markdown_content.split("\n")

    # current chunk
    current_id = -1
    def default_chunk():
        return {
            "id": current_id + 1,
            "unique_id": str(uuid.uuid4()),
            "content": "",
            "parent_id": None,
            "parent_unique_id": None,
            "url_links": [],
            "is_root": False,
            "kind": "chunk"
        }

    # init the different artifacts
    current_chunk = default_chunk()
    hierarchy_ids = dict()
    hierarchy_unique_ids = dict()

    # fill the chunks line by line
    for i, line in enumerate(lines):
        if len(line)>0:

            # Markdown layour drives the chunks content
            hashtag_count = line[:10].count("#")

            if hashtag_count == 0:
                current_chunk['content'] += "\n" + line
                
                # gather the found links
                for link in links:
                    if link in line:
                        current_chunk['url_links'].append(links[link])
            else:
                current_chunk['url_links'] = list(set(current_chunk['url_links']))
                chunks.append(current_chunk)
                current_chunk = default_chunk()
                current_chunk['content'] += line + "\n"
                hierarchy_ids[hashtag_count] = current_chunk['id']
                hierarchy_unique_ids[hashtag_count] = current_chunk['unique_id']

                if hashtag_count == 1:
                    current_chunk['is_root'] = True
                else:
                    if hashtag_count == 2:
                        current_chunk['parent_id'] = hierarchy_ids[hashtag_count-1]
                        current_chunk['parent_unique_id'] = hierarchy_unique_ids[hashtag_count-1]
                    elif hashtag_count > 2:
                        if hashtag_count-1 in hierarchy_ids:
                            current_chunk['parent_id'] = hierarchy_ids[hashtag_count-1]
                            current_chunk['parent_unique_id'] = hierarchy_unique_ids[hashtag_count-1]
                        else:
                            current_chunk['parent_id'] = hierarchy_ids[min(hierarchy_ids.keys())]
                            current_chunk['parent_unique_id'] = hierarchy_unique_ids[min(hierarchy_ids.keys())]
                
                # gather the found links
                for link in links:
                    if link in line:
                        current_chunk['url_links'].append(links[link])
                
                # incrementing the id counter for next chunk
                current_id = current_chunk['id']
    chunks.append(current_chunk)
    return chunks[1:]

In [34]:
def extract_from_page(url: str) -> dict:
    """
    Extract knowledge from Conditions Page.
    :param url: url of the condition page
    :return: content and metadata as dictionary
    """

    # prefix for the links
    prefix = "https://www.seattlechildrens.org"

    # get the source of the page
    response = requests.get(url)
    response.raise_for_status()  # Raise an exception for bad status codes
    soup = BeautifulSoup(response.content, 'html.parser')

    # extract the title of the page
    title = soup.find('div', class_="mod page-title")
    if title:
        title = title.find('h1').text
        title_content = f"# {title}"
    else:
        title = "Title Not Found"

    # treate the accordeons
    div_elements = soup.find_all('div', class_='accordion-header heading4 js-accordion-header')
    for div_tag in div_elements:
        new_h3_tag = soup.new_tag('h3')
        new_h3_tag.extend(div_tag.contents)
        div_tag.replace_with(new_h3_tag)
    div_elements = soup.find_all('div', class_='accordion-more js-accordion-more')
    for div_tag in div_elements:
        div_tag.unwrap()
    uv_elements = soup.find_all('ul', class_='accordion accordion--classic js-accordion')
    for uv_element in uv_elements:
        uv_element.unwrap()
    li_elements = soup.find_all('li', class_='accordion-item js-accordion-item')
    for li_element in li_elements:
        li_element.unwrap()

    # extract the main content of the page
    main_content = soup.find('div', class_="main-content-body")

    if main_content:

        markdown_content = convert_to_markdown(
            main_content,
            heading_style='atx',
            escape_asterisks=True
        )
        
        link_regex = re.compile(r'(!?\[[(^)\]]*\])\((.*?)\)')
        link_matches = link_regex.findall(markdown_content)
        if link_matches:
            for link_match in link_matches:
                target = link_match[1]
                prefixed_target = prefix + target
                pattern = r"\(" + target.replace("/", "\\/") + "\\)"
                markdown_content = re.sub(pattern, f"({prefixed_target})", markdown_content)
    else:
        markdown_content = "Main content not found"

    # copyrights
    copyright = soup.find('p', class_="ho-psmall")
    if copyright:
        copyright = copyright.text
    else:
        copyright = "Copyright not found"

    # extract links
    def clean_link(link_value: str):
        link_value = link_value.replace("\r\n", "").strip()
        return link_value

    if main_content:
        links = {
            clean_link(a.text): prefix+a.get('href') for a in main_content.find_all('a', href=True)
        }

    # create the page content dictionary
    page_content = {
        "unique_id": str(uuid.uuid4()),
        "title": title,
        "content": title_content + markdown_content,
        "copyright": copyright,
        "chunks": chunk_markdown(title_content + markdown_content, links),
        "kind": "page"
    }

    return page_content


### 02-02- Extraction Test

Let's test the code on a single Condition page.

In [35]:
test = extract_from_page(conditions_pages['22q11.2-Related Disorders'])

In [None]:
test

In [37]:
with open("../data/one_condition.jsonl", "w") as fp:
    json.dump(test, fp)

In [31]:
for k, v in test.items():
    if k != 'chunks' and k!= 'content':
        pprint.pprint(f"{k}: {v}")

'unique_id: bf7b5040-7ddb-4202-90dd-d76d5a5f5c69'
'title: 22q11.2-Related Disorders'
'copyright: Copyright not found'
'kind: page'


In [32]:
pprint.pprint(test['chunks'])

[{'content': '# 22q11.2-Related Disorders\n',
  'id': 0,
  'is_root': True,
  'kind': 'chunk',
  'parent_id': None,
  'unique_id': '54e0aa58-83aa-4843-96ba-c6c7f8c5b16f',
  'url_links': []},
 {'content': '## What are 22q11\\.2\\-related disorders?\n'
             '\n'
             '22q11\\.2\\-related disorders are caused by differences in part '
             'of chromosome 22, called the q11\\.2 region. Chromosomes contain '
             'genes, which tell our cells how to work and what proteins to '
             'make. There are 23 pairs of chromosomes in each cell of the '
             'body.\n'
             '22q11\\.2\\-related disorders happen in at least 1 in 1,000 '
             'newborns.\n'
             'The symptoms differ widely, even among members of the same '
             'family. There may be small differences in how your child’s '
             'eyelids, nose and ears look.\n'
             'These conditions are linked to many health issues. They can '
             'affec

### 02-03- Full Extraction

Let's extract the content of all the conditions pages available in the collection.

In [38]:
pages_content = []
pages_w_issue = []
for condition, page_url in tqdm.tqdm(conditions_pages.items()):
    if condition != "Conditions" and condition != "\r\n        All Symptoms\r\n    ":
        try:
            pages_content.append(extract_from_page(page_url))
        except:
            pages_w_issue.append(condition)

100%|██████████| 392/392 [03:06<00:00,  2.10it/s]


In [39]:
print(f"{len(pages_content)} Condition pages extracted...")

389 Condition pages extracted...


A single page has trouble with extraction, because it is for formatted following the common pattern. It can be excluded for the moment.

In [40]:
pages_w_issue

['Mental Health Problems']

Let's save the page content dict as a JSON file.

In [41]:
jsonl_content = ""
for page_content in pages_content:
    if "unique_id" not in page_content.keys():
        print(page_content)
    jsonl_content += json.dumps(page_content) + "\n"


with open("../data/pages_content.jsonl", "w") as fp:
    fp.write(jsonl_content)

## 03- Knowledge Enrichmnent

In this section, the extracted knowledge is enriched by different insights, keywords, and summary content creation.

In [6]:
# let's reload a condition that we have just extracted
with open("../data/one_condition.jsonl") as fp:
    a_condition = json.load(fp)

In [7]:
a_condition['content']

'# 22q11.2-Related Disorders\n\n\n\n\n\n## What are 22q11\\.2\\-related disorders?\n\n\n\n\n22q11\\.2\\-related disorders are caused by differences in part of chromosome 22, called the q11\\.2 region. Chromosomes contain genes, which tell our cells how to work and what proteins to make. There are 23 pairs of chromosomes in each cell of the body.\n\n\n22q11\\.2\\-related disorders happen in at least 1 in 1,000 newborns.\n\n\nThe symptoms differ widely, even among members of the same family. There may be small differences in how your child’s eyelids, nose and ears look.\n\n\nThese conditions are linked to many health issues. They can affect your child’s growth, feeding, breathing, speaking, hearing, learning and mental health. But most children with 22q11\\.2\\-related disorders only have problems in some of these areas.\n\n\n\n\n\n### What causes 22q11\\.2\\-related disorders?\n\n\n\n\nThese disorders happen because of changes in the 22q11\\.2 part of chromosome 22\\. There may be extra

All the enrichment is done with the help of Gemini LLMs. So let's first create a genai client.

In [8]:
genai_client = genai.Client(
    vertexai=True,
    project="petroglyphs-nlp",
    location="us-central1"
)

For the simplest tasks, we use Gemini 2.5 Flash Lite. For the more complex one, we use Gemini 2.5 Pro.

In [9]:
simple_tasks_model = "gemini-2.5-flash-lite"
complex_tasks_model = "gemini-2.5-pro"

In [25]:
simple_tasks_thinking_budget = types.ThinkingConfig(
    thinking_budget=0
)
complex_tasks_thinking_budget = types.ThinkingConfig(
    thinking_budget=8092
)

In [26]:
safety_settings = [
    types.SafetySetting(
        category="HARM_CATEGORY_HATE_SPEECH",
        threshold="OFF"
    ),
    types.SafetySetting(
        category="HARM_CATEGORY_DANGEROUS_CONTENT",
        threshold="OFF"
    ),
    types.SafetySetting(
        category="HARM_CATEGORY_SEXUALLY_EXPLICIT",
        threshold="OFF"
    ),
    types.SafetySetting(
        category="HARM_CATEGORY_HARASSMENT",
        threshold="OFF"
    )
]

In [27]:
simple_tasks_generate_content_config = types.GenerateContentConfig(
    temperature=1,
    top_p=0.95,
    max_output_tokens=65535,
    safety_settings=safety_settings,
    thinking_config=simple_tasks_thinking_budget
)

complex_tasks_generate_content_config = types.GenerateContentConfig(
    temperature=1,
    top_p=0.95,
    max_output_tokens=65535,
    safety_settings=safety_settings,
    thinking_config=complex_tasks_thinking_budget
)

### 03-01- Page Summary creation

#### 03-01-01- Prompt Definition

Gemini creates a summary for each page content. These summaries will be encoded as dense representations. They can be used for condition retrieval and/or chunks reranking.

In [13]:
prompt = types.Part.from_text(text="Summarize the following text in 5 sentences.")
content = types.Part.from_text(text=a_condition['content'])
system_instruction=[types.Part.from_text(text="You are a medical expert summarizing teaching material for public access.")]
simple_tasks_generate_content_config.system_instruction = system_instruction

In [14]:
response = genai_client.models.generate_content(
    model = simple_tasks_model,
    contents = [prompt, content],
    config = simple_tasks_generate_content_config
)

pprint.pprint(response.text)

('22q11.2-related disorders are genetic conditions caused by changes in a '
 'specific region of chromosome 22. These disorders can lead to a wide range '
 'of symptoms, affecting physical features, growth, feeding, breathing, '
 'speech, hearing, learning, and mental health. The specific symptoms and '
 'their severity can vary greatly, even among family members. Diagnosis '
 'involves genetic testing, and management is typically coordinated through '
 'specialized clinics that offer personalized care plans. Treatment strategies '
 "are tailored to address the individual's specific medical needs, often "
 'involving a team of various specialists.')


#### 03-01-02- Batch Generation

Let's implement the summarization of all pages content using the Gemini batch predicting capacity.

In [15]:
# let's reload all conditions that we have just extracted
all_conditions = []
with open("../data/pages_content.jsonl") as fp:
    for line in fp.readlines():
        all_conditions.append(json.loads(line))

In [16]:
print(f"{len(all_conditions)} condition pages have been loaded...")

389 condition pages have been loaded...


The prompt corresponds to a summarization task.

In [54]:
prompt = "Summarize the following text in 5 sentences."

- We need to serialize the generation configuration parameters in order to use them in the batch prediction request template.
- We need to remove the system instruction and the safety setting from this configuration setup, as the batch predicting template is expected them in separate fields.

In [None]:
simple_tasks_generate_content_config_dict = simple_tasks_generate_content_config.to_json_dict()
system_instruction = simple_tasks_generate_content_config_dict.pop('system_instruction')
safety_settings = simple_tasks_generate_content_config_dict.pop('safety_settings')
simple_tasks_generate_content_config_str = str(simple_tasks_generate_content_config_dict).replace("'", '"')

In [56]:
safety_settings = json.dumps(safety_settings).replace("'", '"')

This is the batch prediction template string that we are using to create the JSONL file.

In [57]:
template_generate_text = '{{"page_id": "{0}", "request":{{"contents": [{{"role": "user", "parts": [{{"text": "{1}"}}, {{"text": "{2}"}}]}}], "generationConfig": {3}, "systemInstruction": {{"role": "user", "parts": [{{"text": "{4}"}}]}}, "safetySettings": {5}}}}}'

Let's use it for the entire collection of Condition pages.

In [58]:
requests_jsonl = ''
for page in tqdm.tqdm(all_conditions):
    additional_line = template_generate_text.format(
        page['unique_id'], 
        prompt, 
        re.sub(r'[\x00-\x1F\x7F-\x9F]', '', page['content'].replace('"', "'")).replace("\\", ""),
        simple_tasks_generate_content_config_str,
        system_instruction,
        safety_settings
    )
    requests_jsonl += additional_line + '\n'

# remove the last empty line
requests_jsonl = requests_jsonl[:-1]

100%|██████████| 389/389 [00:00<00:00, 1574.66it/s]


In [59]:
# detect any anomaly in the jsonl string
for i, line in enumerate(requests_jsonl.split("\n")):
    try:
        _ = json.loads(line)
    except:
        print(line)
        print(i)
        break

Now it is time to create the Gemini batch predicting request.

In [60]:
batch_processing_bucket_name = "gemini_batch_predicting_bucket"
storage_client = storage.Client()
batch_processing_bucket = storage_client.bucket(batch_processing_bucket_name)
blob = batch_processing_bucket.blob("requests_summarization_jsonl.jsonl")
blob.upload_from_string(requests_jsonl)

Let's run the batch generation now...

In [None]:
input_uri =  "gs://gemini_batch_predicting_bucket/requests_summarization_jsonl.jsonl"
output_uri = "gs://gemini_batch_predicting_bucket/request_responses/summarization"

MODEL_NAME = simple_tasks_model

# Submit a batch prediction job with Gemini model
batch_prediction_job = genai_client.batches.create(
    model=MODEL_NAME,
    src=input_uri,
    config=types.CreateBatchJobConfig(dest=output_uri),
)

# Check job status
print(f"Job name: {batch_prediction_job.name}")
print(f"Job state: {batch_prediction_job.state}")

completed_states = {
    types.JobState.JOB_STATE_SUCCEEDED,
    types.JobState.JOB_STATE_FAILED,
    types.JobState.JOB_STATE_CANCELLED,
    types.JobState.JOB_STATE_PAUSED,
}

# Refresh the job until complete
while batch_prediction_job.state not in completed_states:
    time.sleep(60)
    batch_prediction_job = genai_client.batches.get(name=batch_prediction_job.name)
    print(f"Job state: {batch_prediction_job.state}")

# Check if the job succeeds
if batch_prediction_job.state == types.JobState.JOB_STATE_SUCCEEDED:
    print("Job succeeded!")
else:
    print(f"Job failed: {batch_prediction_job.error}")

Job name: projects/1075834949123/locations/us-central1/batchPredictionJobs/858955337070280704
Job state: JobState.JOB_STATE_PENDING
Job state: JobState.JOB_STATE_RUNNING
Job state: JobState.JOB_STATE_RUNNING
Job state: JobState.JOB_STATE_RUNNING
Job state: JobState.JOB_STATE_RUNNING
Job state: JobState.JOB_STATE_SUCCEEDED
Job succeeded!


In [27]:
batch_prediction_job

BatchJob(
  create_time=datetime.datetime(2025, 8, 22, 0, 43, 10, 148707, tzinfo=TzInfo(UTC)),
  dest=BatchJobDestination(
    format='jsonl',
    gcs_uri='gs://gemini_batch_predicting_bucket/request_responses'
  ),
  display_name='genai_batch_job_20250821174309_c4d2f',
  end_time=datetime.datetime(2025, 8, 22, 0, 47, 35, 632049, tzinfo=TzInfo(UTC)),
  model='publishers/google/models/gemini-2.5-flash-lite',
  name='projects/1075834949123/locations/us-central1/batchPredictionJobs/858955337070280704',
  src=BatchJobSource(
    format='jsonl',
    gcs_uri=[
      'gs://gemini_batch_predicting_bucket/requests_jsonl.jsonl',
    ]
  ),
  start_time=datetime.datetime(2025, 8, 22, 0, 44, 5, 677003, tzinfo=TzInfo(UTC)),
  state=<JobState.JOB_STATE_SUCCEEDED: 'JOB_STATE_SUCCEEDED'>,
  update_time=datetime.datetime(2025, 8, 22, 0, 47, 35, 632049, tzinfo=TzInfo(UTC))
)

Let's retrieve the responses generated by Gemini.

In [None]:
batch_prediction_blob_name = "request_responses/summarization/prediction-model-2025-08-22T00:43:10.096963Z/predictions.jsonl"
batch_prediction_str = batch_processing_bucket.blob(batch_prediction_blob_name).download_as_bytes().decode('utf-8')

responses = []
for response in batch_prediction_str.split("\n"):
    try:
      responses.append(json.loads(response))
    except:
      responses.append(None)

# remove the extra line
responses = responses[:-1]

In [29]:
print(f"{len(responses)} summaries have been generated ...")

389 summaries have been generated ...


Now we can update each page dictionary wit its associated summary.

In [30]:
responses_dict = {response['page_id']: response["response"]['candidates'][0]['content']['parts'][0]['text'] for response in responses}

In [31]:
for page in tqdm.tqdm(all_conditions):
    page['summary'] = responses_dict[page['unique_id']]

100%|██████████| 389/389 [00:00<00:00, 371743.96it/s]


Let's have a look at a condition from all conditions.

In [None]:
all_conditions[10]

If everything looks good, we can update the pages content file.

In [36]:
jsonl_content = ""
for all_condition in all_conditions:
    if "unique_id" not in all_condition.keys():
        print(all_condition)
    jsonl_content += json.dumps(all_condition) + "\n"


with open("../data/pages_content.jsonl", "w") as fp:
    fp.write(jsonl_content)

And upload it to Google Cloud bucket for data resilience.

In [38]:
# upload the pages.json to GSC bucket
! gsutil cp "../data/pages_content.jsonl" gs://sch_knowledge_management_system/conditions_pages/pages_content.jsonl

Copying file://..\data\pages_content.jsonl [Content-Type=application/octet-stream]...
/ [0 files][    0.0 B/ 10.9 MiB]                                                
-
- [0 files][  4.7 MiB/ 10.9 MiB]                                                
\
|
| [0 files][  9.4 MiB/ 10.9 MiB]                                                
| [1 files][ 10.9 MiB/ 10.9 MiB]                                                
/

Operation completed over 1 objects/10.9 MiB.                                     


### 03-02- Keywords extraction

#### 03-02-01- Prompt Definition

Gemini extracts the keywords for each page content. These keywords can be used as sparse representations of the condition pages.

In [28]:
response_schema = {
    "type": "OBJECT",
    "properties": {
        "keywords": {
            "type": "ARRAY",
            "items": {"type": "STRING"}
        }
    }
}

In [29]:
prompt = types.Part.from_text(text="Extract the 10 most important keywords from the following text.")
content = types.Part.from_text(text=a_condition['content'])
system_instruction=[types.Part.from_text(text="You are a medical expert extracting knowledge from teaching material.")]

simple_tasks_generate_keywords_config = simple_tasks_generate_content_config.copy()
simple_tasks_generate_keywords_config.system_instruction = system_instruction
simple_tasks_generate_keywords_config.response_mime_type = "application/json"
simple_tasks_generate_keywords_config.response_schema = response_schema

C:\Users\j_mas\AppData\Local\Temp\ipykernel_1717616\1391719190.py:5: PydanticDeprecatedSince20: The `copy` method is deprecated; use `model_copy` instead. See the docstring of `BaseModel.copy` for details about how to handle `include` and `exclude`. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
  simple_tasks_generate_keywords_config = simple_tasks_generate_content_config.copy()


In [30]:
response = genai_client.models.generate_content(
    model = simple_tasks_model,
    contents = [prompt, content],
    config = simple_tasks_generate_keywords_config
)

pprint.pprint(response.text)

('{\n'
 '  "keywords": [\n'
 '    "22q11.2-related disorders",\n'
 '    "chromosome 22",\n'
 '    "genetic syndrome",\n'
 '    "DiGeorge syndrome",\n'
 '    "Velocardiofacial syndrome",\n'
 '    "cleft palate",\n'
 '    "heart problems",\n'
 '    "immune system",\n'
 '    "developmental delays",\n'
 '    "genetic counseling"\n'
 '  ]\n'
 '}')


#### 03-02-02- Batch Generation

Let's implement the summarization of all pages content using the Gemini batch predicting capacity.

In [34]:
# let's reload all conditions that we have just extracted
all_conditions = []
with open("../data/pages_content.jsonl") as fp:
    for line in fp.readlines():
        all_conditions.append(json.loads(line))

In [35]:
print(f"{len(all_conditions)} condition pages have been loaded...")

389 condition pages have been loaded...


The prompt corresponds to a keywords extraction task. We do not precise in the prompt the format of the output because we are using structured output to ensure that Gemini always outputs a list of keywords as a Pydantic list of string objects.

In [36]:
prompt = "Extract the 10 most important keywords from the following text."

- We need to serialize the generation configuration parameters in order to use them in the batch prediction request template.
- We need to remove the system instruction and the safety setting from this configuration setup, as the batch predicting template is expected them in separate fields.

In [37]:
simple_tasks_generate_keywords_config_dict = simple_tasks_generate_keywords_config.to_json_dict()
system_instruction = simple_tasks_generate_keywords_config_dict.pop('system_instruction')
safety_settings = simple_tasks_generate_keywords_config_dict.pop('safety_settings')
simple_tasks_generate_keywords_config_str = str(simple_tasks_generate_keywords_config_dict).replace("'", '"')

In [38]:
safety_settings = json.dumps(safety_settings).replace("'", '"')

This is the batch prediction template string that we are using to create the JSONL file.

In [39]:
template_generate_text = '{{"page_id": "{0}", "request":{{"contents": [{{"role": "user", "parts": [{{"text": "{1}"}}, {{"text": "{2}"}}]}}], "generationConfig": {3}, "systemInstruction": {{"role": "user", "parts": [{{"text": "{4}"}}]}}, "safetySettings": {5}}}}}'

Let's use it for the entire collection of Condition pages.

In [41]:
requests_jsonl = ''
for page in tqdm.tqdm(all_conditions):
    additional_line = template_generate_text.format(
        page['unique_id'], 
        prompt, 
        re.sub(r'[\x00-\x1F\x7F-\x9F]', '', page['content'].replace('"', "'")).replace("\\", ""),
        simple_tasks_generate_keywords_config_str,
        system_instruction,
        safety_settings
    )
    requests_jsonl += additional_line + '\n'

# remove the last empty line
requests_jsonl = requests_jsonl[:-1]

100%|██████████| 389/389 [00:00<00:00, 1383.83it/s]


In [42]:
# detect any anomaly in the jsonl string
for i, line in enumerate(requests_jsonl.split("\n")):
    try:
        _ = json.loads(line)
    except:
        print(line)
        print(i)
        break

Now it is time to create the Gemini batch predicting request.

In [43]:
batch_processing_bucket_name = "gemini_batch_predicting_bucket"
storage_client = storage.Client()
batch_processing_bucket = storage_client.bucket(batch_processing_bucket_name)
blob = batch_processing_bucket.blob("requests_keywords_jsonl.jsonl")
blob.upload_from_string(requests_jsonl)

Let's run the batch generation now...

In [44]:
input_uri =  "gs://gemini_batch_predicting_bucket/requests_keywords_jsonl.jsonl"
output_uri = "gs://gemini_batch_predicting_bucket/request_responses/keywords"

MODEL_NAME = simple_tasks_model

# Submit a batch prediction job with Gemini model
batch_prediction_job = genai_client.batches.create(
    model=MODEL_NAME,
    src=input_uri,
    config=types.CreateBatchJobConfig(dest=output_uri),
)

# Check job status
print(f"Job name: {batch_prediction_job.name}")
print(f"Job state: {batch_prediction_job.state}")

completed_states = {
    types.JobState.JOB_STATE_SUCCEEDED,
    types.JobState.JOB_STATE_FAILED,
    types.JobState.JOB_STATE_CANCELLED,
    types.JobState.JOB_STATE_PAUSED,
}

# Refresh the job until complete
while batch_prediction_job.state not in completed_states:
    time.sleep(60)
    batch_prediction_job = genai_client.batches.get(name=batch_prediction_job.name)
    print(f"Job state: {batch_prediction_job.state}")

# Check if the job succeeds
if batch_prediction_job.state == types.JobState.JOB_STATE_SUCCEEDED:
    print("Job succeeded!")
else:
    print(f"Job failed: {batch_prediction_job.error}")

Job name: projects/1075834949123/locations/us-central1/batchPredictionJobs/2428993620104183808
Job state: JobState.JOB_STATE_PENDING
Job state: JobState.JOB_STATE_QUEUED
Job state: JobState.JOB_STATE_RUNNING
Job state: JobState.JOB_STATE_RUNNING
Job state: JobState.JOB_STATE_RUNNING
Job state: JobState.JOB_STATE_SUCCEEDED
Job succeeded!


In [45]:
batch_prediction_job

BatchJob(
  create_time=datetime.datetime(2025, 8, 24, 23, 38, 53, 726216, tzinfo=TzInfo(UTC)),
  dest=BatchJobDestination(
    format='jsonl',
    gcs_uri='gs://gemini_batch_predicting_bucket/request_responses/keywords'
  ),
  display_name='genai_batch_job_20250824163853_548b8',
  end_time=datetime.datetime(2025, 8, 24, 23, 43, 45, 139710, tzinfo=TzInfo(UTC)),
  model='publishers/google/models/gemini-2.5-flash-lite',
  name='projects/1075834949123/locations/us-central1/batchPredictionJobs/2428993620104183808',
  src=BatchJobSource(
    format='jsonl',
    gcs_uri=[
      'gs://gemini_batch_predicting_bucket/requests_keywords_jsonl.jsonl',
    ]
  ),
  start_time=datetime.datetime(2025, 8, 24, 23, 40, 16, 71474, tzinfo=TzInfo(UTC)),
  state=<JobState.JOB_STATE_SUCCEEDED: 'JOB_STATE_SUCCEEDED'>,
  update_time=datetime.datetime(2025, 8, 24, 23, 43, 45, 139710, tzinfo=TzInfo(UTC))
)

Let's retrieve the responses generated by Gemini.

In [46]:
batch_prediction_blob_name = "request_responses/keywords/prediction-model-2025-08-24T23:38:53.689380Z/predictions.jsonl"
batch_prediction_str = batch_processing_bucket.blob(batch_prediction_blob_name).download_as_bytes().decode('utf-8')

responses = []
for response in batch_prediction_str.split("\n"):
    try:
      responses.append(json.loads(response))
    except:
      responses.append(None)

# remove the extra line
responses = responses[:-1]

In [47]:
print(f"{len(responses)} keywords collections have been generated ...")

389 keywords collections have been generated ...


Now we can update each page dictionary wit its associated keywords.

In [48]:
responses_dict = {response['page_id']: response["response"]['candidates'][0]['content']['parts'][0]['text'] for response in responses}

In [64]:
for page in tqdm.tqdm(all_conditions):
    keywords = json.loads(responses_dict[page['unique_id']])
    page.update(keywords)

100%|██████████| 389/389 [00:00<00:00, 389213.80it/s]


Let's have a look at a condition from all conditions.

In [65]:
all_conditions[0]

{'unique_id': '96154880-6cda-4e6d-84c0-7a50a38aa246',
 'title': '22q11.2-Related Disorders',
 'content': '# 22q11.2-Related Disorders\n\n\n\n\n\n## What are 22q11\\.2\\-related disorders?\n\n\n\n\n22q11\\.2\\-related disorders are caused by differences in part of chromosome 22, called the q11\\.2 region. Chromosomes contain genes, which tell our cells how to work and what proteins to make. There are 23 pairs of chromosomes in each cell of the body.\n\n\n22q11\\.2\\-related disorders happen in at least 1 in 1,000 newborns.\n\n\nThe symptoms differ widely, even among members of the same family. There may be small differences in how your child’s eyelids, nose and ears look.\n\n\nThese conditions are linked to many health issues. They can affect your child’s growth, feeding, breathing, speaking, hearing, learning and mental health. But most children with 22q11\\.2\\-related disorders only have problems in some of these areas.\n\n\n\n\n\n### What causes 22q11\\.2\\-related disorders?\n\n\n\

If everything looks good, we can update the pages content file.

In [66]:
jsonl_content = ""
for all_condition in all_conditions:
    if "unique_id" not in all_condition.keys():
        print(all_condition)
    jsonl_content += json.dumps(all_condition) + "\n"


with open("../data/pages_content.jsonl", "w") as fp:
    fp.write(jsonl_content)

In [67]:
# upload the pages.json to GSC bucket
! gsutil cp "../data/pages_content.jsonl" gs://sch_knowledge_management_system/conditions_pages/pages_content.jsonl

Copying file://..\data\pages_content.jsonl [Content-Type=application/octet-stream]...
/ [0 files][    0.0 B/ 11.0 MiB]                                                
-
- [0 files][  4.1 MiB/ 11.0 MiB]                                                
\
|
| [0 files][  9.4 MiB/ 11.0 MiB]                                                
| [1 files][ 11.0 MiB/ 11.0 MiB]                                                
/

Operation completed over 1 objects/11.0 MiB.                                     


## 04- GraphRag Approach

### 04-01- Atomic Facts

Here, we enrich the chunks with atomic facts and key elements:

- `Atomic Fact`: The smallest, indivisible facts, presented as concise sentences. These include propositions, theories, existences, concepts, and implicit elements like logic, causality, event sequences, drug-condition relationships, timelines, etc.

- `Key Elements`: The essential nouns (e.g., disease, medical condition, drugs, people, numbers), verbs (e.g., actions, consequences), and adjectives (e.g., severity, impact) that are pivotal to the text’s narrative.

In [47]:
rich_chunks = [c['content'] for c in a_condition['chunks'] if len(c['content'])>30]

In [48]:
class AtomicFact(BaseModel):

    topics: str = Field(
        description="Topics tag the Atomic Fact and is about the 2 main topics discussed in its atomic_fact."
    )

    key_elements: List[str] = Field(
        description="""The essential nouns (e.g., disease, medical condition, drugs, people, numbers), verbs (e.g.,
        actions, consequences), and adjectives (e.g., severity, impact) that are pivotal to the text’s narrative."""
    )
    
    atomic_fact: str = Field(
        description="""The smallest, indivisible facts, presented as concise sentences. These include
        propositions, theories, existences, concepts, and implicit elements like logic, causality, event
        sequences, drug-condition relationships, timelines, etc."""
    )

class Extraction(BaseModel):
    atomic_facts: List[AtomicFact] = Field(description="List of atomic facts")

In [49]:
construction_system = """
You are now an intelligent assistant tasked with meticulously extracting both key elements and
atomic facts from a long text.

1. Key Elements: The essential nouns (e.g., disease, medical condition, drugs, substances, people, numbers, date, ...), verbs (e.g.,
actions, consequences, ...), and adjectives (e.g., severity, impact, ...) that are pivotal to the text’s narrative.

2. Atomic Facts: The smallest, indivisible facts, presented as concise sentences. These include
propositions, theories, existences, concepts, and implicit elements like logic, causality, event
sequences, discoveries, drug-condition relationships, timelines, etc. An atomic fact has a label which tags the
main topic discussed in its text.

Requirements:
#####
1. Ensure that all identified atomic facts have at least one key element.

2. Ensure that all identified key elements are reflected within the corresponding atomic facts.

3. You should extract key elements and atomic facts comprehensively, especially those that are
important and potentially query-worthy and do not leave out details.

4. Whenever applicable, replace pronouns with their specific noun counterparts (e.g., change I, He,
She to actual names).

5. Ensure that the key elements and atomic facts you extract are presented in the same language as
the original text (e.g., English or Chinese).

6. Ensure that each Atomic Fact has list of topics corresponding to the 2 main topic discussed in its text.
"""

In [53]:
prompt = types.Part.from_text(text="Use the given format to extract information from the following content:")
content = types.Part.from_text(text=rich_chunks[0])
system_instruction=[types.Part.from_text(text=construction_system)]
complex_tasks_generate_content_config.system_instruction = system_instruction
complex_tasks_generate_content_config.response_mime_type="application/json"
complex_tasks_generate_content_config.response_schema = Extraction


In [55]:
response = genai_client.models.generate_content(
    model = complex_tasks_model,
    contents = [prompt, content],
    config = complex_tasks_generate_content_config
)

insight = response.parsed

In [60]:
for af in insight.atomic_facts:
    print(af.atomic_fact)
    print("-" + af.topics)
    for ke in af.key_elements:
        print("----" + ke)

22q11.2-related disorders are caused by differences in a part of chromosome 22 known as the q11.2 region.
-22q11.2-related disorders, Chromosome 22
----22q11.2-related disorders
----caused by
----differences
----chromosome 22
----q11.2 region
Chromosomes contain genes, which instruct cells on how to function and what proteins to produce.
-Chromosomes, Genes
----Chromosomes
----genes
----cells
----proteins
There are 23 pairs of chromosomes in each cell of the body.
-Chromosomes, Cells
----23 pairs of chromosomes
----cell
----body
22q11.2-related disorders occur in at least 1 in 1,000 newborns.
-22q11.2-related disorders, Prevalence
----22q11.2-related disorders
----1 in 1,000
----newborns
The symptoms of 22q11.2-related disorders differ widely, even among members of the same family.
-22q11.2-related disorders, Symptoms
----symptoms
----differ widely
----members of the same family
There may be small differences in the appearance of a child's eyelids, nose, and ears with 22q11.2-related d

These actomic facts and associated key elements can be added into the graph, for each chunk node. The actomic facts are the first knowledge elements concerned by the retrieval process, and should encoded using a dense embedding model.

### 04-02- Generated Questions from Atomic Facts

A recurrent question about the RAG approach is about comparing embedding of text chunks and questions. From a theorical point of view, we should better compare the user's question and a list of anticipated questions first, and then, retrieve the text chunks containing the context needed to answer the anticipated questions found the most similar to the user's question.

The anticipated questions can be created from the Atomic Facts, using Gemini 2.5 Flash Lite. It is not a complex task.

In [72]:
class QA(BaseModel):
    question: str
    answer: str

In [80]:
prompt = types.Part.from_text(text="Extract all relevant questions and the associated answers from the following text. Just return them as a list. Nothing else.")
content = types.Part.from_text(text=insight.atomic_facts[5].atomic_fact)
system_instruction=[types.Part.from_text(text="You are a medical expert extracting knowledge from teaching material.")]
simple_tasks_generate_content_config.system_instruction = system_instruction
simple_tasks_generate_content_config.response_mime_type="application/json"
simple_tasks_generate_content_config.response_schema = QA

In [81]:
response = genai_client.models.generate_content(
    model = simple_tasks_model,
    contents = [prompt, content],
    config = simple_tasks_generate_content_config
)

pprint.pprint(response.text)

('{\n'
 '"question": "What differences may be observed in a child\'s eyelids, nose, '
 'and ears with 22q11.2-related disorders?",\n'
 '"answer": "There may be small differences in the appearance of a child\'s '
 'eyelids, nose, and ears with 22q11.2-related disorders."\n'
 '}')


The generated questions and answers can also be used for creating a Q&A repository kept in a cache and used as an entry point by the answering engine.