In [None]:
# !pip install tqdm sparqlwrapper openai

In [35]:
import os
import json
import re
import urllib
from time import sleep
from collections import deque
from pathlib import Path
from pprint import pprint
import numpy as np
from datetime import datetime

from tqdm import tqdm
from SPARQLWrapper import SPARQLWrapper, JSON
from openai import OpenAI

api_delay = 0.1
client = OpenAI(
    api_key = os.environ['OPENAI_API_KEY'],  # this is also the default, it can be omitted
    # api_key = userdata.get('PBL_OPENAI_API_KEY')    # Google Colabのシークレットに"PBL_OPENAI_API_KEY"という名前でキーを保存しておく
)
# model = 'gpt-4o-2024-05-13'
model = 'gpt-4o-mini-2024-07-18'

dataset_path = "https://raw.githubusercontent.com/KGRC4SI/DataSet/kgrc4si/"
qa_list_dir_path = Path.cwd() / 'qa_path'
result_dir_path = Path.cwd() / 'results'
result_dir_path.mkdir(exist_ok=True, parents=True)

KeyError: 'OPENAI_API_KEY'

In [3]:
class Database:
    def __init__(self):
        self.sparql = SPARQLWrapper("https://kgrc4si.home.kg:7200/repositories/KGRC4SIv05")

    def query(self, sparql_query):
        sleep(api_delay)  # 短時間で叩きすぎないよう調整
        self.sparql.setQuery(sparql_query)
        self.sparql.setReturnFormat(JSON)
        return self.sparql.query().convert()["results"]["bindings"]

db = Database()

In [4]:
def test_database():
    sparql_query = """
PREFIX ex: <http://kgrc4si.home.kg/virtualhome2kg/instance/>
PREFIX vh2kg: <http://kgrc4si.home.kg/virtualhome2kg/ontology/>
select DISTINCT ?event ?place where {
    ex:walk_with_memory_loss6_scene1 vh2kg:hasEvent ?event .
    ?event vh2kg:to ?place .
    ?place a vh2kg:Livingroom .
}
"""
    results = db.query(sparql_query)
    print(results)
    print(len(results))
    for value in results:
        print(value["event"]["value"].split('/')[-1], value["place"]["value"].split('/')[-1])

test_database()

[{'event': {'type': 'uri', 'value': 'http://kgrc4si.home.kg/virtualhome2kg/instance/event0_walk_with_memory_loss6_scene1'}, 'place': {'type': 'uri', 'value': 'http://kgrc4si.home.kg/virtualhome2kg/instance/livingroom342_scene1'}}]
1
event0_walk_with_memory_loss6_scene1 livingroom342_scene1


In [5]:
# 質問のパスから質問文、シーン番号、選択肢、正解を取り出す
def extract_question_details(question_path):
    sleep(api_delay)
    with urllib.request.urlopen(question_path) as response:
        data = json.loads(response.read().decode())
        question = data["question"]
        question = modify_question(question)
        scenario = None
        if "scenario" in data:
            scenario = data["scenario"]
        elif "senario" in data:
            scenario = data["senario"]

        choices = [answer["answer"] for answer in data["answers"]]
        correct_answer = next((answer["answer"] for answer in data["answers"] if answer["correct"]), None)

    # アクティビティ一覧が書いてあるファイルのパスを構築
    activities_path = dataset_path + f"CompleteData/Episodes/{scenario}.json"

    # アクティビティ一覧ファイルからアクティビティ一覧を取り出す

    with urllib.request.urlopen(activities_path) as response:
        data = json.loads(response.read().decode())
        activities = data["data"]["activities"]

    activities = [a.lower() for a in activities]

    # シーン番号を抽出
    scene_id = scenario.split('_')[0]

    return question, choices, scene_id, activities, correct_answer

def modify_question(question):
    pattern = re.compile(r"Answer\d+\(name=.*?, number=(\d+)\)\.number")
    match = pattern.search(question)

    if match:
        number = match.group(1)
        question = re.sub(pattern, number, question, 1)

    return question

In [6]:
def test_extract_question_details(question_path = None):
    if question_path is None:
        question_path = dataset_path + f"QA/YesNo/Q1/q1_answer_scene1_Day1_bedroom.json"

    question, choices, scene_id, activities, correct_answer = extract_question_details(question_path)
    print(f'question: {question}')
    print(f'choices: {choices}')
    print(f'scene_id: {scene_id}')
    print(f'activities: {activities}')
    print(f'correct_answer: {correct_answer}')

    return question, choices, scene_id, activities, correct_answer


test_extract_question_details()

question: Did he enter the bedroom 6 times?
choices: ['Yes', 'No']
scene_id: scene1
activities: ['get_out_of_bed1', 'put_slippers_in_closet1', 'walk_with_memory_loss6', 'put_groceries_in_fridge13', 'walk_with_memory_loss5', 'put_groceries_in_fridge21', 'do_homework_on_paper1', 'clean_kitchentable1', 'get_out_of_bed1', 'read_book1']
correct_answer: Yes


('Did he enter the bedroom 6 times?',
 ['Yes', 'No'],
 'scene1',
 ['get_out_of_bed1',
  'put_slippers_in_closet1',
  'walk_with_memory_loss6',
  'put_groceries_in_fridge13',
  'walk_with_memory_loss5',
  'put_groceries_in_fridge21',
  'do_homework_on_paper1',
  'clean_kitchentable1',
  'get_out_of_bed1',
  'read_book1'],
 'Yes')

In [7]:
# 部屋間の移動経路を幅優先探索で求める
def bfs_path(scene_id, start, goal):
    room_graphs = {
        'scene1': {
            'bedroom': ['bathroom', 'kitchen'],
            'bathroom': ['bedroom'],
            'kitchen': ['bedroom', 'livingroom'],
            'livingroom': ['kitchen']
        },
        'scene2': {
            'bedroom': ['kitchen'],
            'bathroom': ['kitchen'],
            'kitchen': ['bedroom', 'livingroom', 'bathroom'],
            'livingroom': ['kitchen']
        },
        'scene4': {
            'bedroom': ['kitchen', 'livingroom'],
            'toilet': ['kitchen'],
            'kitchen': ['bedroom', 'toilet'],
            'livingroom': ['bedroom']
        },
        'scene5': {
            'bedroom': ['kitchen'],
            'bathroom': ['livingroom'],
            'kitchen': ['bedroom', 'livingroom'],
            'livingroom': ['bathroom', 'kitchen']
        },
        'scene6': {
            'bedroom': ['kitchen', 'livingroom', 'bathroom'],
            'toilet': ['bedroom'],
            'kitchen': ['bedroom'],
            'livingroom': ['bedroom']
        },
        'scene7': {
            'bedroom': ['kitchen'],
            'bathroom': ['bedroom'],
            'kitchen': ['bedroom', 'bathroom', 'livingroom'],
            'livingroom': ['bedroom']
        }
    }

    graph = room_graphs[scene_id]

    queue = deque([start])
    parent = {start: None}
    while queue:
        vertex = queue.popleft()
        if vertex == goal:
            break
        for neighbor in graph[vertex]:
            if neighbor not in parent:
                parent[neighbor] = vertex
                queue.append(neighbor)

    # 経路を復元
    path = []
    current = goal
    while current is not None:
        path.append(current)
        current = parent[current]
    path.reverse()

    return path

In [8]:
def test_bfs_path():
    path = bfs_path('scene1', 'bathroom', 'livingroom')
    print(path)

test_bfs_path()

['bathroom', 'bedroom', 'kitchen', 'livingroom']


In [37]:
# 各イベントの情報を取得する関数
# 引数:
#   scene_id: シーン番号 (例: scene1, scene2, ...)
#   activities: アクティビティの配列 (全て小文字)
# 戻り値:
#   各イベントの詳細情報 (イベント名, 動作, 場所，対象物, 所要時間)
def get_event_details(scene_id, activities):
    event_details = []
    last_place = None
    elapsed_time = 0.

    for activity in activities:
        activity = activity.lower()
        sparql_query = f"""
            PREFIX ex: <http://kgrc4si.home.kg/virtualhome2kg/instance/>
            PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema/>
            PREFIX vh2kg: <http://kgrc4si.home.kg/virtualhome2kg/ontology/>
            PREFIX time: <http://www.w3.org/2006/time#>

            SELECT ?event ?event_number ?action ?event_duration ?object_name ?place_name ?from_name ?to_name WHERE {{
                ex:{activity}_{scene_id} vh2kg:hasEvent ?event .
                ?event vh2kg:eventNumber ?event_number .
                ?event vh2kg:action ?action .
                ?event vh2kg:time ?time .
                ?time time:numericDuration ?event_duration .

                OPTIONAL {{
                    ?event vh2kg:mainObject ?object .
                    ?object a ?object_name .
                }}

                OPTIONAL {{
                    ?event vh2kg:place ?place .
                    ?place a ?place_name .
                }}

                OPTIONAL {{
                    ?event vh2kg:from ?from .
                    ?from a ?from_name .
                }}

                OPTIONAL {{
                    ?event vh2kg:to ?to .
                    ?to a ?to_name .
                }}

            }} ORDER BY ?event_number
        """

        results = db.query(sparql_query)
        for i, result in enumerate(results):
            event = result['event']['value'].split('/')[-1]
            action = result['action']['value'].split('/')[-1].lower()
            duration = float(result['event_duration']['value'])
            if 'object_name' in result:
                object_name = result['object_name']['value'].split('/')[-1].lower()
            else:
                object_name = None

            if 'place_name' in result:
                place_name = result['place_name']['value'].split('/')[-1].lower()
            else:
                place_name = None

            if 'from_name' in result:
                from_name = result['from_name']['value'].split('/')[-1].lower()
            else:
                from_name = None

            if 'to_name' in result:
                to_name = result['to_name']['value'].split('/')[-1].lower()
            else:
                to_name = None

            if i == 0:  # 各アクティビティ開始時(i=0)の処理
                current_place = place_name
                if current_place == None:
                    current_place = from_name

                if last_place is not None and current_place != last_place:  # アクティビティ開始時(i=0)かつ，2つ目のアクティビティ(last_place is not None)からは移動経路を補完する
                    path = bfs_path(scene_id, last_place, current_place)
                    for place in path[1:]:
                        event_details.append({'event': event, 'action': 'silent walk', 'place': place, 'duration': 0., 'object': None, 'erapsed time': elapsed_time})
                        last_place = place

            else:   # アクティビティ開始時以外は前回いた位置を基準にする
                current_place = last_place

            if action == 'walk' and to_name is not None:    # 歩いた場合は移動先を取得
                current_place = to_name

            elapsed_time += duration

            event_details.append({'event': event, 'action': action, 'place': current_place, 'duration': duration, 'object': object_name, 'erapsed time': elapsed_time})
            last_place = current_place

    return event_details

# 特定の部屋に入った回数を求める関数
# 引数:
#   room_name: 部屋名 (例: bedroom, bathroom, kitchen, livingroom)
#   scene_id: シーン番号 (例: scene1, scene2, ...)
#   activities: アクティビティの配列 (全て小文字)
# 戻り値:
#   部屋に入った回数
def count_total_room_entries(room_name, scene_id, activities):
    room_name = room_name.lower()
    event_details = get_event_details(scene_id, activities)
    count = 0
    last_place = None

    for event_detail in event_details:
        if event_detail['place'] != last_place and event_detail['place'] == room_name:
            count += 1

        last_place = event_detail['place']

    return count

# それぞれのアクティビティの中で特定の動作を行った回数を求める関数
# 引数:
#   action_name: 動作名 (全て小文字)
#   scene_id: シーン番号 (例: scene1, scene2, ...)
#   activities: アクティビティの配列 (全て小文字)
# 戻り値:
#   それぞれのアクティビティの中で特定の動作を行った回数
def count_action_occurrences(action_name, scene_id, activities):
    action_name = action_name.lower()
    event_details = get_event_details(scene_id, activities)
    count = 0

    for event_detail in event_details:
        if event_detail['action'] == action_name:
            count += 1

    return count

# 特定の部屋に入って最初に行った動作を求める関数
# 引数:
#   room_name: 部屋名 (例: bedroom, bathroom, kitchen, livingroom)
#   scene_id: シーン番号 (例: scene1, scene2, ...)
#   activities: アクティビティの配列 (全て小文字)
# 戻り値:
#   最初に行った動作の名前
def get_first_action_in_room(room_name, scene_id, activities):
    entered_room = False
    event_details = get_event_details(scene_id, activities)

    actions = []    # 質問によってwalkを入れる場合と入れない場合があるので動作を二つまで保存
    for event_detail in event_details:
        if event_detail['place'] == room_name:
            entered_room = True

        if entered_room:
            if event_detail['action'] != 'walk':
                actions.append(event_detail['action'])
                return actions

            elif len(actions) == 0:
                actions.append(event_detail['action'])

    return actions

# 特定の部屋に入る直前に行った動作を求める関数
# 引数:
#   room_name: 部屋名 (例: bedroom, bathroom, kitchen, livingroom)
#   scene_id: シーン番号 (例: scene1, scene2, ...)
#   activities: アクティビティの配列 (全て小文字)
# 戻り値:
#   部屋に入る直前に行った動作の名前
def get_last_action_before_entry(room_name, scene_id, activities):
    event_details = get_event_details(scene_id, activities)
    actions = []    # 質問によってwalkを入れる場合と入れない場合があるので動作を二つまで保存

    for event_detail in event_details:
        if event_detail['place'] == room_name:
            return actions

        if len(actions) == 0:
            actions = [event_detail['action']]
        else:
            if event_detail['action'] != 'walk':
                    actions = [actions[-1], event_detail['action']]
            elif actions[-1] != 'walk':
                actions = [actions[-1], event_detail['action']]

    return actions

# 開始から特定の時間が経過した時に行った動作を求める関数
# 引数:
#   time: 開始からの経過時間 (例: 1h-30m-15s)
#   scene_id: シーン番号 (例: scene1, scene2, ...)
#   activities: アクティビティの配列 (全て小文字)
# 戻り値:
#   経過時間時点での物体と場所の名前
def get_action_at_time(time, scene_id, activities):
    remain_time = time_to_seconds(time)
    event_details = get_event_details(scene_id, activities)

    for event_detail in event_details:
        remain_time -= event_detail['duration']

        if remain_time > 0.001:
            continue

        action = event_detail['action']
        place = event_detail['place']
        object = event_detail['object']

        return {'action': action, 'target_object': object, 'place': place}

    return None, None

def time_to_seconds(time):
    # 正規表現パターンを定義して、時間を抽出
    pattern = re.compile(r'(\d+)h-(\d+)m-(\d+)s')
    match = pattern.search(time)

    if match:
        hours, minutes, seconds = map(int, match.groups())
        total_seconds = hours * 3600 + minutes * 60 + seconds
        return total_seconds
    else:
        raise ValueError("時間の形式が正しくありません")


In [10]:
def test_q1(question_path=None):
    if question_path is None:
        question_path = dataset_path + "QA/MultiChoice/Q1/q1_answer_scene1_Day10_bathroom.json"
    question, choices, scene_id, activities, correct_answer = test_extract_question_details(question_path)
    print(f"send queries...")
    answer = count_total_room_entries("bathroom", scene_id, activities)
    print(f"count_total_room_entries: {answer}\n")

def test_q2(question_path=None):
    if question_path is None:
        question_path = dataset_path + "QA/MultiChoice/Q2/q2_answer_scene1_Day10_drink.json"
    question, choices, scene_id, activities, correct_answer = test_extract_question_details(question_path)
    print(f"send queries...")
    answer = count_action_occurrences("drink", scene_id, activities)
    print(f"count_action_occurrences: {answer}\n")

def test_q3(question_path=None):
    if question_path is None:
        # question_path = dataset_path + "QA/MultiChoice/Q3/q3_answer_scene1_Day1.json"
        question_path = dataset_path + "QA/MultiChoice/Q3/q3_answer_scene2_Day1.json"
    question, choices, scene_id, activities, correct_answer = test_extract_question_details(question_path)
    print(f"send queries...")
    answer = get_first_action_in_room("kitchen", scene_id, activities)
    print(f"get_first_action_in_room: {answer}\n")

def test_q4(question_path=None):
    if question_path is None:
        question_path = dataset_path + "QA/MultiChoice/Q4/q4_answer_scene1_Day1.json"
    question, choices, scene_id, activities, correct_answer = test_extract_question_details(question_path)
    print(f"send queries...")
    answer = get_last_action_before_entry("kitchen", scene_id, activities)
    print(f"get_last_action_before_entry: {answer}\n")

def test_q5(question_path=None):
    if question_path is None:
        question_path = dataset_path + "QA/MultiChoice/Q5/q5_answer_scene1_Day10_location0.json"
    question, choices, scene_id, activities, correct_answer = test_extract_question_details(question_path)
    print(f"send queries...")
    answer = get_action_at_time("00h-00m-16s", scene_id, activities)
    print(f"get_action_at_time: {answer}\n")


event_details = get_event_details("scene1",  ["Get_out_of_bed1","Turn_on_light6","Clean_desk4","Drink_milk1","Turn_on_light5","Put_away_groceries_from_fridge1","Workout1","Put_away_groceries_from_fridge3","Fall_backward_while_walking_and_turning1","Put_groceries_in_fridge23"])
pprint(event_details)
test_q1()
test_q2()
test_q3()
test_q4()
test_q5()


[{'action': 'walk',
  'duration': 5.52,
  'erapsed time': 5.52,
  'event': 'event0_get_out_of_bed1_scene1',
  'object': 'bed',
  'place': 'bedroom'},
 {'action': 'sit',
  'duration': 6.002999999999999,
  'erapsed time': 11.523,
  'event': 'event1_get_out_of_bed1_scene1',
  'object': 'bed',
  'place': 'bedroom'},
 {'action': 'stand',
  'duration': 6.141,
  'erapsed time': 17.664,
  'event': 'event2_get_out_of_bed1_scene1',
  'object': None,
  'place': 'bedroom'},
 {'action': 'walk',
  'duration': 4.899,
  'erapsed time': 22.563000000000002,
  'event': 'event3_get_out_of_bed1_scene1',
  'object': 'pillow',
  'place': 'bedroom'},
 {'action': 'grab',
  'duration': 3.1049999999999995,
  'erapsed time': 25.668000000000003,
  'event': 'event4_get_out_of_bed1_scene1',
  'object': 'pillow',
  'place': 'bedroom'},
 {'action': 'put',
  'duration': 6.8309999999999995,
  'erapsed time': 32.499,
  'event': 'event5_get_out_of_bed1_scene1',
  'object': 'pillow',
  'place': 'bedroom'},
 {'action': 'wal

In [11]:
def ask_chatgpt(prompt):
    sleep(api_delay)
    response = client.chat.completions.create(
        messages=[
            {
                "role": "user",
                "content": prompt,
            }
        ],
        model=model,
    )

    return response.choices[0].message.content

In [36]:
def test_ask_chatgpt():
    prompt = "What is the capital of Japan?"
    response = ask_chatgpt(prompt)
    print(response)

test_ask_chatgpt()

The capital of Japan is Tokyo.


In [13]:
def answer_question_with_chatgpt(question_path, print_prompt=False, output_text_path=None):
    function_map = {
        "count_total_room_entries": count_total_room_entries,
        "count_action_occurrences": count_action_occurrences,
        "get_first_action_in_room": get_first_action_in_room,
        "get_last_action_before_entry": get_last_action_before_entry,
        "get_action_at_time": get_action_at_time
    }

    question, choices, scene_id, activities, correct_answer = extract_question_details(question_path)
    prompt1 = f"""
I will provide a question, choices, and functions.
Select one function to answer the question and include the required arguments in your response.
Format: function_name, arguments (e.g., count_total_room_entries, bedroom)

Question: {question}
Choices: {choices}
Functions:
- count_total_room_entries
# Parameters: room_name (e.g., bedroom, bathroom, kitchen, livingroom)
# Returns: Total number of entries into the room

- count_action_occurrences
# Parameters: action_name (close, grab, switchon, switchoff, turnto, ...)
# Returns: Number of times the action was performed within each activity

- get_first_action_in_room
# Parameters: room_name (e.g., bedroom, bathroom, kitchen, livingroom)
# Returns: First action performed after entering the room

- get_last_action_before_entry
# Parameters: room_name (e.g., bedroom, bathroom, kitchen, livingroom)
# Returns: Last action performed before entering the room

- get_action_at_time
# Parameters: time (e.g., 1h-30m-15s)
# Returns: Name of the object and place at the specified time
    """

    if print_prompt:
        print(prompt1)

    if output_text_path is not None:
        with open(output_text_path, 'a') as f:
            f.write(prompt1)
            f.write('\n')

    response = ask_chatgpt(prompt1)

    if print_prompt:
        print(f'ChatGPT Response: [ {response} ]')

    if output_text_path is not None:
        with open(output_text_path, 'a') as f:
            f.write(f'ChatGPT Response: [ {response} ]\n')

    func_name, param = response.split(', ')
    func = function_map[func_name]
    param = param.lower()

    results = func(param, scene_id, activities)

    prompt2 = f"""
Please select the appropriate choice and output only the choice.
Format: Choice: {{answer}} (e.g., Choice: Yes, Choice: No, Choice: 5, Choice: GRAB, Choice: livingroom)

Question: {question}
Choices: {choices}
Function: {func_name}
Function output: {results}
"""

    if print_prompt:
        print(prompt2)

    if output_text_path is not None:
        with open(output_text_path, 'a') as f:
            f.write(prompt2)
            f.write('\n')

    answer = ask_chatgpt(prompt2)

    if print_prompt:
        print(f'ChatGPT Response: [ {answer} ]')
        print(f'ChatGPT Answer: {answer[8:]}')
        print(f'Correct Answer: {correct_answer}')
        print(f'Is the answer correct?: {answer[8:] == str(correct_answer)}')

    if output_text_path is not None:
        with open(output_text_path, 'a') as f:
            f.write(f'ChatGPT Response: [ {answer} ]\n')
            f.write(f'ChatGPT Answer: {answer[8:]}\n')
            f.write(f'Correct Answer: {correct_answer}\n')
            f.write(f'Is the answer correct?: {answer[8:] == str(correct_answer)}\n\n')
    return answer[8:], str(correct_answer)

In [34]:
def test_answer_question_with_chatgpt():
    question_path = dataset_path + "QA/YesNo/Q1/q1_answer_scene1_Day1_bathroom.json"
    # question_path = dataset_path + "QA/YesNo/Q1/q1_answer_scene1_Day1_kitchen.json"
    # question_path = dataset_path + "QA/YesNo/Q2/q2_answer_scene1_Day1_close.json"
    # question_path = dataset_path + "QA/YesNo/Q2/q2_answer_scene1_Day1_grab.json"
    # question_path = dataset_path + "QA/YesNo/Q3/q3_answer_scene1_Day1.json"
    # question_path = dataset_path + "QA/YesNo/Q3/q3_answer_scene1_Day2.json"

    # question_path = dataset_path + "QA/MultiChoice/Q1/q1_answer_scene1_Day1_bathroom.json"
    # question_path = dataset_path + "QA/MultiChoice/Q1/q1_answer_scene1_Day1_kitchen.json"
    # question_path = dataset_path + "QA/MultiChoice/Q2/q2_answer_scene1_Day1_close.json"
    # question_path = dataset_path + "QA/MultiChoice/Q2/q2_answer_scene1_Day1_grab.json"
    # question_path = dataset_path + "QA/MultiChoice/Q3/q3_answer_scene1_Day1.json"
    # question_path = dataset_path + "QA/MultiChoice/Q3/q3_answer_scene1_Day2.json"
    # question_path = dataset_path + "QA/MultiChoice/Q4/q4_answer_scene1_Day1.json"
    # question_path = dataset_path + "QA/MultiChoice/Q4/q4_answer_scene1_Day2.json"
    # question_path = dataset_path + "QA/MultiChoice/Q5/q5_answer_scene1_Day1_object0.json"
    # question_path = dataset_path + "QA/MultiChoice/Q5/q5_answer_scene1_Day1_location0.json"

    test_extract_question_details(question_path)
    output_text_path = Path.cwd() / 'test.txt'
    answer, correct_answer = answer_question_with_chatgpt(question_path, print_prompt=False, output_text_path=output_text_path)

test_answer_question_with_chatgpt()

question: Did he enter the bathroom 2 times?
choices: ['Yes', 'No']
scene_id: scene1
activities: ['get_out_of_bed1', 'put_slippers_in_closet1', 'walk_with_memory_loss6', 'put_groceries_in_fridge13', 'walk_with_memory_loss5', 'put_groceries_in_fridge21', 'do_homework_on_paper1', 'clean_kitchentable1', 'get_out_of_bed1', 'read_book1']
correct_answer: Yes


In [26]:
def organize_paths_by_key(paths):
    organized_dict = {}

    for path in paths:
        # Split the path by '/'
        parts = path.split('/')

        # Create the key using the first two parts
        key = '/'.join(parts[1:3])

        # If the key is not in the dictionary, add it with an empty list
        if key not in organized_dict:
            organized_dict[key] = []

        # Append the full path to the list for this key
        organized_dict[key].append(path)

    return organized_dict

def evaluate(question_path_list, result_dir_path, scene, day, print_prompt=True):
    print(f'question path size: {len(question_path_list)}')
    log_text_path = result_dir_path / f'{scene}_{day}_log_{model}.txt'
    result_csv_path = result_dir_path / f'{scene}_{day}_result_{model}.csv'
    
    start_time = datetime.now()
    start_time_str = start_time.strftime("%Y%m%d_%H%M%S")
    log_text_path = log_text_path.with_stem(f'{log_text_path.stem}_{start_time_str}')
    result_csv_path = result_csv_path.with_stem(f'{result_csv_path.stem}_{start_time_str}')

    question_path_dict = organize_paths_by_key(question_path_list)
    all_results = {}

    for key, question_paths in question_path_dict.items():
        print(f'Category: {key}')

        if log_text_path is not None:
            with open(log_text_path, 'a') as f:
                f.write(f'Category: {key}\n')
        success = 0
        failure = 0

        details_list = []
        for question_path in question_paths:
            with open(log_text_path, 'a') as f:
                f.write(f'question path: {question_path}\n')
            answer, correct_answer = answer_question_with_chatgpt(dataset_path + question_path, print_prompt=print_prompt, output_text_path=log_text_path)

            if answer == correct_answer:
                success += 1
            else:
                failure += 1

            details_list.append({"path": question_path, "ChatGPT Answer": answer, "Correct Answer": correct_answer, "answer==correct_answer": answer == correct_answer})

        print(f'success: {success}')
        print(f'failure: {failure}')
        print(f'success rate: {success / (success + failure)}')
        print('results:')
        for details in details_list:
            print(details)
        print()

        with open(log_text_path, 'a') as f:
            f.write(f'success: {success}\n')
            f.write(f'failure: {failure}\n')
            f.write(f'success rate: {success / (success + failure)}\n')
            f.write('results:\n')
            for details in details_list:
                for k, v in details.items():
                    f.write(f'{k}: {v}, ')
                f.write('\n')

        all_results[key] = dict()
        all_results[key]['success'] = success
        all_results[key]['failure'] = failure
        all_results[key]['success rate'] = success / (success + failure)

    pprint(all_results)
    with open(result_csv_path, 'w') as f:
        f.write('scene,day,question type,question number,success,failure,success rate\n')
        for key, value in all_results.items():
            key = key.split('/')
            f.write(f'{scene},{day},{key[0]},{key[1]},{value["success"]},{value["failure"]},{value["success rate"]}\n')


In [38]:
for scene in [f'scene{i}' for i in range(1, 3)]:
    for day in [f'Day{i}' for i in range(1, 6)]:
        qa_path_txt = qa_list_dir_path / f'{scene}_{day}.txt'
        qa_paths = []
        with open(qa_path_txt, 'r') as f:
            for line in f:
                qa_paths.append(line.strip())

        evaluate(qa_paths, result_dir_path, scene, day, print_prompt=False)

question path size: 69
Category: MultiChoice/Q1
success: 3
failure: 0
success rate: 1.0
results:
{'path': 'QA/MultiChoice/Q1/q1_answer_scene2_Day3_bedroom.json', 'ChatGPT Answer': '3', 'Correct Answer': '3', 'answer==correct_answer': True}
{'path': 'QA/MultiChoice/Q1/q1_answer_scene2_Day3_kitchen.json', 'ChatGPT Answer': '6', 'Correct Answer': '6', 'answer==correct_answer': True}
{'path': 'QA/MultiChoice/Q1/q1_answer_scene2_Day3_livingroom.json', 'ChatGPT Answer': '3', 'Correct Answer': '3', 'answer==correct_answer': True}

Category: MultiChoice/Q2
success: 15
failure: 0
success rate: 1.0
results:
{'path': 'QA/MultiChoice/Q2/q2_answer_scene2_Day3_close.json', 'ChatGPT Answer': '3', 'Correct Answer': '3', 'answer==correct_answer': True}
{'path': 'QA/MultiChoice/Q2/q2_answer_scene2_Day3_drink.json', 'ChatGPT Answer': '4', 'Correct Answer': '4', 'answer==correct_answer': True}
{'path': 'QA/MultiChoice/Q2/q2_answer_scene2_Day3_find.json', 'ChatGPT Answer': '2', 'Correct Answer': '2', 'answ

In [28]:
# 質問のパスから質問文、シーン番号、選択肢、正解を取り出す
def extract_caption_question_details(question_path):
    sleep(api_delay)
    with urllib.request.urlopen(question_path) as response:
        data = json.loads(response.read().decode())
        question = data["question"]

        choices = [answer["answer"] for answer in data["answers"]]
        correct_answer = next((answer["answer"] for answer in data["answers"] if answer["correct"]), None)

    # 正規表現パターンを定義
    scene_pattern = re.compile(r"scene(\d)")
    activity_pattern = re.compile(r"movies_(.*?)\.json")

    # 正規表現を使って抽出
    scene_match = scene_pattern.search(question_path)
    activity_match = activity_pattern.search(question_path)

    # 抽出結果を表示
    if scene_match and activity_match:
        scene_id = scene_match.group(0)  # "scene1"
        activity = activity_match.group(1)  # "Relax_on_bed1"
        activities = [activity.lower()]

    return question, choices, scene_id, activities, correct_answer

In [29]:
def test_extract_caption_question_details(question_path = None):
    if question_path is None:
        question_path = dataset_path + f"QA/MultiChoice/Caption/Abnormal_scene1_movies_Fall_backward_while_walking_and_turning1.json"

    question, choices, scene_id, activities, correct_answer = extract_caption_question_details(question_path)
    print(f'question: {question}')
    print(f'choices: {choices}')
    print(f'scene_id: {scene_id}')
    print(f'activities: {activities}')
    print(f'correct_answer: {correct_answer}')

    return question, choices, scene_id, activities, correct_answer

test_extract_caption_question_details()

question: Which is the most appropriate caption of the movie?
choices: ['Fall backward while walking and turning', 'Fall in bathroom', 'Fall while during getting up or rising', 'Fall while initiation of walking', 'Fall while preparing meal', 'Fall while sitting down', 'Fall while sitting down or lowering', 'Fall while standing and reaching', 'Fall while standing and turning', 'Fall while standing at somewhere height', 'Fall while standing quietly', 'Fall while walking forward', 'Run with disorientation', 'Stand on coffee table', 'Walk with memory loss', 'Fall sideways while walking forward', 'Fall while climbing at somewhere height', 'Get out of bed', 'Go to sleep', 'Read bedtime story', 'Drink alcohol', 'Drink juice', 'Drink milk', 'Drink water', 'Drink wine', 'Eat breadslice', 'Eat cupcake', 'Have evening beverage', 'Have morning beverage', 'Cook carrot', 'Cook fried bread', 'Cook potato using microwave', 'Cook potato using stove', 'Cook salmon', 'Make cold cereal', 'Make hot cereal'

('Which is the most appropriate caption of the movie?',
 ['Fall backward while walking and turning',
  'Fall in bathroom',
  'Fall while during getting up or rising',
  'Fall while initiation of walking',
  'Fall while preparing meal',
  'Fall while sitting down',
  'Fall while sitting down or lowering',
  'Fall while standing and reaching',
  'Fall while standing and turning',
  'Fall while standing at somewhere height',
  'Fall while standing quietly',
  'Fall while walking forward',
  'Run with disorientation',
  'Stand on coffee table',
  'Walk with memory loss',
  'Fall sideways while walking forward',
  'Fall while climbing at somewhere height',
  'Get out of bed',
  'Go to sleep',
  'Read bedtime story',
  'Drink alcohol',
  'Drink juice',
  'Drink milk',
  'Drink water',
  'Drink wine',
  'Eat breadslice',
  'Eat cupcake',
  'Have evening beverage',
  'Have morning beverage',
  'Cook carrot',
  'Cook fried bread',
  'Cook potato using microwave',
  'Cook potato using stove',
  

In [30]:
def answer_caption_question_with_chatgpt(question_path, print_prompt=False, output_text_path=None):
    question, choices, scene_id, activities, correct_answer = extract_caption_question_details(question_path)
    seed = 0
    np.random.seed(seed)
    np.random.shuffle(choices)

    whole_event_details = get_event_details(scene_id, activities)

    event_details = "\n"
    for i, whole_event_detail in enumerate(whole_event_details):
        event_details += f'action {i + 1}: {whole_event_detail["action"]}, '
        if whole_event_detail["object"] is not None:
            event_details += f'target: {whole_event_detail["object"]}, '
        event_details += f'place: {whole_event_detail["place"]}\n'

    prompt1 = f"""
The following actions describe a sequence of movements of a character.
What might the character be doing?
Please provide a few possible scenarios.
Additionally, determine if there are any signs of health issues based on the actions.

{event_details}
"""

    if print_prompt:
        print(prompt1)

    if output_text_path is not None:
        with open(output_text_path, 'a') as f:
            f.write(prompt1)
            f.write('\n')

    response = ask_chatgpt(prompt1)

    if print_prompt:
        print(f'ChatGPT Response: [ {response} ]')

    if output_text_path is not None:
        with open(output_text_path, 'a') as f:
            f.write(f'ChatGPT Response: [ {response} ]\n')

    prompt2 = f"""
Please select the options that best describe the character's actions from the following choices.
Choose up to five options in order of relevance, separated by commas.
Format: Choice: {{answer1}}, {{answer2}} ... (No period)

Scenarios: {response}
Character's movement: {event_details}
Choices: {choices}
"""

    if print_prompt:
        print(prompt2)

    if output_text_path is not None:
        with open(output_text_path, 'a') as f:
            f.write(prompt2)
            f.write('\n')

    answer = ask_chatgpt(prompt2)

    if print_prompt:
        print(f'ChatGPT Response: [ {answer} ]')
        print(f'ChatGPT Answer: {answer[8:].split(",")[0]}')
        print(f'Correct Answer: {correct_answer}')
        print(f'Is the answer correct?: {answer[8:].split(",")[0].lower() == str(correct_answer).lower()}')

    if output_text_path is not None:
        with open(output_text_path, 'a') as f:
            f.write(f'ChatGPT Response: [ {answer} ]\n')
            f.write(f'ChatGPT Answer: {answer[8:].split(",")[0]}\n')
            f.write(f'Correct Answer: {correct_answer}\n')
            f.write(f'Is the answer correct?: {answer[8:].split(",")[0].lower() == str(correct_answer).lower()}\n\n')
    return answer[8:].split(", "), answer[8:].split(",")[0], str(correct_answer)

In [33]:
def evaluate_caption(question_path_list, result_dir_path, scene, print_prompt=True):
    print(f'question path size: {len(question_path_list)}')
    
    log_text_path = result_dir_path / f'{scene}_caption_log_{model}.txt'
    result_csv_path = result_dir_path / f'{scene}_caption_result_{model}.csv'
    
    start_time = datetime.now()
    start_time_str = start_time.strftime("%Y%m%d_%H%M%S")
    log_text_path = log_text_path.with_stem(f'{log_text_path.stem}_{start_time_str}')
    result_csv_path = result_csv_path.with_stem(f'{result_csv_path.stem}_{start_time_str}')

    success = 0
    failure = 0

    details_list = []
    for question_path in question_path_list:
        with open(log_text_path, 'a') as f:
            f.write(f'question path: {question_path}\n')
        response, answer, correct_answer = answer_caption_question_with_chatgpt(dataset_path + question_path, print_prompt=print_prompt, output_text_path=log_text_path)
        answer = answer.lower()
        correct_answer = correct_answer.lower()
        response = [res.lower() for res in response]

        if answer == correct_answer:
            success += 1
        else:
            failure += 1

        details_list.append({"path": question_path,  "ChatGPT Response": response, "ChatGPT Answer": answer, "Correct Answer": correct_answer, "answer==correct_answer": answer == correct_answer, "correct_answer in response": correct_answer in response})

    print(f'success: {success}')
    print(f'failure: {failure}')
    print(f'success rate: {success / (success + failure)}')
    print('results:')
    for details in details_list:
        print(details)
    print()

    with open(log_text_path, 'a') as f:
        f.write(f'success: {success}\n')
        f.write(f'failure: {failure}\n')
        f.write(f'success rate: {success / (success + failure)}\n')
        f.write('results:\n')
        for details in details_list:
            for k, v in details.items():
                f.write(f'{k}: {v}, ')
            f.write('\n')
    
    with open(result_csv_path, 'w') as f:
        f.write(f'scene,path,ChatGPT Response1,ChatGPT Response2,ChatGPT Response3,ChatGPT Response4,ChatGPT Response5,ChatGPT Answer,Correct Answer,answer==correct_answer,correct_answer in response\n')
        for details in details_list:
            chatgpt_responses = ['' for _ in range(5)]
            for i, response in enumerate(details['ChatGPT Response']):
                chatgpt_responses[i] = response
            f.write(f'{scene},{details["path"]},')
            for response in chatgpt_responses:
                f.write(f'{response},')
            f.write(f'{details["ChatGPT Answer"]},{details["Correct Answer"]},{details["answer==correct_answer"]},{details["correct_answer in response"]}\n')


In [34]:
for scene in [f'scene{i}' for i in range(1, 2)]:
    qa_path_txt = qa_list_dir_path / f'{scene}_caption.txt'
    qa_paths = []
    with open(qa_path_txt, 'r') as f:
        for line in f:
            qa_paths.append(line.strip())

    evaluate_caption(qa_paths[:3], result_dir_path, scene, print_prompt=False)

question path size: 3
success: 0
failure: 3
success rate: 0.0
results:
{'path': 'QA/MultiChoice/Caption/Abnormal_scene1_movies_Fall_backward_while_walking_and_turning1.json', 'ChatGPT Response': ['pet cat', 'relax on sofa', 'watch television', 'drink juice while watching television', 'have evening beverage'], 'ChatGPT Answer': 'pet cat', 'Correct Answer': 'fall backward while walking and turning', 'answer==correct_answer': False, 'correct_answer in response': False}
{'path': 'QA/MultiChoice/Caption/Abnormal_scene1_movies_Fall_in_bathroom1.json', 'ChatGPT Response': ['use bathroom', 'use toilet', 'fall while sitting down or lowering', 'fall in bathroom', 'fall while standing and reaching'], 'ChatGPT Answer': 'use bathroom', 'Correct Answer': 'fall in bathroom', 'answer==correct_answer': False, 'correct_answer in response': True}
{'path': 'QA/MultiChoice/Caption/Abnormal_scene1_movies_Fall_while_during_getting_up_or_rising1.json', 'ChatGPT Response': ['fall while sitting down', 'fall whi