In [None]:
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
from pinecone import Pinecone
import psycopg2
import singlestoredb as s2

import time
import concurrent.futures
import json

### The wikipedia data is located in an open S3 bucket: `s3://wikipedia-video-game-data/video-game-embeddings(1).csv` in `us-west-1`

# **Credentials**

In [None]:
mongo_conn_str = f'enter your mongo connection string here'
client = MongoClient(mongo_conn_str, server_api=ServerApi('1'))

try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
    db = client.your_db_name
    collection = db.your_collection_name
except Exception as e:
    print(e)

In [None]:
# Connection parameters for postgres
host = "your postgres host"
dbname = "your db name"
user = "your username"

# Create the connection string without a password
conn_string = f"dbname='{dbname}' user='{user}' host='{host}'"
conn = psycopg2.connect(conn_string)

In [None]:
s2_host='your singlestore host'
port='3306'
username='admin'
password='your singlestore password'
database='your singlestore database'

In [None]:
pc = Pinecone(api_key='your pinecone api key')
index = pc.Index("your pinecone index name")

# **Setting the Search Execution**

## **MongoDB**

In [None]:
query = collection.find_one({"_id": "2251799813701581"})
query_vector = query["vector"]

In [None]:
def execute_mongo_search(query_vector):
    pipeline = [
        {
            '$vectorSearch': {
                "index": "vector_index",
                "path": "vector",
                "queryVector": query_vector,
                "numCandidates": 200,
                "limit": 200  # Limit the number of results as needed
            }
        },
        {
            '$addFields': {
                'keyword_bonus': {
                    '$cond': {
                        'if': {'$regexMatch': {'input': "$paragraph", 'regex': "AAA games"}},
                        'then': 1.0,
                        'else': 0.0
                    }
                }
            }
        },
        {
            '$addFields': {
                'custom_score': {
                    '$add': [
                        {'$multiply': [{'$subtract': [1, 0.3]}, '$score']},  # Adjust weight as necessary
                        {'$multiply': [0.3, '$keyword_bonus']}  # Adjust weight as necessary
                    ]
                }
            }
        },
        {
            '$project': {
                '_id': 1,
                'paragraph': 1,
                'custom_score': 1
            }
        },
        {'$sort': {'custom_score': -1}},  # Sort by custom score descending
        {'$limit': 5}  # Limit the results if needed
    ]
    results = collection.aggregate(pipeline)
    return list(results) 

## **MongoDB: Running the Concurrent Queries**

In [None]:
num_concurrent_queries = 250
start_time = time.time()

with concurrent.futures.ThreadPoolExecutor(max_workers=num_concurrent_queries) as executor:
    futures = [executor.submit(execute_mongo_search, query_vector) for _ in range(num_concurrent_queries)]
    concurrent.futures.wait(futures)

end_time = time.time()
print(f"Executed {num_concurrent_queries} concurrent queries.")
print(f"Total execution time: {end_time - start_time} seconds")

failed_count = sum(1 for f in futures if f.exception() is not None)
print(f"Failed queries: {failed_count}")

## **Pinecone**

In [None]:
def execute_pinecone_search(id, keywords):
    try:
        vector_search = index.query(id=id, top_k=200, include_metadata=True)
        filtered_results = []
        
        for result in vector_search["matches"]:
            document = collection.find_one({"_id": str(result["id"])})
            if document:
                paragraph = document["paragraph"]
                pinecone_score = result["score"]
                keyword_bonus = 1.0 if keywords in paragraph else 0.0
                custom_score = (1-0.3)*pinecone_score + 0.3*keyword_bonus
                filtered_results.append((result["id"], paragraph, custom_score))
        return filtered_results
    except Exception as e:
        raise RuntimeError("Failed to process query") from e

## **Pinecone: Running the Concurrent Queries**

In [None]:
num_concurrent_queries = 250
id_to_query = "2251799813701581"  # Placeholder for the ID to query
keywords_to_search = "AAA games"  # Placeholder for the keywords to check in the paragraph
start_time = time.time()

with concurrent.futures.ThreadPoolExecutor(max_workers=num_concurrent_queries) as executor:
    futures = [executor.submit(execute_pinecone_search, id_to_query, keywords_to_search) for _ in range(num_concurrent_queries)]
    concurrent.futures.wait(futures)

end_time = time.time()

# Checking failed futures
failed_count = sum(1 for f in futures if f.exception() is not None)

print(f"Executed {num_concurrent_queries} concurrent queries in {end_time - start_time} seconds")
print(f"Failed queries: {failed_count}")

## **pgvector**

In [None]:
def fetch_vector_pg(id):
    with psycopg2.connect(dbname=dbname, user=user, host=host) as conn:
        with conn.cursor() as cursor:
            cursor.execute("SELECT embedding FROM video_game_wikipedia WHERE id = %s;", (id,))
            result = cursor.fetchone()
            return result[0] if result else None

In [None]:
def execute_pgvector_search(query_vector):
    if not query_vector:
        return []

    sql_query = '''
    WITH vector_query AS (
    SELECT id, paragraph,
           (embedding <#> %s) AS vector_score 
    FROM video_game_wikipedia
    ORDER BY vector_score
    LIMIT 200
    ),
    fts_query AS (
        SELECT id, paragraph,
            ts_rank_cd(paragraph_tsvector, plainto_tsquery('english', 'Mario Kart')) AS text_score
        FROM video_game_wikipedia
        WHERE paragraph_tsvector @@ plainto_tsquery('english', 'Mario Kart')
        ORDER BY text_score DESC
        LIMIT 200
    ),
    combined AS (
        SELECT f.id AS id, f.paragraph, f.text_score, v.vector_score,
            0.7 * v.vector_score + 0.3 * f.text_score AS hybrid_score
        FROM fts_query f
        FULL OUTER JOIN vector_query v ON f.id = v.id
    )
    SELECT id, paragraph, hybrid_score
    FROM combined
    ORDER BY hybrid_score DESC
    LIMIT 5;
    '''

    with psycopg2.connect(dbname=dbname, user=user, host=host) as conn:
        with conn.cursor() as cursor:
            cursor.execute(sql_query, (query_vector,))
            results = cursor.fetchall()

    return results


## **pgvector: Running the Concurrent Queries**

In [None]:
vector_id = '2251799813701581'
query_vector = fetch_vector_pg(vector_id)


num_concurrent_queries = 250
start_time = time.time()

with concurrent.futures.ThreadPoolExecutor(max_workers=num_concurrent_queries) as executor:
    # Fire off the same search query concurrently
    futures = [executor.submit(execute_pgvector_search, query_vector) for _ in range(num_concurrent_queries)]
    concurrent.futures.wait(futures)

end_time = time.time()
print(f"Executed {num_concurrent_queries} concurrent queries.")
print(f"Total execution time: {end_time - start_time} seconds")

failed_count = sum(1 for f in futures if f.exception() is not None)
print(f"Failed queries: {failed_count}")

## **SingleStore**

In [None]:
# Function to fetch the vector
def fetch_vector_s2(vector_id):
    try:
        conn = s2.connect(
            host=s2_host,
            port=port,
            user=username,
            password=password,
            database=database,
            autocommit=True
        )
        query = "SELECT v FROM vecs_clean WHERE id = %s"
        with conn.cursor() as cursor:
            cursor.execute(query, (vector_id,))
            result = cursor.fetchone()
        conn.close()
        if result:
            return json.dumps(result[0])
        else:
            print("Vector not found.")
            return None
    except Exception as e:
        print(f"Error fetching vector: {e}")
        return None

# Modify this function to use the extracted vector
def execute_singlestore_search(vector):
    query = f'''
    with fts as(
        select id, paragraph, match (paragraph) against ('AAA games') as score
        from vecs_clean
        where match (paragraph) against ('AAA games')
        order by score desc
        limit 200
    ),
    vs as (
        select id, paragraph, v <*> {vector} as score
        from vecs_clean
        order by score use index (auto) desc
        limit 200
    )
    select vs.id,
        vs.paragraph,
        .3 * ifnull(fts.score, 0) + .7 * vs.score as hybrid_score,
        vs.score as vec_score,
        ifnull(fts.score, 0) as ft_score
    from fts full outer join vs
        on fts.id = vs.id
    order by hybrid_score desc
    limit 5;
    '''
    # Add your existing database connection and query execution logic here
    try:
        # Establish a new connection for each query
        conn = s2.connect(
            host=s2_host,
            port=port,
            user=username,
            password=password,
            database=database,
            autocommit=True
        )        
        with conn.cursor() as cursor:
            cursor.execute(query)
            results = cursor.fetchall()
        conn.close()
        return results
    except Exception as e:
        print(f"Error executing query: {e}")
        return None

## **SingleStore: Running the Concurrent Queries**

In [None]:
vector_id = '2251799813701581'
query_vector = fetch_vector_s2(vector_id)


num_concurrent_queries = 250
start_time = time.time()

with concurrent.futures.ThreadPoolExecutor(max_workers=num_concurrent_queries) as executor:
    # Fire off the same search query concurrently
    futures = [executor.submit(execute_singlestore_search, query_vector) for _ in range(num_concurrent_queries)]
    concurrent.futures.wait(futures)

end_time = time.time()
print(f"Executed {num_concurrent_queries} concurrent queries.")
print(f"Total execution time: {end_time - start_time} seconds")

failed_count = sum(1 for f in futures if f.exception() is not None)
print(f"Failed queries: {failed_count}")