# Open source data ingestion for RAGs with dlt

Video: https://www.youtube.com/watch?v=qUNyfR_X2Mo

In this hands-on workshop, we’ll learn how to build a data ingestion pipeline using dlt to load data from a REST API into LanceDB so you can have an always up to date RAG.

​We’ll cover the following steps:

* Extract data from REST APIs
* Loading and vectorizing into LanceDB, which unlike other vector DBs stores the data _and_ the embeddings
* Incremental loading

​By the end of this workshop, you’ll be able to write a portable, OSS data pipeline for your RAG that you can deploy anywhere, such as python notebooks, virtual machines, or orchestrators like Airflow, Dagster or Mage.


# Resources

* Slides: [dlt-LLM-Zoomcamp.pdf](https://github.com/user-attachments/files/16131729/dlt.LLM.Zoomcamp.pdf)
* [Google Colab notebook](https://colab.research.google.com/drive/1nNOybHdWQiwUUuJFZu__xvJxL_ADU3xl?usp=sharing) - make a copy to follow along!

## Install requirements

To create a json -> lancedb pipeline, we need to install:
1. dlt with lancedb extras
2. sentence-transformers: we need to use an embedding model to vectorize and store data inside LanceDB. For this we choose the open-source model "sentence-transformers/all-MiniLM-L6-v2".

In [54]:
%%capture
!pip install dlt[lancedb]==0.5.1a0
!pip install sentence-transformers

## Load the data

# Homework

In the workshop, we extracted contents from two pages in notion titled "Workshop: Benefits and Perks" and "Workshop: Working hours, PTO, and Vacation".

Repeat the same process for a third page titled "Homework: Employee handbook" (hidden from public view, but accessible via API key):

1. Modify the REST API source to extract only this page.
2. Write the output into a separate table called "homework".
3. Remember to update the table name in all cells where you connect to a lancedb table.

To do this you can use the [workshop Colab](https://colab.research.google.com/drive/1nNOybHdWQiwUUuJFZu__xvJxL_ADU3xl?usp=sharing) as a basis.

Now, answer the following questions:

In [55]:
%%capture
!pip install dlt[lancedb]==0.5.1a0
!pip install sentence-transformers

In [56]:
!yes | dlt init rest_api lancedb

Looking up the init scripts in [1mhttps://github.com/dlt-hub/verified-sources.git[0m...
Cloning and configuring a verified source [1mrest_api[0m (Generic API Source)
Do you want to proceed? [Y/n]: 
Verified source [1mrest_api[0m was added to your project!
* See the usage examples and code snippets to copy from [1mrest_api_pipeline.py[0m
* Add credentials for [1mlancedb[0m and other secrets in [1m./.dlt/secrets.toml[0m
* Add the required dependencies to [1mrequirements.txt[0m:
  [1mdlt[lancedb]>=0.4.11[0m
  If the dlt dependency is already added, make sure you install the extra for [1mlancedb[0m to it
  To install with pip: [1mpip3 install 'dlt[lancedb]>=0.4.11'[0m

* Read [1mhttps://dlthub.com/docs/walkthroughs/create-a-pipeline[0m for more information


In [57]:
import os
from google.colab import userdata

os.environ["SOURCES__REST_API__NOTION__API_KEY"] = userdata.get("SOURCES__REST_API__NOTION__API_KEY")

os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL_PROVIDER"] = "sentence-transformers"
os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL"] = "all-MiniLM-L6-v2"

os.environ["DESTINATION__LANCEDB__CREDENTIALS__URI"] = ".lancedb"

In [58]:
import dlt
from rest_api import RESTAPIConfig, rest_api_source

from dlt.sources.helpers.rest_client.paginators import BasePaginator, JSONResponsePaginator
from dlt.sources.helpers.requests import Response, Request

from dlt.destinations.adapters import lancedb_adapter

## Q1. Rows in LanceDB

How many rows does the lancedb table "notion_pages__homework" have?

* 14
* 15
* 16
* ***17*** ✅

In [59]:
from datetime import datetime, timezone

class PostBodyPaginator(BasePaginator):
    def __init__(self):
        super().__init__()
        self.cursor = None

    def update_state(self, response: Response) -> None:
        # Assuming the API returns an empty list when no more data is available
        if not response.json():
            self._has_next_page = False
        else:
            self.cursor = response.json().get("next_cursor")
            if self.cursor is None:
                self._has_next_page = False

    def update_request(self, request: Request) -> None:
        if request.json is None:
            request.json = {}

        # Add the cursor to the request body
        request.json["start_cursor"] = self.cursor

In [60]:
import requests
import json

notion_api_key = userdata.get("SOURCES__REST_API__NOTION__API_KEY")
headers = {
    "Authorization": f"Bearer {notion_api_key}",
    "Content-Type": "application/json",
    "Notion-Version": "2022-06-28"
}

payload = {
    "query": "homework",
    "sort": {
        "direction": "ascending",
        "timestamp": "last_edited_time"
    },
    "filter": {
        "value": "page",
        "property": "object"
    }
}

response = requests.post("https://api.notion.com/v1/search", headers=headers, data=json.dumps(payload))

# Check if the request was successful
if response.status_code == 200:
    search_results = response.json()
else:
    print(f"Error: {response.status_code}")

In [64]:
search_results['results'][0]['url']

'https://www.notion.so/Homework-Employee-handbook-c2db1e2c75c24b568d731e882c753c87'

In [65]:
@dlt.resource(name="homework")
def rest_api_notion_resource():
    notion_config: RESTAPIConfig = {
        "client": {
            "base_url": "https://api.notion.com/v1/",
            "auth": {
                "token": dlt.secrets["sources.rest_api.notion.api_key"]
            },
            "headers":{
            "Content-Type": "application/json",
            "Notion-Version": "2022-06-28"
            }
        },
        "resources": [
            {
                "name": "search",
                "endpoint": {
                    "path": "search",
                    "method": "POST",
                    "paginator": PostBodyPaginator(),
                    "json": {
                        "query": "homework",
                        "sort": {
                            "direction": "ascending",
                            "timestamp": "last_edited_time"
                        }
                    },
                    "data_selector": "results"
                }
            },
            {
                "name": "page_content",
                "endpoint": {
                    "path": "blocks/{page_id}/children",
                    "paginator": JSONResponsePaginator(),
                    "params": {
                        "page_id": {
                            "type": "resolve",
                            "resource": "search",
                            "field": "id"
                        }
                    },
                }
            }
        ]
    }

    yield from rest_api_source(notion_config, name="homework")

def extract_page_content(response):
    block_id = response["id"]
    last_edited_time = response["last_edited_time"]
    block_type = response.get("type", "Not paragraph")
    if block_type != "paragraph":
        content = ""
    else:
        try:
            content = response["paragraph"]["rich_text"][0]["plain_text"]
        except IndexError:
            content = ""
    return {
        "block_id": block_id,
        "block_type": block_type,
        "content": content,
        "last_edited_time": last_edited_time,
        "inserted_at_time": datetime.now(timezone.utc)
    }

@dlt.resource(
    name="homework",
    write_disposition="merge",
    primary_key="block_id",
    columns={"last_edited_time":{"dedup_sort":"desc"}}
    )
def rest_api_notion_incremental(
    last_edited_time = dlt.sources.incremental("last_edited_time", initial_value="2024-06-26T08:16:00.000Z",primary_key=("block_id"))
):
    # last_value = last_edited_time.last_value
    # print(last_value)

    for block in rest_api_notion_resource.add_map(extract_page_content):
        if not(len(block["content"])):
            continue
        yield block

def load_notion() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="company_policies",
        destination="lancedb",
        dataset_name="notion_pages",
        # full_refresh=True
    )

    load_info = pipeline.run(
        lancedb_adapter(
            rest_api_notion_incremental,
            embed="content"
        ),
        table_name="employee_homework",
        write_disposition="merge"
    )
    print(load_info)

load_notion()

employee_homework
[{'name': 'block_id', 'nullable': False, 'primary_key': True, 'data_type': 'text'}, {'name': 'block_type', 'data_type': 'text', 'nullable': True}, {'name': 'content', 'x-lancedb-embed': True, 'data_type': 'text', 'nullable': True}, {'dedup_sort': 'desc', 'name': 'last_edited_time', 'data_type': 'timestamp', 'nullable': True}, {'name': 'inserted_at_time', 'data_type': 'timestamp', 'nullable': True}, {'name': '_dlt_load_id', 'data_type': 'text', 'nullable': False}, {'name': '_dlt_id', 'data_type': 'text', 'nullable': False, 'unique': True}]
_dlt_pipeline_state
[{'name': 'version', 'data_type': 'bigint', 'nullable': False}, {'name': 'engine_version', 'data_type': 'bigint', 'nullable': False}, {'name': 'pipeline_name', 'data_type': 'text', 'nullable': False}, {'name': 'state', 'data_type': 'text', 'nullable': False}, {'name': 'created_at', 'data_type': 'timestamp', 'nullable': False}, {'name': 'version_hash', 'data_type': 'text', 'nullable': True}, {'name': '_dlt_load_id'

In [66]:
db = lancedb.connect("./.lancedb")
print(db.table_names())

['notion_pages____dlt_loads', 'notion_pages____dlt_pipeline_state', 'notion_pages____dlt_version', 'notion_pages___dltSentinelTable', 'notion_pages___employee_homework']


In [67]:
import lancedb

db = lancedb.connect(".lancedb")
dbtable = db.open_table("notion_pages___employee_homework")

dbtable.to_pandas()

Unnamed: 0,id__,vector__,block_id,block_type,content,last_edited_time,inserted_at_time,_dlt_load_id,_dlt_id
0,48e8465f-79d3-5776-9ddf-d136689cb614,"[-0.024265556, 0.04746074, -0.01179647, 0.0638...",a8196881-ae94-4767-8767-92fe1a327d24,paragraph,We owe our success to our employees. To show o...,2024-07-05 22:34:00+00:00,2024-07-20 11:01:41.017015+00:00,1721473299.96335,KPJFRH7ZbkbT9g
1,32ec8a74-e262-5395-b963-49f2d211ea96,"[-0.04966156, 0.10853508, -0.009762607, -0.036...",31fcbf26-2ca5-468a-8af8-d7eb4c2db8c8,paragraph,We want to ensure that private information abo...,2024-07-05 22:38:00+00:00,2024-07-20 11:01:41.019171+00:00,1721473299.96335,4DvteTcm2beZcg
2,d51a173a-d990-5550-b3c1-5897704123c3,"[-0.06316319, 0.17331503, 0.025351755, -0.0191...",da7721fd-3d0f-4c04-bc5e-825ad60bed1c,paragraph,Employee health is important to us. We don’t d...,2024-07-05 22:52:00+00:00,2024-07-20 11:01:41.019394+00:00,1721473299.96335,erz4CQFnCRyQZw
3,6ae4ec43-bfb6-533f-82f7-5e6e796d4c77,"[-0.109743185, 0.10586075, 0.003290699, -0.021...",ff36dcf3-5faa-40b4-ad8e-92fdc952201e,paragraph,Our company is dedicated to maintaining a safe...,2024-07-05 22:52:00+00:00,2024-07-20 11:01:41.019565+00:00,1721473299.96335,TCgJZE9TjQglrA
4,c34e7711-ac37-5330-bc5b-d14a287b9c16,"[0.05242333, -0.064576, 0.06586297, 0.01454380...",a1ff9697-4bb6-4f1e-b464-dda296dbd307,paragraph,If your job doesn’t require you to be present ...,2024-07-05 22:52:00+00:00,2024-07-20 11:01:41.019754+00:00,1721473299.96335,cVqh1LsQZXBvSQ
5,06ed3378-4f51-5389-9474-8704d282c07a,"[0.00052337867, -0.054883413, 0.043573413, -0....",e4ec9f4d-b687-4c28-a80d-985bfabcc2ba,paragraph,Remote working refers to working from a non-of...,2024-07-05 22:52:00+00:00,2024-07-20 11:01:41.019971+00:00,1721473299.96335,+I/mXUJTEmP/4g
6,93273bb1-4e25-5042-910d-36d3b69a0d40,"[0.03802633, -0.021509705, 0.04752782, 0.06470...",e6e550dc-b59e-4928-abd7-07eace948681,paragraph,There are some expenses that we will pay direc...,2024-07-05 22:52:00+00:00,2024-07-20 11:01:41.020167+00:00,1721473299.96335,/WB0Ac3iOzEQBA
7,7a0fc79f-e258-5239-b587-386c021620fc,"[-0.05858811, -0.07540446, 0.033775203, 0.0096...",a269d0ca-ce14-481b-a5f4-9192d6840d6e,paragraph,Our company operates between 9 a.m. to 7 p.m. ...,2024-07-05 22:52:00+00:00,2024-07-20 11:01:41.020364+00:00,1721473299.96335,5r6oig2GI+uHDA
8,54e367f7-61af-5dac-bc2c-1d9632961b9e,"[-0.013599302, 0.0047530197, 0.024835136, 0.01...",5b65f3e7-0a37-429a-818d-f99b53755ebd,paragraph,"In this section, we are going to be covering i...",2024-07-05 23:33:00+00:00,2024-07-20 11:01:41.020558+00:00,1721473299.96335,QAahiBu3AArT2A
9,53203ca2-9c5a-515d-98d2-03a3fa92b9b1,"[0.032060888, 0.024244698, 0.008471344, 0.0317...",b27f7d80-f2f1-460e-aa0c-b8e770cf050a,paragraph,Our company observes the following holidays: N...,2024-07-05 22:52:00+00:00,2024-07-20 11:01:41.020742+00:00,1721473299.96335,O1fnWvwjtveJGQ


## Q2. Running the Pipeline: Last edited time

In the demo, we created an incremental dlt resource `rest_api_notion_incremental` that keeps track of `last_edited_time`. What value does it store after you've run your pipeline once? (Hint: you will be able to get this value by performing some aggregation function on the column `last_edited_time` of the table)

* `Timestamp('2024-07-05 22:34:00+0000', tz='UTC') (OR "2024-07-05T22:34:00.000Z")`
* **`Timestamp('2024-07-05 23:33:00+0000', tz='UTC') (OR "2024-07-05T23:33:00.000Z")`** ✅
* `Timestamp('2024-07-05 23:52:00+0000', tz='UTC') (OR "2024-07-05T23:52:00.000Z")`
* `Timestamp('2024-07-05 22:56:00+0000', tz='UTC') (OR "2024-07-05T22:56:00.000Z")`

In [68]:
df = dbtable.to_pandas()
df['last_edited_time'].unique()

<DatetimeArray>
['2024-07-05 22:34:00+00:00', '2024-07-05 22:38:00+00:00',
 '2024-07-05 22:52:00+00:00', '2024-07-05 23:33:00+00:00',
 '2024-07-05 22:54:00+00:00', '2024-07-05 22:56:00+00:00']
Length: 6, dtype: datetime64[us, UTC]

In [69]:
df['last_edited_time'].max()

Timestamp('2024-07-05 23:33:00+0000', tz='UTC')

## Q3. Ask the Assistant

Find out with the help of the AI assistant: how many PTO days are the employees entitled to in a year?  

* 20
* 25
* ***30*** ✅
* 35

In [80]:
!curl -fsSL https://ollama.com/install.sh | sh

>>> Downloading ollama...
############################################################################################# 100.0%
>>> Installing ollama to /usr/local/bin...
>>> Adding ollama user to video group...
>>> Adding current user to ollama group...
>>> Creating ollama systemd service...
>>> The Ollama API is now available at 127.0.0.1:11434.
>>> Install complete. Run "ollama" from the command line.


In [81]:
!nohup ollama serve > nohup.out 2>&1 &

In [82]:
%%capture
!ollama pull llama2-uncensored

In [83]:
%%capture
!pip install ollama



In [84]:
import ollama

In [75]:
def retrieve_context_from_lancedb(dbtable, question, top_k=2):

    query_results = dbtable.search(query=question).to_list()
    context = "\n".join([result["content"] for result in query_results[:top_k]])

    return context

In [85]:
def main():
  # Connect to the lancedb table
  db = lancedb.connect(".lancedb")
  dbtable = db.open_table("notion_pages___employee_homework")

  # A system prompt telling ollama to accept input in the form of "Question: ... ; Context: ..."
  messages = [
      {"role": "system", "content": "You are a helpful assistant that helps users understand policies inside a company's employee handbook. The user will first ask you a question and then provide you relevant paragraphs from the handbook as context. Please answer the question based on the provided context. For any details missing in the paragraph, encourage the employee to contact the HR for that information. Please keep the responses conversational."}
  ]

  while True:
    # Accept user question
    question = input("You: ")

    # Retrieve the relevant paragraphs on the question
    context = retrieve_context_from_lancedb(dbtable,question,top_k=2)

    # Create a user prompt using the question and retrieved context
    messages.append(
        {"role": "user", "content": f"Question: '{question}'; Context:'{context}'"}
    )

    # Get the response from the LLM
    response = ollama.chat(
        model="llama2-uncensored",
        messages=messages
    )
    response_content = response['message']['content']
    print(f"Assistant: {response_content}")

    # Add the response into the context window
    messages.append(
        {"role": "assistant", "content":response_content}
    )

In [None]:
main()

You: how many PTO days are the employees entitled to in a year?
Assistant: Thank you for providing me with the question and context. Based on the relevant paragraphs from the employee handbook, employees are entitled to 30 days of paid time off (PTO) per year. They receive 2.5 days per month, which means they can take a PTO day anytime after their first week with our company. Employees will earn an additional day every year up to a maximum of 25 days overall. If the employee wants to use their PTO, they need to send a request through HRIS and if approved by their manager or HR, they can take leave anytime during the year. However, employees cannot transfer remaining PTO to the next year. They are encouraged to use their time off throughout the year. If an employee leaves our company for whatever reason, we may compensate accrued PTO with their final paycheck according to local law. For non-exempt employees who work on a holiday, they will receive their regular hourly rate with a premiu