# Geographical vector search using Distributed Qdrant DB as a native app on Snowpark Container Services

### Dataset intro

For this exercise, I am using This is an open dataset released by Yelp for learning purposes. It consists of millions of user reviews, business attributes, and over 200,000 pictures from multiple metropolitan areas. 

Number of Records: 5.7 Million records

https://www.yelp.com/dataset

### Import libraries

In [1]:
import numpy as np
import json
import pandas as pd
import math
import tqdm
import glob
from ast import literal_eval
import time
import logging
import sys
import os, errno
import uuid
from typing import Iterable
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from qdrant_client import QdrantClient
from qdrant_client import models
from qdrant_client.models import Filter, FieldCondition, GeoBoundingBox, GeoPoint, GeoRadius
from sentence_transformers import SentenceTransformer
from geopy.geocoders import Nominatim
import ray
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow import Table
from ray.util.multiprocessing import Pool
client = QdrantClient("http://instances.qdrantprimaryservice.qdrant-app-core-schema:6333")
collection_name = "yelp_review_with_cortexembeddings_latest"

### Initiate logger

In [54]:
# Logging
def get_logger(logger_name):
   logger = logging.getLogger(logger_name)
   logger.setLevel(logging.DEBUG)
   handler = logging.StreamHandler(sys.stdout)
   handler.setLevel(logging.DEBUG)
   handler.setFormatter(
      logging.Formatter(
      '%(name)s [%(asctime)s] [%(levelname)s] %(message)s'))
   logger.addHandler(handler)
   return logger
logger = get_logger('snowpark-container-service')

def initiate_snowpark_conn(snowflake_database, snowflake_schema, snowflake_warehouse):
  with open("/snowflake/session/token", "r") as f:
      token = f.read()

  connection_parameters = {
      "account": os.getenv("SNOWFLAKE_ACCOUNT"),
      "host": os.getenv("SNOWFLAKE_HOST"),
      "authenticator": "oauth",
      "token": token,
      "warehouse": snowflake_warehouse,
      "database": snowflake_database,
      "schema": snowflake_schema,
      "client_session_keep_alive": True
  }
  snowpark_session = Session.builder.configs(connection_parameters).create()
  return snowpark_session

session = initiate_snowpark_conn(snowflake_database='QDRANT_consumer_db', snowflake_schema='yelp_reviews_sch', snowflake_warehouse='DISTRIBUTED_QDRANT_ON_SPCS_QDRANT_QUERY_WAREHOUSE')

### See Qdrant status

In [3]:
! curl $QDRANT_PRIMARY_REST_ADDRESS/cluster | jq

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   750  100   750    0     0  14148      0 --:--:-- --:--:-- --:--:-- 14423
[1;39m{
  [0m[34;1m"result"[0m[1;39m: [0m[1;39m{
    [0m[34;1m"status"[0m[1;39m: [0m[0;32m"enabled"[0m[1;39m,
    [0m[34;1m"peer_id"[0m[1;39m: [0m[0;39m5725472888383057[0m[1;39m,
    [0m[34;1m"peers"[0m[1;39m: [0m[1;39m{
      [0m[34;1m"5725472888383057"[0m[1;39m: [0m[1;39m{
        [0m[34;1m"uri"[0m[1;39m: [0m[0;32m"http://instances.qdrantprimaryservice.qdrant-app-core-schema:6335/"[0m[1;39m
      [1;39m}[0m[1;39m,
      [0m[34;1m"7028896095435842"[0m[1;39m: [0m[1;39m{
        [0m[34;1m"uri"[0m[1;39m: [0m[0;32m"http://instances.qdrantsecondaryservice0.qdrant-app-core-schema:6335/"[0m[1;39m
      [1;39m}[0m[1;39m,
      [0m[34;1m"6037741156569795"[0m[1;39m: [0m[1;39m{
        [0m[34;

### See sample of data

In [4]:
snowdf = session.table("yelp_reviews")
snowdf.limit(1).to_pandas()

Unnamed: 0,BUSINESS_ID,DATE,REVIEW_ID,STARS,REVIEW_TEXT,BUSINESS_LATITUDE,BUSINESS_LONGITUDE,BUSINESS_NAME,BUSINESS_POSTAL_CODE,IS_BUSINESS_OPEN,BUSINESS_CITY,BUSINESS_CATEGORIES,BUSINESS_ADDRESS
0,CUsM2ZJAMUkUsf5-G7Js7Q,2017-07-13 23:44:37,hcAHH0UURTd7f9NMpNsc8Q,5.0,"I have written a review in the past, however I...",38.637893,-90.341171,Clayton Nail Spa,63117,1,Clayton,"Waxing, Nail Salons, Beauty & Spas, Hair Removal",7933 Clayton Rd


### Calculate embeddings using Cortex

In [5]:
%%time
snowdf = snowdf.with_column("EMBEDDINGS", F.call_builtin("snowflake.cortex.embed_text_768", F.lit("snowflake-arctic-embed-m"), F.col("REVIEW_TEXT")))
snowdf.write.mode("overwrite").save_as_table("yelp_reviews_with_cortex_embeddings")

CPU times: user 64.3 ms, sys: 8.14 ms, total: 72.5 ms
Wall time: 14min 32s


In [13]:
snowdf = session.table("yelp_reviews_with_cortex_embeddings")

In [14]:
snowdf.limit(10).to_pandas()

Unnamed: 0,BUSINESS_ID,DATE,REVIEW_ID,STARS,REVIEW_TEXT,BUSINESS_LATITUDE,BUSINESS_LONGITUDE,BUSINESS_NAME,BUSINESS_POSTAL_CODE,IS_BUSINESS_OPEN,BUSINESS_CITY,BUSINESS_CATEGORIES,BUSINESS_ADDRESS,EMBEDDINGS
0,R8SpLHPAsLMuvU0b_96sDA,2017-10-07 13:23:48,iDiDHUVil8KTKI8l7yR_8Q,2.0,This swim school needs to improve. \n1) They n...,39.876745,-74.922993,Bear Paddle Swim School - Marlton,8053,1,Marlton,"Specialty Schools, Active Life, Swimming Lesso...","515 Rte 73 S, Unit 170 B","[0.010044973, -0.0053094034, 0.012566361, 0.00..."
1,qHyXH1bvVu9wF6ai9XUj9A,2020-02-27 00:54:57,u2FzTPP5SLBuSOkPP0HaTg,1.0,I went to test drive a 2012 Jeep Liberty at a ...,32.206421,-110.895928,Chapman Volkswagen of Tucson,85711,1,Tucson,"Auto Parts & Supplies, Automotive, Car Dealers...",4500 E 22nd St,"[-0.04038955, 0.045561984, -0.011761724, 0.036..."
2,Bd5P74d6lakelSZVs_QFVg,2019-05-10 20:13:49,LBuKcgxrpW0cuOnkrbjXPg,5.0,We love this Chick Fil-A. My kids love the pl...,39.614499,-86.15914,Chick-fil-A,46142,1,Greenwood,"Fast Food, Caterers, Restaurants, Event Planni...",155 Marlin Dr,"[-0.018255284, -0.023533095, 0.0061574457, 0.0..."
3,z3oB8UHAV0bJMP-RC2_5Uw,2016-09-01 19:10:50,_ZmiCS6K7cBXtJy_mBE4XQ,5.0,"Arlene is wonderful.\nShe is professional, pro...",43.564436,-116.403457,Arlene Thompson Clinical Hypnotherapist,83642,1,Meridian,"Health & Medical, Counseling & Mental Health, ...",,"[-0.010717014, 0.04222669, 0.013096274, -0.017..."
4,7WXQAk23tPYOnTaSgkK36g,2016-12-12 20:23:48,f-2ZsIM6vWNkAIYYFMUrEQ,3.0,Beware of the burnt wheat bread. Otherwise the...,38.651664,-90.300094,Snarf's Sandwiches - Skinker,63130,1,St. Louis,"Sandwiches, Restaurants, Salad, Breakfast & Br...",360 N Skinker Blvd,"[-0.06065508, 0.05726764, -0.024030888, 0.0170..."
5,ZALxLqcIfrW047KDHwhiDQ,2019-07-16 23:10:23,FlW_7cLS9mE067geekUOTw,5.0,"Definitely enjoyed the place great food ,great...",38.63201,-90.194926,El Burro Loco Downtown,63101,1,St. Louis,"Restaurants, Mexican","1101 Lucas Ave, Ste 1","[-0.059387583, 0.04435638, -0.027584508, 0.002..."
6,n4qESYw7vHVVTWq4dtKp2w,2011-11-05 16:11:28,xLG7u4PJ-YgpHjjsRkAZeQ,3.0,"""Come closer, lover, take a table."" Such was t...",38.794637,-89.950472,Orient,62025,1,Edwardsville,"Chinese, Restaurants",1518 Troy Rd,"[-0.07881604, 0.009183761, 0.018110033, 0.0473..."
7,8Vk_2t7hQI02IhpcRnfAIw,2021-01-02 07:59:18,xtNxuv1NZa2Gd6r1FTs5Jg,5.0,If you want a delicious ramen here you will fi...,32.2218,-110.965922,OBON Sushi Bar Ramen,85701,1,Tucson,"Sushi Bars, Restaurants, Bars, Japanese, Ramen...",350 E Congress St,"[-0.054614436, 0.023927307, 0.005734516, 0.017..."
8,GSOLNXG5-IPvNj0DPf3kOQ,2020-01-19 01:51:39,xnw3wQywVif-Kl1xWNh4Lg,4.0,Would you eat dinner at 945pm? \n\nMade a rese...,29.95414,-90.066082,Irene's,70116,1,New Orleans,"Italian, Restaurants",529 Bienville St,"[-0.026657337, 0.019398896, 0.011171801, -0.03..."
9,QHUj7c1MkC3jnkku5S1Pcw,2020-10-29 14:53:16,qfPyYCZgYm5WEIag6EgefA,5.0,Best Mediterranean sandwich I ever had !! Supe...,28.045866,-82.394178,Mays Alreem Usa Pita House,33617,1,Temple Terrace,"Food, Sandwiches, Middle Eastern, Halal, Fast ...",10700 N 56th St,"[-0.07258139, 0.01738596, -0.012300067, -0.001..."


### Download data from snowflake into snowflake stage

In [15]:
_ = session.sql("remove @DISTRIBUTED_QDRANT_ON_SPCS.qdrant_app_public_schema.artifacts/data/").collect()

In [17]:
%%time
session.sql(f"""copy into @DISTRIBUTED_QDRANT_ON_SPCS.qdrant_app_public_schema.artifacts/data/
from
(select object_construct('BUSINESS_ID', BUSINESS_ID, 'DATE', DATE, 'REVIEW_ID', REVIEW_ID, 'STARS', STARS, 'REVIEW_TEXT', REVIEW_TEXT, 'LOCATION', object_construct('lat', BUSINESS_LATITUDE, 'lon', BUSINESS_LONGITUDE), 'BUSINESS_NAME', BUSINESS_NAME, 'BUSINESS_POSTAL_CODE', BUSINESS_POSTAL_CODE, 'BUSINESS_CITY', BUSINESS_CITY, 'BUSINESS_CATEGORIES', BUSINESS_CATEGORIES, 'BUSINESS_ADDRESS', BUSINESS_ADDRESS) AS PAYLOAD, to_array(EMBEDDINGS) AS EMBEDDINGS from yelp_reviews_with_cortex_embeddings)
file_format = (type=PARQUET)
max_file_size=4900000000
""").collect()

CPU times: user 8.51 ms, sys: 221 μs, total: 8.73 ms
Wall time: 1min 7s


[Row(rows_unloaded=5790749, input_bytes=73019331882, output_bytes=73019331882)]

### See sample of downloaded data

In [18]:
pdf = pd.read_parquet('/artifacts/data/data_0_0_0.snappy.parquet')

In [19]:
len(pdf.index)

16384

In [20]:
pdf.head()

Unnamed: 0,_COL_0,_COL_1
0,"{""BUSINESS_ADDRESS"":""5 W Canon Perdido"",""BUSIN...","[-7.027868926525116e-02,4.038435593247414e-02,..."
1,"{""BUSINESS_ADDRESS"":""739 Conti St"",""BUSINESS_C...","[-4.048965126276016e-02,2.500831335783005e-02,..."
2,"{""BUSINESS_ADDRESS"":""350 Gold Ranch Rd"",""BUSIN...","[-2.280976250767708e-02,3.102680109441280e-02,..."
3,"{""BUSINESS_ADDRESS"":""724 Iberville St"",""BUSINE...","[-1.970066502690315e-02,1.447780802845955e-02,..."
4,"{""BUSINESS_ADDRESS"":""2230 N Country Club Rd"",""...","[-5.138897523283958e-02,-2.573888981714845e-03..."


### Get vector length

In [21]:
vector_len = len(snowdf.select("EMBEDDINGS").limit(1).collect()[0]['EMBEDDINGS'])

In [22]:
vector_len

768

### Create Qdrant collection

In [26]:
client.recreate_collection(
    collection_name=collection_name,
    vectors_config=models.VectorParams(size=vector_len, distance=models.Distance.COSINE),
    optimizers_config=models.OptimizersConfigDiff(
        indexing_threshold=0,
    ),
    on_disk_payload="true",
    quantization_config=models.ScalarQuantization(
            scalar=models.ScalarQuantizationConfig(
                type=models.ScalarType.INT8,
                quantile=0.99,
                always_ram=True
            )
        )
)

  client.recreate_collection(


True

### Insert batches from snowflake into Qdrant

In [27]:
filepaths = list(glob.iglob('/artifacts/data/*.parquet'))

In [28]:
len(filepaths)

305

In [29]:
filepaths[0]

'/artifacts/data/data_0_0_0.snappy.parquet'

### Use ray single node multiprocessing to insert into Qdrant

In [30]:
try:
    cli = ray.init()
except:
    cli.disconnect()
    ray.shutdown()
    cli = ray.init()

2024-07-31 03:14:51,345	INFO worker.py:1772 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


In [31]:
print(ray.cluster_resources()) 

{'node:172.16.0.2': 1.0, 'memory': 110314532659.0, 'node:__internal_head__': 1.0, 'CPU': 32.0, 'object_store_memory': 11220602060.0}


In [32]:
pool = Pool(ray_address="auto", ray_remote_args={"num_cpus": 1})

In [33]:
def points_iterator(pdf:pd.DataFrame) -> Iterable[models.PointStruct]:
    for index, row in pdf.iterrows():
        idx = uuid.uuid4().hex
        payload = row['PAYLOAD']
        point_vector: Dict[str, models.Vector] = row['EMBEDDING']
        yield models.PointStruct(id=idx, payload=payload, vector=point_vector)

In [34]:
def read_parquet_and_put_into_qdrant(input_file_path: str):
    from qdrant_client import QdrantClient
    client = QdrantClient("http://instances.qdrantprimaryservice.qdrant-app-core-schema:6333")
    embeddings_pdf = pd.read_parquet(input_file_path)
    embeddings_pdf.columns = ['PAYLOAD', 'EMBEDDING']
    embeddings_pdf["PAYLOAD"] = embeddings_pdf["PAYLOAD"].map(json.loads)
    embeddings_pdf["EMBEDDING"] = embeddings_pdf["EMBEDDING"].map(literal_eval)
    points = points_iterator(embeddings_pdf)
    client.upload_points(
        collection_name=collection_name,
        points=points,
        wait=True,
        parallel=1
    )
    return "SUCCESS"

In [35]:
iterator = pool.imap_unordered(read_parquet_and_put_into_qdrant, filepaths)

In [36]:
%%time
extraction_results = list(tqdm.tqdm(iterator, total=len(filepaths)))

100%|██████████| 305/305 [20:23<00:00,  4.01s/it]  

CPU times: user 4.57 s, sys: 1.41 s, total: 5.98 s
Wall time: 20min 23s





### Start indexing

In [37]:
%%time
client.update_collection(
    collection_name=collection_name,
    optimizer_config=models.OptimizersConfigDiff(indexing_threshold=20000),
)

CPU times: user 5.08 ms, sys: 0 ns, total: 5.08 ms
Wall time: 123 ms


True

In [38]:
client.create_payload_index(
    collection_name=collection_name,
    field_name="BUSINESS_CITY",
    field_schema="keyword",
)

UpdateResult(operation_id=90501, status=<UpdateStatus.COMPLETED: 'completed'>)

In [39]:
client.create_payload_index(
    collection_name=collection_name,
    field_name="LOCATION",
    field_schema="geo",
)

UpdateResult(operation_id=90503, status=<UpdateStatus.COMPLETED: 'completed'>)

### Geo Search with exact city name

In [50]:
class YelpSemanticSearcherWithExactCityName:
    def __init__(self, collection_name):
        self.collection_name = collection_name
        # Initialize encoder model
        self.model = SentenceTransformer("Snowflake/snowflake-arctic-embed-m", device="cpu")
        # initialize Qdrant client
        self.qdrant_client = QdrantClient("http://instances.qdrantprimaryservice.qdrant-app-core-schema:6333")
        #self.session = session
        
    def search(self, text: str, city_name: str, limit:int):
        # Convert text query into vector
        vector = self.model.encode(text).tolist()
        #vector = self.session.sql(f"select snowflake.cortex.embed_text_768('snowflake-arctic-embed-m', '{text}') as result").collect()[0]['RESULT']
        city_filter = Filter(**{
        "must": [{
                "key": "BUSINESS_CITY", # Store city information in a field of the same name 
                "match": { # This condition checks if payload field has the requested value
                    "value": city_name
                }
            }]
        })
    
        # Use `vector` for search for closest vectors in the collection
        search_result = self.qdrant_client.search(
            collection_name=self.collection_name,
            query_vector=vector,
            query_filter=city_filter,
            limit=limit,  # 5 the most closest results is enough
        )
        # `search_result` contains found vector ids with similarity scores along with the stored payload
        # In this function you are interested in payload only
        payloads = [(hit.score, hit.payload) for hit in search_result]
        return payloads

In [51]:
searcher = YelpSemanticSearcherWithExactCityName(collection_name)

In [52]:
%%time
searcher.search("yum yum tacos",  "Clearwater", 3)

CPU times: user 265 ms, sys: 0 ns, total: 265 ms
Wall time: 62.1 ms


[(0.9399147,
  {'BUSINESS_ADDRESS': '1709 Drew St',
   'BUSINESS_CATEGORIES': 'Restaurants, Mexican',
   'BUSINESS_CITY': 'Clearwater',
   'BUSINESS_ID': '0yELPu1_7T-V3InzfEPS7g',
   'BUSINESS_NAME': 'La Cabaña Del Tio',
   'BUSINESS_POSTAL_CODE': '33755',
   'DATE': '2011-11-08 19:15:47.000',
   'LOCATION': {'lat': 27.9677135, 'lon': -82.7665303},
   'REVIEW_ID': '3cM7j8uHQaBzNvSEi26gVw',
   'REVIEW_TEXT': 'The best Tacos in town... super yum.',
   'STARS': 5}),
 (0.8758049,
  {'BUSINESS_ADDRESS': '2169 N Hercules Ave',
   'BUSINESS_CATEGORIES': 'Pizza, Restaurants, Italian',
   'BUSINESS_CITY': 'Clearwater',
   'BUSINESS_ID': 'FZSrUCl8DqHquHjV2qPuQg',
   'BUSINESS_NAME': "Vinny's Pizza & Restaurant",
   'BUSINESS_POSTAL_CODE': '33763',
   'DATE': '2016-11-04 14:52:40.000',
   'LOCATION': {'lat': 28.0005853516, 'lon': -82.7548317611},
   'REVIEW_ID': 'OBya3AhVlzsf19fJeLuPXw',
   'REVIEW_TEXT': 'YUM!  This is my new favorite pizza spot.',
   'STARS': 5}),
 (0.85915184,
  {'BUSINESS_ADD

### Geo Search in a 100_000 meter (60 miles) radius from city

In [43]:
class YelpSemanticSearcherWithinNDistanceFromCity:
    def __init__(self, collection_name):
        self.collection_name = collection_name
        # Initialize encoder model
        self.model = SentenceTransformer("Snowflake/snowflake-arctic-embed-m", device="cpu")
        # initialize Qdrant client
        self.qdrant_client = QdrantClient("http://instances.qdrantprimaryservice.qdrant-app-core-schema:6333")
        #self.session = session
        self.geolocator = Nominatim(user_agent="qdrant_on_spcs_native_app")
        
    def search(self, text: str, city_name:str, distance_in_meters:int, limit:int):
        # Convert text query into vector
        vector = self.model.encode(text).tolist()
        #vector = self.session.sql(f"select snowflake.cortex.embed_text_768('snowflake-arctic-embed-m', '{text}') as result").collect()[0]['RESULT']
        location = self.geolocator.geocode(city_name)
        latitude = location.latitude
        longitude = location.longitude
        query_filter=Filter(
            must=[
                FieldCondition(
                    key="LOCATION",
                    geo_radius=GeoRadius(
                        center=GeoPoint(
                            lat=latitude,
                            lon=longitude,
                        ),
                        radius=distance_in_meters,
                    ),
                )
            ]
        )
        search_result = self.qdrant_client.search(
            collection_name=self.collection_name,
            query_vector=vector,
            query_filter=query_filter,
            limit=limit,  # 5 the most closest results is enough
        )
        # `search_result` contains found vector ids with similarity scores along with the stored payload
        # In this function you are interested in payload only
        payloads = [(hit.score, hit.payload) for hit in search_result]
        return payloads

In [44]:
searcher = YelpSemanticSearcherWithinNDistanceFromCity(collection_name)

In [47]:
%%time
searcher.search(text="yum yum tacos",  city_name="Clearwater", distance_in_meters=100_000, limit=3) #100_000 metres = 60 miles

CPU times: user 272 ms, sys: 0 ns, total: 272 ms
Wall time: 246 ms


[(0.9399147,
  {'BUSINESS_ADDRESS': '1709 Drew St',
   'BUSINESS_CATEGORIES': 'Restaurants, Mexican',
   'BUSINESS_CITY': 'Clearwater',
   'BUSINESS_ID': '0yELPu1_7T-V3InzfEPS7g',
   'BUSINESS_NAME': 'La Cabaña Del Tio',
   'BUSINESS_POSTAL_CODE': '33755',
   'DATE': '2011-11-08 19:15:47.000',
   'LOCATION': {'lat': 27.9677135, 'lon': -82.7665303},
   'REVIEW_ID': '3cM7j8uHQaBzNvSEi26gVw',
   'REVIEW_TEXT': 'The best Tacos in town... super yum.',
   'STARS': 5}),
 (0.93536377,
  {'BUSINESS_ADDRESS': '913 E Hillsborough Ave',
   'BUSINESS_CATEGORIES': 'Mexican, Restaurants, Ethnic Food, Food, Vegetarian, Food Trucks, Street Vendors, Specialty Food',
   'BUSINESS_CITY': 'Tampa',
   'BUSINESS_ID': 'yb2vAoH3E-R11yWmnT570w',
   'BUSINESS_NAME': 'Taco Bus',
   'BUSINESS_POSTAL_CODE': '33604',
   'DATE': '2012-10-26 06:36:58.000',
   'LOCATION': {'lat': 27.995955, 'lon': -82.450292},
   'REVIEW_ID': 'ewQevLM9-YAMySCaE4GFBQ',
   'REVIEW_TEXT': 'Very good tacos',
   'STARS': 5}),
 (0.87981415,


### Clean up

In [55]:
_ = session.sql("remove @DISTRIBUTED_QDRANT_ON_SPCS.qdrant_app_public_schema.artifacts/data/").collect()

In [None]:
cli.disconnect()