In [15]:
import asyncio
import json
import os
import random
import time
from functools import partial
from typing import Dict, List, Tuple

from google.genai import errors
from deepeval import assert_test
from deepeval.metrics import GEval, RoleAdherenceMetric
from deepeval.models import DeepEvalBaseLLM
from deepeval.test_case import LLMTestCase, LLMTestCaseParams
from dotenv import load_dotenv
from google import genai
from google.genai.types import GenerateContentConfig
from tqdm import tqdm

from gemini_prompt import MODEL_ID, get_chat_config, get_evaluator_config

load_dotenv()

"""
Conversational Completeness
Conversational Relevancy
Conversational G-Eval
Knowledge Retention
Role Adherence
Bias
Toxicity
"""



API_KEY = os.getenv("GEMINI_API_KEY")
with open("gemini_api_keys.txt", "r") as fhandle:
    API_KEYS = fhandle.read()
    API_KEYS = API_KEYS.split("\n")

In [16]:
class GeminiLLM(DeepEvalBaseLLM):
    """Class to implement Vertex AI for DeepEval"""

    def __init__(
        self,
        api_key: str,
        model_name: str,
        config: GenerateContentConfig = None,
    ):

        self.client = genai.Client(api_key=api_key)
        self.model_name = model_name
        self.config = config

    def load_model(self):
        return self.client

    def generate(self, prompt: str) -> str:
        client = self.load_model()

        res = client.models.generate_content(
            model=self.model_name,
            contents=prompt,
            config=self.config,
        )

        return res.text

    async def a_generate(self, prompt: str) -> str:
        client = self.load_model()

        res = await client.aio.models.generate_content(
            model=self.model_name,
            contents=prompt,
            config=self.config,
        )

        return res.text

    def get_model_name(self):
        return self.model_name


class GeminiLLMManager(GeminiLLM):
    def __init__(
        self,
        api_keys: List[str],
        model_name: str,
        config: GenerateContentConfig = None,
    ):

        self.clients = self.init_clients(api_keys)
        self.model_name = model_name
        self.config = config

    def init_clients(self, api_keys) -> List:
        clients = []
        for api_key in api_keys:
            client = genai.Client(api_key=api_key)
            try:

                client.models.get(model=MODEL_ID)
                clients.append(
                    {
                        "client": client,
                        "time": time.time(),
                        "rpm": 15,
                        "rate": 0,
                    }
                )
            except Exception as e:
                print(f"Invalid API key ...{api_key[-4:]}. Reason: {e}.\nSkipping...")

        print(f"Finished initializing with {len(clients)}/{len(api_keys)} client(s).")

        return clients

    def load_model(self):
        random.shuffle(self.clients)
        while True:
            min_wait_time = 60
            now = time.time()
            for client in self.clients:
                time_since_last_minute = now - client["time"]
                wait_time = 60 - time_since_last_minute
                min_wait_time = min(min_wait_time, wait_time)
                if time_since_last_minute > 60:
                    client["time"] = now
                    client["rate"] = 0
                    return client

                if time_since_last_minute <= 60 and client["rate"] < client["rpm"]:
                    return client

            print(f"Out of RPM, waiting {min_wait_time:.2f}s")
            for _ in tqdm(range(int(min_wait_time + 1))):
                time.sleep(1)

    async def a_load_model(self):
        random.shuffle(self.clients)
        min_wait_time = 60
        while True:
            now = time.time()
            for client in self.clients:
                time_since_last_minute = now - client["time"]
                wait_time = 60 - time_since_last_minute
                min_wait_time = min(min_wait_time, wait_time)
                if time_since_last_minute > 60:
                    client["time"] = now
                    client["rate"] = 0
                    return client

                if time_since_last_minute <= 60 and client["rate"] < client["rpm"]:
                    return client

            print(f"Out of RPM, waiting {min_wait_time:.2f}s")
            await asyncio.sleep(min_wait_time)

    # def generate(self, prompt: str) -> str:
    #     client = self.load_model()
    #     res = client["client"].models.generate_content(
    #         model=self.model_name,
    #         contents=prompt,
    #         config=self.config,
    #     )
    #     client["rate"] += 1
    #     return res.text

    def generate(self, prompt: str) -> str:
        retry_count = 100
        retry_delay_max = 60
        fibo_delays = [1, 1, 2, 3, 5, 8, 13, 21, 34, 55]

        e = None
        for t in range(retry_count):
            try:
                client = self.load_model()
                res = client["client"].models.generate_content(
                    model=self.model_name,
                    contents=prompt,
                    config=self.config,
                )
                client["rate"] += 1
                return res.text

            except Exception as e:
                if isinstance(e, errors.ClientError):
                    if e.code == 429:
                        client["rate"] = 15

                client["rate"] += 1
                print(f"Failed to call gemini API. Reason: {e}.")
                print(f"Retry {t+1}/{retry_count}...")

                retry_idx = min(len(fibo_delays) - 1, t)
                retry_delay = min(fibo_delays[retry_idx], retry_delay_max)

                time.sleep(retry_delay)

        print(f"Failed to generate. Skipping...")

        raise TimeoutError(
            f"Failed to generate content after {retry_count} retries. Last exception: {e}"
        )

    # async def a_generate(self, prompt: str) -> str:
    #     client = self.load_model()
    #     res = await client["client"].aio.models.generate_content(
    #         model=self.model_name,
    #         contents=prompt,
    #         config=self.config,
    #     )
    #     client["rate"] += 1
    #     return res.text

    async def a_generate(self, prompt: str) -> str:
        retry_count = 100
        retry_delay = 0.2

        e = None
        for t in range(retry_count):
            try:
                client = await self.a_load_model()
                res = await client["client"].aio.models.generate_content(
                    model=self.model_name,
                    contents=prompt,
                    config=self.config,
                )
                client["rate"] += 1
                return res.text

            except Exception as ex:
                e = ex
                print(f"Failed to call gemini API. Reason: {e}.")
                print(f"Retry {t+1}/{retry_count}...")
                await asyncio.sleep(retry_delay)

        print(f"Failed to generate. Skipping...")

        raise TimeoutError(
            f"Failed to generate content after {retry_count} retries. Last exception: {e}"
        )


eval_llm = GeminiLLM(
    api_key=API_KEY,
    model_name=MODEL_ID,
    config=get_evaluator_config(),
)


gemini_llm = GeminiLLM(
    api_key=API_KEY,
    model_name=MODEL_ID,
    config=get_chat_config(),
)


eval_llm_manager = GeminiLLMManager(
    api_keys=API_KEYS,
    model_name=MODEL_ID,
    config=get_evaluator_config(),
)


gemini_llm_manager = GeminiLLMManager(
    api_keys=API_KEYS,
    model_name=MODEL_ID,
    config=get_chat_config(),
)

Finished initializing with 4/4 client(s).
Finished initializing with 4/4 client(s).


In [17]:
def parse_metric(test_info: Dict, eval_llm=None):
    return GEval(
        name=test_info["title"],
        criteria=test_info["criteria"],
        evaluation_params=[LLMTestCaseParams.ACTUAL_OUTPUT],
        model=eval_llm,
        async_mode=False,
    )


def build_test_cases_info(
    json_filepath: str,
    llm: GeminiLLM = None,
    retry: float = 3,
    retry_wait: float = 0.5,
    wait_time: int = 60,
    rate_limit: int = 15,
    overwrite: bool = False,
):
    with open(json_filepath, "r", encoding="utf8") as fhandle:
        contents = json.load(fhandle)

    request_count = 0
    for i, item in enumerate(contents):
        item: Dict
        if isinstance(item["criteria"], list):
            item["criteria"] = " ".join(item["criteria"])

        if llm:
            for t in range(retry):

                if request_count >= rate_limit:
                    request_count = 0
                    print(f"rate limit hit, waiting {wait_time}s")
                    time.sleep(wait_time)

                actual_output = item.get("actual_output")
                if actual_output != None and not overwrite:
                    print(f"{i} already exists, continue")
                    break

                try:
                    output = llm.generate(item["question"])
                    item["actual_output"] = output
                    request_count += 1
                    print(f"{i} Successfully generated output.")
                    break
                except Exception as e:
                    item["actual_output"] = None
                    print(f"Failed to generate output for index ({i}). Reason: {e}")
                    print(f"Retry {t+1}/{retry} ...")
                time.sleep(retry_wait)
    print("Done!!!")

    return contents

def load_test_case_infos(json_path:str):
    with open(json_path, "r", encoding="utf8") as fhandle:
        content = json.load(fhandle)
    return content

In [None]:
test_infos = build_test_cases_info("test1.json", gemini_llm)
with open("test_suite_1.json", "w", encoding="utf8") as fhandle:
    json.dump(test_infos, fhandle, ensure_ascii=False, indent=4)

In [18]:
test_infos = build_test_cases_info("test2.json", gemini_llm)
with open("test_suite_2.json", "w", encoding="utf8") as fhandle:
    json.dump(test_infos, fhandle, ensure_ascii=False, indent=4)

0 Successfully generated output.
1 Successfully generated output.
2 Successfully generated output.
3 Successfully generated output.
4 Successfully generated output.
5 Successfully generated output.
6 Successfully generated output.
7 Successfully generated output.
8 Successfully generated output.
9 Successfully generated output.
10 Successfully generated output.
11 Successfully generated output.
12 Successfully generated output.
13 Successfully generated output.
14 Successfully generated output.
rate limit hit, waiting 60s
15 Successfully generated output.
16 Successfully generated output.
17 Successfully generated output.
18 Successfully generated output.
19 Successfully generated output.
20 Successfully generated output.
21 Successfully generated output.
22 Successfully generated output.
23 Successfully generated output.
24 Successfully generated output.
25 Successfully generated output.
26 Successfully generated output.
27 Successfully generated output.
28 Successfully generated outp

In [None]:
test_infos = build_test_cases_info("test3.json", gemini_llm)
with open("test_suite_3.json", "w", encoding="utf8") as fhandle:
    json.dump(test_infos, fhandle, ensure_ascii=False, indent=4)

In [4]:
def deepeval_test_case(
    test_info: Dict,
    eval_llm: GeminiLLM,
    llm: GeminiLLM = None,
) -> Dict:

    actual_output = test_info["actual_output"]
    if llm:
        actual_output = llm.generate(test_info["question"])

    test_case = LLMTestCase(input=test_info["question"], actual_output=actual_output)
    metric = parse_metric(test_info, eval_llm)

    result = {
        "title": test_info["title"],
        "input": test_case.input,
        "output": test_case.actual_output,
        "criteria": metric.criteria,
        "score": None,
        "reason": None,
        "evaluation_steps": None,
        "evaluation_model": None,
    }

    try:
        score = metric.measure(test_case,False)
        result["score"] = score
        result["reason"] = metric.reason
        result["evaluation_steps"] = metric.evaluation_steps
        result["evaluation_model"] = metric.evaluation_model
    except Exception as e:
        print(f"Failed to eval test case: {test_info['title']}")
        print(e)

    return result


def deepeval_test_cases(
    test_infos: List[Dict],
    eval_llm: GeminiLLM,
    llm: GeminiLLM = None,
):
    for test_info in test_infos:
        result = deepeval_test_case(test_info, eval_llm)
        yield result


def evaluate_tests(
    output_json_path: str,
    test_infos: List[Dict],
    eval_llm: GeminiLLM,
    overwrite: bool = False,
):
    # check if output_json_path already exist:
    # _test_infos = []
    # if os.path.exists(output_json_path) and not overwrite:
    #     with open(output_json_path, "r", encoding="utf8") as fhandle:
    #         outputs = json.load(fhandle)
    #     for output in outputs:
    #         if output.get("score") == None:
    #             _test_infos.append({
    #                 "title": output["title"],
    #                 "question": output["input"],
    #                 "criteria": output["criteria"],
    #                 "actual_output": output["output"],
    #             })
    # else:
    #     _test_infos = test_infos

    test_result_generator = deepeval_test_cases(test_infos, eval_llm)
    with open(output_json_path, "w", encoding="utf8") as fhandle:
        fhandle.write("[\n")

    first = True
    for result in test_result_generator:
        with open(output_json_path, "a", encoding="utf8") as fhandle:
            if not first:
                fhandle.write(",\n")
            json.dump(result, fhandle, ensure_ascii=False, indent=4)
            first = False

    with open(output_json_path, "a", encoding="utf8") as fhandle:
        fhandle.write("\n]")

In [19]:
test_infos = load_test_case_infos("test_suite_2.json")
evaluate_tests("test_output_2.json", test_infos, eval_llm_manager)

In [9]:
import pandas as pd

In [20]:
test_output_paths = [
    "test_output_1.json",
    "test_output_2.json",
    "test_output_3.json",
]

for test_output_path in test_output_paths:
    df = pd.read_json(test_output_path)
    out_path, _= os.path.splitext(test_output_path)
    out_path = f"{out_path}.csv"
    df.to_csv(out_path)