In [1]:
import json
def write(path,data):
    with open(path,'w',encoding='utf-8') as file:
        json.dump(data,file,indent=4,ensure_ascii=False)
def read(path):
    with open(path,'r') as file:
        data = json.load(file)
    return data
import re
def extract_video_numbers_first_only(text):
    """
    只返回第一个匹配的数字
    
    Args:
        text (str): 输入的字符串
    
    Returns:
        int or None: 第一个找到的数字，如果没有找到则返回None
    """
    pattern = r'(?i)\bvideo\s+(10|[0-9])\b'
    match = re.search(pattern, text)
    
    if match:
        return int(match.group(1))
    return None

### GET Train Dataset

In [None]:
all_data = read("./data/IVCR-200K.json")
new_data = []
for sample in all_data:
    if sample.get('split') == 'train' and sample['type'] != [0]:
        new_data.append(sample)
train_data = []
for sample in new_data:
    if sample.get('split') == 'train' and sample['type'] != [0]:
        sub_sample = []
        for conv in sample.get('conversations'):
            if conv.get('from') == 'human':
                sub_conv = dict()
                sub_conv['user'] = conv.get('value').strip()
                sub_conv['text_id'] = conv.get('text_id')
            else:
                sub_conv['assistance'] = conv.get('value').strip()
                sub_conv['video_id'] = conv.get('video_id')
                if conv.get('candidate_videos'):
                    sub_conv['candidate_videos'] = conv.get('candidate_videos')
                sub_conv['gt_se'] = conv.get('gt_se')
                sub_sample.append(sub_conv)
                if len(sub_sample) <= 10:
                    train_data.append(sub_sample.copy())
write("./data/IVCR_no_type0_train.json",train_data)
#Build data that conforms to large language model conversation format
train_dialogues_data = []
system_message = "You are an excellent video retrieval AI assistant."
Current_Video = "Current video : <VID> {VID1} </VID> <VIDEO> <VIDEOTOKEN> </VIDEO>."
for sample in train_data:
    messages = []
    messages.append({"role":"system","content":system_message})
    for sub_data  in sample:
        user_question = sub_data.get('user').strip()
        if sub_data.get('gt_se') == [-1,-1]:
            videos = sub_data.get('candidate_videos')
            candiate_video = "Candidate videos : "
            for i in range(10):
                a = f"<VID> {i+1} </VID> <VIDEO> <VIDEOTOKEN> </VIDEO>"
                if i<9:
                    a += ','
                    candiate_video += a
                else:
                    a += '.'
                    candiate_video += a

            user_question = candiate_video + user_question
            
            assistance_answer = sub_data.get('assistance').strip()
            messages.append({"role":"user","content":user_question.strip()})
            messages.append({"role":"assistant","content":assistance_answer})
        else:
            assistance_answer = sub_data.get('assistance').strip()
            video_index = extract_video_numbers_first_only(assistance_answer)
            video_caption = Current_Video.format(VID1 = video_index)
            user_question = video_caption + user_question
            messages.append({"role":"user","content":user_question.strip()})
            messages.append({"role":"assistant","content":assistance_answer})
    train_dialogues_data.append(messages)
write("./data/IVCR_no_type0_dialogues_train.json",train_dialogues_data)

In [None]:
#Get video path list
data = read('./data/IVCR_no_type0_train.json')
all_video_list = []
for sample in data:
    video_list = []
    for sub_data  in sample:
        if sub_data.get('gt_se') == [-1,-1]:
            video_list.append(sub_data['candidate_videos'])
        else:
            video_list.append(sub_data['video_id'])
    all_video_list.append(video_list)
write('./data/IVCR_no_type0_dialogues_video_list_train.json',all_video_list)

### Get Test Dataset

In [None]:
#Remove samples from the test set in the original data where first-round dialogue R@10 or R@1 retrieval failed
all_data = read("./data/IVCR-200K.json")
new_data = []
count = 0
tongji = dict()
sum = 0
for sample in all_data:
    if sample.get('split') == 'test' and sample['type'] != [0]:
        count+=1
        sub_sample = []
        convs = sample['conversations']
        if sample['type'] != [7]:
            for i,conv in enumerate(convs):
                if i == 0 and conv['from'] == 'human' and convs[i+1]['gt_se'] == [-1,-1]:
                    top10_video  = convs[i+1]['candidate_videos']
                    gt_video = convs[i+1]['video_id']
                    if gt_video in top10_video:
                        new_data.append(sample)
                        break
                elif i == 0 and conv['from'] == 'human' and convs[i+1]['gt_se'] != [-1,-1]:
                    top10_video  = convs[i+1]['candidate_videos']
                    gt_video = convs[i+1]['video_id']  
                    if gt_video == top10_video[0]:
                        new_data.append(sample)
                        break
        else:
            new_convs = []
            for i,conv in enumerate(convs):
                if conv['from'] == 'human' and convs[i+1]['gt_se'] == [-1,-1]:
                    top10_video  = convs[i+1]['candidate_videos']
                    gt_video = convs[i+1]['video_id']
                    if gt_video in top10_video:
                        new_convs.append(conv)
                elif conv['from'] == 'human' and convs[i+1]['gt_se'] != [-1,-1]:
                    top10_video  = convs[i+1]['candidate_videos']
                    gt_video = convs[i+1]['video_id']
                    if gt_video == top10_video[0]:
                        new_convs.append(conv)
            sample['conversations'] = new_convs
            new_data.append(sample)
print(count)
print(sum)
write('./data/IVCR-200K-no-zero-test.json',new_data)

In [None]:
#Build data that conforms to large language model conversation format
all_data = read('./data/IVCR-200K-no-zero-test.json')
new_data = []
data_cate = dict()
for sample in all_data:
    if sample.get('split') == 'test' and sample['type'] != [0]:
        if sample.get('type')[0] not in data_cate:
            data_cate[sample.get('type')[0]] = 1
        else:
            data_cate[sample.get('type')[0]] += 1
        sub_sample = []
        for conv in sample.get('conversations'):
            if conv.get('from') == 'human':
                sub_conv = dict()
                sub_conv['user'] = conv.get('value').strip()
                sub_conv['text_id'] = conv.get('text_id')
            else:
                sub_conv['assistance'] = conv.get('value').strip()
                sub_conv['video_id'] = conv.get('video_id')
                if conv.get('candidate_videos'):
                    sub_conv['candidate_videos'] = conv.get('candidate_videos')
                sub_conv['gt_se'] = conv.get('gt_se')
                sub_sample.append(sub_conv)
                if len(sub_sample) <= 10:
                    new_data.append(sub_sample.copy())
write('./data/IVCR_no_type0_no_zero_test.json',new_data)

In [None]:
data = read('./data/IVCR_no_type0_no_zero_test.json')
dialogues = []
system_message = "You are an excellent video retrieval AI assistant."
Current_Video = "Current video : <VID> {VID1} </VID> <VIDEO> <VIDEOTOKEN> </VIDEO>."
for sample in data:
    messages = []
    messages.append({"role":"system","content":system_message})
    for sub_data  in sample:
        user_question = sub_data.get('user').strip()
        if sub_data.get('gt_se') == [-1,-1]:
            videos = sub_data.get('candidate_videos')
            candiate_video = "Candidate videos : "
            for i in range(10):
                a = f"<VID> {i+1} </VID> <VIDEO> <VIDEOTOKEN> </VIDEO>"
                if i<9:
                    a += ','
                    candiate_video += a
                else:
                    a += '.'
                    candiate_video += a

            user_question = candiate_video + user_question
            
            assistance_answer = sub_data.get('assistance').strip()
            messages.append({"role":"user","content":user_question.strip()})
            messages.append({"role":"assistant","content":assistance_answer})
        else:
            assistance_answer = sub_data.get('assistance').strip()
            video_index = extract_video_numbers_first_only(assistance_answer)
            video_caption = Current_Video.format(VID1 = video_index)
            user_question = video_caption + user_question
            messages.append({"role":"user","content":user_question.strip()})
            messages.append({"role":"assistant","content":assistance_answer})
    dialogues.append(messages)
write("./data/IVCR_no_type0_no_zero_dialogues_test.json",dialogues)

In [None]:
#Get video path list
data = read('./data/IVCR_no_type0_no_zero_test.json')
all_video_list = []
for sample in data:
    video_list = []
    for sub_data  in sample:
        if sub_data.get('gt_se') == [-1,-1]:
            video_list.append(sub_data['candidate_videos'])
        else:
            video_list.append(sub_data['video_id'])
    all_video_list.append(video_list)
write('./data/IVCR_no_type0_no_zero_dialogues_video_list_test.json',all_video_list)

In [None]:
data = read("./data/IVCR_no_type0_no_zero_dialogues_test.json")
new_data = []
count = 0
for s in data:
    if len(s) <= 15:
        count += 1
        new_data.append(s)
print(f"len(new_data) : {count}")
write('./data/IVCR_no_type0_no_zero_dialogues_test_7.json',new_data)