# Open source data ingestion for RAGs with dlt

# Homework

In the workshop, we extracted contents from two pages in notion titled "Workshop: Benefits and Perks" and "Workshop: Working hours, PTO, and Vacation".

Repeat the same process for a third page titled "Homework: Employee handbook" (hidden from public view, but accessible via API key):

1. Modify the REST API source to extract only this page.
2. Write the output into a separate table called "homework".
3. Remember to update the table name in all cells where you connect to a lancedb table.

To do this you can use the [workshop Colab](https://colab.research.google.com/drive/1nNOybHdWQiwUUuJFZu__xvJxL_ADU3xl?usp=sharing) as a basis.

Now, answer the following questions:  

## Q1. Rows in LanceDB

How many rows does the lancedb table "notion_pages__homework" have?

* 14
* 15
* 16
* 17

In [1]:
%%capture
!pip install dlt[lancedb]
!pip install sentence-transformers

In [2]:
!nvidia-smi

Fri Jul 19 19:14:32 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.104.05             Driver Version: 535.104.05   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  Tesla T4                       Off | 00000000:00:04.0 Off |                    0 |
| N/A   50C    P8              12W /  70W |      0MiB / 15360MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

In [3]:
!yes | dlt init rest_api lancedb

Looking up the init scripts in [1mhttps://github.com/dlt-hub/verified-sources.git[0m...
No files to update, exiting


In [4]:
import os
from google.colab import userdata

os.environ["SOURCES__REST_API__NOTION__API_KEY"] = userdata.get("SOURCES__REST_API__NOTION__API_KEY")

os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL_PROVIDER"] = "sentence-transformers"
os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL"] = "all-MiniLM-L6-v2"

os.environ["DESTINATION__LANCEDB__CREDENTIALS__URI"] = ".lancedb"

In [5]:
import dlt
from rest_api import RESTAPIConfig, rest_api_source

from dlt.sources.helpers.rest_client.paginators import BasePaginator, JSONResponsePaginator
from dlt.sources.helpers.requests import Response, Request

from dlt.destinations.adapters import lancedb_adapter

In [6]:
from datetime import datetime, timezone

class PostBodyPaginator(BasePaginator):
    def __init__(self):
        super().__init__()
        self.cursor = None

    def update_state(self, response: Response) -> None:
        # Assuming the API returns an empty list when no more data is available
        if not response.json():
            self._has_next_page = False
        else:
            self.cursor = response.json().get("next_cursor")
            if self.cursor is None:
                self._has_next_page = False

    def update_request(self, request: Request) -> None:
        if request.json is None:
            request.json = {}

        # Add the cursor to the request body
        request.json["start_cursor"] = self.cursor

@dlt.resource(name="employee_handbook")
def rest_api_notion_resource():
    notion_config: RESTAPIConfig = {
        "client": {
            "base_url": "https://api.notion.com/v1/",
            "auth": {
                "token": dlt.secrets["sources.rest_api.notion.api_key"]
            },
            "headers":{
            "Content-Type": "application/json",
            "Notion-Version": "2022-06-28"
            }
        },
        "resources": [
            {
                "name": "search",
                "endpoint": {
                    "path": "search",
                    "method": "POST",
                    "paginator": PostBodyPaginator(),
                    "json": {
                        "query": "Homework: Employee handbook",
                        "sort": {
                            "direction": "ascending",
                            "timestamp": "last_edited_time"
                        }
                    },
                    "data_selector": "results"
                }
            },
            {
                "name": "homework_page_content",
                "endpoint": {
                    "path": "blocks/{page_id}/children",
                    "paginator": JSONResponsePaginator(),
                    "params": {
                        "page_id": {
                            "type": "resolve",
                            "resource": "search",
                            "field": "id"
                        }
                    },
                }
            }
        ]
    }

    yield from rest_api_source(notion_config,name="employee_handbook")

def extract_page_content(response):
    block_id = response["id"]
    last_edited_time = response["last_edited_time"]
    block_type = response.get("type", "Not paragraph")
    if block_type != "paragraph":
        content = ""
    else:
        try:
            content = response["paragraph"]["rich_text"][0]["plain_text"]
        except IndexError:
            content = ""
    return {
        "block_id": block_id,
        "block_type": block_type,
        "content": content,
        "last_edited_time": last_edited_time,
        "inserted_at_time": datetime.now(timezone.utc)
    }

@dlt.resource(
    name="homework",
    write_disposition="merge",
    primary_key="block_id",
    columns={"last_edited_time":{"dedup_sort":"desc"}}
    )
def rest_api_notion_incremental(
    last_edited_time = dlt.sources.incremental("last_edited_time", initial_value="2024-06-26T08:16:00.000Z",primary_key=("block_id"))
):
    # last_value = last_edited_time.last_value
    # print(last_value)

    for block in rest_api_notion_resource.add_map(extract_page_content):
        if not(len(block["content"])):
            continue
        yield block

def load_notion() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="company_policies",
        destination="lancedb",
        dataset_name="notion_pages",
        # full_refresh=True
    )

    load_info = pipeline.run(
        lancedb_adapter(
            rest_api_notion_incremental,
            embed="content"
        ),
        table_name="homework",
        write_disposition="merge"
    )
    print(load_info)

load_notion()

Pipeline company_policies load step completed in ---
0 load package(s) were loaded to destination LanceDB and into dataset None
The LanceDB destination used <dlt.destinations.impl.lancedb.configuration.LanceDBCredentials object at 0x7c5dcaf9b460> location to store data


In [7]:
import lancedb

db = lancedb.connect(".lancedb")
db.table_names()

['notion_pages____dlt_loads',
 'notion_pages____dlt_pipeline_state',
 'notion_pages____dlt_version',
 'notion_pages___dltSentinelTable',
 'notion_pages___homework']

In [8]:
dbtable = db.open_table("notion_pages___homework")
dbtable = dbtable.to_pandas()
dbtable.shape

(17, 9)

>> Total rows in lanceDB table 'notion_pages___homework' : 17

## Q2. Running the Pipeline: Last edited time

In the demo, we created an incremental dlt resource `rest_api_notion_incremental` that keeps track of `last_edited_time`. What value does it store after you've run your pipeline once? (Hint: you will be able to get this value by performing some aggregation function on the column `last_edited_time` of the table)

* `Timestamp('2024-07-05 22:34:00+0000', tz='UTC') (OR "2024-07-05T22:34:00.000Z")`
* `Timestamp('2024-07-05 23:33:00+0000', tz='UTC') (OR "2024-07-05T23:33:00.000Z")`
* `Timestamp('2024-07-05 23:52:00+0000', tz='UTC') (OR "2024-07-05T23:52:00.000Z")`
* `Timestamp('2024-07-05 22:56:00+0000', tz='UTC') (OR "2024-07-05T22:56:00.000Z")`

In [9]:
dbtable.head(2)

Unnamed: 0,id__,vector__,block_id,block_type,content,last_edited_time,inserted_at_time,_dlt_load_id,_dlt_id
0,c69f1ecf-7b02-5810-8286-3f42659ae9d4,"[-0.024265556, 0.04746074, -0.01179647, 0.0638...",a8196881-ae94-4767-8767-92fe1a327d24,paragraph,We owe our success to our employees. To show o...,2024-07-05 22:34:00+00:00,2024-07-19 19:07:44.819222+00:00,1721416064.0473487,8AhIYf6ZEidMYQ
1,f2c18ac0-50f5-5b72-a871-dc5a46780353,"[-0.04966156, 0.10853508, -0.009762607, -0.036...",31fcbf26-2ca5-468a-8af8-d7eb4c2db8c8,paragraph,We want to ensure that private information abo...,2024-07-05 22:38:00+00:00,2024-07-19 19:07:44.822765+00:00,1721416064.0473487,EOxBtxR/Onq+sQ


In [10]:
sorted(set(dbtable['last_edited_time']))

[Timestamp('2024-07-05 22:34:00+0000', tz='UTC'),
 Timestamp('2024-07-05 22:38:00+0000', tz='UTC'),
 Timestamp('2024-07-05 22:52:00+0000', tz='UTC'),
 Timestamp('2024-07-05 22:54:00+0000', tz='UTC'),
 Timestamp('2024-07-05 22:56:00+0000', tz='UTC'),
 Timestamp('2024-07-05 23:33:00+0000', tz='UTC')]

>> Latest last edited time is : Timestamp('2024-07-05 23:33:00+0000', tz='UTC')

## Q3. Ask the Assistant

Find out with the help of the AI assistant: how many PTO days are the employees entitled to in a year?  

* 20
* 25
* 30
* 35

In [11]:
!curl -fsSL https://ollama.com/install.sh | sh

>>> Downloading ollama...
############################################################################################# 100.0%
>>> Installing ollama to /usr/local/bin...
>>> Adding ollama user to video group...
>>> Adding current user to ollama group...
>>> Creating ollama systemd service...
>>> The Ollama API is now available at 127.0.0.1:11434.
>>> Install complete. Run "ollama" from the command line.


In [12]:
!nohup ollama serve > nohup.out 2>&1 &

In [13]:
%%capture
!ollama pull llama2-uncensored

In [14]:
!pip install ollama



In [15]:
import ollama

In [16]:
def retrieve_context_from_lancedb(dbtable, question, top_k=2):

    query_results = dbtable.search(query=question).to_list()
    context = "\n".join([result["content"] for result in query_results[:top_k]])

    return context

In [17]:
def main():
    # Connect to the lancedb table
    db = lancedb.connect(".lancedb")
    dbtable = db.open_table("notion_pages___homework")

    # A system prompt telling ollama to accept input in the form of "Question: ... ; Context: ..."
    messages = [
        {"role": "system", "content": "You are a helpful assistant that helps users understand policies inside a company's employee handbook. The user will first ask you a question and then provide you relevant paragraphs from the handbook as context. Please answer the question based on the provided context. For any details missing in the paragraph, encourage the employee to contact the HR for that information. Please keep the responses conversational."}
    ]

    while True:
        # Accept user question
        question = input("You: ")

        # Retrieve the relevant paragraphs on the question
        context = retrieve_context_from_lancedb(dbtable,question,top_k=2)

        # Create a user prompt using the question and retrieved context
        messages.append(
            {"role": "user", "content": f"Question: '{question}'; Context:'{context}'"}
        )

        # Get the response from the LLM
        response = ollama.chat(
            model="llama2-uncensored",
            messages=messages
        )
        response_content = response['message']['content']
        print(f"Assistant: {response_content}")

        # Add the response into the context window
        messages.append(
            {"role": "assistant", "content":response_content}
        )

In [18]:
main()

You: how many PTO days are the employees entitled to in a year?


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Assistant: Thank you for the question! Based on the context provided, we can answer your question as follows: The employees in this company are entitled to 30 days of paid time off (PTO) per year.


KeyboardInterrupt: Interrupted by user

>> PTO days obtained in the response from assistant : 30