imdb metadataset의 keyword열 데이터를 바탕으로, 검색을 수행하는 코드입니다.

1. pgvector에서 지원하는 두가지 벡터인덱싱 방식인 HNSW, IVFFlat을 적용해보았습니다.

2. 대용량 데이터에 대해 검색시간을 줄일 수 있는 expression indexing(binary, subvector 인덱스)과 rerank를 구현해보았습니다.

3. PostgreSQL에서 지원하는 Full-text Search와 내적기반 벡터검색의 Reciprocal Rank Fusion을 구현해보았습니다.

In [2]:
import psycopg2
import numpy as np
import pandas as pd
from langchain_openai import OpenAIEmbeddings
from time import time
from tqdm import tqdm

## 실습용 데이터 준비

In [4]:
# 벡터 생성
imdb = pd.read_csv('movie_metadata.csv')
imdb = imdb.dropna()
texts = imdb['plot_keywords'][:1000].tolist()

# 벡터 검색
test_words = "cyberpunk|dystopia|paraplegic|marine|warrior"  # 검색할 문자열
k = 5  # 상위 5개 유사 벡터 검색

# 결과 조회용 함수
def show(rows):
    for row in rows:
        print(row)

In [12]:
# OpenAI 임베딩 사용
import os
os.environ['OPENAI_API_KEY'] = open('API_KEY', 'r').read()
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")

# 검색할 문자열의 임베딩
test_vec = embeddings.embed_query(test_words)
emb_dim = len(test_vec)

# embeddings_list에 텍스트의 순서와 동일하게 임베딩을 생성해서 저장
embeddings_list = []
for text in tqdm(texts):
    embeddings_list.append(embeddings.embed_query(text))

# 임베딩 파일을 CSV로 저장
import csv
filename = "imdb_embedding.csv"
with open(filename, "w", newline="") as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(["keywords", "embedding"])
    for i in range(len(embeddings_list)):
        writer.writerow([texts[i], embeddings_list[i]])

## INSERT

In [53]:
# PG에 연결
conn = psycopg2.connect(host='192.168.0.47', dbname='postgres', user='postgres', password='postgres1016', port=55432)
cursor = conn.cursor()

# items라는 이름의 테이블 생성
# embedding vector열은 emb_dim 차원의 벡터
cursor.execute("DROP TABLE IF EXISTS imdb;")
cursor.execute("CREATE TABLE IF NOT EXISTS imdb (id serial primary key, keywords text, embedding vector({}));".format(emb_dim))

# content와 embedding을 짝지워 INSERT
for i in tqdm(range(len(embeddings_list))):
    content= texts[i]
    embedding = embeddings_list[i]
    cursor.execute("INSERT INTO imdb (keywords, embedding) VALUES (%s, %s)", (content, embedding))

# PG에 커밋
conn.commit()
conn.close()

## Query

In [182]:
# PG에 연결
conn = psycopg2.connect(host='192.168.0.47', dbname='postgres', user='postgres', password='postgres1016', port=55432)
cursor = conn.cursor()

# 유사도 기반 검색
# <-> : L2거리
s = time()
cursor.execute("""
    SELECT id,
        keywords,
        embedding <-> '{}' AS similarity
    FROM imdb
    ORDER BY similarity
    LIMIT 5
""".format(test_vec))
time_spent = time() - s
print("time:", time_spent, "\n")

result = cursor.fetchall()
show(result)

# PG에 커밋
conn.commit()
conn.close()

time: 0.020612239837646484 

(626, 'christmas|magic|santa claus|son|toy', 0.871037229064231)
(405, 'christmas|christmas eve|elf|high tech|resentment', 0.8716796136506032)
(89, 'boy|christmas|christmas eve|north pole|train', 0.925608590680549)
(906, 'act of kindness|christmas|christmas eve|neighbor|vacation', 0.9353251981016648)
(383, 'christmas|island|love|survival|talking to inanimate object', 0.9784871476040324)


## Creating Index

### HNSW L2 Index

In [169]:
# PG에 연결
conn = psycopg2.connect(host='192.168.0.47', dbname='postgres', user='postgres', password='postgres1016', port=55432)
cursor = conn.cursor()

# 유사도를 포함한 쿼리 실행
s = time()
cursor.execute("""
    CREATE INDEX IF NOT EXISTS imdb_hnsw_index_l2 ON imdb USING hnsw (embedding vector_l2_ops) WITH (m = 64, ef_construction = 256);
""".format(test_vec))
time_spent = time() - s
print("time:", time_spent, "\n")

# PG에 커밋
conn.commit()
conn.close()

time: 2.1496260166168213 



In [170]:
# PG에 연결
conn = psycopg2.connect(host='192.168.0.47', dbname='postgres', user='postgres', password='postgres1016', port=55432)
cursor = conn.cursor()

# 유사도를 포함한 쿼리 실행
s = time()
cursor.execute("""
    EXPLAIN ANALYZE SELECT id,
        keywords,
        embedding <-> '{}' AS similarity
    FROM imdb
    ORDER BY similarity
    LIMIT 5
""".format(test_vec))
time_spent = time() - s
print("time:", time_spent, "\n")

result = cursor.fetchall()
show(result)

cursor.execute("DROP INDEX IF EXISTS imdb_hnsw_index_l2;")

# PG에 커밋
conn.commit()
conn.close()

time: 0.03040909767150879 

('Limit  (cost=155.11..155.12 rows=5 width=63) (actual time=7.272..7.273 rows=5 loops=1)',)
('  ->  Sort  (cost=155.11..157.61 rows=1000 width=63) (actual time=7.271..7.272 rows=5 loops=1)',)
("        Sort Key: ((embedding <-> '[0.009310356,0.010533301,0.023119183,-0.027138308,-0.0063605406,-0.041469496,0.00799523,0.019591695,-0.010533301,-0.04414891,0.044173494,-0.014675334,-0.02812158,0.004498469,0.037216846,0.023795182,-0.07517114,0.049335673,0.018878821,0.0068521765,-0.0075466125,0.0108713005,0.007005813,0.016703334,-0.01187301,0.05791472,0.009377956,0.016568134,0.044001423,-0.023119183,0.009783557,-0.050491016,0.04643502,-0.0027454798,0.044861786,-0.03318543,-0.008308648,-0.016420642,-0.025835471,-0.021668857,-0.04112535,0.016015042,0.0031049885,0.051670942,0.028859032,-0.019997295,-0.002759307,0.050786,0.0023752165,0.06696082,0.025860053,0.0005726791,-0.014687626,0.082889825,-0.020009585,-0.026253361,-0.020980567,0.040043753,0.015535697,0.0017007532,0

### HNSW Cosine Index

In [171]:
# PG에 연결
conn = psycopg2.connect(host='192.168.0.47', dbname='postgres', user='postgres', password='postgres1016', port=55432)
cursor = conn.cursor()

# 유사도를 포함한 쿼리 실행
s = time()
cursor.execute("""
    CREATE INDEX imdb_hnsw_index_cos ON imdb USING hnsw (embedding vector_cosine_ops) WITH (m = 64, ef_construction = 256);
""".format(test_vec))
time_spent = time() - s
print("time:", time_spent, "\n")

# PG에 커밋
conn.commit()
conn.close()

time: 2.142549991607666 



In [172]:
# PG에 연결
conn = psycopg2.connect(host='192.168.0.47', dbname='postgres', user='postgres', password='postgres1016', port=55432)
cursor = conn.cursor()

# 유사도를 포함한 쿼리 실행
s = time()
cursor.execute("""
    SET hnsw.ef_search = 100;
    EXPLAIN ANALYZE SELECT id,
        keywords,
        embedding <=> '{}' AS similarity
    FROM imdb
    ORDER BY similarity
    LIMIT 5;
""".format(test_vec))
time_spent = time() - s
print("time:", time_spent, "\n")

result = cursor.fetchall()
show(result)

cursor.execute("DROP INDEX IF EXISTS imdb_hnsw_index_cos;")

# PG에 커밋
conn.commit()
conn.close()

time: 0.024373769760131836 

('Limit  (cost=155.11..155.12 rows=5 width=63) (actual time=6.114..6.115 rows=5 loops=1)',)
('  ->  Sort  (cost=155.11..157.61 rows=1000 width=63) (actual time=6.113..6.113 rows=5 loops=1)',)
("        Sort Key: ((embedding <=> '[0.009310356,0.010533301,0.023119183,-0.027138308,-0.0063605406,-0.041469496,0.00799523,0.019591695,-0.010533301,-0.04414891,0.044173494,-0.014675334,-0.02812158,0.004498469,0.037216846,0.023795182,-0.07517114,0.049335673,0.018878821,0.0068521765,-0.0075466125,0.0108713005,0.007005813,0.016703334,-0.01187301,0.05791472,0.009377956,0.016568134,0.044001423,-0.023119183,0.009783557,-0.050491016,0.04643502,-0.0027454798,0.044861786,-0.03318543,-0.008308648,-0.016420642,-0.025835471,-0.021668857,-0.04112535,0.016015042,0.0031049885,0.051670942,0.028859032,-0.019997295,-0.002759307,0.050786,0.0023752165,0.06696082,0.025860053,0.0005726791,-0.014687626,0.082889825,-0.020009585,-0.026253361,-0.020980567,0.040043753,0.015535697,0.0017007532,

### HNSW IVFFlat Index

In [173]:
# PG에 연결
conn = psycopg2.connect(host='192.168.0.47', dbname='postgres', user='postgres', password='postgres1016', port=55432)
cursor = conn.cursor()

# 유사도를 포함한 쿼리 실행
s = time()
cursor.execute("""
    CREATE INDEX imdb_ivf_index_l2 ON imdb USING ivfflat (embedding vector_l2_ops) WITH (lists = 100);
""".format(test_vec))
time_spent = time() - s
print("time:", time_spent, "\n")

# PG에 커밋
conn.commit()
conn.close()

time: 0.13736295700073242 



In [174]:
# PG에 연결
conn = psycopg2.connect(host='192.168.0.47', dbname='postgres', user='postgres', password='postgres1016', port=55432)
cursor = conn.cursor()

# 유사도를 포함한 쿼리 실행
s = time()
cursor.execute("""
    SET ivfflat.probes = 10;
    EXPLAIN ANALYZE SELECT id,
        keywords,
        embedding <-> '{}' AS similarity
    FROM imdb
    ORDER BY similarity
    LIMIT 5;
""".format(test_vec))
time_spent = time() - s
print("time:", time_spent, "\n")

result = cursor.fetchall()
show(result)

cursor.execute("DROP INDEX IF EXISTS imdb_ivf_index_l2;")

# PG에 커밋
conn.commit()
conn.close()

time: 0.02797698974609375 

('Limit  (cost=155.11..155.12 rows=5 width=63) (actual time=7.229..7.231 rows=5 loops=1)',)
('  ->  Sort  (cost=155.11..157.61 rows=1000 width=63) (actual time=7.228..7.229 rows=5 loops=1)',)
("        Sort Key: ((embedding <-> '[0.009310356,0.010533301,0.023119183,-0.027138308,-0.0063605406,-0.041469496,0.00799523,0.019591695,-0.010533301,-0.04414891,0.044173494,-0.014675334,-0.02812158,0.004498469,0.037216846,0.023795182,-0.07517114,0.049335673,0.018878821,0.0068521765,-0.0075466125,0.0108713005,0.007005813,0.016703334,-0.01187301,0.05791472,0.009377956,0.016568134,0.044001423,-0.023119183,0.009783557,-0.050491016,0.04643502,-0.0027454798,0.044861786,-0.03318543,-0.008308648,-0.016420642,-0.025835471,-0.021668857,-0.04112535,0.016015042,0.0031049885,0.051670942,0.028859032,-0.019997295,-0.002759307,0.050786,0.0023752165,0.06696082,0.025860053,0.0005726791,-0.014687626,0.082889825,-0.020009585,-0.026253361,-0.020980567,0.040043753,0.015535697,0.0017007532,0

### Expression Indexing

Expression Indexing은 데이터베이스에서 특정 컬럼의 값뿐만 아니라, 해당 컬럼에 어떤 표현식(expression)을 적용한 결과에 대해 인덱스를 생성하는 방법을 의미함. \
여기서 "표현식"은 함수 호출, 산술 연산, 텍스트 변환 등과 같이 데이터를 변형하거나 계산하는 논리를 포함

#### Binary Hamming Distance Index + Rerank

In [175]:
len(test_vec)

1536

In [177]:
# PG에 연결
conn = psycopg2.connect(host='192.168.0.47', dbname='postgres', user='postgres', password='postgres1016', port=55432)
cursor = conn.cursor()

# embedding을 bit타입으로 리코딩 후 hamming distance 기반의 HNSW 인덱스 생성
s = time()
cursor.execute("""
    CREATE INDEX IF NOT EXISTS imdb_binary_index ON imdb USING hnsw ((binary_quantize(embedding)::bit(1536)) bit_hamming_ops);
""".format(test_vec))
time_spent = time() - s
print("time:", time_spent, "\n")

# PG에 커밋
conn.commit()
conn.close()

time: 0.007518291473388672 



In [178]:
# PG에 연결
conn = psycopg2.connect(host='192.168.0.47', dbname='postgres', user='postgres', password='postgres1016', port=55432)
cursor = conn.cursor()

# 유사도 기반 쿼리 실행
# <~> : hamming distance
s = time()
cursor.execute("""
    SELECT id,
        keywords,
        binary_quantize(embedding::vector) <~> binary_quantize('{}'::vector) AS hamming_distance
    FROM imdb
    ORDER BY hamming_distance
    LIMIT 5;
""".format(test_vec))
time_spent = time() - s
print("time:", time_spent, "\n")

result = cursor.fetchall()
show(result)

# PG에 커밋
conn.commit()
conn.close()

time: 0.029187917709350586 

(405, 'christmas|christmas eve|elf|high tech|resentment', 453.0)
(626, 'christmas|magic|santa claus|son|toy', 471.0)
(383, 'christmas|island|love|survival|talking to inanimate object', 494.0)
(89, 'boy|christmas|christmas eve|north pole|train', 498.0)
(906, 'act of kindness|christmas|christmas eve|neighbor|vacation', 498.0)


In [179]:
# PG에 연결
conn = psycopg2.connect(host='192.168.0.47', dbname='postgres', user='postgres', password='postgres1016', port=55432)
cursor = conn.cursor()

# 유사도 기반 쿼리 실행 후, 원래의 값으로 cosine 유사도 기반 재정렬
s = time()
cursor.execute("""
    SELECT id,
        keywords,
        embedding <=> '{}' AS similarity
    FROM(
        SELECT *,
            binary_quantize(embedding::vector) <~> binary_quantize('{}'::vector) AS hamming_distance
        FROM imdb
        ORDER BY hamming_distance
        LIMIT 5
    )
    ORDER BY similarity;
""".format(test_vec, test_vec))
time_spent = time() - s
print("time:", time_spent, "\n")

result = cursor.fetchall()
show(result)

# PG에 커밋
conn.commit()
conn.close()

time: 0.03220677375793457 

(626, 'christmas|magic|santa claus|son|toy', 0.3793529272079468)
(405, 'christmas|christmas eve|elf|high tech|resentment', 0.37991273403167725)
(89, 'boy|christmas|christmas eve|north pole|train', 0.42837557623531863)
(906, 'act of kindness|christmas|christmas eve|neighbor|vacation', 0.43741653673102043)
(383, 'christmas|island|love|survival|talking to inanimate object', 0.47871855028161014)


#### Subvector Index + Rerank

In [180]:
# PG에 연결
conn = psycopg2.connect(host='192.168.0.47', dbname='postgres', user='postgres', password='postgres1016', port=55432)
cursor = conn.cursor()

# embedding 중 1~100차원의 subvector 사용하여 cosine 거리 기반 HNSW 인덱스 생성
cursor.execute("""
CREATE INDEX IF NOT EXISTS imdb_subvector_index 
ON imdb 
USING hnsw ((subvector(embedding, 1, 100)::vector(100)) vector_cosine_ops);
""")

# subvector index를 바탕으로 10개의 후보 선정
cursor.execute("""
SELECT id,
   keywords,
   subvector(embedding, 1, 100)::vector(100) <=> subvector('{}'::vector, 1, 100)::vector(100) AS similarity
FROM imdb
ORDER BY similarity
LIMIT 10;
""".format(test_vec))

result = cursor.fetchall()
print('\nBy Subvector Indexing:')
show(result)

# 선정된 후보에 대해, 원래 벡터를 기반으로 재정렬
cursor.execute("""
SELECT id,
      keywords,
      embedding <=> '{}'::vector AS similarity
FROM (
   SELECT *,
          subvector(embedding, 1, 100)::vector(100) <=> subvector('{}'::vector, 1, 100)::vector(100) AS similarity
   FROM imdb
   ORDER BY similarity
   LIMIT 10
) subquery
ORDER BY similarity
LIMIT 5;
""".format(test_vec, test_vec))

print('\nAfter Reranking:')
result = cursor.fetchall()
show(result)

# PG에 커밋
conn.commit()
conn.close()


By Subvector Indexing:
(405, 'christmas|christmas eve|elf|high tech|resentment', 0.2691966624968045)
(626, 'christmas|magic|santa claus|son|toy', 0.31271188427737395)
(906, 'act of kindness|christmas|christmas eve|neighbor|vacation', 0.38337733787302397)
(89, 'boy|christmas|christmas eve|north pole|train', 0.3908667573672565)
(60, 'charles dickens|christmas|christmas eve|ghost|scrooge', 0.4136089010490043)
(379, 'box office flop|christmas|christmas eve|critically bashed|toy comes to life', 0.45610838617582705)
(383, 'christmas|island|love|survival|talking to inanimate object', 0.46322898039509686)
(221, 'christ figure|class differences|messiah|saviour|social commentary', 0.46591124531273087)
(210, 'box office hit|christmas|materialism|public humiliation|snowflake', 0.4807323439290948)
(612, 'cia|jordan|middle east|spy|terrorist', 0.5130261184793261)

After Reranking:
(626, 'christmas|magic|santa claus|son|toy', 0.3793529272079468)
(405, 'christmas|christmas eve|elf|high tech|resentmen

#### Full-Text Search + Reciprocal Rank Fusion

In [181]:
# PG에 연결
conn = psycopg2.connect(host='192.168.0.47', dbname='postgres', user='postgres', password='postgres1016', port=55432)
cursor = conn.cursor()

# Full-Text Search를 위한 GIN Index 생성
# GIN Index 생성을 위해서는 tsvector 타입의 컬럼이 필요하다
cursor.execute("""
-- tsvector 형식의 keyword 열 생성
ALTER TABLE imdb ADD COLUMN IF NOT EXISTS textsearch tsvector;
-- keyword열에 to_tsvector 함수를 적용하여 textsearch 열에 저장
-- 'english': 영어에 맞는 stopwords, stemming 설정 적용
UPDATE imdb SET textsearch = to_tsvector('english', keywords); 
-- textsearch에 기반하여 인덱스 생성. GIN 또는 GiST 인덱스를 생성할 수 있음.
CREATE INDEX IF NOT EXISTS textsearch_idx ON imdb USING GIN (textsearch);
""")

# PostgreSQL Full-Text Search
# 'Christmas'라는 문자열에 대해 검색
cursor.execute("""
SELECT id, keywords
FROM imdb
WHERE textsearch @@ plainto_tsquery('christmas')
ORDER BY ts_rank_cd(textsearch, plainto_tsquery('christmas')) DESC
LIMIT 5;
""")

print('\nFull Text-Search:')
result = cursor.fetchall()
show(result)


# Cosine 유사도 기반 벡터 검색
test_vec = embeddings.embed_query("christmas") # "Christmas"의 임베딩

cursor.execute("""
    SET hnsw.ef_search = 100;
    SELECT id,
        keywords,
        embedding <=> '{}' AS similarity
    FROM imdb
    ORDER BY similarity
    LIMIT 5;
""".format(test_vec))

print('\nVector Search:')
result = cursor.fetchall()
show(result)


# Full-Text Search 결과에 대해 Cosine 유사도 기반 재정렬
cursor.execute("""
SELECT id,
    keywords,
    embedding <=> '{}' AS similarity
FROM (
    SELECT *
    FROM imdb
    WHERE textsearch @@ plainto_tsquery('christmas')
    ORDER BY ts_rank_cd(textsearch, plainto_tsquery('christmas')) DESC
    LIMIT 50
)
ORDER BY similarity
LIMIT 5
""".format(test_vec))

print('\nReranked by Cosine Similarity after 50 Full Text-Search Results:')
result = cursor.fetchall()
show(result)

# Full-Text Search 결과와 Cosine 유사도 기반 결과에 대해
# Reciprocal Rank Fusion을 수행한 결과
cursor.execute(
"""
WITH fts_results AS (
    SELECT id,
           keywords,
           ROW_NUMBER() OVER (ORDER BY ts_rank_cd(textsearch, plainto_tsquery('christmas')) DESC) AS rank
    FROM imdb
    WHERE textsearch @@ plainto_tsquery('christmas')
    LIMIT 5
),
fts_rrf AS (
    SELECT id,
           keywords,
           1.0 / (60 + rank) AS rrf_score
    FROM fts_results
),
vector_results AS (
    SELECT id,
           keywords,
           ROW_NUMBER() OVER (ORDER BY embedding <=> '{}'::vector) AS rank
    FROM imdb
    LIMIT 5
),
vector_rrf AS (
    SELECT id,
           keywords,
           1.0 / (60 + rank) AS rrf_score
    FROM vector_results
),
combined_results AS (
    SELECT id, keywords, SUM(rrf_score) AS total_rrf_score
    FROM (
        SELECT * FROM fts_rrf
        UNION ALL
        SELECT * FROM vector_rrf
    ) subquery
    GROUP BY id, keywords
)
SELECT id, keywords, total_rrf_score
FROM combined_results
ORDER BY total_rrf_score DESC
LIMIT 5;
""".format(test_vec))

print('\nReciprocal Rank Fusion of Vector Search and Full-Text Search Results:')
result = cursor.fetchall()
show(result)

# PG에 커밋
conn.commit()
conn.close()


Full Text-Search:
(405, 'christmas|christmas eve|elf|high tech|resentment')
(60, 'charles dickens|christmas|christmas eve|ghost|scrooge')
(379, 'box office flop|christmas|christmas eve|critically bashed|toy comes to life')
(89, 'boy|christmas|christmas eve|north pole|train')
(906, 'act of kindness|christmas|christmas eve|neighbor|vacation')

Vector Search:
(626, 'christmas|magic|santa claus|son|toy', 0.3793529272079468)
(405, 'christmas|christmas eve|elf|high tech|resentment', 0.37991273403167725)
(89, 'boy|christmas|christmas eve|north pole|train', 0.42837557623531863)
(906, 'act of kindness|christmas|christmas eve|neighbor|vacation', 0.43741653673102043)
(383, 'christmas|island|love|survival|talking to inanimate object', 0.47871855028161014)

Reranked by Cosine Similarity after 50 Full Text-Search Results:
(626, 'christmas|magic|santa claus|son|toy', 0.3793529272079468)
(405, 'christmas|christmas eve|elf|high tech|resentment', 0.37991273403167725)
(89, 'boy|christmas|christmas eve|n