In [13]:
import os
  
os.environ["SOURCES__REST_API__NOTION__API_KEY"] = "notion api key"

os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL_PROVIDER"] = "sentence-transformers"
os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL"] = "all-MiniLM-L6-v2"

os.environ["DESTINATION__LANCEDB__CREDENTIALS__URI"] = ".lancedb"
os.environ["DESTINATION__LANCEDB__CREDENTIALS__API_KEY"] = "api_key"
os.environ["DESTINATION__LANCEDB__CREDENTIALS__EMBEDDING_MODEL_PROVIDER_API_KEY"] = "embedding_model_provider_api_key"

In [14]:
import dlt
from rest_api import RESTAPIConfig, rest_api_source

from dlt.sources.helpers.rest_client.paginators import BasePaginator, JSONResponsePaginator
from dlt.sources.helpers.requests import Response, Request

from dlt.destinations.adapters import lancedb_adapter

In [16]:
class PostBodyPaginator(BasePaginator):
      def __init__(self):
          super().__init__()
          self.cursor = None

      def update_state(self, response: Response) -> None:
          # Assuming the API returns an empty list when no more data is available
          if not response.json():
              self._has_next_page = False
          else:
              self.cursor = response.json().get("next_cursor")
              if self.cursor is None:
                  self._has_next_page = False

      def update_request(self, request: Request) -> None:
          if request.json is None:
              request.json = {}

          # Add the cursor to the request body
          request.json["start_cursor"] = self.cursor

In [17]:
def extract_page_content(response):
   block_id = response["id"]
   last_edited_time = response["last_edited_time"]
   block_type = response.get("type", "Not paragraph")
   if block_type != "paragraph":
       content = ""
   else:
       try:
           content = response["paragraph"]["rich_text"][0]["plain_text"]
       except IndexError:
           content = ""
   return {
       "block_id": block_id,
       "block_type": block_type,
       "content": content,
       "last_edited_time": last_edited_time,
       "inserted_at_time": datetime.now(timezone.utc)
   }

In [18]:
@dlt.resource(
   name="employee_handbook",
   write_disposition="merge",
   primary_key="block_id",
   columns={"last_edited_time":{"dedup_sort":"desc"}}
)
def rest_api_notion_incremental(
   last_edited_time = dlt.sources.incremental("last_edited_time", initial_value="2024-06-26T08:16:00.000Z",primary_key=("block_id"))
):
   for block in rest_api_notion_resource.add_map(extract_page_content):   
       if not(len(block["content"])):
           continue
       yield block

In [19]:
from datetime import datetime, timezone

class PostBodyPaginator(BasePaginator):
    def __init__(self):
        super().__init__()
        self.cursor = None

    def update_state(self, response: Response) -> None:
        # Assuming the API returns an empty list when no more data is available
        if not response.json():
            self._has_next_page = False
        else:
            self.cursor = response.json().get("next_cursor")
            if self.cursor is None:
                self._has_next_page = False

    def update_request(self, request: Request) -> None:
        if request.json is None:
            request.json = {}

        # Add the cursor to the request body
        request.json["start_cursor"] = self.cursor

@dlt.resource(name="employee_handbook")
def rest_api_notion_resource():
    notion_config: RESTAPIConfig = {
        "client": {
            "base_url": "https://api.notion.com/v1/",
            "auth": {
                "token": dlt.secrets["sources.rest_api.notion.api_key"]
            },
            "headers":{
            "Content-Type": "application/json",
            "Notion-Version": "2022-06-28"
            }
        },
        "resources": [
            {
                "name": "search",
                "endpoint": {
                    "path": "search",
                    "method": "POST",
                    "paginator": PostBodyPaginator(),
                    "json": {
                        "query": "Homework",
                        "sort": {
                            "direction": "ascending",
                            "timestamp": "last_edited_time"
                        }
                    },
                    "data_selector": "results"
                }
            },
            {
                "name": "page_content",
                "endpoint": {
                    "path": "blocks/{page_id}/children",
                    "paginator": JSONResponsePaginator(),
                    "params": {
                        "page_id": {
                            "type": "resolve",
                            "resource": "search",
                            "field": "id"
                        }
                    },
                }
            }
        ]
    }

    yield from rest_api_source(notion_config,name="employee_handbook")

def extract_page_content(response):
    block_id = response["id"]
    last_edited_time = response["last_edited_time"]
    block_type = response.get("type", "Not paragraph")
    if block_type != "paragraph":
        content = ""
    else:
        try:
            content = response["paragraph"]["rich_text"][0]["plain_text"]
        except IndexError:
            content = ""
    return {
        "block_id": block_id,
        "block_type": block_type,
        "content": content,
        "last_edited_time": last_edited_time,
        "inserted_at_time": datetime.now(timezone.utc)
    }

@dlt.resource(
    name="employee_handbook",
    write_disposition="merge",
    primary_key="block_id",
    columns={"last_edited_time":{"dedup_sort":"desc"}}
    )
def rest_api_notion_incremental(
    last_edited_time = dlt.sources.incremental("last_edited_time", initial_value="2024-06-26T08:16:00.000Z",primary_key=("block_id"))
):
    # last_value = last_edited_time.last_value
    # print(last_value)

    for block in rest_api_notion_resource.add_map(extract_page_content):
        if not(len(block["content"])):
            continue
        yield block

def load_notion() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="employee_handbook",
        destination="lancedb",
        dataset_name="notion_pages",
        # full_refresh=True
    )

    load_info = pipeline.run(
        lancedb_adapter(
            rest_api_notion_incremental,
            embed="content"
        ),
        table_name="employee_handbook",
        write_disposition="merge"
    )
    print(load_info)

load_notion()

Pipeline employee_handbook load step completed in ---
0 load package(s) were loaded to destination LanceDB and into dataset None
The LanceDB destination used <dlt.destinations.impl.lancedb.configuration.LanceDBCredentials object at 0x79b86e3b2b60> location to store data


In [20]:
import lancedb

db = lancedb.connect(".lancedb")
dbtable = db.open_table("notion_pages___employee_handbook")


df = dbtable.to_pandas()

print(df)

len(df.index)




                                    id__  \
0   6adeb540-d180-5d40-bc84-c40e5c173ea1   
1   25cd721d-fd64-517f-9b3b-34e3fad3522e   
2   c75b7ef9-96b6-551b-9cdd-795bbe01bb6e   
3   7a69c4c0-cd55-5090-903e-facf23eadde5   
4   ff1141dc-88f6-500a-a8c3-c18e37661650   
5   a28e913f-761f-5684-8cd5-0d0c49e0338c   
6   a18932d9-1583-5c42-bd0d-0f96738c5e6c   
7   93661874-13a2-5a43-bed8-868005dfd5e2   
8   b220778f-1118-5c22-b614-3bc0fd0a602b   
9   d0f801ba-d3cc-5252-ad6e-3285662b609c   
10  579b97f9-a5e2-53af-b4f7-efc9ad5105ad   
11  a9083b7e-22cc-5b1f-8040-cb7aa1f72338   
12  cffdb1bb-a146-5e90-8fbb-a1d577a2a98e   
13  71e89a85-ae0b-5b68-866b-bd3922ec7548   

                                             vector__  \
0   [-0.038923826, 0.12081745, 0.046208546, -0.005...   
1   [-0.10974317, 0.10586079, 0.003290621, -0.0213...   
2   [0.050755586, -0.06461987, 0.0652738, 0.014652...   
3   [0.0005233448, -0.054883398, 0.043573365, -0.0...   
4   [0.03802628, -0.02150967, 0.0475278, 0.0647069... 

14

In [21]:
from datetime import datetime, timezone
from typing import Any, Dict, Generator
import dlt

class PostBodyPaginator(BasePaginator):
    def __init__(self):
        super().__init__()
        self.cursor = None

    def update_state(self, response: Response) -> None:
        # Assuming the API returns an empty list when no more data is available
        if not response.json():
            self._has_next_page = False
        else:
            self.cursor = response.json().get("next_cursor")
            if self.cursor is None:
                self._has_next_page = False

    def update_request(self, request: Request) -> None:
        if request.json is None:
            request.json = {}

        # Add the cursor to the request body
        request.json["start_cursor"] = self.cursor

@dlt.resource(name="homework_page")
def rest_api_notion_homework_resource():
    notion_config: RESTAPIConfig = {
        "client": {
            "base_url": "https://api.notion.com/v1/",
            "auth": {
                "token": dlt.secrets["sources.rest_api.notion.api_key"]
            },
            "headers": {
                "Content-Type": "application/json",
                "Notion-Version": "2022-06-28"
            }
        },
        "resources": [
            {
                "name": "search",
                "endpoint": {
                    "path": "search",
                    "method": "POST",
                    "paginator": PostBodyPaginator(),
                    "json": {
                        "query": "Homework: Employee handbook",
                        "sort": {
                            "direction": "ascending",
                            "timestamp": "last_edited_time"
                        }
                    },
                    "data_selector": "results"
                }
            },
            {
                "name": "page_content",
                "endpoint": {
                    "path": "blocks/{page_id}/children",
                    "paginator": JSONResponsePaginator(),
                    "params": {
                        "page_id": {
                            "type": "resolve",
                            "resource": "search",
                            "field": "id"
                        }
                    },
                }
            }
        ]
    }

    yield from rest_api_source(notion_config, name="homework_page")

def extract_page_content(response: Dict[str, Any]) -> Dict[str, Any]:
    block_id = response["id"]
    last_edited_time = response["last_edited_time"]
    block_type = response.get("type", "Not paragraph")
    if block_type != "paragraph":
        content = ""
    else:
        try:
            content = response["paragraph"]["rich_text"][0]["plain_text"]
        except IndexError:
            content = ""
    return {
        "block_id": block_id,
        "block_type": block_type,
        "content": content,
        "last_edited_time": last_edited_time,
        "inserted_at_time": datetime.now(timezone.utc)
    }

@dlt.resource(
    name="homework",
    write_disposition="merge",
    primary_key="block_id",
    columns={"last_edited_time": {"dedup_sort": "desc"}}
)
def rest_api_notion_homework_incremental(
    last_edited_time=dlt.sources.incremental("last_edited_time", initial_value="2024-06-26T08:16:00.000Z", primary_key=("block_id"))
):
    for block in rest_api_notion_homework_resource.add_map(extract_page_content):
        if not len(block["content"]):
            continue
        yield block

def load_homework() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="homework",
        destination="lancedb",
        dataset_name="notion_pages_homework",
        # full_refresh=True
    )

    load_info = pipeline.run(
        lancedb_adapter(
            rest_api_notion_homework_incremental,
            embed="content"
        ),
        table_name="homework",
        write_disposition="merge"
    )
    print(load_info)


load_homework()

Pipeline homework load step completed in ---
0 load package(s) were loaded to destination LanceDB and into dataset None
The LanceDB destination used <dlt.destinations.impl.lancedb.configuration.LanceDBCredentials object at 0x79b86e3b32b0> location to store data


In [22]:
import lancedb

db = lancedb.connect(".lancedb")
dbtable2 = db.open_table("notion_pages_homework___homework")


homework = dbtable2.to_pandas()

print(homework)

len(homework.index)


                                    id__  \
0   be35ff12-5aa0-5bce-9487-823aed7fc757   
1   b8e977e1-bcb0-5f4b-9e65-eb462ebfc2d7   
2   aa18f499-5edb-5b25-955d-c12b427e6216   
3   bedef311-c67e-51cd-9e1c-44ee9c4a310f   
4   0dd48a88-c404-5a05-bff0-ddfd46fc24f8   
5   452ce00a-0053-5e79-b217-0f08abf0d7af   
6   19b069f2-d5e1-5c0f-9330-9e0a2f127941   
7   f256aff1-78f7-5c30-a6a3-c8b532fd8662   
8   563dfe54-8fe5-508f-aadf-c34b272d6d65   
9   e06fcdeb-be04-50dd-8053-ea3d6bd41952   
10  45694a3b-553f-5987-871f-91e05f9587c6   
11  1b8de022-aea1-5ad5-94dd-2e52e974d1be   
12  1189f97b-cb54-5f13-812e-3686df64191b   
13  2bf04841-5d92-5c97-9cd8-dab9b0debb5f   
14  57502397-f159-55f4-80a7-e37a96d48823   
15  bc7c8180-c067-5145-8e8d-dcc1076ae609   
16  3b66ad77-6e81-5281-b904-e13451bf49d1   

                                             vector__  \
0   [-0.024265619, 0.04746082, -0.011796436, 0.063...   
1   [-0.04966163, 0.10853516, -0.009762589, -0.036...   
2   [-0.06316319, 0.17331506, 0.0253

17

In [None]:
# Find the latest edit time
latest_edit_time = homework['last_edited_time'].max()

print(f'The latest edit time is: {latest_edit_time}')

In [23]:
import ollama

In [24]:
def retrieve_context_from_lancedb(dbtable, question, top_k=2):

    query_results = dbtable.search(query=question).to_list()
    context = "\n".join([result["content"] for result in query_results[:top_k]])

    return context

In [25]:
def main():
  # Connect to the lancedb table
  db = lancedb.connect(".lancedb")
  dbtable = db.open_table("notion_pages_homework___homework")

  # A system prompt telling ollama to accept input in the form of "Question: ... ; Context: ..."
  messages = [
      {"role": "system", "content": "You are a helpful assistant that helps users understand policies inside a company's employee handbook. The user will first ask you a question and then provide you relevant paragraphs from the handbook as context. Please answer the question based on the provided context. For any details missing in the paragraph, encourage the employee to contact the HR for that information. Please keep the responses conversational."}
  ]

  while True:
    # Accept user question
    question = input("You: ")

    # Retrieve the relevant paragraphs on the question
    context = retrieve_context_from_lancedb(dbtable,question,top_k=2)

    # Create a user prompt using the question and retrieved context
    messages.append(
        {"role": "user", "content": f"Question: '{question}'; Context:'{context}'"}
    )

    # Get the response from the LLM
    response = ollama.chat(
        model="llama2-uncensored",
        messages=messages
    )
    response_content = response['message']['content']
    print(f"Assistant: {response_content}")

    # Add the response into the context window
    messages.append(
        {"role": "assistant", "content":response_content}
    )

In [26]:
if __name__ == "__main__":
    main()