In [1]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
import sys
import glob
import torch

import json
import msgpack

import numpy as np
import pandas as pd

from PIL import Image
from matplotlib import pyplot

from tqdm.auto import tqdm

from sklearn.metrics.pairwise import cosine_similarity

In [2]:
sys.path.append(os.path.abspath('../kcg-ml-image-pipeline/'))
from utility.active_learning.samples import representative_sample_selection, get_min_distance_to_representative_samples

In [3]:
ROOT = '../dataset/'

DATASETs = [
    'environmental', 
    'character', 
    'icons', 
    'mech', 
    'waifu',
    'propaganda-poster'
]

In [4]:
def save_representative_samples(dataset_name):
    
    # load clip vision embedding
    
    js = json.load(open(f'data/{dataset_name}/data.json'))
    
    job_uuids = list()
    file_paths = list()
    samples = list()
    
    for info in tqdm(js.values(), total=len(js), leave=False):
    
        file_path = os.path.splitext(info['file_path'].split('_')[0])[0]
        
        path = os.path.join(ROOT, 'clip', f'{file_path}_clip.msgpack')
    
        with open(path, 'rb') as f:
            mp = msgpack.load(f)
    
        job_uuids.append(info['job_uuid'])
        file_paths.append(file_path)
        samples.append(np.array(mp['clip-feature-vector']))
    
    job_uuids = np.array(job_uuids)
    samples = np.concatenate(samples, axis=0)
    
    path_to_index = {file_path: i for i, file_path in enumerate(file_paths)}
    
    # load rank data
    
    paths = sorted(glob.glob(os.path.join(ROOT, 'ranking', dataset_name, '*.json')))
    
    rank_file_paths = list()
    
    for path in tqdm(paths, leave=False):
        js = json.load(open(path))
    
        if js['task'] != 'selection':
            continue
        
        file_path_1 = os.path.splitext(js['image_1_metadata']['file_path'])[0].replace('datasets/', '')
        file_path_2 = os.path.splitext(js['image_2_metadata']['file_path'])[0].replace('datasets/', '')
        
        if (file_path_1 not in path_to_index) or (file_path_2 not in path_to_index):
            continue
        rank_file_paths.append(file_path_1)
        rank_file_paths.append(file_path_2)
    
    rank_file_paths = set(rank_file_paths)
    
    file_path = f'./data/{dataset_name}/representative.json'
    
    # continue
    
    if os.path.exists(file_path):
        
        existed = json.load(open(file_path))
        
        rank_file_paths = rank_file_paths.difference(existed['representative']).difference(existed['unused'])
        
        selected_indices = list(map(path_to_index.get, existed['representative']))
        existed_samples = samples[selected_indices]
    
        existed_representative = existed['representative']
        existed_unused = existed['unused']
    
    else:
        existed_samples = None
    
        existed_representative = []
        existed_unused = []
    
    indices = np.array(list(map(path_to_index.get, rank_file_paths)))
    
    #
    
    selected = representative_sample_selection(samples=samples[indices], threshold=0.25, existed_samples=existed_samples)
        
    # save
    
    representative_indices = indices[selected]
    unused_indices = indices[list(set(range(len(indices))).difference(selected))]
    
    representative_names = [file_paths[i] for i in representative_indices] + existed_representative
    unused_names = [file_paths[i] for i in unused_indices] + existed_unused
    
    json.dump({
        'representative': representative_names,
        'unused': unused_names
    }, open(f'./data/{dataset_name}/representative.json', 'wt'))

In [None]:
for dataset_name in DATASETs:
    save_representative_samples(dataset_name)