In [4]:
# @title GCS-Based Graph Extraction Pipeline
# Imports for GCS interaction
from google.cloud import storage
from langchain_core.documents import Document
from langchain_experimental.graph_transformers import LLMGraphTransformer
from langchain_google_vertexai import ChatVertexAI, VertexAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pydantic import BaseModel, Field
from langchain_google_spanner import SpannerGraphStore

import os
import json # for printing the extracted data
import uuid

In [11]:
# @title Set Your Values Here { display-mode: "form" }
INSTANCE = "XXXX"  # @param {type: "string"}
DATABASE = "financedb"  # @param {type: "string"}
GRAPH_NAME = "finance_graph"  # @param {type: "string"}

In [6]:
from google.cloud import spanner
CORRECT_PROJECT_ID = "my-project-XXXX"

# 1. Create the Spanner Client with the correct project_id
spanner_client = spanner.Client(project=CORRECT_PROJECT_ID)

In [14]:
graph_store = SpannerGraphStore(
    instance_id=INSTANCE,
    database_id=DATABASE,
    graph_name=GRAPH_NAME,
    client = spanner_client
)

Created multiplexed session.
Created multiplexed session.
INFO:projects/my-alloydb-project-vivekshinde/instances/graph/databases/financedb:Created multiplexed session.


In [13]:
llm_client = ChatVertexAI(model="gemini-2.5-pro", temperature=0)

In [9]:
# @title GCS-Based Graph Extraction Pipeline
# Imports for GCS interaction
from google.cloud import storage
from langchain_core.documents import Document
from langchain_experimental.graph_transformers import LLMGraphTransformer
from langchain_google_vertexai import ChatVertexAI, VertexAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
import os
from pydantic import BaseModel, Field
import json # for printing the extracted data

# --- Configuration ---
# NOTE: Ensure the service account running this script has the Storage Object Viewer role.
GCS_BUCKET_NAME = "databases7" # REPLACE with your bucket name
GCS_FOLDER_PREFIX = "graphrag/financereports/"      # REPLACE with your GCS object path

class DocumentMetadata(BaseModel):
    """Metadata extracted from a document chunk for RAG."""
    entity_id: str = Field(..., description="The primary entity ID this chunk refers to (e.g., U100, P200). Use 'UNKNOWN' if no primary ID is found.")
    document_type: str = Field(..., description="The type of the source document (e.g., 'GraduationList', 'ProfileSnippet', 'AnalystReport').")
    keywords: list[str] = Field(..., description="A list of 3-5 keywords summarizing the content of the text chunk.")


def fetch_text_from_gcs(bucket_name, prefix):
    """
    Fetches text content from all files under a specified prefix in GCS.
    Yields content from each file.
    """
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    
    # List all blobs under the given prefix (folder path)
    blobs = storage_client.list_blobs(bucket_name, prefix=prefix)
    
    found_content = False
    for blob in blobs:
        # Skip directories themselves
        if blob.name.endswith('/'):
            continue
            
        print(f"Processing file: gs://{bucket_name}/{blob.name}")
        try:
            # Download the file content as a string
            text_content = blob.download_as_text()
            
            # Treating the entire file as one document, regardless of internal newlines.
            yield text_content
            found_content = True
            
        except Exception as e:
            print(f"Error fetching data from GCS file {blob.name}: {e}")
            
    if not found_content:
         print(f"No text files found under prefix: gs://{bucket_name}/{prefix}")

def extract_metadata_llm(llm_client, text_chunk):
    """Uses LLM structured output to deterministically extract metadata."""
    # Configure the LLM to output according to the Pydantic schema
    structured_llm = llm_client.with_structured_output(schema=DocumentMetadata)
    
    prompt = f"""
    You are an expert document metadata extractor. Analyze the following text chunk and 
    extract the following information deterministically:
    1. The primary EntityID. Use U100 for Veritas University, U101 for Oakhaven University, P200 for Emily Davis, and P201 for Alice Johnson. If the entity is not one of these, use 'UNKNOWN'.
    2. The DocumentType. MUST be one of: 'AnalystReport', '10-K Filing', 'Press Release', or 'Legal Filing'.
    3. 3 to 5 keywords summarizing the content.
    
    TEXT: ---{text_chunk}---
    """
    
    # Invoke the structured LLM chain
    try:
        result = structured_llm.invoke(prompt)
        print(f"--- Extracted Metadata: {result.entity_id}, {result.document_type}")
        return result
    except Exception as e:
        print(f"LLM Extraction Failed: {e}")
        # Return a fallback object on failure
        return DocumentMetadata(entity_id="UNKNOWN_ERROR", document_type="ERROR", keywords=["failed"])

def insert_document_embeddings_to_spanner(splits, embeddings_generator, llm_client):
    """Generates embeddings and extracts metadata for document splits, then inserts them into Spanner."""
    if not splits:
        print("No splits to process for Spanner insertion.")
        return

    # 1. Generate Embeddings for all text chunks
    texts = [s.page_content for s in splits]
    print(f"Generating {len(texts)} embeddings...")
    
    # VertexAIEmbeddings uses the embedding model configured in your environment
    embeddings = embeddings_generator.embed_documents(texts)

    # 2. Prepare rows for Spanner insertion
    rows_to_insert = []
    
    for i, split in enumerate(splits):
        # *** PRODUCTION RAG: Use LLM structured output to populate metadata ***
        metadata_obj = extract_metadata_llm(llm_client, split.page_content)

        row = (
            str(uuid.uuid4()),                    # DocumentID
            metadata_obj.entity_id,               # EntityID_Associated (Extracted by Gemini)
            metadata_obj.document_type,           # DocumentType (Extracted by Gemini)
            split.page_content,                   # TextContent
            embeddings[i]                         # VectorEmbedding (as list of floats)
        )
        rows_to_insert.append(row)

    # 3. Insert into Spanner
    try:
        instance = spanner_client.instance(INSTANCE)
        database = instance.database(DATABASE)

        def insert_documents(transaction):
            print("Inserting documents into FinancialDocuments table...")
            transaction.insert(
                table='FinancialDocuments',
                columns=('DocumentID', 'EntityID_Associated', 'DocumentType', 'TextContent', 'VectorEmbedding'),
                values=rows_to_insert,
            )
        
        database.run_in_transaction(insert_documents)
        print(f"Successfully inserted {len(rows_to_insert)} documents and embeddings into Spanner.")

    except Exception as e:
        print(f"Error inserting data into Spanner: {e}")


In [8]:
print(f"Fetching document data from gs://{GCS_BUCKET_NAME}/{GCS_FOLDER_PREFIX}...")

# Collect all chunks from all files into a single list
all_text_chunks = []
for chunk in fetch_text_from_gcs(GCS_BUCKET_NAME, GCS_FOLDER_PREFIX):
    all_text_chunks.append(chunk)

if not all_text_chunks:
    print("No content fetched. Exiting.")
    exit()

# 2. Convert text snippets into LangChain Document objects
# Process the combined list of chunks from all files
documents = [Document(page_content=t) for t in all_text_chunks if t.strip()]

# 3. Initialize text splitter and model components
# The splitter will now operate on the full file content (each entry in 'documents')
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = text_splitter.split_documents(documents)

# Initialize the LLM (Requires Vertex AI setup/authentication)
llm_client = ChatVertexAI(model="gemini-2.5-pro", temperature=0)

# Initialize Embedding Generator (Uses default embedding model, e.g., text-embedding-004)
embeddings_generator = VertexAIEmbeddings(model_name="text-embedding-004")

# 4. Configure the Graph Transformer with allowed nodes and relationships
llm_transformer = LLMGraphTransformer(
    llm=llm_client,
    allowed_nodes=["COMPANY", "PERSON", "LOCATION", "ASSET", "EVENT"], # ADDED: LOCATION, ASSET, EVENT
    allowed_relationships=[
        "BOARD_MEMBER_OF",
        "SUPPLIES",
        "COMPETES_WITH",
        "EXPOSED_TO",     # ADDED: For risk or location exposure
        "INVESTS_IN",     # ADDED: For investment links
        "SUBJECT_TO",     # ADDED: For regulatory events or fines
    ], # UPDATED: Based on Spanner RelationshipType
    node_properties=[
        "description",
    ],
)

# 5. Extract the Graph Documents
print(f"Generating graph documents from {len(splits)} splits...")
graph_documents = llm_transformer.convert_to_graph_documents(splits)

# 6. Output Graph Results (for verification)
for doc in graph_documents:
    print("-" * 30)
    print(f"Source: {doc.source.page_content[:50]}...")
    print(f"Nodes: {doc.nodes}")
    print(f"Relationships: {doc.relationships}")

print("-" * 30)
print(f"Successfully extracted {len(graph_documents)} graph documents.")

Fetching document data from gs://databases7/graphrag/financereports/...
Processing file: gs://databases7/graphrag/financereports/01_AnalystReport.txt
Processing file: gs://databases7/graphrag/financereports/02_10KFiling.txt
Processing file: gs://databases7/graphrag/financereports/03_10KFiling.txt
Processing file: gs://databases7/graphrag/financereports/04_PressRelease.txt
Processing file: gs://databases7/graphrag/financereports/05_LegalFiling.txt
Generating graph documents from 5 splits...




Generating 5 embeddings...
--- Extracted Metadata: UNKNOWN, AnalystReport
--- Extracted Metadata: UNKNOWN, 10-K Filing
--- Extracted Metadata: UNKNOWN, 10-K Filing
--- Extracted Metadata: UNKNOWN, Press Release
--- Extracted Metadata: UNKNOWN, Legal Filing
Error inserting data into Spanner: name 'SPANNER_INSTANCE_ID' is not defined
------------------------------
Source: AnalystReport: Our firm maintains an OUTPERFORM ra...
Nodes: [Node(id='Globaltech Solutions', type='Company', properties={'description': 'Our firm maintains an OUTPERFORM rating on GlobalTech Solutions (C100).'}), Node(id='Geopolitical Uncertainty In The Eastern Markets', type='Event', properties={'description': 'The primary risk remains geopolitical uncertainty in the eastern markets, specifically concerning raw material inputs.'}), Node(id='Analystreport', type='Asset', properties={'description': 'AnalystReport dated Q3'})]
Relationships: [Relationship(source=Node(id='Globaltech Solutions', type='Company', properties=

In [15]:
# @title Load the graph to Spanner Graph database
# Uncomment the line below, if you want to cleanup from
# previous iterations.
# BeWARE - THIS COULD REMOVE DATA FROM YOUR DATABASE !!!
# graph_store.cleanup()


for graph_document in graph_documents:
    graph_store.add_graph_documents([graph_document])

Waiting for DDL operations to complete...
Insert nodes of type `Company`...
Insert nodes of type `Event`...
Insert nodes of type `Asset`...
Insert edges of type `Company_EXPOSED_TO_Event`...
Insert edges of type `Company_SUBJECT_TO_Asset`...
No schema change required...
Insert nodes of type `Company`...
Insert nodes of type `Asset`...
Insert nodes of type `Event`...
Insert edges of type `Company_SUBJECT_TO_Asset`...
Insert edges of type `Company_EXPOSED_TO_Event`...
Waiting for DDL operations to complete...
Insert nodes of type `Company`...
Insert nodes of type `Location`...
Insert edges of type `Company_EXPOSED_TO_Location`...
Waiting for DDL operations to complete...
Insert nodes of type `Company`...
Insert nodes of type `Asset`...
Insert edges of type `Company_INVESTS_IN_Asset`...
Waiting for DDL operations to complete...
Insert nodes of type `Company`...
Insert nodes of type `Event`...
Insert edges of type `Company_SUBJECT_TO_Event`...


In [16]:
# 7. Insert documents and vectors into Spanner (Now uses LLM extraction)
insert_document_embeddings_to_spanner(splits, embeddings_generator, llm_client)

Generating 5 embeddings...
--- Extracted Metadata: UNKNOWN, AnalystReport
--- Extracted Metadata: UNKNOWN, 10-K Filing
--- Extracted Metadata: UNKNOWN, 10-K Filing
--- Extracted Metadata: UNKNOWN, Press Release
--- Extracted Metadata: UNKNOWN, Legal Filing


Created multiplexed session.
Created multiplexed session.
Created multiplexed session.


Inserting documents into FinancialDocuments table...
Successfully inserted 5 documents and embeddings into Spanner.


INFO:projects/my-alloydb-project-vivekshinde/instances/graph/databases/financedb:Created multiplexed session.


In [17]:
from google.cloud import spanner
from langchain_google_spanner import SpannerGraphQAChain
from langchain_google_vertexai import ChatVertexAI
from IPython.core.display import HTML

# Initialize llm object
llm = ChatVertexAI(model="gemini-2.5-pro", temperature=0)

# Initialize GraphQAChain
chain = SpannerGraphQAChain.from_llm(
    llm,
    graph=graph_store,
    allow_dangerous_requests=True,
    verbose=True,
    return_intermediate_steps=True,
)

In [19]:
# @title Run Spanner Graph QA Chain 1
question = "Find all current suppliers for Innovate Finance Inc. (C101)."  # @param {type:"string"}
response = chain.invoke("query=" + question)
response["result"]



[1m> Entering new SpannerGraphQAChain chain...[0m
Executing gql:
[32;1m[1;3mGRAPH finance_graph
MATCH (:Company {id: 'C101'})-[:Company_SUBJECT_TO_Asset]->(asset:Asset)<-[:Company_INVESTS_IN_Asset]-(supplier:Company)
RETURN supplier.id AS supplier_id, supplier.description AS supplier_description;[0m
Full Context:
[32;1m[1;3m[][0m

[1m> Finished chain.[0m


"I don't know the answer."