In [None]:
# numpy 2 was recently released and not all packages support it yet.
# of course they would change the latest default and not be backward compatible...
!pip uninstall -y numpy
!pip install numpy==1.26.4

In [None]:
!pip install pandas
!python -m pip install pgpq
!python -m pip install psycopg
!python -m pip install SQLAlchemy
!pip install pinecone-datasets
!pip install psycopg2-binary
!pip install matplotlib
!pip install plotnine

In [None]:
from pinecone_datasets import list_datasets
list_datasets()

In [None]:
import pandas as pd
import numpy as np
import psycopg
import lib
from importlib import reload
from sqlalchemy import create_engine
reload(lib)

# for parallel index creation
from multiprocessing import Pool 

# for plotting
from plotnine import ggplot, geom_point, aes, stat_smooth, facet_wrap
from plotnine import *
from plotnine.data import mtcars



In [None]:
conn_string = "postgresql://postgres:postgres@localhost:4444"
lib.recreate_tables(conn_string)
lib.create_extensions(conn_string)
conn = psycopg.connect(conn_string)
engine = create_engine(conn_string, echo=True)
conn.autocommit = True

In [None]:
# DATASET = '10M'
DATASET = '100K'

yfcc_data = lib.get_yfcc_data(dataset=DATASET)
yfcc_data_queries = lib.get_yfcc_data(queries=True, dataset=DATASET)

In [None]:
# map blob column to jsonb
lib.df2pg(conn_string, yfcc_data)
# if the DB instance is small, tthe above may fail with OOM at _transform_metadata step. In that case you can specifically rerun that step
# lib._transform_metadata(conn_string)
lib.df2pg(conn_string, yfcc_data_queries, queries=True)

## Exploreatory queries

In [None]:
with conn.cursor() as cursor:
    cursor.execute("SELECT id, metadata_tags, blob FROM yfcc_passages LIMIT 1")
    print(cursor.fetchall())

In [None]:
df = pd.read_sql('select * from yfcc_passages limit 10', con=engine)

In [None]:
df = pd.read_sql('select * from yfcc_passages limit 10', con=engine)
# select only rows from the pg table that have blob->selectibity < 10
df = pd.read_sql("select blob->>'selectivity', * from yfcc_queries where blob->>'selectivity'> \'10\' limit 10", con=engine)
df

In [None]:
### Bring vector values to 0-1 range for quantization
with psycopg.connect(conn_string) as conn:
    with conn.cursor() as cursor:
        cursor.execute("""
UPDATE yfcc_passages SET vector =  (
  SELECT array_agg((element - 128)/ 100.0)
  FROM unnest(vector) AS t(element)
);
UPDATE yfcc_queries SET vector =  (
  SELECT array_agg((element - 128)/ 100.0)
  FROM unnest(vector) AS t(element)
);        
""")

In [None]:
# create partial indexes

with psycopg.connect(conn_string) as conn:
    with conn.cursor() as cursor:
        cursor.execute("""
    DROP FUNCTION IF EXISTS create_index_statements_for_popular_tags(index_threshhold INTEGER);
    CREATE OR REPLACE FUNCTION create_index_statements_for_popular_tags(index_threshhold INTEGER DEFAULT 10000)
    RETURNS TABLE(index_command TEXT) AS
    $$
    DECLARE
        tag_record RECORD;
    BEGIN
        FOR tag_record IN
            SELECT tag
            FROM (
                SELECT unnest(metadata_tags) AS tag
                FROM yfcc_passages
            ) AS tags
            GROUP BY tag
            HAVING COUNT(*) > index_threshhold
            ORDER BY COUNT(*) DESC
        LOOP
            index_command := format('CREATE INDEX IF NOT EXISTS hnsw_filtered_%s ON yfcc_passages USING lantern_hnsw(vector) WITH (quant_bits = 8) WHERE metadata_tags @> ARRAY[%s];', tag_record.tag, tag_record.tag);
            RETURN NEXT;
        END LOOP;
    END;
    $$ LANGUAGE plpgsql;
                    """)
    # for s in create_index_statements:
    #     print("running", s)
    #     cursor.execute(s[0])
# done in 792m 11.9s

In [None]:
with psycopg.connect(conn_string) as conn:
    with conn.cursor() as cursor:
        create_index_statements = cursor.execute("select * from create_index_statements_for_popular_tags();").fetchall()
create_index_statements

In [None]:
NUM_CORES=10
print("creating", len(create_index_statements), "partial indexes")
reload(lib)
            
            
# python create paiers of (conn_string, index_command) and run them in parallel
run_query_inputs = [(conn_string, s[0]) for s in create_index_statements]

with Pool(NUM_CORES) as p:
    p.starmap(lib.run_query, run_query_inputs, chunksize=1)


In [None]:
## Prewarm everything

with psycopg.connect(conn_string) as conn:
    with conn.cursor() as cursor:
        print("prewarm the base table")
        # prewarming the base relation first, since this has lowest priority
        # and it is ok if this gets evicted later to make space for the rest
        cursor.execute("SELECT pg_prewarm('yfcc_passages')")    
        
        print("prewarm all partial indexes")
        cursor.execute("""
SELECT pg_prewarm(i.relname::text)
FROM pg_class t
JOIN pg_index ix ON t.oid = ix.indrelid
JOIN pg_class i ON i.oid = ix.indexrelid
JOIN pg_am a ON i.relam = a.oid
JOIN pg_namespace n ON n.oid = i.relnamespace
WHERE a.amname = 'lantern_hnsw';
                       """)
        
        print("prewarm pk and GIN indexes on yfcc_passages")
        cursor.execute("""
SELECT i.relname, pg_prewarm(i.relname::text)
FROM pg_class t
JOIN pg_index ix ON t.oid = ix.indrelid
JOIN pg_class i ON i.oid = ix.indexrelid
JOIN pg_am a ON i.relam = a.oid
JOIN pg_namespace n ON n.oid = i.relnamespace
WHERE (a.amname = 'gin' OR a.amname = 'btree') AND t.relname = 'yfcc_passages';
                       """)

        
        
        print(cursor.fetchall())

In [None]:
# for use_pgvector in [True, False]:
reload(lib)
for use_pgvector in [False]:

    recalls, latencies, stats = lib.run_experiment(conn_string, 10000, offset=0, pgvector=use_pgvector, explain=False)
    plt.hist(latencies)

In [None]:
import json

with open("recalls.json", "w") as f:
    json.dump(recalls.tolist(), f)
    
with open("latencies.json", "w") as f:
    json.dump(latencies.tolist(), f)

In [None]:
# read the files above
with open("recalls.json", "r") as f:
    recalls = np.array(json.load(f))
with open("latencies.json", "r") as f:
    latencies = np.array(json.load(f))
with open("latencies-pinecone-100k.json", "r") as f:
    latencies_pinecone = np.array(json.load(f))
with open("recalls-pinecone-100k.json", "r") as f:
    recalls_pinecone = np.array(json.load(f))

In [None]:
dataset_size = 10_000_000
vector_dim = 192
M = 16
index_size = 1.5 * dataset_size * vector_dim * M / 1024 / 1024 / 1024
qpm = 1000
ipm = 1000

# Per pinecone's pricing of 18 RUs per query on YFCC dataset.


qpm = [1, 10, 100, 1000, 10000]
qp_month = 30 * 24 * 60 * np.array(qpm)
ru_per_month = 18 * np.array(qp_month)
cost_per_1M_ru = 8.25 # https://www.pinecone.io/pricing/
query_cost_per_month = ru_per_month * cost_per_1M_ru / 1_000_000

ec2_cost_per_month = 153 # r6g.2xlarge 64 GB 8 core shared ec2 instance, easily be able to support 10K/queries per minute
ubicloud_cost_per_month = 520
lantern_on_gcp = 965
lantern_on_ubicloud = 595


# plot the above with ggplot such that. X axis is various queries per minute query rates, Y axis is cost per month
# each group of bars are pinecone, ec2, ubicloud, lantern on gcp and lantern on ubicloud

df_cost = pd.DataFrame({
    "Queries per minute": qpm,
    "Pinecone": query_cost_per_month,
    "EC2": ec2_cost_per_month,
    # "Ubicloud": ubicloud_cost_per_month,
    "Lantern": lantern_on_gcp,
    "Lantern on Ubicloud": lantern_on_ubicloud
})

df_cost_melted = df_cost.melt(id_vars="Queries per minute", var_name="System", value_name="Cost per month")



In [None]:
df_cost_melted["aa"] = df_cost_melted["Queries per minute"]
system_colors = ["#FFFF00", "#ff7f0e","#FF564B", "#1C17FF"]
system_order = ["EC2", "Lantern on Ubicloud", "Lantern", "Pinecone"]

# let cc column be cost per month for pinecone only and be none for other systems
df_cost_melted["cc"] = df_cost_melted["Cost per month"]
df_cost_melted.loc[df_cost_melted["System"] != "Pinecone", "cc"] = None

# Create a categorical type with the specified order
df_cost_melted['System'] = pd.Categorical(df_cost_melted['System'], categories=system_order, ordered=True)

cost_plot = (
    
    ggplot(df_cost_melted, aes(x="factor(aa)", y='Cost per month', fill='System')) +
    geom_bar(stat='identity', position='dodge') +
    geom_text(aes(label='map( lambda x: "$" + str(int(x)) if x == x else "", cc)'), position=position_dodge(width=1), size=9, va='bottom', color="#1C17FF")+
    labs(x='Queries per minute', y='Cost per month') +
    scale_y_continuous(labels=lambda l: ["$" + str(int(x)) for x in l], breaks=range(0, 6600, 500)) +
    # change ordering of systems
    
    coord_cartesian(ylim=(0, 6600)) +
    gg_theme +
    scale_fill_manual(values=system_colors) +
    # x == x filters out float('NaN') values which are used above to filter out cost numbers and reduce clutter
    ggtitle("Cost per month for various systems")



)

fig = cost_plot.draw()
fig.axes[0].collections[0].set_clip_on(False)




# fig.savefig("cost_plot.png", dpi=300)
fig
# cost_plot.show()

In [None]:
np.percentile(recalls_pinecone, 100 - np.array([50, 90, 95, 99]))

In [None]:
#print 50, 90, 95 and 99 percentile recall and latency on the same line per percentile in a tabulated format
print("percentile\trecall PG\t \tlatency(ms)")
print("================================")
print(f"mean\t\t{np.round(np.mean(recalls),2)}\t{np.round(np.mean(latencies), 2)}")
for p in [50, 90, 95, 99]:
    print(f"{p}\t\t{np.percentile(recalls, 100-p)}\t{np.round(np.percentile(latencies, p), 2)}")


# Calculate percentiles
percentiles = np.array([50, 95, 99])

latencies_percentiles = np.percentile(latencies, percentiles)
pinecone_latencies = np.percentile(latencies_pinecone, percentiles)

postgres_recalls = np.percentile(recalls, 100 - percentiles)
pinecone_recalls = np.percentile(recalls_pinecone, 100-percentiles)


# Create a DataFrame for the percentiles
df_percentiles = pd.DataFrame({'Percentile': percentiles, 'Latency': latencies_percentiles, "Recall":postgres_recalls, "System": "Postgres"})
df_percentiles_pinecone = pd.DataFrame({'Percentile': percentiles, 'Latency': pinecone_latencies, "Recall": pinecone_recalls, "System": "Pinecone"})
df_percentiles = pd.concat([df_percentiles, df_percentiles_pinecone])


#common plot components
gg_theme = theme_bw() + theme(figure_size=(6, 3))
# gg_theme = theme_minimal() + theme(figure_size=(6, 3))
gg_x_axis_percentiles =  scale_x_discrete(labels=lambda l: [str(x) + 'th' for x in l])

theme

# Plot the percentiles
plot = (
    ggplot(df_percentiles, aes(x='factor(Percentile)', y='Latency', fill='System')) +
    geom_bar(stat='identity', position='dodge', width=0.7) +
    geom_text(aes(label='map( lambda x: str(int(x)), round(Latency))'), position=position_dodge(width=0.7), size=10, va='baseline') +
    labs(x='Percentile', y='Latency') +
    scale_y_continuous(labels=lambda l: [str(int(x)) + "ms" for x in l]) +

    gg_x_axis_percentiles +
    gg_theme +
    scale_fill_manual(values=system_colors[2:][::-1]) +
    ggtitle("Latency at various percentiles")
)
plot.show()


plot_recall = (
    ggplot(df_percentiles, aes(x='factor(Percentile)', y='Recall', fill='System')) +
    geom_bar(stat='identity', position='dodge', width=0.7) +
    geom_text(aes(label='map( lambda x: str(int(x)) + "%", round(Recall * 100))'), position=position_dodge(width=.7), size=10, va='baseline') +
    labs(x='Percentile', y='Recall') +
    scale_y_continuous(labels=lambda l: [str(int(x * 100)) + "%" for x in l], limits=(0,1)) +
    gg_x_axis_percentiles +
    gg_theme +
    scale_fill_manual(values=system_colors[2:][::-1]) +

    ggtitle("Recall at various percentiles")
    

)
plot_recall.show()



In [None]:
# explain output
lib.vector_search(conn_string,  q_vector_id=21, explain = True, materialize_first=True, return_recall=True, reuse_conn=True, pgvector=False, prefilter_count=0)


In [None]:
reload(lib)
# lib.vector_search(conn_string, [3432], explain = False)
# lib.create_index(conn_string)
res = lib.bulk_vector_search(conn_string, 3000,k=10, return_recall=True, explain=False)
res

In [None]:
# if recall is none, means near_ids and near_dists are none, means no results were found
overall_recall = sum([1 if a.recall is None else a.recall for a in res])/len(res)
tiebreak_affected = [a for a in res if a.recall and int(a.recall) < 1  ]
# note: even with 100% accurate scan, recall is < 1, since there are 4 rows which have equal distance to 10th and 11th result
# and there is no stable tie breaking
print(overall_recall, len(tiebreak_affected))