In [None]:
!pip install cassandra-driver -U

In [1]:
import sys, os
from dotenv import load_dotenv
from conn import getCQLSession
from cassandra.cqlengine.query import BatchStatement
from cassandra.query import BatchType
import csv

load_dotenv(override=True) 
os.environ["MODE"]

'astra_db'

In [2]:
from conn import getCQLSession
session = getCQLSession(os.environ["MODE"])

Initializing CQL Session
Connected


In [None]:
table = "default_keyspace.product_catalog_emb"

session.execute("""CREATE TABLE IF NOT EXISTS default_keyspace.product_catalog_emb (
                    id text PRIMARY KEY,
                    title text,
                    emb VECTOR<FLOAT, 384>)""")

session.execute("""CREATE CUSTOM INDEX product_catalog_emb_vector_idx
    ON default_keyspace.product_catalog_emb (emb)
    USING 'StorageAttachedIndex'  
    WITH OPTIONS = { 'similarity_function': 'COSINE'};""")

CREATE CUSTOM INDEX product_catalog_emb_vector_idx
    ON default_keyspace.product_catalog_emb (emb)
    USING 'StorageAttachedIndex'  
    WITH OPTIONS = { 'similarity_function': 'COSINE'};

CREATE CUSTOM INDEX product_catalog_emb_vector_idx
    ON default_keyspace.product_catalog_emb (emb)
    USING 'StorageAttachedIndex'  
    WITH OPTIONS = { 'source_model': 'bert', 'similarity_function': 'COSINE'};
    
CREATE CUSTOM INDEX product_catalog_emb_vector_idx
    ON default_keyspace.product_catalog_emb (emb)
    USING 'StorageAttachedIndex'  
    WITH OPTIONS = { 'source_model': 'bert', 'similarity_function': 'DOT_PRODUCT'};    

# Product Search

In [3]:
import csv
def read_csv(file_path):
    data = []
    with open(file_path, 'r') as file:
        csv_reader = csv.DictReader(file)
        for row in csv_reader:
            data.append(row)
    return data

In [4]:
queries = read_csv("../data_queries/chunk_9999.csv")

In [10]:
queries[:10]

[{'query': 'trellis for climbing plants',
  'emb': '-0.048838172,-0.074733235,0.05481011,0.027351009,0.0006103623,0.026222544,-0.008075648,0.0138345035,-0.0077102883,0.03429652,0.009751471,-0.05707372,0.045820367,0.025954433,0.05993116,0.056023695,0.011463513,0.10386538,-0.048460662,-0.018406807,0.04325874,0.012444333,-0.030416965,0.0017623174,-0.0153835965,0.011319403,-0.08990371,-0.005596035,-0.014435396,-0.11805417,0.011254357,0.035645973,0.01903037,0.006994528,-0.010007574,0.0066225445,-0.053514514,-0.0013567668,-0.034658812,0.017996404,0.013226245,0.0060074404,-0.07308237,-0.001998519,-0.055668496,-0.014055906,-0.027527863,-0.014177166,0.030189157,-0.041558646,-0.033268902,-0.050733875,-0.013386321,0.001383755,-0.020400908,0.0132677,0.036865756,0.059189137,0.010553816,0.02062063,0.08669635,-0.034591265,-0.11797153,0.0666018,0.02127234,0.0060160155,-0.045337945,0.027114328,0.08377424,0.07520063,0.06897177,0.015983732,0.007868892,0.006487555,-0.01724752,-0.024720518,-0.037986327,-0.

In [None]:
import time
table = "default_keyspace.product_catalog_emb"

def query_data(data, limit="10"):
    cmd_select = f"""
        SELECT id, title FROM {table}
        ORDER BY emb ANN OF :emb LIMIT {limit}
    """

    prepared_stmt_select = session.prepare(cmd_select)
    count = 0
    start = time.time()
    for row in data:
        count += 1
        prods = session.execute(prepared_stmt_select,{"emb": [float(x) for x in row['emb'].split(',')] })
        if count % 10 == 0 :
            end = time.time()
            print(f"""{count} queries | Time: {end - start} | RPS: {(end - start) / 10} """)
            start = time.time()
    
    print(f"""Finish: {count} queries.""")
    return count

In [None]:
# source_model="bert", limit 10, 
query_data(queries, limit="1000")

In [39]:
import time
import time
import uuid
import queue
table = "default_keyspace.product_catalog_emb"

async def query_data_queue(data, limit="10", CONCURRENCY_LEVEL=100, TOTAL_QUERIES = 1000):

    cmd_select = f"""
        SELECT id, title FROM {table}
        ORDER BY emb ANN OF :emb LIMIT {limit}
    """
    prepared_stmt_select = session.prepare(cmd_select)
    
    def clear_queue():
        while True:
            try:
                futures.get_nowait().result()
            except queue.Empty:
                break


    start = time.time()
    futures = queue.Queue(maxsize=CONCURRENCY_LEVEL)

    # Chunking way, when the max concurrency level is reached, we
    # wait the current chunk of requests to finish
    for row in data[:TOTAL_QUERIES]:
        future = session.execute_async(prepared_stmt_select,{"emb": [float(x) for x in row['emb'].split(',')] })
        try:
            futures.put_nowait(future)
        except queue.Full:
            clear_queue()
            futures.put_nowait(future)

    clear_queue()
    end = time.time()

    print("Finished executing {} queries with a concurrency level of {} in {:.2f} seconds.".
          format(TOTAL_QUERIES, CONCURRENCY_LEVEL, (end-start)))        
    print("QPS: {:.2f} seconds.".
      format( (end-start) / TOTAL_QUERIES))     

    print(f"""Finish""")

In [47]:
# source_model="bert", DOT_PRODUCT metric 
await query_data_queue(queries, limit="1000", CONCURRENCY_LEVEL=50, TOTAL_QUERIES = 10000)

OperationTimedOut: errors={'097a6154-1b3e-499c-9119-46e4be5e0aa2-us-east-1.db.astra.datastax.com:29042:73cfb59e-cf1c-4957-a764-c405000cbe38': 'Client request timeout. See Session.execute[_async](timeout)'}, last_host=097a6154-1b3e-499c-9119-46e4be5e0aa2-us-east-1.db.astra.datastax.com:29042:73cfb59e-cf1c-4957-a764-c405000cbe38

In [None]:
cmd_select = f"""
    SELECT id, title FROM {table}
    ORDER BY emb ANN OF :emb LIMIT {limit}
"""

prepared_stmt_select = session.prepare(cmd_select)
prods = session.execute(prepared_stmt_select,{"emb": [float(x) for x in data[0]['emb'].split(',')] })

In [None]:
# Limit 10, No source_model, COSINE
query_data(queries)

In [None]:
%time
cmd_select = f"""
        SELECT id FROM default_keyspace.product_catalog_emb
        ORDER BY emb ANN OF :emb LIMIT 1000
"""
start = time.time()
prepared_stmt_select = session.prepare(cmd_select)
prods = session.execute(prepared_stmt_select,{"emb": [float(x) for x in queries[2]['emb'].split(',')] })
end = time.time()
print(f"""Time: {end - start}""")
df = pd.DataFrame(prods)
df