# 02 - Separate Extract and Build

## Setup

If you haven't already, install the toolkit and dependencies using the [Setup](./00-Setup.ipynb) notebook.

## Local extract to folder

See [Run the extract and build stages separately](https://github.com/awslabs/graphrag-toolkit/blob/main/docs/lexical-graph/indexing.md#run-the-extract-and-build-stages-separately)

In [None]:
%reload_ext dotenv
%dotenv

import os

from graphrag_toolkit.lexical_graph.storage.graph.falkordb import FalkorDBGraphStoreFactory
from graphrag_toolkit.lexical_graph import LexicalGraphIndex, set_logging_config
from graphrag_toolkit.lexical_graph.storage import GraphStoreFactory
from graphrag_toolkit.lexical_graph.storage import VectorStoreFactory
from graphrag_toolkit.lexical_graph.indexing.load import FileBasedDocs
from graphrag_toolkit.lexical_graph.indexing.build import Checkpoint

from llama_index.readers.web import SimpleWebPageReader

set_logging_config('INFO')

# Register the FalkorDB backend with the factory
GraphStoreFactory.register(FalkorDBGraphStoreFactory)

extracted_docs = FileBasedDocs(
    docs_directory='extracted'
)

checkpoint = Checkpoint('extraction-checkpoint')

graph_store = GraphStoreFactory.for_graph_store(os.environ['GRAPH_STORE'])
vector_store = VectorStoreFactory.for_vector_store(os.environ['VECTOR_STORE'])

graph_index = LexicalGraphIndex(
    graph_store, 
    vector_store
)

doc_urls = [
    'https://docs.aws.amazon.com/neptune/latest/userguide/intro.html',
    'https://docs.aws.amazon.com/neptune-analytics/latest/userguide/what-is-neptune-analytics.html',
    'https://docs.aws.amazon.com/neptune-analytics/latest/userguide/neptune-analytics-features.html',
    'https://docs.aws.amazon.com/neptune-analytics/latest/userguide/neptune-analytics-vs-neptune-database.html'
]

docs = SimpleWebPageReader(
    html_to_text=True,
    metadata_fn=lambda url:{'url': url}
).load_data(doc_urls)

graph_index.extract(docs, handler=extracted_docs, checkpoint=checkpoint, show_progress=True)

collection_id = extracted_docs.collection_id

print('Extraction complete')
print(f'collection_id: {collection_id}')

## Extraction to S3

See [Run the extract and build stages separately](https://github.com/awslabs/graphrag-toolkit/blob/main/docs/lexical-graph/indexing.md#run-the-extract-and-build-stages-separately)

In [None]:
%reload_ext dotenv
%dotenv

import os

from graphrag_toolkit.lexical_graph.storage.graph.falkordb import FalkorDBGraphStoreFactory
from graphrag_toolkit.lexical_graph import LexicalGraphIndex, set_logging_config
from graphrag_toolkit.lexical_graph.storage import GraphStoreFactory
from graphrag_toolkit.lexical_graph.storage import VectorStoreFactory
from graphrag_toolkit.lexical_graph.indexing.load import S3BasedDocs
from graphrag_toolkit.lexical_graph.indexing.build import Checkpoint

from llama_index.readers.web import SimpleWebPageReader

set_logging_config('INFO')

# Register the FalkorDB backend with the factory
GraphStoreFactory.register(FalkorDBGraphStoreFactory)

extracted_docs = S3BasedDocs(
    region=os.environ['AWS_REGION'],
    bucket_name=os.environ['S3_BUCKET_NAME'],
    key_prefix='key_prefix',
    collection_id='demo123'
)

checkpoint = Checkpoint('s3-extraction-checkpoint')

graph_store = GraphStoreFactory.for_graph_store(os.environ['GRAPH_STORE'])
vector_store = VectorStoreFactory.for_vector_store(os.environ['VECTOR_STORE'])

graph_index = LexicalGraphIndex(
    graph_store,
    vector_store
)

doc_urls = [
    'https://docs.aws.amazon.com/neptune/latest/userguide/intro.html',
    'https://docs.aws.amazon.com/neptune-analytics/latest/userguide/what-is-neptune-analytics.html',
    'https://docs.aws.amazon.com/neptune-analytics/latest/userguide/neptune-analytics-features.html',
    'https://docs.aws.amazon.com/neptune-analytics/latest/userguide/neptune-analytics-vs-neptune-database.html'
]

docs = SimpleWebPageReader(
    html_to_text=True,
    metadata_fn=lambda url:{'url': url}
).load_data(doc_urls)

graph_index.extract(docs, handler=extracted_docs, checkpoint=checkpoint, show_progress=True)

collection_id = extracted_docs.collection_id

print('Extraction complete')
print(f'collection_id: {collection_id}')

## Using batch inference with the LexicalGraphIndex

Ensure you have reviewed batch-extraction.md. For permission creation please see setup-bedrock-batch.md in lexical-graph-hybrid-dev/aws folder.

In [None]:
!pip install PyMuPDF llama-index[pdf]


In [None]:
%reload_ext dotenv
%dotenv

import os

from graphrag_toolkit.lexical_graph import (
    LexicalGraphIndex,
    GraphRAGConfig,
    IndexingConfig,
    set_logging_config,
)
from graphrag_toolkit.lexical_graph.storage import (
    GraphStoreFactory,
    VectorStoreFactory,
)
from graphrag_toolkit.lexical_graph.storage.graph.falkordb import FalkorDBGraphStoreFactory
from graphrag_toolkit.lexical_graph.indexing.extract import BatchConfig
from graphrag_toolkit.lexical_graph.indexing.build import Checkpoint

from llama_index.core import SimpleDirectoryReader
from llama_index.readers.file import PyMuPDFReader

def batch_extract_and_load():
    set_logging_config("INFO")

    # Register FalkorDB backend
    GraphStoreFactory.register(FalkorDBGraphStoreFactory)

    # Set batch size
    GraphRAGConfig.extraction_batch_size = int(os.environ.get("EXTRACTION_BATCH_SIZE", 4))

    # Configure batch S3 setup
    batch_config = BatchConfig(
        region=os.environ["AWS_REGION"],
        bucket_name=os.environ["S3_BATCH_BUCKET_NAME"],
        key_prefix=os.environ["BATCH_KEY_PREFIX_01"],
        role_arn=f'arn:aws:iam::{os.environ["AWS_ACCOUNT"]}:role/{os.environ["BATCH_ROLE_NAME"]}',

    )

    indexing_config = IndexingConfig(batch_config=batch_config)
    checkpoint = Checkpoint(os.environ["BATCH_CHECKPOINT_01"])

    graph_store = GraphStoreFactory.for_graph_store(os.environ["GRAPH_STORE"])
    vector_store = VectorStoreFactory.for_vector_store(os.environ["VECTOR_STORE"])

    graph_index = LexicalGraphIndex(
        graph_store,
        vector_store,
        indexing_config=indexing_config
    )

    # Use PyMuPDF for PDFs
    file_extractor = {
        ".pdf": PyMuPDFReader()
    }

    reader = SimpleDirectoryReader(
        input_dir=os.environ["BATCH_SOURCE_DIR"],
        file_extractor=file_extractor
    )
    docs = reader.load_data()

    graph_index.extract(docs, checkpoint=checkpoint, show_progress=True)

# Run the batch job
batch_extract_and_load()


## Using batch inference with the LexicalGraphIndex. Writing to AWS S3 and DynamoDB

In [4]:
%reload_ext dotenv
%dotenv

import os
import boto3
from datetime import datetime, timezone
import time

from graphrag_toolkit.lexical_graph import (
    LexicalGraphIndex,
    GraphRAGConfig,
    IndexingConfig,
    set_logging_config,
)
from graphrag_toolkit.lexical_graph.storage import (
    GraphStoreFactory,
    VectorStoreFactory,
)
from graphrag_toolkit.lexical_graph.storage.graph.falkordb import FalkorDBGraphStoreFactory
from graphrag_toolkit.lexical_graph.indexing.extract import BatchConfig
from graphrag_toolkit.lexical_graph.indexing.build import Checkpoint

from llama_index.core import SimpleDirectoryReader
from llama_index.readers.file import PyMuPDFReader

def batch_extract_and_load():
    set_logging_config("INFO")

    # Register FalkorDB backend
    GraphStoreFactory.register(FalkorDBGraphStoreFactory)

    # Initialize DynamoDB client
    dynamodb = boto3.resource('dynamodb', region_name=os.environ['AWS_REGION'])
    table = dynamodb.Table(os.environ.get('DYNAMODB_NAME'))

    # Get collection_id from environment or generate a fallback
    collection_id = os.environ.get('COLLECTION_ID', f"batch-{int(time.time())}")

    # Check for existing record
    try:
        response = table.query(
            KeyConditionExpression='collection_id = :id',
            ExpressionAttributeValues={
                ':id': collection_id
            }
        )
        items = response.get('Items', [])
        if items:
            existing_status = items[0].get('status', 'UNKNOWN')
            print(f"Extraction for collection_id {collection_id} has already been run with status {existing_status}")
            # Check checkpoint state
            checkpoint = Checkpoint(os.environ["BATCH_CHECKPOINT_01"])
            if existing_status == 'COMPLETED' and checkpoint.is_complete():
                print(f"Checkpoint {os.environ['BATCH_CHECKPOINT_01']} indicates extraction is complete. Skipping.")
                exit(0)
            elif existing_status == 'IN_PROGRESS':
                print(f"Resuming extraction for collection_id {collection_id} with existing checkpoint.")
            else:
                print(f"Previous run failed or incomplete. Resuming with checkpoint.")
    except Exception as e:
        print(f"Error querying DynamoDB for existing record: {str(e)}")
        exit(1)

    # Set batch size
    GraphRAGConfig.extraction_batch_size = int(os.environ.get("EXTRACTION_BATCH_SIZE", 4))

    # Configure batch S3 setup
    batch_config = BatchConfig(
        region=os.environ["AWS_REGION"],
        bucket_name=os.environ["S3_BATCH_BUCKET_NAME"],
        key_prefix=os.environ["BATCH_KEY_PREFIX_01"],
        role_arn=f'arn:aws:iam::{os.environ["AWS_ACCOUNT"]}:role/{os.environ["BATCH_ROLE_NAME"]}',
    )

    indexing_config = IndexingConfig(batch_config=batch_config)
    checkpoint = Checkpoint(os.environ["BATCH_CHECKPOINT_01"])

    graph_store = GraphStoreFactory.for_graph_store(os.environ["GRAPH_STORE"])
    vector_store = VectorStoreFactory.for_vector_store(os.environ["VECTOR_STORE"])

    graph_index = LexicalGraphIndex(
        graph_store,
        vector_store,
        indexing_config=indexing_config
    )

    # Use PyMuPDF for PDFs
    file_extractor = {
        ".pdf": PyMuPDFReader()
    }

    reader = SimpleDirectoryReader(
        input_dir=os.environ["BATCH_SOURCE_DIR"],
        file_extractor=file_extractor
    )
    docs = reader.load_data()

    # Track start time
    start_time = time.time()
    status = 'IN_PROGRESS'
    error_message = None
    completion_date = datetime.now(timezone.utc).isoformat()

    # Write initial record to DynamoDB if no record exists
    try:
        if not items:  # Only create if no record was found
            table.put_item(Item={
                'collection_id': collection_id,
                'completion_date': completion_date,
                'status': status,
                'reader_type': 'PDF',
                's3_bucket': os.environ['S3_BATCH_BUCKET_NAME'],
                's3_key_prefix': os.environ['BATCH_KEY_PREFIX_01'],
                'graph_store': os.environ['GRAPH_STORE'],
                'vector_store': os.environ['VECTOR_STORE'],
                'aws_region': os.environ['AWS_REGION'],
                'start_time': datetime.fromtimestamp(start_time, tz=timezone.utc).isoformat(),
                'duration': 0,
                'document_count': len(docs),
                'error_message': None,
                'checkpoint': os.environ['BATCH_CHECKPOINT_01'],
                'user_id': os.environ.get('USER_ID', 'unknown'),
                'environment_variables': {
                    'EXTRACTION_MODEL': os.environ.get('EXTRACTION_MODEL', ''),
                    'EMBEDDINGS_MODEL': os.environ.get('EMBEDDINGS_MODEL', ''),
                    'EMBEDDINGS_DIMENSIONS': os.environ.get('EMBEDDINGS_DIMENSIONS', ''),
                    'EXTRACTION_BATCH_SIZE': os.environ.get('EXTRACTION_BATCH_SIZE', '4')
                },
                'reader_configuration': {
                    'file_type': 'pdf',
                    'reader_class': 'PyMuPDFReader'
                }
            })
            print(f"Initial DynamoDB record created for collection {collection_id}")
        else:
            # Update existing record to IN_PROGRESS
            table.put_item(Item={
                'collection_id': collection_id,
                'completion_date': completion_date,
                'status': status,
                'reader_type': 'PDF',
                's3_bucket': os.environ['S3_BATCH_BUCKET_NAME'],
                's3_key_prefix': os.environ['BATCH_KEY_PREFIX_01'],
                'graph_store': os.environ['GRAPH_STORE'],
                'vector_store': os.environ['VECTOR_STORE'],
                'aws_region': os.environ['AWS_REGION'],
                'start_time': datetime.fromtimestamp(start_time, tz=timezone.utc).isoformat(),
                'duration': 0,
                'document_count': len(docs),
                'error_message': None,
                'checkpoint': os.environ['BATCH_CHECKPOINT_01'],
                'user_id': os.environ.get('USER_ID', 'unknown'),
                'environment_variables': {
                    'EXTRACTION_MODEL': os.environ.get('EXTRACTION_MODEL', ''),
                    'EMBEDDINGS_MODEL': os.environ.get('EMBEDDINGS_MODEL', ''),
                    'EMBEDDINGS_DIMENSIONS': os.environ.get('EMBEDDINGS_DIMENSIONS', ''),
                    'EXTRACTION_BATCH_SIZE': os.environ.get('EXTRACTION_BATCH_SIZE', '4')
                },
                'reader_configuration': {
                    'file_type': 'pdf',
                    'reader_class': 'PyMuPDFReader'
                }
            })
            print(f"Updated DynamoDB record for collection {collection_id} to IN_PROGRESS")
    except Exception as e:
        print(f"Error creating/updating initial DynamoDB record: {str(e)}")
        exit(1)

    # Perform extraction
    try:
        graph_index.extract(docs, checkpoint=checkpoint, show_progress=True)
        status = 'COMPLETED'
    except Exception as e:
        status = 'FAILED'
        error_message = str(e)
        print(f"Extraction failed: {str(e)}")

    # Update DynamoDB record with final status

    duration = int(time.time() - start_time)

    try:
        table.put_item(Item={
            'collection_id': collection_id,
            'completion_date': completion_date,
            'status': status,
            'reader_type': 'PDF',
            's3_bucket': os.environ['S3_BATCH_BUCKET_NAME'],
            's3_key_prefix': os.environ['BATCH_KEY_PREFIX_01'],
            'graph_store': os.environ['GRAPH_STORE'],
            'vector_store': os.environ['VECTOR_STORE'],
            'aws_region': os.environ['AWS_REGION'],
            'start_time': datetime.fromtimestamp(start_time, tz=timezone.utc).isoformat(),
            'duration': duration,
            'document_count': len(docs),
            'error_message': error_message,
            'checkpoint': os.environ['BATCH_CHECKPOINT_01'],
            'user_id': os.environ.get('USER_ID', 'unknown'),
            'environment_variables': {
                'EXTRACTION_MODEL': os.environ.get('EXTRACTION_MODEL', ''),
                'EMBEDDINGS_MODEL': os.environ.get('EMBEDDINGS_MODEL', ''),
                'EMBEDDINGS_DIMENSIONS': os.environ.get('EMBEDDINGS_DIMENSIONS', ''),
                'EXTRACTION_BATCH_SIZE': os.environ.get('EXTRACTION_BATCH_SIZE', '4')
            },
            'reader_configuration': {
                'file_type': 'pdf',
                'reader_class': 'PyMuPDFReader'
            }
        })
        print(f"Stored collection {collection_id} in DynamoDB with status {status}")
    except Exception as e:
        print(f"Error storing final DynamoDB record: {str(e)}")
        status = 'FAILED'
        error_message = f"Extraction: {error_message or 'Success'}, DynamoDB: {str(e)}"
        # Attempt to store a failure record
        table.put_item(Item={
            'collection_id': collection_id,
            'completion_date': completion_date,
            'status': status,
            'reader_type': 'PDF',
            's3_bucket': os.environ['S3_BATCH_BUCKET_NAME'],
            's3_key_prefix': os.environ['BATCH_KEY_PREFIX_01'],
            'graph_store': os.environ['GRAPH_STORE'],
            'vector_store': os.environ['VECTOR_STORE'],
            'aws_region': os.environ['AWS_REGION'],
            'start_time': datetime.fromtimestamp(start_time, tz=timezone.utc).isoformat(),
            'duration': duration,
            'document_count': len(docs),
            'error_message': error_message,
            'checkpoint': os.environ['BATCH_CHECKPOINT_01'],
            'user_id': os.environ.get('USER_ID', 'unknown'),
            'environment_variables': {
                'EXTRACTION_MODEL': os.environ.get('EXTRACTION_MODEL', ''),
                'EMBEDDINGS_MODEL': os.environ.get('EMBEDDINGS_MODEL', ''),
                'EMBEDDINGS_DIMENSIONS': os.environ.get('EMBEDDINGS_DIMENSIONS', ''),
                'EXTRACTION_BATCH_SIZE': os.environ.get('EXTRACTION_BATCH_SIZE', '4')
            },
            'reader_configuration': {
                'file_type': 'pdf',
                'reader_class': 'PyMuPDFReader'
            }
        })

    print('Extraction complete')
    print(f'collection_id: {collection_id}')

# Run the batch job
batch_extract_and_load()

Initial DynamoDB record created for collection batch-1747014631
2025-05-11 21:50:32:INFO:g.l.i.e.extraction_pipeline:Running extraction pipeline [batch_size: 100, num_workers: 1]
2025-05-11 21:50:34:INFO:g.l.i.u.batch_inference_utils:Created batch job [job_arn: arn:aws:bedrock:us-east-1:188967239867:model-invocation-job/fvlw1o4sj86o]
2025-05-11 22:00:39:INFO:g.l.i.u.batch_inference_utils:Created batch job [job_arn: arn:aws:bedrock:us-east-1:188967239867:model-invocation-job/ibk53ab5i3ox]
2025-05-11 22:09:42:INFO:g.l.i.b.build_pipeline:Running build pipeline [batch_size: 4, num_workers: 1, job_sizes: [257], batch_writes_enabled: True, batch_write_size: 25]
2025-05-11 22:09:42:INFO:g.l.i.b.build_pipeline:Running build pipeline [batch_size: 4, num_workers: 1, job_sizes: [275], batch_writes_enabled: True, batch_write_size: 25]
2025-05-11 22:09:42:INFO:g.l.i.b.build_pipeline:Running build pipeline [batch_size: 4, num_workers: 1, job_sizes: [282], batch_writes_enabled: True, batch_write_size

Extracting propositions [nodes: 67, num_workers: 4]: 100%|██████████| 67/67 [00:24<00:00,  2.78it/s]
Extracting topics [nodes: 67, num_workers: 4]: 100%|██████████| 67/67 [01:14<00:00,  1.11s/it]


2025-05-11 22:11:22:INFO:g.l.i.b.build_pipeline:Running build pipeline [batch_size: 4, num_workers: 1, job_sizes: [350], batch_writes_enabled: True, batch_write_size: 25]
2025-05-11 22:11:22:INFO:g.l.i.b.build_pipeline:Running build pipeline [batch_size: 4, num_workers: 1, job_sizes: [258], batch_writes_enabled: True, batch_write_size: 25]
2025-05-11 22:11:22:INFO:g.l.i.b.build_pipeline:Running build pipeline [batch_size: 4, num_workers: 1, job_sizes: [380], batch_writes_enabled: True, batch_write_size: 25]
2025-05-11 22:11:22:INFO:g.l.i.b.build_pipeline:Running build pipeline [batch_size: 4, num_workers: 1, job_sizes: [361], batch_writes_enabled: True, batch_write_size: 25]
2025-05-11 22:11:22:INFO:g.l.i.b.build_pipeline:Running build pipeline [batch_size: 4, num_workers: 1, job_sizes: [321], batch_writes_enabled: True, batch_write_size: 25]
2025-05-11 22:11:22:INFO:g.l.i.b.build_pipeline:Running build pipeline [batch_size: 4, num_workers: 1, job_sizes: [161], batch_writes_enabled: Tr