# PubPulse laboratory

Import and set up some handy things...

In [1]:
%reload_ext autoreload
%autoreload 2
%reload_ext sql
    
import os, sys

# todo: figure out if / how to preload notebook with python path access to mastodon_agent in parent directory
sys.path.insert(0, os.path.dirname(os.getcwd()))

import torch
import math
import time
import os
import numpy as np
import pandas as pd
import psycopg2
from pgvector.psycopg2 import register_vector
from tqdm.notebook import trange, tqdm
from IPython.display import HTML
from ipywidgets import IntProgress
from IPython.display import display

from sqlalchemy import create_engine
from sqlalchemy.orm import Session

from mastodon_agent.config import config
from mastodon_agent.tasks import ml_gpu

pd.set_option('display.max_colwidth', 100)
os.environ['DATABASE_URL'] = config.database_url
engine = create_engine(config.database_url)

# this ensures that the current MacOS version is at least 12.3+
print(torch.backends.mps.is_available())
# this ensures that the current current PyTorch installation was built with MPS activated.
print(torch.backends.mps.is_built())

True
True


How many statuses have we ingested so far?

In [2]:
%sql SELECT count(*) FROM statuses

1 rows affected.


count
199264


Let's take a look at the latest posts ingested:

In [None]:
%%sql
SELECT
    url,
    ingested_at,
    status->>'created_at' as created_at,
    status->'account'->>'acct' as acct
FROM statuses
ORDER BY ingested_at DESC
LIMIT 5;

 * postgresql://postgres:***@localhost:55432/example


Try fetching the latest posts using python:

In [None]:
from sqlalchemy.sql import text

with engine.connect() as conn:
    stmt = text("""
        SELECT
            ingested_at,
            status->>'created_at' as created_at,
            url,
            status->'account'->>'acct' as acct,
            status->>'content' as content
        FROM statuses
        ORDER BY ingested_at DESC
        LIMIT 3;
    """)
    result = conn.execute(stmt)
    
from collections import namedtuple

Record = namedtuple('Record', result.keys())
records = [Record(*r) for r in result.fetchall()]

texts = [r.content for r in records if r.content]

df = pd.DataFrame(records)
HTML(df.to_html(render_links=True, escape=False))

Let's load up a local embedding model:

In [None]:
from sentence_transformers import SentenceTransformer
embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

Try comparing a few different ways to access an embedding model:

In [None]:
import requests 
from mastodon_agent.tasks import ml_gpu

texts = [
    "I like pie",
    "Have you the like of pie!",
    "Lorem ipsum dolor sit amet consectetur adipiscing elit Aliquam mattis arcu sit amet ex convallis ac varius lacus vehicula",
    "Etiam non feugiat sapien. Vestibulum accumsan elit massa, at volutpat augue lacinia lacinia.",
]

local_api_resp = requests.post(
    f"{config.ml_api_url}/embeddings",
    json = { "inputs": texts }
)
embeddings_from_local_api = local_api_resp.json()

response = requests.post(
    f"https://api-inference.huggingface.co/pipeline/feature-extraction/sentence-transformers/all-MiniLM-L6-v2",
    headers={"Authorization": f"Bearer {config.hf_token}"},
    json={
        "inputs": texts,
        "options":{"wait_for_model":True}
    }
)
embeddings_from_hf = response.json()

embeddings_from_model = embedding_model.encode(texts)

embeddings_from_celery = ml_gpu.embed.delay(texts).get(timeout=10)

pd.DataFrame([
    embeddings_from_local_api[0],
    embeddings_from_hf[0],
    embeddings_from_model[0],
    embeddings_from_celery[0],
])

How many ingested statuses do we have since the last newest generated embedding?

In [None]:
%%sql
SELECT count(url)
FROM statuses
WHERE ingested_at > (SELECT created_at FROM status_embeddings ORDER BY created_at DESC LIMIT 1);

Generate embeddings for statuses newer than the newest embedding:

In [None]:
from mastodon_agent.tasks import ml_gpu

conn = psycopg2.connect(os.environ["DATABASE_URL"])
register_vector(conn)

cur = conn.cursor()
cur.execute("""
    SELECT
        url,
        status->>'content' as content
    FROM statuses    
    WHERE ingested_at > (
        SELECT created_at
        FROM status_embeddings
        ORDER BY created_at DESC
        LIMIT 1
    )
    ORDER BY ingested_at DESC
    LIMIT 5000
""")

def embed_with_local_api(texts):
    response = requests.post(
        f"{config.ml_api_url}/embeddings",
        json = { "inputs": texts }
    )
    return response.json()

def embed_with_hf_api(texts):
    response = requests.post(
        f"https://api-inference.huggingface.co/pipeline/feature-extraction/sentence-transformers/all-MiniLM-L6-v2",
        headers={"Authorization": f"Bearer {config.hf_token}"},
        json={
            "inputs": texts,
            "options":{"wait_for_model":True}
        }
    )
    return response.json()
    
def embed_with_inprocess_model(texts):
    return embedding_model.encode(texts)

def embed_with_celery_job(texts):
    return ml_gpu.embed.delay(texts).get(timeout=10)

embed = embed_with_celery_job

CHUNK_SIZE = 100
chunks = []

def embed_statuses_chunk():
    global chunks
    urls = [c[0] for c in chunks]
    texts = [c[1] for c in chunks]
    embeddings = embed(texts)

    chunks = []

    with conn:
        with conn.cursor() as cur:
            for idx in range(0, len(urls)):
                url = urls[idx]
                embedding = embeddings[idx]
                cur.execute(
                    """
                        INSERT INTO status_embeddings (url, embedding) VALUES (%s, %s)
                          ON CONFLICT (url) DO UPDATE SET embedding = EXCLUDED.embedding;            
                    """,
                    (url, embedding)
                )

for row in tqdm(cur, total=cur.rowcount):
    chunks.append((row[0], row[1]))
    if len(chunks) >= CHUNK_SIZE:    
        embed_statuses_chunk()

embed_statuses_chunk()

In [None]:
%sql SELECT count(embedding) FROM status_embeddings

In [None]:
#embeddings = embedding_model.encode([
#    """I really like banana bread"""
#])

embeddings = ml_gpu.embed.delay([
    """retro gaming is nifty"""
]).get(timeout=10)

conn = psycopg2.connect(os.environ["DATABASE_URL"])
register_vector(conn)

cur = conn.cursor()
cur.execute(
    """
    SELECT
        ingested_at,
        url,
        status->'account'->>'acct' as acct,
        status->>'content' as content
    FROM statuses
    WHERE url in (
        SELECT url
        FROM status_embeddings
        WHERE created_at > now() - INTERVAL '6 hours'
        ORDER BY embedding <-> %s
        LIMIT 25
    )
    ORDER BY ingested_at DESC
    LIMIT 25
    """,
    (np.array(embeddings[0]),)
)
rows = cur.fetchall()

df = pd.DataFrame(rows, columns=("ingested_at", "url", "acct", "content"))
HTML(df.to_html(render_links=True, escape=False))