[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/vectara/example-notebooks/blob/main/notebooks/db-ingest.ipynb)

In [None]:
import numpy as np
import os
import json
import requests
import markdown
from bs4 import BeautifulSoup
from urllib3.util import Retry

import ray
from ray.experimental import tqdm_ray

from snowflake.connector import connect


In [2]:
sf_user = '<SNOWFLAKE-USER-NAME>'
sf_password = '<SNOWFLAKE-PASSWORD>'
sf_account = '<SNOWFLAKE-ACCOUNT-ID>'

We now use SQL to create the updated row for each review. This includes the reviewer name, date of review and actual review text, but also the information about the listing itself.

In [3]:
con = connect(user=sf_user, password=sf_password, account=sf_account)
cursor = con.cursor()
cursor.execute("USE DATABASE AIRBNB;")

query = '''
SELECT 
    DATE, REVIEWER_NAME, COMMENTS, R.ID as REVIEW_ID,
    LATITUDE, LONGITUDE, ROOM_TYPE, DESCRIPTION,
    NEIGHBOURHOOD_CLEANSED as NEIGHBORHOOD
FROM REVIEWS AS R JOIN LISTINGS as L
ON R.LISTING_ID = L.ID
'''

cursor.execute(query)
df = cursor.fetch_pandas_all()

In [4]:
df.head(5)

Unnamed: 0,DATE,REVIEWER_NAME,COMMENTS,REVIEW_ID,LATITUDE,LONGITUDE,ROOM_TYPE,DESCRIPTION,NEIGHBORHOOD
0,2023-05-08,Mathias,Veldig bra oppholf,886755474280381042,41.37249,2.13603,Private room,<b>The space</b><br />Breakfast buffet not inc...,Sants
1,2023-05-08,Stéphane,Parfait,886853033852892799,41.37249,2.13603,Private room,<b>The space</b><br />Breakfast buffet not inc...,Sants
2,2022-10-28,Harry,Awesome space in a relatively quiet area. Clo...,747593260851879467,41.37202,2.14067,Entire home/apt,Moderno apartamento (2 a 6 personas) a 9 min...,la Bordeta
3,2022-10-31,Karine,Appartement situé à 10 minutes de la place d’E...,749781800071072130,41.37202,2.14067,Entire home/apt,Moderno apartamento (2 a 6 personas) a 9 min...,la Bordeta
4,2022-11-04,Nathália,Muito obrigada pela estadia,752747990540719003,41.37202,2.14067,Entire home/apt,Moderno apartamento (2 a 6 personas) a 9 min...,la Bordeta


Now we index this data, row by row, into the Vectara corpus

In [8]:
vectara_corpus_id = os.environ['VECTARA_CORPUS_ID']
vectara_customer_id = os.environ['VECTARA_CUSTOMER_ID']
vectara_api_key = os.environ['VECTARA_API_KEY']

def get_post_headers() -> dict:
    """Returns headers that should be attached to each post request."""
    return {
        "x-api-key": vectara_api_key,
        "customer-id": vectara_customer_id,
        "Content-Type": "application/json",
    }

def index_doc(session, doc: dict) -> str:
    req = {
        "customerId": vectara_customer_id,
        "corpusId": vectara_corpus_id,
        "document": doc
    }

    response = session.post(
        headers=get_post_headers(),
        url="https://api.vectara.io/v1/index",
        data=json.dumps(req),
        timeout=250,
        verify=True,
    )

    status_code = response.status_code
    result = response.json()
    
    status_str = result["status"]["code"] if "status" in result else None
    if status_code == 409 or status_str and (status_str == "ALREADY_EXISTS"):
        return "E_ALREADY_EXISTS"
    elif status_str and (status_str == "FORBIDDEN"):
        return "E_NO_PERMISSIONS"
    else:
        return "E_SUCCEEDED"

def clean_md(s):
    if not s:
        return s
    html_content = markdown.markdown(s)
    soup = BeautifulSoup(html_content, features='html.parser')
    return soup.get_text()

def add_chunk(df_chunk, bar) -> None:
    """Ingest a df into Vectara."""
    session = requests.Session()
    retry_strategy = Retry(total=5, backoff_factor=2)
    adapter = requests.adapters.HTTPAdapter(max_retries=retry_strategy)
    session.mount("https://", adapter)

    for row in df_chunk.to_dict(orient='records'):
        metadata = {
            'date': row['DATE'],
            'reviewer': row['REVIEWER_NAME'],
            'latitude': row['LATITUDE'],
            'longitude': row['LONGITUDE'],
            'neighborhood': row['NEIGHBORHOOD']
        }
        doc = {
            "documentId": f"Review {row['REVIEW_ID']}",
            "metadataJson": json.dumps(metadata),
            "title": f"Review by {row['REVIEWER_NAME']} on {row['DATE']}",
            "section": [
                {'text': f"{row['ROOM_TYPE']} property in {row['NEIGHBORHOOD']}, described as {clean_md(row['DESCRIPTION'])}"},
                {'text': clean_md(row['COMMENTS'])}
            ],
        }
        print(doc)


In [6]:
print(f"Ingesting {len(df)} rows into Vectara")

Ingesting 763592 rows into Vectara


In [7]:
#
# use ray to parallelize ingest
#

ray_workers = 8
print(f"Using Ray with {ray_workers} workers")

ray.init(num_cpus=ray_workers, ignore_reinit_error=True, include_dashboard=False)
remote_tqdm = ray.remote(tqdm_ray.tqdm)
bar = remote_tqdm.remote(total=len(df))
ray_add_chunk = ray.remote(add_chunk)

chunks = np.array_split(df, ray_workers)
futures = [ray_add_chunk.remote(chunk, bar) for chunk in chunks]
_ = ray.get(futures)


Using Ray with 8 workers


2023-11-10 19:36:13,815	INFO worker.py:1642 -- Started a local Ray instance.


(pid=73328)  0:   0%|          | 0/763592 [00:00<?, ?it/s]