# Step 1: Setup

Run imports and set up secrets. We also drop all related tables to start fresh.

In [1]:
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values
from timescale_vector import pgvectorizer

from langchain.docstore.document import Document
from langchain.text_splitter import CharacterTextSplitter
from timescale_vector import client
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores.timescalevector import TimescaleVector
from datetime import timedelta, datetime

In [2]:
import os
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv(), override=True)

TIMESCALE_SERVICE_URL = os.environ["TIMESCALE_SERVICE_URL"]

In [3]:
with psycopg2.connect(TIMESCALE_SERVICE_URL) as conn:
    with conn.cursor() as cursor:
        cursor.execute('''DROP TABLE IF EXISTS blog CASCADE''')
        cursor.execute('''DROP TABLE IF EXISTS blog_embedding CASCADE''')
        cursor.execute('''DROP TABLE IF EXISTS blog_embedding_work_queue CASCADE''')

# Step 2: Insert data

Create a blog table and insert some data into it

In [4]:
with psycopg2.connect(TIMESCALE_SERVICE_URL) as conn:
    with conn.cursor() as cursor:
        cursor.execute('''
        CREATE TABLE IF NOT EXISTS blog (
            id              SERIAL PRIMARY KEY NOT NULL,
            title           TEXT NOT NULL,
            content         TEXT NOT NULL,
            url             TEXT NOT NULL
        );
        ''')

In [5]:
# Load your CSV file into a pandas DataFrame
df = pd.read_csv('../openai_pgvector_helloworld/blog_posts_data.csv')

# Insert it into the db
with psycopg2.connect(TIMESCALE_SERVICE_URL) as conn:
    with conn.cursor() as cursor:
        values = df.values.tolist()
        insert_statement = f"INSERT INTO blog (title, content, url) VALUES %s"
        execute_values(cursor, insert_statement, values) 

df.head()

Unnamed: 0,title,content,url
0,"How to Build a Weather Station With Elixir, Ne...",This is an installment of our “Community Membe...,https://www.timescale.com/blog/how-to-build-a-...
1,CloudQuery on Using PostgreSQL for Cloud Asset...,This is an installment of our “Community Membe...,https://www.timescale.com/blog/cloudquery-on-u...
2,How a Data Scientist Is Building a Time-Series...,This is an installment of our “Community Membe...,https://www.timescale.com/blog/how-a-data-scie...
3,How Conserv Safeguards History: Building an En...,This is an installment of our “Community Membe...,https://www.timescale.com/blog/how-conserv-saf...
4,How Messari Uses Data to Open the Cryptoeconom...,This is an installment of our “Community Membe...,https://www.timescale.com/blog/how-messari-use...


# Step 3: Create the embed_and_write function for PGVectorizer

In [6]:
def get_document(blog):
    text_splitter = CharacterTextSplitter(
        separator="\n",
        chunk_size=1000,
        chunk_overlap=200,
    )
    docs = []
    for chunk in text_splitter.split_text(blog['content']):
        content = f"Title: {blog['title']}, contents:{chunk}"
        metadata = {
            "id": str(client.uuid_from_time(datetime.now())),
            "blog_id": blog['id'], 
            "title": blog['title'], 
            "url": blog['url'],
        }
        docs.append(Document(page_content=content, metadata=metadata))
    return docs

def embed_and_write(blog_instances, vectorizer):
    TABLE_NAME = "blog_embedding"
    embedding = OpenAIEmbeddings()
    vector_store = TimescaleVector(
        collection_name=TABLE_NAME,
        service_url=TIMESCALE_SERVICE_URL,
        embedding=embedding,
        time_partition_interval=timedelta(days=30),
    )

    # delete old embeddings for all ids in the work queue
    metadata_for_delete = [{"blog_id": blog['locked_id']} for blog in blog_instances]
    vector_store.delete_by_metadata(metadata_for_delete)

    documents = []
    for blog in blog_instances:
        # skip blogs that are deleted (title will be None because of left join)
        if blog['title'] != None:
            documents.extend(get_document(blog))

    if len(documents) == 0:
        return

    texts = [d.page_content for d in documents]
    metadatas = [d.metadata for d in documents]
    ids = [d.metadata["id"] for d in documents]
    vector_store.add_texts(texts, metadatas, ids)

# Step 4: Run PgVectorizer

This has to be done any time you need to sync the database data with embeddings. We suggest running it in a scheduled job. e.g:
- A scheduled AWS Lambda function
- A scheduled Cloudflare worker
- A Modal function
- A cron job on an ECec2 instance or even on your local machine


In [7]:
vectorizer = pgvectorizer.Vectorize(TIMESCALE_SERVICE_URL, 'blog')
while vectorizer.process(embed_and_write) > 0:
    pass

Created a chunk of size 6009, which is longer than the specified 1000
Created a chunk of size 8793, which is longer than the specified 1000
Created a chunk of size 9840, which is longer than the specified 1000
Created a chunk of size 8469, which is longer than the specified 1000
Created a chunk of size 6232, which is longer than the specified 1000
Created a chunk of size 9329, which is longer than the specified 1000
Created a chunk of size 8311, which is longer than the specified 1000
Created a chunk of size 7167, which is longer than the specified 1000
Created a chunk of size 8469, which is longer than the specified 1000
Created a chunk of size 10884, which is longer than the specified 1000
Created a chunk of size 3818, which is longer than the specified 1000
Created a chunk of size 9799, which is longer than the specified 1000
Created a chunk of size 7374, which is longer than the specified 1000
Created a chunk of size 10123, which is longer than the specified 1000
Created a chunk of

# Step 5: Query your embedding

You can now query embeddings like you normally would

In [8]:
TABLE_NAME = "blog_embedding"
embedding = OpenAIEmbeddings()
vector_store = TimescaleVector(
    collection_name=TABLE_NAME,
    service_url=TIMESCALE_SERVICE_URL,
    embedding=embedding,
    time_partition_interval=timedelta(days=30),
)

res = vector_store.similarity_search_with_score("Weather Station", 1)
res[0]

(Document(page_content='Title: How to Build a Weather Station With Elixir, Nerves, and TimescaleDB, contents:This is an installment of our “Community Member Spotlight” series, where we invite our customers to share their work, shining a light on their success and inspiring others with new ways to use technology to solve problems.In this edition,Alexander Koutmos, author of the Build a Weather Station with Elixir and Nerves book, joins us to share how he uses Grafana and TimescaleDB to store and visualize weather data collected from IoT sensors.About the teamThe bookBuild a Weather Station with Elixir and Nerveswas a joint effort between Bruce Tate, Frank Hunleth, and me.I have been writing software professionally for almost a decade and have been working primarily with Elixir since 2016. I currently maintain a few Elixir libraries onHexand also runStagira, a software consultancy company.Bruce Tateis a kayaker, programmer, and father of two from Chattanooga, Tennessee. He is the author 

Note the document returned has the chunk of the blog that the embedding matched. To get back the full blog, you can look at the blog id in the metadata.

In [9]:
res[0][0].metadata['blog_id']

1

# Addendum: Keeping your embeddings up-to-date with your PostgreSQL tables

Keeping your data up to date is super-simple. Simply re-run `vectorizer.process()` (that's why we suggest running it on a schedle). It will automatically sync all the insert, updates, and deletes that were performed.

For example, if you delete a blog post like so:

In [10]:
with psycopg2.connect(TIMESCALE_SERVICE_URL) as conn:
    with conn.cursor() as cursor:
        cursor.execute('''
          DELETE FROM blog WHERE title LIKE '%Weather Station%';
        ''')

And re-run the vectorizer via:

In [11]:
vectorizer = pgvectorizer.Vectorize(TIMESCALE_SERVICE_URL, 'blog')
while vectorizer.process(embed_and_write) > 0:
    pass

Then the embedding will no-longer find the deleted blog post.

In [12]:
TABLE_NAME = "blog_embedding"
embedding = OpenAIEmbeddings()
vector_store = TimescaleVector(
    collection_name=TABLE_NAME,
    service_url=TIMESCALE_SERVICE_URL,
    embedding=embedding,
    time_partition_interval=timedelta(days=30),
)

res = vector_store.similarity_search_with_score("Weather Station", 1)
res[0]

(Document(page_content="Title: How WsprDaemon Combines TimescaleDB and Grafana to Measure and Analyze Radio Transmissions, contents:This is an installment of our “Community Member Spotlight” series, where we invite our customers to share their work, shining a light on their success and inspiring others with new ways to use technology to solve problems.In this edition, Rob Robinett and Gwyn Griffiths, the creators of WsprDaemon, join us to share the work they’re doing to allow amateur radio enthusiasts to analyze transmission data and understand trends, be it their own personal noise levels or much larger space weather patterns.Amateur radio is a hobby for some three million people worldwide (see “What is Amateur Radio?” to learn more) and its technical scope is vast, examples include: designing and building satellites, devising bandwidth-efficient data communications protocols, and creating novel, low noise antennas for use in urban locations.Our project,WsprDaemon, focuses on amateurs