In [None]:
import ast
import json
import os
import pickle
from pathlib import Path

import openai
import pandas as pd
from dotenv import load_dotenv
from openai import OpenAI

In [None]:
# トークン数を記録する変数を初期化
total_input_tokens = 0
total_output_tokens = 0

In [None]:
# 使用するモデルの設定
MODEL_NAME = "gpt-3.5-turbo"

# .envファイルのロード
load_dotenv(dotenv_path=Path("./data/.env"))

# OpenAI APIキーの設定
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
openai.api_key = OPENAI_API_KEY

# モデルのコスト情報のロード
with open("./data/model_costs.json", "r") as f:
    model_costs = json.load(f)

# ラベルのロード
with open("./data/labels.json", "r") as f:
    labels = json.load(f)["labels"]

# 元データのデータフレームのロード
with open("data/df_batches.pkl", "rb") as f:
    df_batches = pickle.load(f)

# 元データの辞書のロード
with open("data/original_data_dict.pkl", "rb") as f:
    original_data_dict = pickle.load(f)

In [None]:
def classify_and_evaluate(texts_dict: dict[str], labels: list[str], model: str) -> dict:
    """
    文章を指定されたラベルに分類し、満足か不満かを判断する関数

    Parameters
    ----------
    texts_dict : dict[str]
        分類する文章の辞書
    labels : list[str]
        分類するラベル
    model : str
        使用するモデル

    Returns
    -------
    dict
        分類結果と満足度の判定結果
    """

    global total_input_tokens, total_output_tokens

    try:
        # OpenAI APIリクエストの作成
        client = OpenAI()
        response = client.chat.completions.create(
            model=model,
            messages=[
                # {
                #     "role": "system",
                #     "content": "あなたは、テキストを事前定義されたラベルに分類し、満足度を評価するアシスタントです。",
                # },
                {
                    "role": "user",
                    "content": f"与えられた文章を次のようなラベルの1つに分類してください: {labels}。"
                    + "\n次に、その文章が満足か不満のどちらであるかを判断してください。\n出力の形式は次のような辞書型にしてください: {text_id : {'label': 'ラベル名', 'sentiment': '満足' または '不満'}, text_id: ...}。"
                    + f"\n文章: {texts_dict}",
                },
            ],
            temperature=1.0,
            # max_tokens=100,
        )

        # レスポンスからトークン数を取得
        input_tokens = response.usage.prompt_tokens
        output_tokens = response.usage.completion_tokens
        total_input_tokens += input_tokens
        total_output_tokens += output_tokens

        # 結果の抽出
        result_text = response.choices[0].message.content
        # 文字列を辞書型に変換
        result_dict = ast.literal_eval(result_text)

        return {
            "result_dict": result_dict,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
        }

    except Exception as e:
        return {"error": str(e)}


# def evaluate_result(result_dict: dict[str], labels: list[str]) -> dict:


def calculate_cost(input_tokens: int, output_tokens: int, model_name: str) -> float:
    """
    使用したトークン数に基づいてコストを計算する関数

    Parameters
    ----------
    input_tokens : int
        入力トークン数
    output_tokens : int
        出力トークン数
    model_name : str
        使用したモデルの名前

    Returns
    -------
    float
        トークン数に基づいて計算されたコスト(単位: USD)
    """

    input_cost = input_tokens / 1e6 * model_costs[model_name]["input"]
    output_cost = output_tokens / 1e6 * model_costs[model_name]["output"]
    return input_cost + output_cost

In [None]:
# len(df_batches)

In [None]:
result_dict = {}

for df_batch in df_batches:
    # インデックスをキー、文章を値とする辞書に変換
    texts_dict = df_batch["文章"].to_dict()

    # 分類と評価を実行
    result = classify_and_evaluate(texts_dict, labels, MODEL_NAME)
    result_dict.update(result["result_dict"])

In [None]:
len(result_dict)

In [None]:
result_dict

In [None]:
df_compare = pd.DataFrame(
    {
        "original": {
            key: {
                inner_key: inner_val
                for inner_key, inner_val in val.items()
                if inner_key != "text"
            }
            for key, val in original_data_dict.items()
        },
        "predicted": result_dict,
    }
)

# text列を追加
df_compare["text"] = {key: val["text"] for key, val in original_data_dict.items()}

# NaNを削除
df_compare = df_compare.dropna()

# 一致しているか
df_compare["is_match_all"] = df_compare["original"] == df_compare["predicted"]

In [None]:
df_compare

In [None]:
with open("data/df_compare.pkl", "wb") as f:
    pickle.dump(df_compare, f)

In [None]:
# 正解率
accuracy = df_compare["is_match"].sum() / len(df_compare)
accuracy

In [None]:
# predicted_dict
# original_data_dict

In [None]:
cost = calculate_cost(total_input_tokens, total_output_tokens, MODEL_NAME)

print(f"Total input tokens: {total_input_tokens}")
print(f"Total output tokens: {total_output_tokens}")
print(f"Total cost: ${cost}")