In [1]:
!pip install llama-index --quiet
!pip install neo4j --quiet
!pip install llama-index-graph-stores-neo4j --quiet
!pip install llama-parse --quiet
!pip install qdrant_client --quiet
!pip install llama-index-vector-stores-qdrant --quiet
!pip install llama-index-embeddings-huggingface --quiet
!pip install llama-index-embeddings-fastembed --quiet
# !pip install llama-index-embeddings-sentence-transformers --quiet
!pip install llama-index-llms-groq --quiet

[?25l   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/1.6 MB[0m [31m?[0m eta [36m-:--:--[0m[2K   [91m━━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━[0m [32m1.0/1.6 MB[0m [31m29.4 MB/s[0m eta [36m0:00:01[0m[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.6/1.6 MB[0m [31m22.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m41.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.5/1.5 MB[0m [31m38.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m180.2/180.2 kB[0m [31m8.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m76.4/76.4 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.9/77.9 kB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [15]:
import asyncio
import json
import re
import os
import time
from typing import List, Dict, Any, Tuple
from llama_index.core import SimpleDirectoryReader, Document, VectorStoreIndex, SummaryIndex, StorageContext
from llama_index.llms.groq import Groq
from llama_index.embeddings.fastembed import FastEmbedEmbedding
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core import Settings
from google.colab import drive, userdata
import nest_asyncio
import logging
import pandas as pd
from neo4j import GraphDatabase
from qdrant_client import QdrantClient

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Mount Google Drive
drive.mount('/content/drive')

nest_asyncio.apply()

# Rate limiting constants
MAX_CONCURRENT_REQUESTS = 2
RATE_LIMIT_REQUESTS = 10
RATE_LIMIT_PERIOD = 120  # in seconds

class RateLimiter:
    def __init__(self, max_calls: int, period: float, max_concurrent: int):
        self.max_calls = max_calls
        self.period = period
        self.calls: List[float] = []
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def wait(self) -> None:
        now = time.time()
        self.calls = [call for call in self.calls if call > now - self.period]

        if len(self.calls) >= self.max_calls:
            sleep_time = self.calls[0] - (now - self.period)
            if sleep_time > 0:
                await asyncio.sleep(sleep_time)

        self.calls.append(time.time())

    async def __aenter__(self):
        await self.semaphore.acquire()
        await self.wait()
        return self

    async def __aexit__(self, exc_type, exc_value, traceback):
        self.semaphore.release()

class GlobalEntityManager:
    def __init__(self):
        self.entity_map = {}
        self.next_id = 1

    def get_or_create_id(self, entity_name: str, entity_type: str) -> str:
        key = (entity_name, entity_type)
        if key not in self.entity_map:
            self.entity_map[key] = f"E{self.next_id}"
            self.next_id += 1
        return self.entity_map[key]

global_entity_manager = GlobalEntityManager()

async def extract_bill_info(document: Document, llm: Groq, rate_limiter: RateLimiter) -> Dict[str, Any]:
    prompt = f"""Analyze the following legislative document and extract key information. Provide your response in a structured JSON format with the following elements:

1. "bill_info": number, title, stage, assent_date
2. "amendments": List of acts being amended (act_name, sections_amended, description)
3. "key_provisions": List of main provisions (title, description, section_reference)
4. "definitions": List of important terms and their definitions
5. "persons_involved": List of persons involved (name, position, department)
6. "affected_parties": List of affected parties (name, type, description)
7. "related_acts": List of related acts

Legislative document:

{document.text}  # Limit the input text to 4000 characters

Provide your structured JSON response below:
"""
    max_retries = 5
    retry_delay = 1

    for attempt in range(max_retries):
        try:
            async with rate_limiter:
                response = await llm.acomplete(prompt)
                start = response.text.find('{')
                end = response.text.rfind('}')

                if start != -1 and end != -1:
                    json_str = response.text[start:end+1]
                    json_str = clean_json_string(json_str)
                    try:
                        structured_data = json.loads(json_str)
                        return structured_data
                    except json.JSONDecodeError as e:
                        print(f"Error: JSON decoding failed. {str(e)}")
                        return {"bill_info": {}, "amendments": [], "key_provisions": [], "definitions": [], "persons_involved": [], "affected_parties": [], "related_acts": []}
                else:
                    print("Error: No valid JSON structure found in the output.")
                    return {"bill_info": {}, "amendments": [], "key_provisions": [], "definitions": [], "persons_involved": [], "affected_parties": [], "related_acts": []}
        except Exception as e:
            if attempt < max_retries - 1:
                wait_time = retry_delay * (2 ** attempt)
                print(f"Error occurred: {str(e)}. Retrying in {wait_time} seconds...")
                await asyncio.sleep(wait_time)
            else:
                print(f"Max retries reached. Error: {str(e)}")
                return {"bill_info": {}, "amendments": [], "key_provisions": [], "definitions": [], "persons_involved": [], "affected_parties": [], "related_acts": []}

def clean_json_string(json_str: str) -> str:
    last_brace = json_str.rfind('}')
    if last_brace != -1:
        json_str = json_str[:last_brace+1]
    json_str = re.sub(r',\s*}', '}', json_str)
    json_str = re.sub(r',\s*]', ']', json_str)
    return json_str

def create_graph_data(extracted_info: Dict[str, Any]) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
    entities = []
    relationships = []

    # Create Bill entity
    bill_info = extracted_info.get('bill_info', {})
    bill_id = f"BILL_{bill_info.get('number', 'UNKNOWN')}"
    entities.append({
        'id': bill_id,
        'type': 'Bill',
        'number': bill_info.get('number', 'UNKNOWN'),
        'title': bill_info.get('title', ''),
        'stage': bill_info.get('stage', ''),
        'assent_date': bill_info.get('assent_date')
    })

    # Create Amendment entities and relationships
    for i, amendment in enumerate(extracted_info.get('amendments', [])):
        amendment_id = global_entity_manager.get_or_create_id(f"{bill_id}_AMENDMENT_{i}", "Amendment")
        sections_amended = amendment.get('sections_amended', [])
        if isinstance(sections_amended, list):
            sections_amended = ', '.join(map(str, sections_amended))
        elif not isinstance(sections_amended, str):
            sections_amended = str(sections_amended)

        entities.append({
            'id': amendment_id,
            'type': 'Amendment',
            'act_name': amendment.get('act_name', ''),
            'sections_amended': sections_amended,
            'description': amendment.get('description', '')
        })
        relationships.append({
            'source': bill_id,
            'target': amendment_id,
            'type': 'AMENDS'
        })

    # Create Provision entities and relationships
    for i, provision in enumerate(extracted_info.get('key_provisions', [])):
        provision_id = global_entity_manager.get_or_create_id(f"{bill_id}_PROVISION_{i}", "Provision")
        entities.append({
            'id': provision_id,
            'type': 'Provision',
            'title': provision.get('title', ''),
            'description': provision.get('description', ''),
            'section_reference': provision.get('section_reference', '')
        })
        relationships.append({
            'source': bill_id,
            'target': provision_id,
            'type': 'CONTAINS'
        })

    # Create Definition entities and relationships
    definitions = extracted_info.get('definitions', [])
    if isinstance(definitions, dict):
        definitions = [{'term': k, 'definition': v} for k, v in definitions.items()]
    for i, definition in enumerate(definitions):
        term = definition.get('term', f'UNKNOWN_TERM_{i}')
        definition_id = global_entity_manager.get_or_create_id(f"{bill_id}_DEF_{term}", "Definition")
        entities.append({
            'id': definition_id,
            'type': 'Definition',
            'term': term,
            'definition': definition.get('definition', '')
        })
        relationships.append({
            'source': bill_id,
            'target': definition_id,
            'type': 'DEFINES'
        })

    # Create Person entities and relationships
    for person in extracted_info.get('persons_involved', []):
        person_id = global_entity_manager.get_or_create_id(person.get('name', 'UNKNOWN'), "Person")
        entities.append({
            'id': person_id,
            'type': 'Person',
            'name': person.get('name', 'UNKNOWN'),
            'position': person.get('position', ''),
            'department': person.get('department', '')
        })
        relationships.append({
            'source': bill_id,
            'target': person_id,
            'type': 'INVOLVED'
        })

    # Create AffectedParty entities and relationships
    for party in extracted_info.get('affected_parties', []):
        party_id = global_entity_manager.get_or_create_id(party.get('name', 'UNKNOWN'), party.get('type', 'UNKNOWN'))
        entities.append({
            'id': party_id,
            'type': party.get('type', 'UNKNOWN'),
            'name': party.get('name', 'UNKNOWN'),
            'description': party.get('description', '')
        })
        relationships.append({
            'source': bill_id,
            'target': party_id,
            'type': 'AFFECTS'
        })

    # Create relationships for related acts
    for act in extracted_info.get('related_acts', []):
        if isinstance(act, dict):
            act_name = act.get('name', 'UNKNOWN')
        else:
            act_name = str(act)
        act_id = global_entity_manager.get_or_create_id(act_name, "Act")
        entities.append({
            'id': act_id,
            'type': 'Act',
            'name': act_name
        })
        relationships.append({
            'source': bill_id,
            'target': act_id,
            'type': 'RELATES_TO'
        })

    return entities, relationships

async def process_legislative_documents(input_directory: str, llm: Groq, rate_limiter: RateLimiter) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
    documents = SimpleDirectoryReader(input_dir=input_directory).load_data()
    all_entities = []
    all_relationships = []

    for doc in documents:
        extracted_info = await extract_bill_info(doc, llm, rate_limiter)
        entities, relationships = create_graph_data(extracted_info)
        all_entities.extend(entities)
        all_relationships.extend(relationships)

    return all_entities, all_relationships

class Neo4jLoader:
    def __init__(self, uri, username, password):
        self.driver = GraphDatabase.driver(uri, auth=(username, password))

    def close(self):
        self.driver.close()

    def clear_database(self):
        query = "MATCH (n) DETACH DELETE n"
        with self.driver.session() as session:
            session.run(query)
        logging.info("Database cleared")

    def create_constraints(self):
        constraints = [
            "CREATE CONSTRAINT IF NOT EXISTS FOR (n:Bill) REQUIRE n.id IS UNIQUE",
            "CREATE CONSTRAINT IF NOT EXISTS FOR (n:Amendment) REQUIRE n.id IS UNIQUE",
            "CREATE CONSTRAINT IF NOT EXISTS FOR (n:Provision) REQUIRE n.id IS UNIQUE",
            "CREATE CONSTRAINT IF NOT EXISTS FOR (n:Definition) REQUIRE n.id IS UNIQUE",
            "CREATE CONSTRAINT IF NOT EXISTS FOR (n:Person) REQUIRE n.id IS UNIQUE",
            "CREATE CONSTRAINT IF NOT EXISTS FOR (n:Act) REQUIRE n.id IS UNIQUE",
            "CREATE CONSTRAINT IF NOT EXISTS FOR (n:GovernmentEntity) REQUIRE n.id IS UNIQUE"
        ]
        with self.driver.session() as session:
            for constraint in constraints:
                session.run(constraint)
        logging.info("Constraints created")

    @staticmethod
    def sanitize_label(label):
        # Remove any non-alphanumeric characters and capitalize each word
        return ''.join(word.capitalize() for word in re.findall(r'\w+', label))

    def load_entities(self, entities: list):
        with self.driver.session() as session:
            for entity in entities:
                sanitized_type = self.sanitize_label(entity['type'])
                query = (
                    f"MERGE (e:{sanitized_type} {{id: $id}}) "
                    f"SET e += $properties"
                )
                session.run(query, id=entity['id'], properties=entity)
        logging.info(f"Loaded {len(entities)} entities")

    def load_relationships(self, relationships: list):
        with self.driver.session() as session:
            for rel in relationships:
                query = (
                    f"MATCH (source) WHERE source.id = $source_id "
                    f"MATCH (target) WHERE target.id = $target_id "
                    f"CREATE (source)-[r:{rel['type']}]->(target)"
                )
                session.run(query, source_id=rel['source'], target_id=rel['target'])
        logging.info(f"Loaded {len(relationships)} relationships")

async def load_data_to_neo4j(entities: List[Dict[str, Any]], relationships: List[Dict[str, Any]]):
    url = userdata.get('NEO4J_URL')
    username = "neo4j"
    password = userdata.get('NEO4J_PASSWORD')

    loader = Neo4jLoader(url, username, password)
    try:
        loader.clear_database()
        loader.create_constraints()
        loader.load_entities(entities)
        loader.load_relationships(relationships)
        logging.info("Neo4j database updated successfully!")
    except Exception as e:
        logging.error(f"Error updating Neo4j database: {str(e)}")
    finally:
        loader.close()

async def load_data_qdrant(file_directory: str):
    try:
        # Initialize Qdrant client
        qdrant_client = QdrantClient(
            url=userdata.get('QDRANT_URL'),
            api_key=userdata.get('QDRANT_API_KEY'),
        )

        # Create vector store
        vector_store = QdrantVectorStore(client=qdrant_client, collection_name="legislative_docs")
        storage_context = StorageContext.from_defaults(vector_store=vector_store)

        # Initialize an empty index
        documents = []

        # Iterate through each document in the directory
        for filename in os.listdir(file_directory):
            if filename.endswith(".txt"):
                file_path = os.path.join(file_directory, filename)
                # print(f"Processing: {filename}")

                # Load the current document
                documents += SimpleDirectoryReader(input_files=[file_path]).load_data()
        splitter = SentenceSplitter(chunk_size=1024, chunk_overlap=100)
        nodes = splitter.get_nodes_from_documents(documents)

        vector_index = VectorStoreIndex(nodes, storage_context=storage_context)
        vector_index.storage_context.persist()
        logging.info("Qdrant database updated successfully!")

    except Exception as e:
        logging.error(f"Error loading data into Qdrant: {str(e)}")

async def main():
    input_directory = r'/content/drive/MyDrive/Colab Notebooks/can_extracted_pdf_files_1'
    llm = Groq(model="llama3-70b-8192", api_key=userdata.get('GROQ_API_KEY'), temperature=0)
    embed_model = FastEmbedEmbedding(model_name="BAAI/bge-small-en-v1.5")
    Settings.embed_model = embed_model
    Settings.chunk_size = 1024
    Settings.llm = llm

    rate_limiter = RateLimiter(RATE_LIMIT_REQUESTS, RATE_LIMIT_PERIOD, MAX_CONCURRENT_REQUESTS)

    # Process legislative documents
    entities, relationships = await process_legislative_documents(input_directory, llm, rate_limiter)

    # Load data to Neo4j and Qdrant
    await load_data_to_neo4j(entities, relationships)
    await load_data_qdrant(input_directory)

if __name__ == "__main__":
    asyncio.run(main())

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Fetching 5 files:   0%|          | 0/5 [00:00<?, ?it/s]



Error: No valid JSON structure found in the output.
[{'id': 'BILL_BILL C-10', 'type': 'Bill', 'number': 'BILL C-10', 'title': 'An Act respecting certain measures related to COVID-19', 'stage': 'Assented to', 'assent_date': 'MARCH 4, 2022'}, {'id': 'E1', 'type': 'Provision', 'title': 'Payments out of C.R.F.', 'description': 'Authorizes the Minister of Health to make payments of up to $2.5 billion out of the Consolidated Revenue Fund for COVID-19 tests', 'section_reference': 'Section 1'}, {'id': 'E2', 'type': 'Provision', 'title': 'Transfers', 'description': 'Authorizes the Minister of Health to transfer COVID-19 tests and instruments to provinces, territories, bodies, and persons in Canada', 'section_reference': 'Section 2'}, {'id': 'E3', 'type': 'Person', 'name': 'The Minister of Health', 'position': 'Minister of Health', 'department': 'Health'}, {'id': 'E4', 'type': 'Government entities', 'name': 'Provinces and territories', 'description': 'Will receive COVID-19 tests and instruments'