In [1]:
from pathlib import Path
from typing import List
from langchain.chat_models import ChatOpenAI
from langchain.document_loaders import TextLoader
from langchain.graphs import Neo4jGraph
from langchain.prompts import ChatPromptTemplate
from langchain.text_splitter import TokenTextSplitter
from langchain_core.pydantic_v1 import BaseModel, Field
import os
import time
from langchain.llms import Ollama 
from langchain.document_loaders import WebBaseLoader, TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import GPT4AllEmbeddings 
from langchain.vectorstores import Chroma
from langchain.chains import RetrievalQA
import bs4
from langchain import hub
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.chat_models import ChatOpenAI
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

In [2]:
ollama = Ollama(base_url='http://localhost:11434', model='llama2')

In [3]:
loader = WebBaseLoader(
    web_paths=(["https://www.thoughtworks.com/en-de/insights/articles/data-mesh-in-practice-technology-and-the-architecture", 
               "https://www.thoughtworks.com/en-de/insights/articles/data-mesh-in-practice-product-thinking-and-development",
               "https://www.thoughtworks.com/en-in/insights/blog/data-strategy/dev-experience-data-mesh-product",
               "https://www.thoughtworks.com/en-in/insights/blog/data-strategy/dev-experience-data-mesh-platform"])
    )


In [4]:
import nest_asyncio
nest_asyncio.apply()
loader.requests_per_second = 1
raw_documents = loader.aload()

Fetching pages: 100%|##########| 4/4 [00:00<00:00, 17.69it/s]


# Normal RAG

In [5]:
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500, chunk_overlap=20)
all_splits = text_splitter.split_documents(raw_documents)
vectorstore = Chroma.from_documents(documents=all_splits, embedding=GPT4AllEmbeddings())

bert_load_from_file: gguf version     = 2
bert_load_from_file: gguf alignment   = 32
bert_load_from_file: gguf data offset = 695552
bert_load_from_file: model name           = BERT
bert_load_from_file: model architecture   = bert
bert_load_from_file: model file type      = 1
bert_load_from_file: bert tokenizer vocab = 30522


In [6]:
from langchain.prompts import PromptTemplate
QA_CHAIN_PROMPT = PromptTemplate.from_template("""Use the following pieces of context to answer the question at the end. 
If you don't know the answer, just say that you don't know, don't try to make up an answer.  

RULES:                                               
DO NOT INCLUDE THE INFORMATION IN YOUR ANSWER.

CONTEXT: 
{context}

Question:
{question}

""")

In [7]:
retriever = vectorstore.as_retriever()
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

In [8]:
rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | QA_CHAIN_PROMPT
    | ollama
    | StrOutputParser()
)

In [9]:
prompts = [
    "What is Data Product?",
    "What is difference between Data Mesh and Data Fabric",
    "What is Data Mesh?",
    "How do Source-Oriented Data Products (SODP) differ from Customer-Oriented Data Products (CODP)?",
    "The three pillars of Data Mesh success?",
    "What are tools used in ?"
]

In [10]:
from IPython.display import JSON
from trulens_eval import TruChain, Feedback,Tru
tru = Tru()

🦑 Tru initialized with db url sqlite:///default.sqlite .
🛑 Secret keys may be written to the database. See the `database_redact_keys` option of `Tru` to prevent this.


In [11]:
from trulens_eval import LiteLLM
import litellm
litellm.set_verbose=False
ollama_provider = LiteLLM(model_engine="ollama/llama2", api_base='http://localhost:11434')

In [12]:
from trulens_eval import Feedback, Select,feedback
from trulens_eval.feedback import Groundedness
from trulens_eval.app import App
import numpy as np

context = App.select_context(rag_chain)

grounded = Groundedness(ollama_provider)

f_groundedness = (
    Feedback(grounded.groundedness_measure_with_cot_reasons)
    .on(context.collect()) # collect context chunks into a list
    .on_output()
    .aggregate(grounded.grounded_statements_aggregator)
)

f_qa_relevance = Feedback(ollama_provider.relevance).on_input_output()

f_context_relevance = (
    Feedback(ollama_provider.qs_relevance).on_input().on(context).aggregate(np.mean)
)

✅ In groundedness_measure_with_cot_reasons, input source will be set to __record__.app.first.steps.context.first.get_relevant_documents.rets.collect() .
✅ In groundedness_measure_with_cot_reasons, input statement will be set to __record__.main_output or `Select.RecordOutput` .
✅ In relevance, input prompt will be set to __record__.main_input or `Select.RecordInput` .
✅ In relevance, input response will be set to __record__.main_output or `Select.RecordOutput` .
✅ In qs_relevance, input question will be set to __record__.main_input or `Select.RecordInput` .
✅ In qs_relevance, input statement will be set to __record__.app.first.steps.context.first.get_relevant_documents.rets .


In [13]:
from trulens_eval.feedback.groundtruth import GroundTruthAgreement
answer_relevance_golden_set = [
    {
        "query": "What are the considerations to design the right data product?",
        "response": "Key considerations in designing the right data products are its fulfillment to the use case for a given domain, along with compliance to slo and slis, support for output ports based on persona, metadata for discoverability and access and quality aspects to deliver trust.",
        "expected_score": 1
    }, 
    {
        "query": "what are tools used in?",
        "response":"Snowflake,Talend,DBT,Collibra,Monte Carlo,Dataops.live,SOLE,OAM Client libraries",
        "expected_score": 1
    },
    {
        "query": "According to Zhamak Dehghani's principles, effective data products in a Data Mesh architecture should possess several key qualities. Which of the following options correctly lists these qualities?",
        "response":"Discoverable, Addressable, Trustworthy, Self-Describing, Interoperable, and Secure",
        "expected_score": 0.7
    },
    {
        "query": "The three pillars of Data Mesh success",
        "response":"Organizational change,product thinking, and technology",
        "expected_score": 0.7
    },
    {
        "query": "What is Data Mesh?",
        "response":"A decentralized approach to data architecture and organizational design",
        "expected_score": 0.8
    },
    {
        "query": "How do Source-Oriented Data Products (SODP) differ from Customer-Oriented Data Products (CODP)?",
        "response":"SODPs are designed based on internal operational data sources, while CODPs are created to meet specific external customer needs.",
        "expected_score": 0.6
    }
    ]


ground_truth = GroundTruthAgreement(answer_relevance_golden_set, provider = ollama_provider)

f_groundtruth = Feedback(ground_truth.agreement_measure,
                          name = "Ground Truth Agreement").on_input_output()

✅ In Ground Truth Agreement, input prompt will be set to __record__.main_input or `Select.RecordInput` .
✅ In Ground Truth Agreement, input response will be set to __record__.main_output or `Select.RecordOutput` .


In [14]:
tru_recorder = TruChain(rag_chain,
    app_id='normal_rag',
    feedbacks=[f_groundedness,f_qa_relevance,f_context_relevance,f_groundtruth])

In [15]:
with tru_recorder as recording:
    for prompt in prompts:
        llm_response = rag_chain.invoke(prompt)
        display(llm_response)

'Thank you for providing the context. Based on the information provided, a data product is a specific type of entity that is created to serve a user-driven goal and is subject to clearly defined Service Level Objectives (SLOs). A data product team is responsible for maintaining it, and it is owned by a single domain or stakeholder.\n\nIn contrast, a data asset can be any entity composed of data, such as databases or application output files. It does not have the same level of specificity or user-driven goal as a data product, and its maintenance and ownership may vary.\n\nBased on the context provided, a data product typically has a clear purpose and is designed to meet the consumption requirements of specific consumers. Its value is determined by how well it meets the needs of those consumers, and it can be deployed and maintained independently of other data products in the Data Mesh.'

"I don't know the difference between Data Mesh and Data Fabric. The context provided does not provide enough information to answer the question."

' Based on the provided context, Data Mesh appears to be a term used in the field of data management and architecture. It is not explicitly defined in the provided text, but it seems to refer to a comprehensive approach to managing and utilizing data within an organization.\n\nHowever, I cannot provide a definitive answer without more information or context. If you have any additional details or clarification regarding Data Mesh, please feel free to provide them, and I will do my best to assist you.'

"I'm not able to answer the question as the provided context does not provide enough information to differentiate between Source-Oriented Data Products (SODP) and Customer-Oriented Data Products (CODP). The context only provides a general definition of data products and their characteristics, but it does not provide any specific details about how SODP and CODP differ. Without more information, I cannot provide an accurate answer to the question."

' Based on the context provided, the three pillars of Data Mesh success are:\n\n1. Technology and Architecture: This pillar refers to the technical infrastructure and architecture that supports the Data Mesh implementation. It includes the tools, platforms, and systems used to collect, store, process, and analyze data.\n2. Data Strategy: This pillar focuses on the organizational strategy for managing and utilizing data. It involves developing a clear vision, goals, and objectives for data-driven decision making, as well as establishing processes and governance structures to support these efforts.\n3. People and Culture: This pillar encompasses the human element of Data Mesh success. It involves creating a culture of data-driven decision making, upskilling and reskilling employees, and fostering collaboration and innovation across different departments and teams.'

'Based on the provided context, the tools used in DataOps.live are:\n\n1. DBT (Data Builders) - for data transformation and data quality testing.\n2. JDBC (Java Database Connectivity) - as the standard input and output ports to support addressability.\n3. Collibra - for cataloging the product meta data.\n4. Monte Carlo - for monitoring.\n5. Snowflake - for roles definition and users mapping.\n6. Immuta - to define policy as code.\n7. DataOps.live - as the CI/CD pipeline.\n8. Schema - an isolated namespace on Snowflake for data storage.\n9. Service role - which has permission to read and write all internal data, input and output ports, publish metadata to the catalog, and publish metrics to the monitoring system.\n10. Owner role - that can create Data Product metadata, and can access all data and monitor metrics.\n\nAdditionally, a dedicated code repository with scaffold code for each data product is created in DataOps.live, which allows data products to be deployed independently of eac

  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)


In [16]:
tru.get_records_and_feedback(app_ids=[])

(                app_id                                           app_json  \
 0           normal_rag  {"tru_class_info": {"name": "TruChain", "modul...   
 1           normal_rag  {"tru_class_info": {"name": "TruChain", "modul...   
 2           normal_rag  {"tru_class_info": {"name": "TruChain", "modul...   
 3           normal_rag  {"tru_class_info": {"name": "TruChain", "modul...   
 4           normal_rag  {"tru_class_info": {"name": "TruChain", "modul...   
 5           normal_rag  {"tru_class_info": {"name": "TruChain", "modul...   
 6           normal_rag  {"tru_class_info": {"name": "TruChain", "modul...   
 7           normal_rag  {"tru_class_info": {"name": "TruChain", "modul...   
 8           normal_rag  {"tru_class_info": {"name": "TruChain", "modul...   
 9           normal_rag  {"tru_class_info": {"name": "TruChain", "modul...   
 10          normal_rag  {"tru_class_info": {"name": "TruChain", "modul...   
 11          normal_rag  {"tru_class_info": {"name": "TruChain",

In [17]:
tru.get_leaderboard(app_ids=[])

Unnamed: 0_level_0,qs_relevance,Ground Truth Agreement,groundedness_measure_with_cot_reasons,relevance,latency,total_cost
app_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
neo4j_parental_rag,0.966667,,0.458333,0.35,30.0,0.0
normal_rag,0.941667,1.0,0.718182,0.666667,33.333333,0.0


# Neo4j RAG (Parent Retriever)

In [208]:
graph = Neo4jGraph(
    url='bolt://localhost:7687',
    username='neo4j',
    password='password',
)

In [None]:
# Ingestion of data in the neo4j graph
from langchain.text_splitter import TokenTextSplitter
from langchain.document_loaders import WikipediaLoader


parent_splitter = TokenTextSplitter(chunk_size=1000, chunk_overlap=24)
child_splitter = TokenTextSplitter(chunk_size=100, chunk_overlap=24)

parent_documents = parent_splitter.split_documents(raw_documents)

for d in parent_documents:
    child_documents = child_splitter.split_documents([d])
    parent_text = d.page_content
    child_texts = [c.page_content for c in child_documents]

    # Create parent node and child nodes with relationships
    graph.query(
        """
        UNWIND $children AS child
        CREATE (p:Parent {text: $parent})
        CREATE (c:Child {text: child})
        CREATE (c)-[:HAS_PARENT]->(p)
        """,
        {"parent": parent_text, "children": child_texts}
    )

In [209]:
from langchain.vectorstores.neo4j_vector import Neo4jVector

retrieval_query = """
MATCH (node)-[:HAS_PARENT]->(parent)
RETURN parent.text AS text, score, {} AS metadata
"""

from langchain.embeddings import (
    OllamaEmbeddings,
    SentenceTransformerEmbeddings,
    BedrockEmbeddings,
)

vector_index = Neo4jVector.from_existing_graph(
    GPT4AllEmbeddings(),
    url='bolt://localhost:7687',
    username='neo4j',
    password='password',
    index_name="new_index_name",
    node_label="Child",
    text_node_properties=["text"],
    embedding_node_property="embedding",
    retrieval_query=retrieval_query,
)

bert_load_from_file: gguf version     = 2
bert_load_from_file: gguf alignment   = 32
bert_load_from_file: gguf data offset = 695552
bert_load_from_file: model name           = BERT
bert_load_from_file: model architecture   = bert
bert_load_from_file: model file type      = 1
bert_load_from_file: bert tokenizer vocab = 30522


In [210]:
rag_neo4j_chain = (
    {"context": retriever, "question": RunnablePassthrough()}
    | QA_CHAIN_PROMPT
    | ollama
    | StrOutputParser()
)

In [222]:
tru_neo4j_recorder = TruChain(rag_neo4j_chain,
    app_id='neo4j_parental_rag',
    feedbacks=[f_groundedness,f_qa_relevance,f_context_relevance,f_groundtruth])

In [239]:
with tru_neo4j_recorder as recording:
    for prompt in prompts:
        llm_response = rag_neo4j_chain.invoke(prompt)
        display(llm_response)

A new object of type <class 'langchain_core.vectorstores.VectorStoreRetriever'> at 0x2c74f45f0 is calling an instrumented method <function BaseRetriever.get_relevant_documents at 0x10bfa0400>. The path of this call may be incorrect.
Guessing path of new object is app.first.steps.context.first based on other object (0x10fcb7570) using this function.
A new object of type <class 'langchain_core.vectorstores.VectorStoreRetriever'> at 0x2c74f45f0 is calling an instrumented method <function VectorStoreRetriever._get_relevant_documents at 0x10d071080>. The path of this call may be incorrect.
Guessing path of new object is app.first.steps.context.first based on other object (0x10fcb7570) using this function.
App {app_id} was not present in database. Adding it.


'I don\'t know the answer to the question "What is Data Product?" based on the provided context. The context only provides information on the differences between data assets and data products, but does not provide a clear definition or explanation of what a data product is. Therefore, I cannot provide an answer to this question without additional information or clarification.'

A new object of type <class 'langchain_core.vectorstores.VectorStoreRetriever'> at 0x2c74f45f0 is calling an instrumented method <function BaseRetriever.get_relevant_documents at 0x10bfa0400>. The path of this call may be incorrect.
Guessing path of new object is app.first.steps.context.first based on other object (0x10fcb7570) using this function.
A new object of type <class 'langchain_core.vectorstores.VectorStoreRetriever'> at 0x2c74f45f0 is calling an instrumented method <function VectorStoreRetriever._get_relevant_documents at 0x10d071080>. The path of this call may be incorrect.
Guessing path of new object is app.first.steps.context.first based on other object (0x10fcb7570) using this function.


"I'm not able to answer the question as there is no clear difference between Data Mesh and Data Fabric provided in the context. The documents focus on Data Mesh, its implementation, and the need for organizational change, product thinking, technology decisions, and harmonious evolution of these aspects to successfully implement Data Mesh. There is no direct comparison or contrast between Data Mesh and Data Fabric in the given context. Therefore, I cannot provide a definitive answer to your question."

A new object of type <class 'langchain_core.vectorstores.VectorStoreRetriever'> at 0x2c74f45f0 is calling an instrumented method <function BaseRetriever.get_relevant_documents at 0x10bfa0400>. The path of this call may be incorrect.
Guessing path of new object is app.first.steps.context.first based on other object (0x10fcb7570) using this function.
A new object of type <class 'langchain_core.vectorstores.VectorStoreRetriever'> at 0x2c74f45f0 is calling an instrumented method <function VectorStoreRetriever._get_relevant_documents at 0x10d071080>. The path of this call may be incorrect.
Guessing path of new object is app.first.steps.context.first based on other object (0x10fcb7570) using this function.


"I don't know the answer to your question. The context you provided does not provide any information about what Data Mesh is, and I cannot make an educated guess based on the limited information provided."

A new object of type <class 'langchain_core.vectorstores.VectorStoreRetriever'> at 0x2c74f45f0 is calling an instrumented method <function BaseRetriever.get_relevant_documents at 0x10bfa0400>. The path of this call may be incorrect.
Guessing path of new object is app.first.steps.context.first based on other object (0x10fcb7570) using this function.
A new object of type <class 'langchain_core.vectorstores.VectorStoreRetriever'> at 0x2c74f45f0 is calling an instrumented method <function VectorStoreRetriever._get_relevant_documents at 0x10d071080>. The path of this call may be incorrect.
Guessing path of new object is app.first.steps.context.first based on other object (0x10fcb7570) using this function.
openai request failed <class 'openai.RateLimitError'>=Error code: 429 - {'error': {'message': 'You exceeded your current quota, please check your plan and billing details. For more information on this error, read the docs: https://platform.openai.com/docs/guides/error-codes/api-errors.', '

"I don't know the answer to the question. The provided context does not provide enough information to differentiate between Source-Oriented Data Products (SODP) and Customer-Oriented Data Products (CODP)."

A new object of type <class 'langchain_core.vectorstores.VectorStoreRetriever'> at 0x2c74f45f0 is calling an instrumented method <function BaseRetriever.get_relevant_documents at 0x10bfa0400>. The path of this call may be incorrect.
Guessing path of new object is app.first.steps.context.first based on other object (0x10fcb7570) using this function.
A new object of type <class 'langchain_core.vectorstores.VectorStoreRetriever'> at 0x2c74f45f0 is calling an instrumented method <function VectorStoreRetriever._get_relevant_documents at 0x10d071080>. The path of this call may be incorrect.
Guessing path of new object is app.first.steps.context.first based on other object (0x10fcb7570) using this function.
openai request failed <class 'openai.RateLimitError'>=Error code: 429 - {'error': {'message': 'You exceeded your current quota, please check your plan and billing details. For more information on this error, read the docs: https://platform.openai.com/docs/guides/error-codes/api-errors.', '

'Based on the provided context, the three pillars of Data Mesh success are:\n\n1. Empowering domains: This refers to the ability of Data Mesh to give control and access to data to different business domains, enabling them to make informed decisions and take actions based on data insights.\n2. Improving data utilization: Data Mesh aims to improve data utilization by providing a unified view of data across different systems and sources, making it easier for organizations to leverage data for various purposes such as analysis, reporting, and decision-making.\n3. Supporting future growth: By providing a scalable and flexible architecture, Data Mesh enables organizations to adapt to changing business needs and accommodate new technologies and systems, ensuring long-term success and growth.'

A new object of type <class 'langchain_core.vectorstores.VectorStoreRetriever'> at 0x2c74f45f0 is calling an instrumented method <function BaseRetriever.get_relevant_documents at 0x10bfa0400>. The path of this call may be incorrect.
Guessing path of new object is app.first.steps.context.first based on other object (0x10fcb7570) using this function.
A new object of type <class 'langchain_core.vectorstores.VectorStoreRetriever'> at 0x2c74f45f0 is calling an instrumented method <function VectorStoreRetriever._get_relevant_documents at 0x10d071080>. The path of this call may be incorrect.
Guessing path of new object is app.first.steps.context.first based on other object (0x10fcb7570) using this function.


'Based on the provided context, the tools used in Data Mesh are:\n\n1. DBT (Data Build Tool) - for data transformation and data quality testing.\n2. JDBC (Java Database Connectivity) - as the standard input and output ports to support addressability.\n3. Collibra - for cataloging product meta data.\n4. Monte Carlo - for monitoring.\n5. Snowflake - for defining roles and mapping users to appropriate roles to grant appropriate privileges to users.\n6. Immuta - to define policy as code.\n7. DataOps.live - as the CI/CD pipeline.\n8. Code - for ingestion, transformation, and publishing to output ports.\n9. Sample data - for unit tests and data quality tests.\n10. Infrastructure as code - to provision data pipelines, CI/CD pipelines, and other platform capabilities like storage, compute, monitoring configuration etc.\n11. Access policies as code - that specify who can access the data products and how.'

In [228]:
tru.get_records_and_feedback(app_ids=[])[0]

Unnamed: 0,app_id,app_json,type,record_id,input,output,tags,record_json,cost_json,perf_json,ts,latency,total_tokens,total_cost


In [240]:
tru.get_leaderboard(app_ids=[])

Unnamed: 0_level_0,qs_relevance,relevance,groundedness_measure_with_cot_reasons,latency,total_cost
app_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
neo4j_parental_rag,0.966667,0.35,0.458333,30.0,0.0
normal_rag,0.883333,0.6,0.65,30.0,0.0


In [70]:
tru.start_dashboard()

Starting dashboard ...
Config file already exists. Skipping writing process.
Credentials file already exists. Skipping writing process.
Dashboard already running at path:   Network URL: http://192.168.1.2:8501



<Popen: returncode: None args: ['streamlit', 'run', '--server.headless=True'...>

In [119]:
tru.start_dashboard()

Starting dashboard ...
Config file already exists. Skipping writing process.
Credentials file already exists. Skipping writing process.


Accordion(children=(VBox(children=(VBox(children=(Label(value='STDOUT'), Output())), VBox(children=(Label(valu…

Dashboard started at http://192.168.1.2:8501 .


<Popen: returncode: None args: ['streamlit', 'run', '--server.headless=True'...>

In [233]:
tru.reset_database()