In [None]:
%cd drive/MyDrive/research/tiktok_cuts

/content/drive/MyDrive/research/tiktok_model_finetune


In [None]:
# INPUT: a folder that contains scenes splitted from an advertising video
# the name of the folder is the video id

### DESCRIPTION OF THE FOLLOWING CODES ###
# to estimate human attention within the scene
# Human attention is guided by meaning maps (semantic richness) stated by T.R. Henderson (2017)
# we aim to simulates their lab experiment procedure by
# 1) first separating the scene image into several patches

# output dir: for patch, cuts_patch/[video_id]
#             for metadata, metadata/[video_id]

import os
import numpy as np
from PIL import Image, ImageDraw
from tqdm import tqdm

def create_circular_mask(h, w, center=None, radius=None):
    if center is None:  # use the middle of the image
        center = (int(w/2), int(h/2))
    if radius is None:  # use the smallest distance between the center and image walls
        radius = min(center[0], center[1], w-center[0], h-center[1])

    Y, X = np.ogrid[:h, :w]
    dist_from_center = np.sqrt((X - center[0])**2 + (Y - center[1])**2)

    mask = dist_from_center <= radius
    return mask

def extract_circular_patches(scene_name, image, degrees, overlap=0.1, save_dir_patch="raw/test", save_dir_meta="raw/test"):
    if not os.path.exists(save_dir_patch):
        os.makedirs(save_dir_patch)

    h, w, _ = image.shape
    patch_count = 0
    metadata = []
    for degree in degrees:
        radius = degree_to_pixel(degree, h, w)
        step = int(radius * (1 - overlap))
        for y in range(0, h, step):
            for x in range(0, w, step):
                mask = create_circular_mask(h, w, center=(x, y), radius=radius)
                patch = np.zeros_like(image)
                patch[mask] = image[mask]

                # Find the bounding box of the circular patch
                coords = np.argwhere(mask)
                y_min, x_min = coords.min(axis=0)
                y_max, x_max = coords.max(axis=0)

                # Crop the patch to the bounding box
                cropped_patch = patch[y_min:y_max+1, x_min:x_max+1]
                patch_image = Image.fromarray(cropped_patch)
                patch_filename = f'patch_{patch_count}_deg_{degree}.png'
                patch_image.save(os.path.join(save_dir_patch, patch_filename))

                # Save metadata for reconstruction
                metadata.append({
                    'filename': patch_filename,
                    'center': (x, y),
                    'radius': radius,
                    'bbox': (x_min, y_min, x_max, y_max)
                })

                patch_count += 1

    # Save metadata to a file
    np.save(os.path.join(save_dir_meta, f'{scene_name}_metadata.npy'), metadata)

def degree_to_pixel(degree, h, w):
    # Assuming the image represents a certain field of view, convert degrees to pixels
    # This is a placeholder function and should be adjusted based on actual FOV and image dimensions
    fov = 90  # Example field of view in degrees
    return int((degree / fov) * min(h, w))

def reconstruct_image(metadata_file, original_shape):
    metadata = np.load(metadata_file, allow_pickle=True)
    reconstructed_image = np.zeros(original_shape, dtype=np.uint8)

    for data in metadata:
        patch_image = Image.open(os.path.join(os.path.dirname(metadata_file), data['filename']))
        patch_array = np.array(patch_image)

        x_min, y_min, x_max, y_max = data['bbox']
        mask = create_circular_mask(original_shape[0], original_shape[1], center=data['center'], radius=data['radius'])
        mask_cropped = mask[y_min:y_max+1, x_min:x_max+1]

        reconstructed_image[y_min:y_max+1, x_min:x_max+1][mask_cropped] = patch_array[mask_cropped]

    return Image.fromarray(reconstructed_image)

# Process all scenes in the folder "raw/scenes"
scenes_path = 'scene_cuts/7164043618378006530'
output_patch_dir = 'cuts_patch/7164043618378006530'
output_metadata_dir = 'cuts_metadata/7164043618378006530'

if not os.path.exists(output_patch_dir):
    os.makedirs(output_patch_dir)
if not os.path.exists(output_metadata_dir):
    os.makedirs(output_metadata_dir)

for scene_filename in tqdm(os.listdir(scenes_path)):
    scene_path = os.path.join(scenes_path, scene_filename)
    scene_name, _ = os.path.splitext(scene_filename)

    # Load the scene image
    scene_image = Image.open(scene_path)
    scene_image_np = np.array(scene_image)

    # Define the output directories for patches and metadata
    scene_patch_dir = os.path.join(output_patch_dir, scene_name)
    metadata_dir = output_metadata_dir

    # Extract patches and save metadata
    extract_circular_patches(scene_name, scene_image_np, degrees=[3, 7],
                             save_dir_patch=scene_patch_dir,
                             save_dir_meta=metadata_dir)

100%|██████████| 3/3 [03:36<00:00, 72.11s/it]


In [None]:
# 2) then, store the patch information to .csv file
# output dir: current path

import csv

def calculate_patch_scores_for_scenes(patch_data_folder, metadata_folder, output_csv):
    # Prepare CSV file
    with open(output_csv, mode='a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['scene', 'filename', 'center_x', 'center_y', 'radius', 'bbox_x_min', 'bbox_y_min', 'bbox_x_max', 'bbox_y_max', 'mean_score'])

        # Iterate through each scene
        for scene_name in tqdm(os.listdir(patch_data_folder)):
            scene_folder = os.path.join(patch_data_folder, scene_name)
            metadata_file = os.path.join(metadata_folder, f"{scene_name}_metadata.npy")

            if not os.path.exists(metadata_file):
                print(f"Metadata file for scene {scene_name} not found, skipping.")
                continue

            # Load metadata
            metadata = np.load(metadata_file, allow_pickle=True)

            # Iterate through each patch
            for data in metadata:
                patch_image = Image.open(os.path.join(scene_folder, data['filename'])).convert('L')
                patch_array = np.array(patch_image)

                # Write data to CSV
                writer.writerow([
                    scene_name,
                    data['filename'],
                    data['center'][0], data['center'][1],
                    data['radius'],
                    data['bbox'][0], data['bbox'][1], data['bbox'][2], data['bbox'][3],
                ])

# Example usage
patch_data_folder = 'cuts_patch/7164043618378006530'
metadata_folder = 'cuts_metadata/7164043618378006530'
output_csv = 'patch_info.csv'
calculate_patch_scores_for_scenes(patch_data_folder, metadata_folder, output_csv)

100%|██████████| 3/3 [00:33<00:00, 11.30s/it]


In [None]:
# we need to transfer the patch info along with the data into cluster for inference
# fine-tuned LLaVA model at the cluster will return a patch_info.csv with nonempty column of "mean_score"
# 3) given the results (.csv file), construct the meaning map
# output dir: scene_meaning/[video_id]

import pandas as pd
import numpy as np
import os
from PIL import Image
import matplotlib.pyplot as plt
import scipy.io
from scipy.ndimage import gaussian_filter
import cv2

def create_circular_mask(h, w, center=None, radius=None):
    if center is None:  # use the middle of the image
        center = (int(w/2), int(h/2))
    if radius is None:  # use the smallest distance between the center and image walls
        radius = min(center[0], center[1], w-center[0], h-center[1])

    Y, X = np.ogrid[:h, :w]
    dist_from_center = np.sqrt((X - center[0])**2 + (Y - center[1])**2)

    mask = dist_from_center <= radius
    return mask

def likert_to_numeric(likert_label):
    likert_scale = {
        'very low': 1,
        'Very low': 1,    # outputs of LLaVA w.o. finetune
        'low': 2,
        'somewhat low': 3,
        'Somewhat low': 3,
        'somewhat high': 4,
        'Somewhat high': 4,
        'high': 5,
        'very high': 6,
        'Very high': 6
    }
    return likert_scale[likert_label]

def plot_smoothed_meaning_map_from_csv(csv_file, original_shape, sigma=5, gamma=3.0,
                                       scene_filter=None, save_path=None):
    # Load CSV data
    data = pd.read_csv(csv_file)

    # Filter data by scene if a scene_filter is provided
    if scene_filter:
        data = data[data['scene'] == scene_filter]

    meaning_map = np.zeros(original_shape[:2], dtype=np.float32)
    count_map = np.zeros(original_shape[:2], dtype=np.float32)

    # Iterate through each row in the CSV
    for _, row in data.iterrows():
        likert_label = row['likert_label_predicted']
        numeric_score = likert_to_numeric(likert_label)
        x_min, y_min, x_max, y_max = int(row['bbox_x_min']), int(row['bbox_y_min']), int(row['bbox_x_max']), int(row['bbox_y_max'])
        center = (int(row['center_x']), int(row['center_y']))
        radius = int(row['radius'])

        # Get the mask
        mask = create_circular_mask(original_shape[0], original_shape[1], center=center, radius=radius)
        mask_cropped = mask[y_min:y_max+1, x_min:x_max+1]

        # Place the numeric score in the corresponding location on the meaning map
        meaning_map[y_min:y_max+1, x_min:x_max+1][mask_cropped] += numeric_score
        count_map[y_min:y_max+1, x_min:x_max+1][mask_cropped] += 1

    # Avoid division by zero
    count_map[count_map == 0] = 1
    smoothed_meaning_map = meaning_map / count_map

    # Apply Gaussian filter for smoothing
    smoothed_meaning_map = gaussian_filter(smoothed_meaning_map, sigma=sigma)
    smoothed_meaning_map = np.power(smoothed_meaning_map, gamma)

    # Plot the smoothed meaning map
    if save_path:
        plt.imsave(save_path, smoothed_meaning_map, cmap='gray')
    else:
        plt.imshow(smoothed_meaning_map, cmap='hot', interpolation='nearest')
        plt.title('Smoothed Meaning Map')
        plt.show()

# Example usage
original_shape = (1024, 576)
scene_filter = '7164043618378006530-Scene-003-01'
plot_smoothed_meaning_map_from_csv('preds.csv', original_shape, scene_filter=scene_filter,
                                  save_path='scene_meaning/7164043618378006530/meaning-Scene-003-01.png')

In [None]:
# Then, 4) use prompt segmentation model (CLIPSeg) to find the range of product
# In this case, the product is "jewelry"
# Input: scene under the folder "scene_cuts/[video_id]"
# Output: product segmentation images under the folder "cuts_segmentation/[video_id]"

!pip install -q git+https://github.com/huggingface/transformers.git

  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone
  Building wheel for transformers (pyproject.toml) ... [?25l[?25hdone


In [None]:
import os
from PIL import Image
import requests
from transformers import CLIPSegProcessor, CLIPSegForImageSegmentation
from transformers.image_utils import IMAGENET_DEFAULT_MEAN, IMAGENET_DEFAULT_STD
import torch
from torch import nn
import matplotlib.pyplot as plt

scene_path = "scene_cuts/7164043618378006530"
scene_files = os.listdir(scene_path)

processor = CLIPSegProcessor.from_pretrained("CIDAS/clipseg-rd64-refined")
model = CLIPSegForImageSegmentation.from_pretrained("CIDAS/clipseg-rd64-refined")

processor.image_processor.image_mean = IMAGENET_DEFAULT_MEAN
processor.image_processor.image_std = IMAGENET_DEFAULT_STD

def prompt_seg(scene_path, base_save_path):
    image = Image.open(scene_path)

    prompts = ["jewelry within a box"]
    inputs = processor(text=prompts, images=[image], padding="max_length", return_tensors="pt")

    # predict
    with torch.no_grad():
      outputs = model(**inputs)

    preds = outputs.logits.unsqueeze(1)

    # resize
    preds = nn.functional.interpolate(
        outputs.logits.unsqueeze(1),
        size=(image.size[1], image.size[0]),
        mode="bilinear"
    )

    # binary mask
    pred_image = preds[0][0].cpu().numpy()
    _, binary_mask = cv2.threshold(pred_image, 0.5, 1, cv2.THRESH_BINARY)

    # save files
    scene_part = os.path.splitext(os.path.basename(scene_path))[0]  # e.g., 7164043618378006530-Scene-001-01
    new_filename = os.path.join(scene_part + '-seg')  # e.g., 7164043618378006530-Scene-001-01-seg
    save_path = os.path.join(base_save_path, new_filename + '.png')
    plt.imsave(save_path, binary_mask, cmap='gray')

# experiment
base_save_path='cuts_segmentation/7164043618378006530'
for scene in scene_files:
    current_scene_path = os.path.join(scene_path, scene)
    prompt_seg(current_scene_path, base_save_path)

Unused or unrecognized kwargs: padding.
Unused or unrecognized kwargs: padding.
Unused or unrecognized kwargs: padding.


In [None]:
import os
import cv2
import numpy as np

def calculate_attention_proportion(product_map_path, attention_map_path):
    # Load the product map (binary mask)
    product_map = cv2.imread(product_map_path, cv2.IMREAD_GRAYSCALE)
    if product_map is None:
        raise FileNotFoundError(f"Product map not found: {product_map_path}")

    # Load the attention map
    attention_map = cv2.imread(attention_map_path, cv2.IMREAD_GRAYSCALE)
    if attention_map is None:
        raise FileNotFoundError(f"Attention map not found: {attention_map_path}")

    # Ensure both maps have the same dimensions
    if product_map.shape != attention_map.shape:
        raise ValueError("Product map and attention map must have the same dimensions")

    # Calculate the total attention
    total_attention = np.sum(attention_map)

    # Apply the product map as a mask to the attention map
    product_attention = np.sum(attention_map[product_map > 0])

    # Calculate the proportion of attention within the product region
    proportion = product_attention / total_attention if total_attention > 0 else 0

    return proportion

def main(product_map_dir, attention_map_dir):
    scene_ids = ["001", "002", "003"]
    for scene_id in scene_ids:
        product_map_filename = f"7164043618378006530-Scene-{scene_id}-01-seg.png"
        attention_map_filename = f"meaning-Scene-{scene_id}-01.png"

        product_map_path = os.path.join(product_map_dir, product_map_filename)
        attention_map_path = os.path.join(attention_map_dir, attention_map_filename)

        try:
            proportion = calculate_attention_proportion(product_map_path, attention_map_path)
            print(f"Scene-{scene_id}-01: {proportion:.4f}")
        except (FileNotFoundError, ValueError) as e:
            print(e)


if __name__ == "__main__":
    product_map_dir = "cuts_segmentation/7164043618378006530"
    attention_map_dir = "scene_meaning/7164043618378006530"
    main(product_map_dir, attention_map_dir)

Scene-001-01: 0.0000
Scene-002-01: 0.1396
Scene-003-01: 0.6090
