In [2]:
import os
os.chdir("/home/ubuntu/llm-zoomcamp/llm-zoomcamp-2024/workshops-dlt")

In [3]:
from dotenv import load_dotenv
load_dotenv()

True

In this demo, we will be creating an LLM chat bot that **has the latest knowledge** of the employee handbook of a fictional company. We will be able to chat to it about specific policies like PTO, work from home etc.

To build this, we would need to do three things:
1. The company policies exist in a [Notion Page](https://dlthub.notion.site/Employee-handbook-669c2a1e04044465811c8ca22977685d). We will need to first **extract** the text from these pages.
2. Once extracted, we will want to **embed** them into vectors and then store them in a vector database.
3. This will allow us to create our **RAG**: a function that would **accept a user question**, **match it** to the information stored in the vector database, and then **send the question + relevant information** as input to the LLM.

1. <ins>dlt</ins> for **data ingestion**:
    - dlt can easily connect to any REST API source (like <ins>Notion</ins>)
    - It also has integrations with vector databases, like LanceDB
    - It also allows to easily plug in functionality like incremental loading.
2. <ins>LanceDB</ins> as vector database:
    - LanceDB is an open-source vector database that is very easy to use and integrate into python workflows.
    - It is in-process and severless (like DuckDB), which makes querying and retrieval very efficient.
3. <ins>Ollama</ins> for RAG:
    - Ollama is an open-source and allows you to easily run LLMs locally.

In [4]:
import dlt
from rest_api import RESTAPIConfig, rest_api_source

from dlt.sources.helpers.rest_client.paginators import (
    BasePaginator, 
    JSONResponsePaginator
)

from dlt.sources.helpers.requests import Response, Request

from dlt.destinations.adapters import lancedb_adapter

In [7]:
from datetime import datetime, timezone

class PostBodyPaginator(BasePaginator):
    def __init__(self):
        super().__init__()
        self.cursor = None

    def update_state(self, response: Response) -> None:
        # Assuming the API returns an empty list when no more data is available
        if not response.json():
            self._has_next_page = False
        else:
            self.cursor = response.json().get("next_cursor")
            if self.cursor is None:
                self._has_next_page = False

    def update_request(self, request: Request) -> None:
        if request.json is None:
            request.json = {}

        # Add the cursor to the request body
        request.json["start_cursor"] = self.cursor

@dlt.resource(name="employee_handbook")
def rest_api_notion_resource():
    notion_config: RESTAPIConfig = {
        "client": {
            "base_url": "https://api.notion.com/v1/",
            "auth": {
                "token": dlt.secrets["sources.rest_api.notion.api_key"]
            },
            "headers":{
            "Content-Type": "application/json",
            "Notion-Version": "2022-06-28"
            }
        },
        "resources": [
            {
                "name": "search",
                "endpoint": {
                    "path": "search",
                    "method": "POST",
                    "paginator": PostBodyPaginator(),
                    "json": {
                        "query": "workshop",
                        "sort": {
                            "direction": "ascending",
                            "timestamp": "last_edited_time"
                        }
                    },
                    "data_selector": "results"
                }
            },
            {
                "name": "page_content",
                "endpoint": {
                    "path": "blocks/{page_id}/children",
                    "paginator": JSONResponsePaginator(),
                    "params": {
                        "page_id": {
                            "type": "resolve",
                            "resource": "search",
                            "field": "id"
                        }
                    },
                }
            }
        ]
    }

    yield from rest_api_source(notion_config,name="employee_handbook")

def extract_page_content(response):
    block_id = response["id"]
    last_edited_time = response["last_edited_time"]
    block_type = response.get("type", "Not paragraph")
    if block_type != "paragraph":
        content = ""
    else:
        try:
            content = response["paragraph"]["rich_text"][0]["plain_text"]
        except IndexError:
            content = ""
    return {
        "block_id": block_id,
        "block_type": block_type,
        "content": content,
        "last_edited_time": last_edited_time,
        "inserted_at_time": datetime.now(timezone.utc)
    }

@dlt.resource(
    name="employee_handbook",
    write_disposition="merge",
    primary_key="block_id",
    columns={"last_edited_time":{"dedup_sort":"desc"}}
    )
def rest_api_notion_incremental(
    last_edited_time = dlt.sources.incremental("last_edited_time", initial_value="2024-06-26T08:16:00.000Z",primary_key=("block_id"))
):
    # last_value = last_edited_time.last_value
    # print(last_value)

    for block in rest_api_notion_resource.add_map(extract_page_content):
        if not(len(block["content"])):
            continue
        yield block

def load_notion() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="company_policies",
        destination="lancedb",
        dataset_name="notion_pages",
        # full_refresh=True
    )

    load_info = pipeline.run(
        lancedb_adapter(
            rest_api_notion_incremental,
            embed="content"
        ),
        table_name="employee_handbook",
        write_disposition="merge"
    )
    print(load_info)

load_notion()

Pipeline company_policies load step completed in 12.98 seconds
1 load package(s) were loaded to destination LanceDB and into dataset notion_pages
The LanceDB destination used <dlt.destinations.impl.lancedb.configuration.LanceDBCredentials object at 0x74d34c29e190> location to store data
Load package 1721566070.0416813 is LOADED and contains no failed jobs


In [4]:
import lancedb

db = lancedb.connect(".lancedb/")
dbtable = db.open_table("notion_pages___employee_handbook")

dbtable.to_pandas()

Unnamed: 0,id__,vector__,block_id,block_type,content,last_edited_time,inserted_at_time,_dlt_load_id,_dlt_id
0,71e89a85-ae0b-5b68-866b-bd3922ec7548,"[-0.05858811, -0.07540446, 0.033775203, 0.0096...",c0262981-b5f1-4a57-a91f-2e75f649b86c,paragraph,Our company operates between 9 a.m. to 7 p.m. ...,2024-07-18 14:00:00+00:00,2024-07-21 12:47:51.014621+00:00,1721566070.0416813,zdGtvD9rVXJ1pg
1,a28e913f-761f-5684-8cd5-0d0c49e0338c,"[-0.004968941, -0.003911972, 0.028705625, 0.00...",faacf4ec-90be-4e96-b8b9-29b5112bc7ca,paragraph,Employees receive [20 days] of Paid Time Off (...,2024-06-26 09:03:00+00:00,2024-07-21 12:47:51.017945+00:00,1721566070.0416813,1j+AgSHFk7b24Q
2,a18932d9-1583-5c42-bd0d-0f96738c5e6c,"[0.032060888, 0.024244698, 0.008471344, 0.0317...",e6021a51-f403-4950-80c2-ebff005c7289,paragraph,Our company observes the following holidays: N...,2024-06-26 09:08:00+00:00,2024-07-21 12:47:51.018169+00:00,1721566070.0416813,l/3MYHMoYIRgag
3,93661874-13a2-5a43-bed8-868005dfd5e2,"[-0.0131553095, 0.008382407, 0.017044391, 0.05...",b8f4cc6d-c28c-4071-9545-caadce5eb37b,paragraph,These holidays are considered “off-days” for m...,2024-06-26 09:09:00+00:00,2024-07-21 12:47:51.018310+00:00,1721566070.0416813,yHYsxgLby5G8pA
4,b220778f-1118-5c22-b614-3bc0fd0a602b,"[0.027987516, 0.067343615, 0.03980646, 0.00774...",ea7a1beb-6874-4f41-966d-dc1f80a1f635,paragraph,Employees who are unable to work due to illnes...,2024-06-26 09:11:00+00:00,2024-07-21 12:47:51.018447+00:00,1721566070.0416813,PUljzl1+jrHfyg
5,d0f801ba-d3cc-5252-ad6e-3285662b609c,"[0.03252615, 0.008159476, 0.08443566, 0.055641...",bd7a9110-fac5-4270-9493-4039ca67b467,paragraph,Losing a loved one is traumatizing. If this ha...,2024-06-26 09:17:00+00:00,2024-07-21 12:47:51.018577+00:00,1721566070.0416813,2zbb6kXde58E0Q
6,579b97f9-a5e2-53af-b4f7-efc9ad5105ad,"[-0.007314052, 0.014710642, -0.019091193, 0.02...",b1718dee-8c0f-4189-8c75-0e8c7844a501,paragraph,"In accordance with German law, we offer a comp...",2024-06-26 09:20:00+00:00,2024-07-21 12:47:51.018805+00:00,1721566070.0416813,YpNM9ptdF181kg
7,a9083b7e-22cc-5b1f-8040-cb7aa1f72338,"[-0.031538427, 0.0342599, -0.027282646, 0.0275...",5bfa90c5-461d-406a-9324-a1dd54bad0d5,paragraph,We recognize the vital role that fathers and p...,2024-06-26 09:21:00+00:00,2024-07-21 12:47:51.018954+00:00,1721566070.0416813,mfzQRc94pxgEWg
8,6adeb540-d180-5d40-bc84-c40e5c173ea1,"[-0.03892389, 0.1208173, 0.046208583, -0.00543...",baac0ba4-9b60-450e-8cc1-1e6e2a0fb7d9,paragraph,"In this section, we describe what we offer to ...",2024-07-03 17:34:00+00:00,2024-07-21 12:47:51.473789+00:00,1721566070.0416813,QJgmOTNIkTy9MA
9,cffdb1bb-a146-5e90-8fbb-a1d577a2a98e,"[-0.07571499, 0.14543605, 0.0011521844, -0.024...",0e429073-6383-4918-8961-fcc66346067f,paragraph,{edited} Employee health is important to us. W...,2024-07-18 17:28:00+00:00,2024-07-21 12:47:51.474057+00:00,1721566070.0416813,f4u7puRBqd/MlQ


In [10]:
def retrieve_context_from_lancedb(dbtable, question, top_k=2):
    query_results = dbtable.search(query=question).to_list()
    context = "\n".join([result["content"] for result in query_results[:top_k]])
    return context

In [11]:
from openai import OpenAI

def main():
    # Connect to the lancedb table
    db = lancedb.connect(".lancedb/")
    dtable = db.open_table("notion_pages___employee_handbook")

    # A system prompt telling ollama to accept input in the form of 
    # "Question: ... ; Context: ..."
    system_prompt = """You are a helpful assistant that helps users understand policies inside a company's employee handbook. 
    The user will first ask you a question and then provide you relevant paragraphs from the handbook as context. 
    Please answer the question based on the provided context. 
    For any details missing in the paragraph, encourage the employee to contact the HR for that information. 
    Please keep the responses conversational."""

    # Initialize the messages list with the system prompt
    messages = [{"role": "system", "content": system_prompt}]

    while True:
        # Accept user question
        question = input("You: ")

        # Retrieve the relevat paragraphs on the question
        context = retrieve_context_from_lancedb(dbtable, question, top_k=2)

        user_prompt = f"Question: '{question}'; Context: '{context}'"

        # Create a user prompt using the question and retrieve context
        messages.append(
            {"role": "user", "content": user_prompt}
        )

        # Get the response from the LLM
        client = OpenAI(
            base_url="http://localhost:11434/v1/",
            api_key='ollama'
        )

        response = client.chat.completions.create(
            model="llama2-uncensored",
            messages=messages,
        )
        response_content = response.choices[0].message.content
        print(f"Assistant: {response_content}")

        # Add the response into the context window
        messages.append(
            {"role": "assistant", "content": response_content}
        )


In [12]:
main()

Assistant: Based on the provided context, an employee can ask you how many vacation days they get each year as long as it's not a religious holiday that isn't included in our list. They should also know that if their floating day falls on a company-observed holiday, they won't be able to take that time off for that specific holiday. In addition, an employee must use their PTO within 12 months after the date of the observed holiday, as specified in the policy.
Assistant: Based on the provided context, an employee can ask you about maternity leave benefits. However, they must remember that it's based on German law, so specific regulations may vary depending on the location of your company. To get more information or clarification, encourage them to consult with their supervisor and the Human Resources department. Additionally, if an employee has a partner who wants paternity leave after the child's arrival, they can ask for it. However, as with maternity leave benefits, the specific regu

KeyboardInterrupt: 