In [None]:
import orjson

In [None]:
import csv

with open("../data/accounts.tsv", newline="", encoding="utf-8") as f:
    accounts = {
        row["author_id"]: {
            "Type": row["Type"],
            "Lang": row["Lang"],
            "Stance": row["Stance"]
        }
        for row in csv.DictReader(f, delimiter="\t")
    }

In [None]:
tweets = "../sampled_data/2260916_tweets.jsonl"
unclear_imgs = []
private_imgs = []

with open(tweets, "r") as file:
    for line in file:
        row = orjson.loads(line)
        acc_id = str(row["account"]["id"])
        account = accounts.get(acc_id, {})
        acc_type = account.get("Type")
        media = row.get("media")
        if media is not None and acc_type is not None:
            if acc_type == "Unclear":
                unclear_imgs.append(media)
            if acc_type == "Private individuals":
                private_imgs.append(media)

In [None]:
# task 1.2 bag-of-words
from pathlib import Path
from tqdm import tqdm
from transformers import pipeline, DetrImageProcessor, DetrForObjectDetection
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import KMeans
import pandas as pd
import torch
from PIL import Image

processor = DetrImageProcessor.from_pretrained("facebook/detr-resnet-50", revision="no_timm")
model = DetrForObjectDetection.from_pretrained("facebook/detr-resnet-50", revision="no_timm")

def get_visual_words(image_path):
    """
    Run object detection on an image and return a list of labels
    with confidence >= score_threshold.
    """

    try:
      image = Image.open(image_path).convert("RGB")
    except Exception as e:
        print(f"Cannot open {image_path}: {e}")
        return []
    
    try:
        inputs = processor(images=image, return_tensors="pt")
        outputs = model(**inputs)

        target_sizes = torch.tensor([image.size[::-1]])
        results = processor.post_process_object_detection(outputs, target_sizes=target_sizes, threshold=0.9)[0]

        # Extract labels above threshold
        labels = [
            model.config.id2label[label.item()]
            for score, label in zip(results["scores"], results["labels"])
            if score.item()
        ]

        # Remove duplicates
        return list(set(labels))
    
    except:
        print(f"Detection failed for {image_path}: {e}")
        return []

In [None]:
import random

unclear_paths = []
private_paths = []
for imgs, paths in zip([unclear_imgs, private_imgs], [unclear_paths, private_paths]):
    for img in imgs:
        if Path("../data/media/" + img + ".jpg").exists():
            paths.append("../data/media/" + img + ".jpg")
        elif Path("../data/media/" + img + ".png").exists():
            paths.append("../data/media/" + img + ".png")

paths = random.sample(unclear_paths, 25) + random.sample(private_paths, 25)

In [None]:
docs = []
image_ids = []
bags = []

for path in tqdm(paths):
    words = get_visual_words(path)
    
    if len(words) == 0:
        continue
        
    text = " ".join(words)
    docs.append(text)
    image_ids.append(path)
    bags.append(words)

print(f"Built BoW docs for {len(docs)} images")

In [None]:
vectorizer = TfidfVectorizer()
X = vectorizer.fit_transform(docs)   # shape: (n_images, n_terms)

print(X.shape)

k = 5
kmeans = KMeans(n_clusters=k, random_state=0, n_init=10)
cluster_labels = kmeans.fit_predict(X)

print(pd.Series(cluster_labels).value_counts())

import numpy as np

terms = np.array(vectorizer.get_feature_names_out())

def print_cluster_summary(cluster_id, top_n=10, max_examples=5):
    idx = np.where(cluster_labels == cluster_id)[0]
    print(f"\n=== Cluster {cluster_id} ===")
    print(f"Number of images: {len(idx)}")
    
    center = kmeans.cluster_centers_[cluster_id]
    top_term_idx = np.argsort(center)[::-1][:top_n]
    top_terms = terms[top_term_idx]
    print(f"Top {top_n} visual words: {', '.join(top_terms)}")
    
    example_ids = [image_ids[i] for i in np.random.choice(idx, max_examples)]
    print(f"Example images: {example_ids}")
    for id in example_ids:
        image = Image.open(id)
        display(image)


for c in range(k):
    print_cluster_summary(c)

In [None]:
from torchvision import models, transforms

cnn_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406], 
        std=[0.229, 0.224, 0.225],
    ),
])

from torch.utils.data import Dataset, DataLoader

class ImageDataset(Dataset):
    def __init__(self, image_paths, transform=None):
        self.image_paths = image_paths
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        path = self.image_paths[idx]
        img = Image.open(path).convert("RGB")
        if self.transform is not None:
            img = self.transform(img)
        # return image tensor and a simple id (filename)
        return img, path

dataset = ImageDataset(random.sample(private_paths, 1000), transform=cnn_transform)
loader = DataLoader(dataset, batch_size=32, shuffle=False)

In [None]:
from torch import nn
import numpy as np

device = "cpu"
resnet = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
resnet.fc = nn.Identity()
resnet = resnet.to(device)
resnet.eval()

In [None]:
all_embeddings = []
all_ids = []

with torch.no_grad():
    for batch_imgs, batch_ids in tqdm(loader):
        batch_imgs = batch_imgs.to(device)
        feats = resnet(batch_imgs)
        all_embeddings.append(feats.cpu().numpy())
        all_ids.extend(batch_ids)

embeddings = np.vstack(all_embeddings)

print("Embeddings shape:", embeddings.shape)
print("First few ids:", all_ids[:5])


In [None]:
k = 7  # choose number of clusters (can try 3, 4, 5, ...)
kmeans = KMeans(n_clusters=k, random_state=0, n_init=10)
cluster_labels = kmeans.fit_predict(embeddings)

print(pd.Series(cluster_labels).value_counts())

def print_cnn_cluster_summary(cluster_id, examples=10):
    idx = np.where(cluster_labels == cluster_id)[0]
    print(f"\n=== Cluster {cluster_id} ===")
    print(f"Number of images: {len(idx)}")

    example_ids = [all_ids[i] for i in np.random.choice(idx, 10)]
    print(f"Example images: {example_ids}")
    for id in example_ids:
        image = Image.open(id)
        display(image)

for c in range(k):
    print_cnn_cluster_summary(c)

In [None]:
# task 1.4
import pandas as pd

df_desc = pd.read_csv("../data/descriptions.tsv", sep="\t")  # ADJUST PATH
print(df_desc.head())

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

texts = df_desc["Labels"].astype(str).tolist()

vectorizer = TfidfVectorizer(
    max_df=0.8,
    min_df=5,
    stop_words="english"
)

X = vectorizer.fit_transform(texts)
X.shape

In [None]:
from sklearn.cluster import KMeans
import numpy as np

k = 10
kmeans = KMeans(n_clusters=k, random_state=0, n_init=10)
labels = kmeans.fit_predict(X)

# df_desc["cluster"] = labels
df_desc["cluster"].value_counts()

terms = np.array(vectorizer.get_feature_names_out())

def print_text_cluster_summary(cluster_id, top_n=10, max_examples=5):
    idx = np.where(labels == cluster_id)[0]
    print(f"\n=== Cluster {cluster_id} ===")
    print(f"Number of images: {len(idx)}")
    
    # Top terms in this cluster centroid
    center = kmeans.cluster_centers_[cluster_id]
    top_term_idx = np.argsort(center)[::-1][:top_n]
    top_terms = terms[top_term_idx]
    print(f"Top {top_n} terms: {', '.join(top_terms)}")
    
    # Example descriptions
    print("\nExample descriptions:")
    for i in idx[:max_examples]:
        print("-", texts[i][:200].replace("\n", " "), "...")
    
for c in range(k):
    print_text_cluster_summary(c)