In [109]:
import os
import dlt
import ollama
import lancedb
from datetime import datetime, timezone
from rest_api import RESTAPIConfig, rest_api_source
from dlt.sources.helpers.rest_client.paginators import BasePaginator, JSONResponsePaginator
from dlt.sources.helpers.requests import Response, Request
from dlt.destinations.adapters import lancedb_adapter

In [110]:
# os.environ["SOURCES__REST_API__NOTION__API_KEY"] = ""
os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL_PROVIDER"] = "sentence-transformers"
os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL"] = "all-MiniLM-L6-v2"
os.environ["DESTINATION__LANCEDB__CREDENTIALS__URI"] = ".lancedb"

In [111]:
!yes | dlt init rest_api lancedb

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Looking up the init scripts in [1mhttps://github.com/dlt-hub/verified-sources.git[0m...
No files to update, exiting
yes: standard output: Broken pipe


In [112]:
class PostBodyPaginator(BasePaginator):
    def __init__(self):
        super().__init__()
        self.cursor = None

    def update_state(self, response: Response) -> None:
        # Assuming the API returns an empty list when no more data is available
        if not response.json():
            self._has_next_page = False
        else:
            self.cursor = response.json().get("next_cursor")
            if self.cursor is None:
                self._has_next_page = False

    def update_request(self, request: Request) -> None:
        if request.json is None:
            request.json = {}

        # Add the cursor to the request body
        request.json["start_cursor"] = self.cursor

In [113]:
@dlt.resource(name="homework")
def rest_api_notion_resource():
    notion_config: RESTAPIConfig = {
        "client": {
            "base_url": "https://api.notion.com/v1/",
            "auth": {
                "token": dlt.secrets["sources.rest_api.notion.api_key"]
            },
            "headers": {
                "Content-Type": "application/json",
                "Notion-Version": "2022-06-28"
            }
        },
        "resources": [
            {
                "name": "search",
                "endpoint": {
                    "path": "search",
                    "method": "POST",
                    "paginator": PostBodyPaginator(),
                    "json": {
                        "query": "homework",
                        "sort": {
                            "direction": "ascending",
                            "timestamp": "last_edited_time"
                        }
                    },
                    "data_selector": "results"
                }
            },
            {
                "name": "page_content",
                "endpoint": {
                    "path": "blocks/{page_id}/children",
                    "paginator": JSONResponsePaginator(),
                    "params": {
                        "page_id": {
                            "type": "resolve",
                            "resource": "search",
                            "field": "id"
                        }
                    },
                }
            }
        ]
    }
    yield from rest_api_source(notion_config, name="homework")


In [114]:
def extract_page_content(response):
   block_id = response["id"]
   last_edited_time = response["last_edited_time"]
   block_type = response.get("type", "Not paragraph")
   if block_type != "paragraph":
       content = ""
   else:
       try:
           content = response["paragraph"]["rich_text"][0]["plain_text"]
       except IndexError:
           content = ""
   return {
       "block_id": block_id,
       "block_type": block_type,
       "content": content,
       "last_edited_time": last_edited_time,
       "inserted_at_time": datetime.now(timezone.utc)
   }

In [115]:
@dlt.resource(
    name="homework",
    write_disposition="merge",
    primary_key="block_id",
    columns={"last_edited_time":{"dedup_sort":"desc"}}
    )
def rest_api_notion_incremental(
    last_edited_time = dlt.sources.incremental("last_edited_time", initial_value="2024-06-26T08:16:00.000Z",primary_key=("block_id"))
):
    last_value = last_edited_time.last_value
    print(last_value)
    
    for block in rest_api_notion_resource.add_map(extract_page_content):
        if not(len(block["content"])):
            continue
        yield block

In [116]:
def load_notion() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="company_policies",
        destination="lancedb",
        dataset_name="notion_pages",
        # full_refresh=True
    )

    load_info = pipeline.run(
        lancedb_adapter(
            rest_api_notion_incremental,
            embed="content"
        ),
        table_name="homework",
        write_disposition="merge"
    )
    print(load_info)

load_notion()

2024-07-05T23:33:00.000Z
Pipeline company_policies load step completed in ---
0 load package(s) were loaded to destination LanceDB and into dataset None
The LanceDB destination used <dlt.destinations.impl.lancedb.configuration.LanceDBCredentials object at 0x7f245b740410> location to store data


In [117]:
db = lancedb.connect(".lancedb")
dbtable = db.open_table("notion_pages___homework")

homework_table = dbtable.to_pandas()

## Q1. Rows in LanceDB

In [118]:
len(homework_table)

17

## Q2. Running the Pipeline: Last edited time

2024-07-05T23:33:00.000Z


## Q3. Ask the Assistant

In [119]:
def retrieve_context_from_lancedb(dbtable, question, top_k=2):
    query_results = dbtable.search(query=question).to_list()
    context = "\n".join([result["content"] for result in query_results[:top_k]])
    return context

In [120]:
def assistant(question):
    db = lancedb.connect(".lancedb")
    dbtable = db.open_table("notion_pages___homework")
    context = retrieve_context_from_lancedb(dbtable, question, top_k=1)
    messages = [
        {
            "role": "system",
            "content": "You are a support assistant that answers questions on a company's employee handbook. The user will first ask you a question and then provide you relevant paragraphs from the handbook as context. Analyse the context and give a short answer."}
    ]
    messages.append(
        {"role": "user", "content": f"Question: '{question}'\n##\n Context:'{context}'"}
    )
    response = ollama.chat(
        model="llama3",
        messages=messages
    )
    response_content = response['message']['content']
    print(f"Assistant: {response_content}")
    messages.append(
        {"role": "assistant", "content":response_content}
    )

In [121]:
question = "How many PTO days are the employees entitled to in a year"
assistant(question)

Assistant: According to the provided context from the employee handbook, employees are entitled to 30 days of Paid Time Off (PTO) per year.
