In [75]:
from typing import List
import pandas as pd

from llama_index.core.schema import TextNode

In [76]:
import os

os.environ["OPENAI_API_KEY"] = ""

In [77]:
from llama_index.llms.openai import OpenAI

llm = OpenAI()

In [78]:
from llama_index.embeddings.openai import OpenAIEmbedding

embed = OpenAIEmbedding()

In [79]:
def read_data(file_path: str) -> List[TextNode]:
    """
    Read the data from a file.

    Args:
        file_path (str): The file path.

    Returns:
        List[TextNode]: The list of text nodes.
    """
    # Read the data
    bookings_df = pd.read_parquet(file_path)

    # Construct LlamaIndex TextNodes
    nodes = []
    for _, row in bookings_df.iterrows():
        # Convert the reviews column to a list of dictionaries
        for review in row["reviews"]:
            nodes.append(
                TextNode(
                    text=f"Hotel: {row['hotel_name']}, reviewed by User: {review['username']} | Review Title: {review['review_title']} | Review: {review.get('en_full_review', 'No review text available')}",
                    # metadata={
                    #     "hotel": {"name": row['hotel_name']},
                    #     "user": {
                    #         "username": review["username"],
                    #         "country": review["user_country"],
                    #     },
                    #     "review": {
                    #         "title": review["review_title"],
                    #         "content": review.get(
                    #             "en_full_review", "No review text available"
                    #         ),
                    #         "rating": review["rating"],
                    #         "post_date": review["review_post_date"],
                    #         "stay_duration": review["stay_duration"],
                    #         "stay_type": review["stay_type"],
                    #     },
                    # },
                )
            )
            # Only use the first review for each hotel
            break

    return nodes

In [80]:
nodes = read_data("data/bookings.parquet")

In [81]:
import asyncio
from collections.abc import Callable
from typing import Any
from typing import List
from typing import Optional
from typing import Union

import nest_asyncio
from llama_index.core.async_utils import run_jobs
from llama_index.core.graph_stores.types import EntityNode
from llama_index.core.graph_stores.types import KG_NODES_KEY
from llama_index.core.graph_stores.types import KG_RELATIONS_KEY
from llama_index.core.graph_stores.types import Relation
from llama_index.core.indices.property_graph import default_parse_triplets_fn
from llama_index.core.llms import LLM
from llama_index.core.prompts import PromptTemplate
from llama_index.core.prompts.default_prompts import DEFAULT_KG_TRIPLET_EXTRACT_PROMPT
from llama_index.core.schema import BaseNode
from llama_index.core.schema import TransformComponent

nest_asyncio.apply()


class RecommendationGraphExtractor(TransformComponent):
    """
    Extract triples from a graph.
    Uses an LLM and a simple prompt + output parsing to extract paths (i.e. triples) and entity, relation descriptions from text.
    """
    
    llm: LLM
    extract_prompt: PromptTemplate
    parse_fn: Callable
    num_workers: int
    max_paths_per_chunk: int

    def __init__(
        self,
        llm: Optional[LLM] = None,
        extract_prompt: Optional[Union[str, PromptTemplate]] = None,
        parse_fn: Optional[Callable] = default_parse_triplets_fn,
        max_paths_per_chunk: int = 100,
        num_workers: int = 4,
    ):
        """
        Initialize the GraphRAGExtractor.

        Args:
            llm (LLM): The language model to use.
            extract_prompt (Union[str, PromptTemplate]): The prompt to use for extracting triples.
            parse_fn (callable): A function to parse the output of the language model.
            num_workers (int): The number of workers to use for parallel processing.
            max_paths_per_chunk (int): The maximum number of paths to extract per chunk.
        """
        from llama_index.core import Settings

        if isinstance(extract_prompt, str):
            extract_prompt = PromptTemplate(template=extract_prompt)

        super().__init__(
            llm=llm or Settings.llm,
            extract_prompt=extract_prompt or DEFAULT_KG_TRIPLET_EXTRACT_PROMPT,
            parse_fn=parse_fn,
            num_workers=num_workers,
            max_paths_per_chunk=max_paths_per_chunk,
        )

    @classmethod
    def class_name(cls) -> str:
        return "GraphExtractor"

    def __call__(
        self, nodes: List[BaseNode], show_progress: bool = False, **kwargs: Any
    ) -> List[BaseNode]:
        """
        Extract triples from a list of nodes.

        Args:
            nodes (List[BaseNode]): The nodes to extract triples from.
            show_progress (bool): Whether to show the progress of the extraction.

        Returns:
            List[BaseNode]: The nodes with extracted triples.
        """
        return asyncio.run(
            self.acall(nodes=nodes, show_progress=show_progress, **kwargs)
        )

    async def _aextract(self, node: BaseNode) -> BaseNode:
        """
        Extract triples from a node asynchronously.

        Args:
            node (BaseNode): The node to extract triples from.

        Returns:
            BaseNode: The node with extracted triples.
        """
        assert hasattr(node, "text")

        text = node.get_content(metadata_mode="llm")
        try:
            llm_reponse = await self.llm.apredict(
                prompt=self.extract_prompt,
                text=text,
                max_knowledge_triplets=self.max_paths_per_chunk,
            )
            entities, relationships = self.parse_fn(llm_reponse)
        except ValueError:
            entities, relationships = [], []

        existing_nodes = node.metadata.pop(KG_NODES_KEY, [])
        entity_metadata = node.metadata.copy()
        for entity, entity_type, description, attributes in entities:
            entity_metadata = {
                "description": description,
                "attributes": attributes,
                "type": entity_type,
                "embedding_key": f"{entity_type}_{entity}",
            }
            entity_node = EntityNode(
                name=entity, label=entity_type, properties=entity_metadata
            )
            existing_nodes.append(entity_node)

        existing_relations = node.metadata.pop(KG_RELATIONS_KEY, [])
        for src, tgt, rel, strength, desc, features in relationships:
            relation_metadata = {
                "description": desc,
                "strength": float(strength),
                "features": features,
                "source_type": next((e[1] for e in entities if e[0] == src), None),
                "target_type": next((e[1] for e in entities if e[0] == tgt), None),
            }
            relation = Relation(
                label=rel, source_id=src, target_id=tgt, properties=relation_metadata
            )
            existing_relations.append(relation)

        node.metadata[KG_NODES_KEY] = existing_nodes
        node.metadata[KG_RELATIONS_KEY] = existing_relations
        return node

    async def acall(
        self, nodes: List[BaseNode], show_progress: bool = False, **kwargs: Any
    ) -> List[BaseNode]:
        """
        Extract triples from a list of nodes asynchronously.

        Args:
            nodes (List[BaseNode]): The nodes to extract triples from.
            show_progress (bool): Whether to show the progress of the extraction.

        Returns:
            List[BaseNode]: The nodes with extracted triples.
        """
        jobs = []
        for node in nodes:
            jobs.append(self._aextract(node=node))

        return await run_jobs(
            jobs,
            workers=self.num_workers,
            show_progress=show_progress,
            desc="Extracting paths from text",
        )

In [82]:
import re
from collections import defaultdict
from typing import Any
from typing import Dict
from typing import Tuple

import networkx as nx
from graspologic.partition import hierarchical_leiden
from graspologic.partition import HierarchicalClusters
from llama_index.core import Settings
from llama_index.core.llms import ChatMessage
from llama_index.graph_stores.neo4j import Neo4jPropertyGraphStore


class RecommendationGraphStore(Neo4jPropertyGraphStore):
    """
    Extended graph store with recommendation-specific functionality.
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.community_summaries = {}
        self.entity_info = None
        self.max_cluster_size = 5
        self.similarity_threshold = 0.7

    def generate_community_summary(self, text: str) -> str:
        """
        Generate a summary for the community.

        Args:
            text (str): Text to summarize.

        Returns:
            str: Summary of the text.
        """
        messages = [
            ChatMessage(
                role="system",
                content=(
                    "Analyze the following relationships and generate a summary focused on "
                    "recommendation patterns. Include key features that drive recommendations, "
                    "common preferences, and relationship strengths. Highlight any clusters "
                    "or patterns that could be useful for making recommendations."
                ),
            ),
            ChatMessage(role="user", content=text),
        ]
        response = Settings.llm.chat(messages)
        clean_reponse = re.sub(r"^assistant:\s*", "", str(response)).strip()

        return clean_reponse

    def build_recommendation_communities(self) -> None:
        """
        Build communities optimized for recommendations.
        """
        graph = self._create_weighted_graph()
        community_hierarchical_clusters = hierarchical_leiden(
            graph, max_cluster_size=self.max_cluster_size, resolution=1.0
        )
        self.entity_info, community_info = self._collect_community_info(
            graph=graph, clusters=community_hierarchical_clusters
        )
        self._summarize_communities(community_info=community_info)

    def _create_weighted_graph(self) -> nx.Graph:
        """
        Build a NetworkX graph from the graph store.

        Returns:
            nx.Graph: A NetworkX graph.
        """
        nx_graph = nx.Graph()
        triplets = self.get_triplets()

        for entity1, relation, entity2 in triplets:
            weight = relation.properties.get("strength", 0.5)
            nx_graph.add_edge(
                relation.source_id,
                relation.target_id,
                weight=weight,
                relationship=relation.label,
                description=relation.properties["description"],
                features=relation.properties.get("features", ""),
            )

        return nx_graph

    def _collect_community_info(
        self, graph: nx.Graph, clusters: HierarchicalClusters
    ) -> Tuple[Dict[str, Any], Dict[str, Any]]:
        """
        Collects information about the communities.

        Args:
            graph (nx.Graph): A NetworkX graph.
            clusters (HierarchicalClusters): Hierarchical clusters.

        Returns:
            Tuple[Dict[str, Any], Dict[str, Any]]: A tuple containing entity information and community information.
        """
        entity_info = defaultdict(set)
        community_info = defaultdict(list)

        for item in clusters:
            node = item.node
            cluster_id = item.cluster

            # Update entity info
            entity_info[node].add(cluster_id)

            for neighbor in graph.neighbors(node):
                edge_data = graph.get_edge_data(node, neighbor)
                if edge_data:
                    detail = f"{node} -> {neighbor} -> {edge_data['relationship']} -> {edge_data['description']}"
                    community_info[cluster_id].append(detail)

        # Convert sets to lists for easier serialization if needed
        entity_info = {k: list(v) for k, v in entity_info.items()}

        return dict(entity_info), dict(community_info)

    def _summarize_communities(self, community_info: Dict[str, Any]) -> None:
        """
        Generate summaries for the communities.

        Args:
            community_info (Dict[str, Any]): Community information.
        """
        for community_id, details in community_info.items():
            details_text = "\n".join(details) + "."
            self.community_summary[community_id] = self.generate_community_summary(
                text=details_text
            )

    def get_community_summaries(self):
        """
        Get community summaries.

        Returns:
            Dict[str, Any]: Community summaries.
        """
        if not self.community_summary:
            self.build_communities()

        return self.community_summary

In [83]:
ENTITIES_GRAPH_REGEXP_PATTERN = (
    r"^(\w+(?:\s+\w+)*)\s*->\s*([a-zA-Z\s]+?)\s*->\s*(\w+(?:\s+\w+)*)$"
)

ENTITIES_RESPONSE_PATTERN = (
    r'\("entity"\$\$\$\$"(.+?)"\$\$\$\$"(.+?)"\$\$\$\$"(.+?)"\$\$\$\$"(.+?)"\)'
)
RELATIONSHIPS_RESPONSE_PATTERN = r'\("relationship"\$\$\$\$"(.+?)"\$\$\$\$"(.+?)"\$\$\$\$"(.+?)"\$\$\$\$"(.+?)"\$\$\$\$"(.+?)"\$\$\$\$"(.+?)"\)'

TO_BE_CLEANED_RESPONSE = r"^assistant:\s*"

RECOMMENDATION_KG_EXTRACT_TMPL = """
-Goal-
Given a text document, identify entities, their attributes, and relationships that are relevant for making recommendations.
Extract up to {max_knowledge_triplets} entity-relation triplets focusing on characteristics that influence recommendations.

-Steps-
1. Identify all entities, focusing on items, users, and categories. For each entity, extract:
- entity_name: Name of the entity, capitalized
- entity_type: Type (Item, User, Category, Feature, etc.)
- entity_description: Detailed description including preferences, characteristics, and attributes relevant for recommendations
- entity_attributes: Key features that could influence recommendations (price, genre, style, etc.)
Format: ("entity"$$$$<entity_name>$$$$<entity_type>$$$$<entity_description>$$$$<entity_attributes>)

2. Identify meaningful relationships between entities that could drive recommendations:
- source_entity: Source entity name
- target_entity: Target entity name
- relation: Relationship type (likes, similar_to, belongs_to, recommends, etc.)
- relationship_strength: Numerical score (0-1) indicating relationship strength
- relationship_description: Detailed explanation of why these entities are related
- recommendation_features: Specific features that make this relationship relevant for recommendations

Format: ("relationship"$$$$<source_entity>$$$$<target_entity>$$$$<relation>$$$$<relationship_strength>$$$$<relationship_description>$$$$<recommendation_features>)

3. When finished, output all entities and relationships.

-Real Data-
######################
text: {text}
######################
output:"""

GRAPH_NETWORK_HTML_FILEPATH = "assets/graph_network.html"

DATA_FILE_PATH = "../data/booking.parquet"

In [95]:
from typing import Any
from typing import Dict
from typing import List

from llama_index.core import PropertyGraphIndex
from llama_index.core.llms import LLM
from llama_index.core.query_engine import CustomQueryEngine


class RecommendationGraphRAGQueryEngine(CustomQueryEngine):
    graph_store: RecommendationGraphStore
    index: PropertyGraphIndex
    llm: LLM
    similarity_top_k: int = 20

    def custom_query(self, query_str: str) -> str:
        """
        Custom query to retrieve recommendations using knowledge from the graph.

        Args:
            query_str (str): Query string.

        Returns:
            str: Recommendations.
        """
        entities = self.get_entities(
            query_str=query_str, similarity_top_k=self.similarity_top_k
        )
        
        print(f"Entities: {entities}")

        community_ids = self.retrieve_entity_communities(
            entity_info=self.graph_store.entity_info, entities=entities
        )
        
        print(f"Community IDs: {community_ids}")

        community_recommendations = []
        for community_id in community_ids:
            recommendations = self._generate_community_recommendations(
                community_id=community_id, query_str=query_str, entities=entities
            )
            community_recommendations.append(recommendations)

        # Aggregate and rank recommendations
        final_recommendations = self._aggregate_recommendations(
            community_recommendations=community_recommendations, query_str=query_str
        )

        return self._format_recommendations(recommendations=final_recommendations)

    def get_entities(self, query_str: str, similarity_top_k: str) -> List[str]:
        """
        Get entities from the graph.

        Args:
            query_str (str): Query string.
            similarity_top_k (str): Similarity top k.

        Returns:
            List[str]: List of entities.
        """
        retrieved_nodes = self.index.as_retriever(
            similarity_top_k=similarity_top_k
        ).retrieve(query_str)

        entities = set()
        for node in retrieved_nodes:
            matches = re.findall(
                pattern=ENTITIES_GRAPH_REGEXP_PATTERN,
                string=node,
                flags=re.MULTILINE | re.IGNORECASE,
            )

            for match in matches:
                subject = match[0]
                obj = match[2]
                entities.add(subject)
                entities.add(obj)

        return list(entities)

    def retrieve_entity_communities(
        self, entity_info: Dict[str, Any], entities: List[str]
    ) -> List[str]:
        """
        Retrieve cluster information for given entities, allowing for multiple clusters per entity.

        Args:
            entity_info (Dict[str, Any]): Entity information.
            entities (List[str]): List of entities.

        Returns:
            List[str]: List of cluster information.
        """
        community_ids = []

        for entity in entities:
            if entity in entity_info:
                community_ids.extend(entity_info[entity])

        return list(set(community_ids))

    def _generate_community_recommendations(
        self, community_id: int, query_str: str, entities: List[str]
    ) -> str:
        """
        Generate recommendations from a specific community.

        Args:
            community_id (int): The community ID.
            query_str (str): The query string.
            entities (List[str]): The entities.

        Returns:
            str: The recommendations.
        """
        community_summary = self.graph_store.community_summaries.get(community_id, "")
        entities_str = ", ".join(entities)
        messages = [
            ChatMessage(
                role="system",
                content=(
                    f"Given the community information below and the query, generate relevant "
                    f"recommendations. Focus on items that match the query intent and have "
                    f"strong relationships within the community.\n\n"
                    f"Community Summary: {community_summary}\n"
                    f"Query Entities: {entities_str}"
                ),
            ),
            ChatMessage(role="user", content=query_str),
        ]

        final_response = self.llm.chat(messages=messages)
        cleaned_final_response = re.sub(
            TO_BE_CLEANED_RESPONSE, "", str(final_response)
        ).strip()
        return cleaned_final_response

    def _aggregate_recommendations(
        self, community_recommendations: List[str], query_str: str
    ) -> str:
        """
        Aggregate and rank recommendations from different communities.

        Args:
            community_recommendations (List[str]): The community recommendations.
            query_str (str): The query string.

        Returns:
            str: The aggregated recommendations
        """
        messages = [
            ChatMessage(
                role="system",
                content=(
                    "Combine and prioritize the following recommendations based on "
                    "relevance to the query, relationship strength, and diversity. "
                    "Provide a clear explanation for each recommendation."
                )
            ),
            ChatMessage(
                role="user",
                content=(
                    f"Query: {query_str}\n\n"
                    f"Community Recommendations:\n"
                    f"{'-' * 40}\n"
                    + "\n".join(community_recommendations)
                )
            ),
        ]

        final_response = self.llm.chat(messages=messages)
        cleaned_final_response = re.sub(
            TO_BE_CLEANED_RESPONSE, "", str(final_response)
        ).strip()
        return cleaned_final_response

    def _format_recommendations(self, recommendations: str) -> str:
        """
        Format recommendations for presentation.

        Args:
            recommendations (str): The recommendations.

        Returns:
            str: The formatted recommendations.
        """
        messages = [
            ChatMessage(
                role="system",
                content=(
                    "Format the recommendations in a clear, structured way. "
                    "Include relevant details such as similarity scores, key features, "
                    "and reasoning for each recommendation."
                ),
            ),
            ChatMessage(role="user", content=recommendations),
        ]

        final_response = self.llm.chat(messages=messages)
        cleaned_final_response = re.sub(
            TO_BE_CLEANED_RESPONSE, "", str(final_response)
        ).strip()
        return cleaned_final_response

In [96]:
def parse_fn(response_str: str) -> Any:
    entities = re.findall(ENTITIES_RESPONSE_PATTERN, response_str)
    relationships = re.findall(RELATIONSHIPS_RESPONSE_PATTERN, response_str)
    return entities, relationships


kg_extractor = RecommendationGraphExtractor(
    llm=llm,
    extract_prompt=RECOMMENDATION_KG_EXTRACT_TMPL,
    max_paths_per_chunk=2,
    parse_fn=parse_fn,
)

In [86]:
graph_store = RecommendationGraphStore(
    username="neo4j", password="password", url="bolt://localhost:7687"
)

In [87]:
from llama_index.core import PropertyGraphIndex

index = PropertyGraphIndex(
    nodes=nodes,
    kg_extractors=[kg_extractor],
    property_graph_store=graph_store,
    show_progress=True,
    embed_model=embed,
)

Extracting paths from text: 100%|██████████| 34/34 [00:13<00:00,  2.53it/s]
Generating embeddings: 100%|██████████| 1/1 [00:01<00:00,  1.93s/it]
Generating embeddings: 0it [00:00, ?it/s]


In [97]:
query_engine = RecommendationGraphRAGQueryEngine(
    graph_store=index.property_graph_store,
    llm=llm,
    index=index,
    similarity_top_k=10,
)

In [98]:
from IPython.display import display, Markdown

response = query_engine.query(
    "Suggestions for a hotel in Paris with a good view and close to the Eiffel Tower."
)
display(Markdown(f"{response.response}"))

Entities: []
Community IDs: []


**Recommendations for Hotels with Views of the Eiffel Tower:**

1. **Hotel Pullman Paris Tour Eiffel**
   - Similarity Score: High
   - Key Features: Stunning views of the Eiffel Tower, proximity to the landmark
   - Reasoning: Known for its exceptional views and convenient location near the Eiffel Tower, ideal for guests seeking a good view.

2. **Shangri-La Hotel Paris**
   - Similarity Score: High
   - Key Features: Luxurious accommodations, best views of the Eiffel Tower
   - Reasoning: Offers some of the best views of the Eiffel Tower from rooms and suites, providing a memorable and picturesque experience.

3. **Le Metropolitan, a Tribute Portfolio Hotel**
   - Similarity Score: Medium
   - Key Features: Views of the Eiffel Tower, convenient location
   - Reasoning: Located near the Eiffel Tower, offering rooms with views of the iconic landmark for a pleasant stay with convenience.

4. **Mercure Paris Centre Eiffel Tower Hotel**
   - Similarity Score: Medium
   - Key Features: Close proximity to the Eiffel Tower, rooms with tower views
   - Reasoning: Situated near the Eiffel Tower, providing easy access to the attraction and memorable views for guests.

5. **Hotel Plaza Athenee**
   - Similarity Score: Low
   - Key Features: Luxury accommodations, stunning Eiffel Tower views
   - Reasoning: Offers upscale accommodations with unique views of the Eiffel Tower from select rooms, providing a luxurious experience for guests.