> Script to build analogy db

In [None]:
import pandas as pd
import re
import swifter
from PIL import Image
import requests
from io import BytesIO
import cv2
import numpy as np
import imageio
from urllib.request import urlopen
from Encoder import MultiModalEncoder
import warnings
warnings.filterwarnings("ignore")

In [None]:

def get_img_url(string):
    url = re.findall(f"Url='(.+?)'",string)
    if url:return url[0]
    return


def get_vid_url(string):
    url = re.findall(r"VideoVariant\(contentType='video/mp4', url='(.+?)',",string)
    if url:return url[0]
    return


session = requests.Session()

def get_image_from_url(url, timeout=5):
    # Create a session object if it's not given
    with requests.Session() as session:
        try:
            with session.get(url, stream=True, timeout=timeout) as response:
                response.raise_for_status()  # Raises HTTPError for bad HTTP status codes
                if 'image' in response.headers.get('Content-Type', '').lower():
                    # Use BytesIO to load the image from the response content
                    image = Image.open(BytesIO(response.content))
                    
                    # Convert the image to RGB if it's not already in RGB mode
                    if image.mode not in ('RGB'):
                        image = image.convert('RGB')
                    
                    return image
                else:
                    return None
        except Exception as e:
            return None

def get_video_from_url(url):
    try:
        with urlopen(url) as response:
            video_data = response.read()
            video_bytes = BytesIO(video_data)
            return video_bytes
    except:
        return
    

# df = pd.read_csv('train.csv')
# df['img_url'] = df.media.swifter.apply(get_img_url)
# df['vid_url'] = df.media.swifter.apply(get_vid_url)
# analogies = df.sample(50000)
# training = df[~df['id'].isin(analogies['id'])]
# analogies.to_csv('dump/analogies.csv',index=False)
# training.to_csv('dump/training.csv',index=False)

analogies = pd.read_csv('dump/analogies.csv')
training = pd.read_csv('dump/training.csv')

In [None]:
encoder = MultiModalEncoder('cuda')
encoder.freeze_encoders()
_ = encoder.eval()

In [None]:
# %pip install "docarray[full]"

In [None]:
from docarray import BaseDoc, DocList
from docarray.typing import ImageUrl, VideoUrl, ID, NdArray
from typing import Optional


class Doc(BaseDoc):
    id: ID
    date: Optional[str] = None
    likes: Optional[int] = None
    content: Optional[str] = None
    username: Optional[str] = None
    media: Optional[str] = None
    inferred_company: Optional[str] = None
    img_url: Optional[ImageUrl] = None
    vid_url: Optional[VideoUrl] = None
    img_vector: NdArray[768]
    vid_vector: NdArray[768]
    text_vector: NdArray[768]

In [None]:
def process_list(lst, func):
    none_positions = [i for i, x in enumerate(lst) if x is None]
    processed_list = [x for x in lst if x is not None]
    processed_list = func(processed_list)
    for pos in none_positions:
        processed_list.insert(pos, None)
    return processed_list

import torch
@torch.inference_mode()
def _get_embedding(to_encode,key):
    encoded = list(encoder({key:to_encode})[key].pooler_output)
    encoded = [x.cpu().numpy() for x in encoded]
    return encoded

def get_embedding(to_encode,key):
    return process_list(to_encode, lambda x: _get_embedding(x,key))

In [None]:
# Single threaded


# from tqdm import tqdm
# batch_size = 16
# docs = []
# for i in tqdm(range(0,len(analogies),batch_size)):
#     batch = analogies.iloc[i:i+batch_size]
#     batch = batch.where(batch.notna(), other=None) # replace nan by None
#     texts = batch.content.tolist()
#     imgs = batch.img_url.apply(get_image_from_url).tolist()
#     vids = batch.vid_url.apply(get_video_from_url).tolist()
#     encoded_texts = get_embedding(texts,'text')
#     encoded_imgs = get_embedding(imgs,'image')
#     encoded_vids = get_embedding(vids,'video')

#     for j in range(batch_size):
#         docs.append(Doc(
#             id=batch.iloc[j].id,
#             date=batch.iloc[j].date,
#             likes=batch.iloc[j].likes,
#             content=batch.iloc[j].content,
#             username=batch.iloc[j].username,
#             inferred_company=batch.iloc[j]['inferred company'],
#             img_url=batch.iloc[j].img_url,
#             vid_url=batch.iloc[j].vid_url,
#             img_vector=encoded_imgs[j],
#             vid_vector=encoded_vids[j],
#             text_vector=encoded_texts[j],
#         ))
# docs = DocList[Doc](docs)
# docs.save_binary('dump/analogies_embeddings.pickle', compress=None, protocol='pickle')

In [None]:
# Multi-threaded

from joblib import Parallel, delayed
from tqdm import tqdm

batch_size = 32


# Function to process a single batch
def process_batch(batch):
    try:
        # Replace nan by None
        batch = batch.where(batch.notna(), other=None)

        # Load content
        texts = batch.content.tolist()
        imgs = batch.img_url.apply(get_image_from_url).tolist()
        vids = batch.vid_url.apply(get_video_from_url).tolist()

        # Get embeddings
        encoded_texts = get_embedding(texts, 'text')
        encoded_texts = [e if e is not None else np.zeros(shape=(768,)) for e in encoded_texts]
        encoded_imgs = get_embedding(imgs, 'image')
        encoded_imgs = [e if e is not None else np.zeros(shape=(768,)) for e in encoded_imgs]
        encoded_vids = get_embedding(vids, 'video')
        encoded_vids = [e if e is not None else np.zeros(shape=(768,)) for e in encoded_vids]

        # Collect docs
        batch_docs = []
        for j in range(len(batch)):
            batch_docs.append(
                Doc(
                    id=batch.iloc[j].id,
                    date=batch.iloc[j].date,
                    likes=batch.iloc[j].likes,
                    content=batch.iloc[j].content,
                    username=batch.iloc[j].username,
                    inferred_company=batch.iloc[j]['inferred company'],
                    img_url=batch.iloc[j].img_url,
                    vid_url=batch.iloc[j].vid_url,
                    img_vector=encoded_imgs[j],
                    vid_vector=encoded_vids[j],
                    text_vector=encoded_texts[j],
                )
            )
    except:
        return []
    return batch_docs

# Create batches
batches = [analogies.iloc[i:i + batch_size] for i in range(0, len(analogies), batch_size)]

# Use joblib to parallelize the batch processing
n_jobs = -1  # Use all available CPU cores
results = Parallel(n_jobs=-1,backend='threading')(delayed(process_batch)(batch) for batch in tqdm(batches))

# Flatten the list of batch results to a single list of docs
docs = [doc for batch_docs in results for doc in batch_docs]

# Wrap the docs in a DocList and save
docs = DocList[Doc](docs)
docs.save_binary('dump/analogies_embeddings.pickle', compress=None, protocol='pickle')