# Vector Database Initialization

## Dependencies

In [26]:
%pip install beautifulsoup4 requests pandas langchain tiktoken pyarrow fastparquet chromadb sentence_transformers --user

Collecting sentence_transformers
  Downloading sentence-transformers-2.2.2.tar.gz (85 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m86.0/86.0 kB[0m [31m393.1 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Collecting transformers<5.0.0,>=4.6.0 (from sentence_transformers)
  Downloading transformers-4.30.2-py3-none-any.whl (7.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.2/7.2 MB[0m [31m35.0 kB/s[0m eta [36m0:00:00[0m00:01[0m00:06[0m
[?25hCollecting torch>=1.6.0 (from sentence_transformers)
  Downloading torch-2.0.1-cp311-cp311-manylinux2014_aarch64.whl (74.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m74.0/74.0 MB[0m [31m71.9 kB/s[0m eta [36m0:00:00[0m00:01[0m00:29[0m
[?25hCollecting torchvision (from sentence_transformers)
  Downloading torchvision-0.15.2-cp311-cp311-manylinux2014_aarch64.whl (1.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
%pip install pydantic==1.10.11 --user

Note: you may need to restart the kernel to use updated packages.


## Setup

### Documents URL Scraping

In [3]:
from pathlib import Path
from urllib.parse import urlparse
from bs4 import BeautifulSoup
import requests

root_url = "https://airflow.apache.org/docs/apache-airflow/stable/"
root_response = requests.get(root_url)
root_html = root_response.content.decode("utf-8")
soup = BeautifulSoup(root_html, 'html.parser')

root_url_parts = urlparse(root_url)
root_links = soup.find_all("a", attrs={"class": "reference internal"})

result = set()
for root_link in root_links:
    path = root_url_parts.path + root_link.get("href")
    path = str(Path(path).resolve())
    path = urlparse(path).path
    url = f"{root_url_parts.scheme}://{root_url_parts.netloc}{path}"
    result.add(url)
urls = list(result)
print(*urls, sep="\n")

https://airflow.apache.org/docs/apache-airflow/stable/integration.html
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
https://airflow.apache.org/docs/apache-airflow/stable/tutorial/index.html
https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/index.html
https://airflow.apache.org/docs/apache-airflow/stable/
https://airflow.apache.org/docs/apache-airflow/stable/start.html
https://airflow.apache.org/docs/apache-airflow/stable/extra-packages-ref.html
https://airflow.apache.org/docs/apache-airflow/stable/license.html
https://airflow.apache.org/docs/apache-airflow/stable/release_notes.html
https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html
https://airflow.apache.org/docs/apache-airflow/stable/deprecated-rest-api-ref.html
https://airflow.apache.org/docs/apache-airflow/stable/public-airflow-interface.html
https://airflow.apache.org/do

In [4]:
from langchain.document_loaders import WebBaseLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.text_splitter import CharacterTextSplitter

loader = WebBaseLoader(urls)
documents = loader.load()

# Select one of the following:
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
# text_splitter = CharacterTextSplitter.from_tiktoken_encoder(chunk_size=1000, chunk_overlap=100)

splitted_documents = text_splitter.split_documents(documents)
print("Total documents: ", len(splitted_documents))

Total documents:  1488


In [5]:
import pandas as pd

page_contents = []
sources = []
titles = []
languages = []

for document in splitted_documents:
    page_contents.append(document.page_content)
    if document.metadata:
        sources.append(document.metadata.get('source', "Unknown"))
        titles.append(document.metadata.get('title', "Unknown"))
        languages.append(document.metadata.get('language', "Unknown"))

documents_df = pd.DataFrame({
    'page_content': page_contents,
    'source': sources,
    'title': titles,
    'language': languages
})
documents_df.fillna("Unknown", inplace=True)
documents_df.head()

Unnamed: 0,page_content,source,title,language
0,Integration — Airflow Documentation\n\n\n\n\n\...,https://airflow.apache.org/docs/apache-airflow...,Integration — Airflow Documentation,en
1,Announcements\n \n\...,https://airflow.apache.org/docs/apache-airflow...,Integration — Airflow Documentation,en
2,Database Migrations\nDatabase ERD Schema\n\n\n...,https://airflow.apache.org/docs/apache-airflow...,Integration — Airflow Documentation,en
3,It also has integration with Sentry service fo...,https://airflow.apache.org/docs/apache-airflow...,Integration — Airflow Documentation,en
4,Airflow REST API,https://airflow.apache.org/docs/apache-airflow...,Airflow REST API,Unknown


In [6]:
# Replace \n and \t with a space
documents_df["page_content"] = documents_df["page_content"].replace('\n', ' ', regex=True)
documents_df["page_content"] = documents_df["page_content"].replace('\t', ' ', regex=True)
# Remove leading and trailing spaces
documents_df["page_content"] = documents_df["page_content"].str.strip()

In [7]:
documents_df.head()

Unnamed: 0,page_content,source,title,language
0,Integration — Airflow Documentation ...,https://airflow.apache.org/docs/apache-airflow...,Integration — Airflow Documentation,en
1,Announcements ...,https://airflow.apache.org/docs/apache-airflow...,Integration — Airflow Documentation,en
2,Database Migrations Database ERD Schema ...,https://airflow.apache.org/docs/apache-airflow...,Integration — Airflow Documentation,en
3,It also has integration with Sentry service fo...,https://airflow.apache.org/docs/apache-airflow...,Integration — Airflow Documentation,en
4,Airflow REST API,https://airflow.apache.org/docs/apache-airflow...,Airflow REST API,Unknown


In [8]:
from urllib.parse import urlparse

def decompose_url(url):
    parsed = urlparse(url)
    
    # The root url will be the scheme plus '://' plus the netloc
    root_url = parsed.scheme + "://" + parsed.netloc
    
    # The path will be split into parts by '/'
    path_parts = parsed.path.strip('/').split('/')
    
    # The section and page depend on how many parts there are
    if len(path_parts) >= 2:
        # The section will be the second to last part
        section = path_parts[-2]
        # The page will be the last part
        page = path_parts[-1]
    elif len(path_parts) == 1:
        # If there's only one part, we'll assume it's the page
        section = None
        page = path_parts[0]
    else:
        # If there are no parts, then both section and page will be None
        section = None
        page = None
    
    return root_url, section, page

# Apply the function to the 'source' column of your dataframe
documents_df['root_url'], documents_df['section'], documents_df['page'] = zip(*documents_df['source'].map(decompose_url))


In [9]:
documents_df.head()

Unnamed: 0,page_content,source,title,language,root_url,section,page
0,Integration — Airflow Documentation ...,https://airflow.apache.org/docs/apache-airflow...,Integration — Airflow Documentation,en,https://airflow.apache.org,stable,integration.html
1,Announcements ...,https://airflow.apache.org/docs/apache-airflow...,Integration — Airflow Documentation,en,https://airflow.apache.org,stable,integration.html
2,Database Migrations Database ERD Schema ...,https://airflow.apache.org/docs/apache-airflow...,Integration — Airflow Documentation,en,https://airflow.apache.org,stable,integration.html
3,It also has integration with Sentry service fo...,https://airflow.apache.org/docs/apache-airflow...,Integration — Airflow Documentation,en,https://airflow.apache.org,stable,integration.html
4,Airflow REST API,https://airflow.apache.org/docs/apache-airflow...,Airflow REST API,Unknown,https://airflow.apache.org,stable,stable-rest-api-ref.html


In [10]:
documents_df.isnull().sum()

page_content    0
source          0
title           0
language        0
root_url        0
section         0
page            0
dtype: int64

In [11]:
documents_df.to_parquet('./parquets/documents_with_rec-char-split.parquet')

## Storage

In [17]:
import chromadb
from chromadb.config import Settings
client = chromadb.Client(Settings(
    chroma_db_impl="duckdb+parquet",
    persist_directory="./db/"
))

In [24]:
client.delete_collection(name=collection_name)

In [27]:
from chromadb.utils import embedding_functions

sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="all-mpnet-base-v2")
collection_name = "airflow_docs_stable"

if len(client.list_collections()) > 0 and collection_name in [
    client.list_collections()[0].name
]:
    client.delete_collection(name=collection_name)
print(f"Creating collection: '{collection_name}'")
collection = client.create_collection(name=collection_name, embedding_function=sentence_transformer_ef)

Downloading (…)a8e1d/.gitattributes:   0%|          | 0.00/1.18k [00:00<?, ?B/s]

Downloading (…)_Pooling/config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

Downloading (…)b20bca8e1d/README.md:   0%|          | 0.00/10.6k [00:00<?, ?B/s]

Downloading (…)0bca8e1d/config.json:   0%|          | 0.00/571 [00:00<?, ?B/s]

Downloading (…)ce_transformers.json:   0%|          | 0.00/116 [00:00<?, ?B/s]

Downloading (…)e1d/data_config.json:   0%|          | 0.00/39.3k [00:00<?, ?B/s]

Downloading pytorch_model.bin:   0%|          | 0.00/438M [00:00<?, ?B/s]

Downloading (…)nce_bert_config.json:   0%|          | 0.00/53.0 [00:00<?, ?B/s]

Downloading (…)cial_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

Downloading (…)a8e1d/tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Downloading (…)okenizer_config.json:   0%|          | 0.00/363 [00:00<?, ?B/s]

Downloading (…)8e1d/train_script.py:   0%|          | 0.00/13.1k [00:00<?, ?B/s]

Downloading (…)b20bca8e1d/vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

Downloading (…)bca8e1d/modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

Creating collection: 'airflow_docs_stable'


In [28]:
for index, row in documents_df.iterrows():
    if pd.notnull(row['source']) and pd.notnull(row['section']) and pd.notnull(row['page']):
        metadata = {
            'source': row['source'],
            'section': row['section'],
            'page': row['page']
        }
    collection.add(
        documents=[row['page_content']],
        metadatas=[metadata],
        ids=[str(index)],
    )

In [29]:
client.persist()

True

## Testing

In [30]:
question = "How to create a DAG?"
results = collection.query(
    query_texts=[question],
    n_results=3,
)
formatted_result = "\n\n".join(results["documents"][0])
print(formatted_result)

Using the Public Interface for DAG Authors¶  DAGs¶ The DAG is Airflow’s core entity that represents a recurring workflow. You can create a DAG by instantiating the DAG class in your DAG file. You can also instantiate them via :class::~airflow.models.dagbag.DagBag class that reads DAGs from a file or a folder. DAGs can also have parameters specified via :class::~airflow.models.param.Param class. Airflow has a set of example DAGs that you can use to learn how to write DAGs   airflow.example_dags   You can read more about DAGs in DAGs. References for the modules used in DAGs are here:   airflow.models.dag airflow.models.dagbag airflow.models.param     Operators¶ Operators allow for generation of certain types of tasks that become nodes in the DAG when instantiated. There are 3 main types of operators:

Named Arguments¶  -D, --daemon Daemonize instead of running in the foreground Default: False  -p, --do-pickle Attempt to pickle the DAG object to send over to the workers, instead of lettin