## Import Utilities

In [None]:
import google.generativeai as genai
import pandas as pd
import json
import time
import os
import sys
sys.path.append('..')  # Add the parent directory of LLM_Evaluations to the Python path
from Utils.llm_evaluation_utils import load_responses_df, \
                        check_and_store_response,    \
                        build_question_prompt,      \
                        QUESTION_SETS

api_key = os.environ.get('GOOGLE_API_KEY')
genai.configure(api_key=api_key)

safety_settings = [
    {
        'category': 'HARM_CATEGORY_DANGEROUS',
        'threshold': 'BLOCK_NONE',
    },
    {
        'category': 'HARM_CATEGORY_HARASSMENT',
        'threshold': 'BLOCK_NONE',
    },
    {
        'category': 'HARM_CATEGORY_HATE_SPEECH',
        'threshold': 'BLOCK_NONE',
    },
    {
        'category': 'HARM_CATEGORY_SEXUALLY_EXPLICIT',
        'threshold': 'BLOCK_NONE',
    },
    {
        'category': 'HARM_CATEGORY_DANGEROUS_CONTENT',
        'threshold': 'BLOCK_NONE',
    },
]

generation_config = {
  'temperature': 0.8,
  'top_p': 1,
  'top_k': 8,
  'max_output_tokens': 250,
}

model_name = 'gemini-1.0-pro-latest'
model = genai.GenerativeModel(model_name=model_name,
                              safety_settings=safety_settings,
                            #   system_instruction=system,          # only in 1.5-pro
                              generation_config=generation_config)
question_type = 'GS'
QUESTIONS = QUESTION_SETS[question_type]['QUESTIONS']

## **Gemini Evaluation**

### Defined Functions

In [None]:
def prepare_errors_dict(dir, model_name):
    '''Load and prepare a dictionary of IDs and question numbers with Blocked errors from a JSON file.'''
    if '1.0' in model_name:
        errors_file_name = 'ids_nums_with_Blocked_error-Gemini-1_0.json'
    elif '1.5' in model_name:
        errors_file_name = 'ids_nums_with_Blocked_error-Gemini-1_5.json'
        
    file_path = os.path.join(dir, errors_file_name)
    try:
        with open(file_path, 'r') as f:
            ids_questions_with_error = json.load(f)
        # Convert the lists to sets
        ids_questions_with_error = {key: set(value) for key, value in ids_questions_with_error.items()}
    
    except FileNotFoundError:
        ids_questions_with_error = {}
    
    return ids_questions_with_error


### Load Data

In [None]:
# transcripts_dir = '../../Getting_Transcripts'
# transcripts_file_name = 'merged_filtered_videos_transcripts.csv'
# responses_dir = '../LLMs_Responses'
# topics_to_include = ['Spina Bifida', 'Flat Feet', 'Cluster Headache', 'Trigger Finger', 'Pudendal Nerve']

transcripts_dir = '../../../ISA_Paper/Data'
transcripts_file_name = 'diabetes_videos_transcripts.csv'
responses_dir = '../../../ISA_Paper/Data/Results'
topics_to_include = ['Insulin Self-Administration']

prompt_type = 'GS_prompting'
topics = 'diabetes'
results_file_name = f'{model_name}-{topics}-{prompt_type}'

responses_df = load_responses_df(transcripts_dir, transcripts_file_name, responses_dir, results_file_name, question_type)

ids_questions_with_error = prepare_errors_dict('.', model_name)

print('responses_df shape:', responses_df.shape)
responses_df.head(2)

Filter to include selected topics only

In [None]:
if 'Topic' not in responses_df.columns:
    experts_file = '../../../Videos_and_DISCERN_data/filtered_experts_scores.csv'
    experts_df = pd.read_csv(experts_file)

    responses_df = responses_df.merge(experts_df[['Video ID', 'Topic']], on='Video ID', how='left')
    responses_df.insert(2, 'Topic', responses_df.pop('Topic'))
    responses_df = responses_df[responses_df['Topic'].isin(topics_to_include)]
    responses_df = responses_df.reset_index(drop=True)

print('responses_df shape:', responses_df.shape)
responses_df.head(2)

### Get Gemini API Responses

In [None]:
def generate_content_with_backoff(model, prompt, max_retries=3, base_delay=2):
    '''
    Calls model.generate_content() with exponential backoff on rate limit errors.
  
    Args:
        model: The model object used for content generation.
        prompt: The prompt string for content generation.
        max_retries: Maximum number of retries in case of rate limit errors.
        base_delay: Base delay (in seconds) for exponential backoff.
  
    Returns:
        The response object from model.generate_content() on successful generation,
        or None if all retries fail.
        Raise an error if error occured other than error code 429 
    '''
    for attempt in range(1, max_retries + 1):
        try:
            response = model.generate_content(prompt)
            if response.text:
                return response
        except Exception as error:
            if getattr(error, 'code', None) == 429:
                print(f'Rate limit exceeded. Attempt {attempt}/{max_retries}...')
                delay = base_delay * 2 ** (attempt - 1)  # Exponential backoff calculation
                time.sleep(delay)
            else:
                raise error # Raise the error for handling in the outer loop
            
    print(f'Failed to generate content after {max_retries} retries.')
    return None

In [None]:
print_response = False

# Calculate the delay based on your rate limit
requests_limit_per_minute = 15
base_delay = 60.0 / requests_limit_per_minute

def gemini_inference(responses_df, QUESTIONS, base_delay, print_response):
    for index, row in responses_df.iterrows():
        video_id = row['Video ID']
        transcript = row['Transcript']
        print(f'Started with video ID: {video_id} | Index: {index}')
        
        for question_num in range(1, len(QUESTIONS) + 1):
            column_name = f'Response_{question_num}'
            if row[column_name] == '':

                # Check if the question_num for the video_id has already encountered an error
                # if question_num in ids_questions_with_error.get(video_id, set()):
                #     # print(f'Skipping video ID: {video_id} | Index: {index} | Question: {question_num} due to previous error')
                #     continue
                
                prompt = build_question_prompt(transcript, question_num, question_type)

                try:
                    # response = model.generate_content(prompt)
                    response = generate_content_with_backoff(model, prompt, max_retries=3, base_delay=base_delay)
                    if response:
                        check_and_store_response(response.text, responses_df, video_id, question_num, 
                                                 print_response=print_response)
                    else:
                        return

                except Exception as e:
                    print(f'Error with video ID {video_id}, index {index}, Q{question_num}: {e}')
                    if 'candidate.safety_ratings' in str(e) or 'response.prompt_feedback' in str(e):    # BlockedPromptException
                        if video_id not in ids_questions_with_error:
                            ids_questions_with_error[video_id] = set()  # Initialize the set if it's the first occurrence of the video ID
                        ids_questions_with_error[video_id].add(question_num)
                        continue
                    
                time.sleep(base_delay)

gemini_inference(responses_df, QUESTIONS, base_delay, print_response)

### Explore Results

In [None]:
display_from_index = 0
index_of_q1 = responses_df.columns.get_loc("Q1")

responses_df.iloc[display_from_index:, index_of_q1:index_of_q1+15].head()

In [None]:
print('Number of errors: ', len(ids_questions_with_error))
ids_questions_with_error

In [None]:
columns_with_none = (responses_df.isna() | (responses_df == '')).sum()
columns_with_none

In [None]:
rows_with_none = responses_df[responses_df.isna().any(axis=1)]
rows_with_none

In [None]:
indices_with_problems = responses_df[responses_df['Problem'].apply(lambda x: len(x) > 0)].index.tolist()
print(indices_with_problems)

In [None]:
from IPython.display import display, HTML

if indices_with_problems:
    index_with_problem = 29
    responses_with_problem_list = list(responses_df.loc[index_with_problem, 'Problem'])
    print("List of questions with problem:", responses_with_problem_list)

    response_with_problem = responses_with_problem_list[0]
    text = responses_df.loc[index_with_problem, f'Response_{response_with_problem}']
    display(HTML("<div style='white-space: pre-wrap;'>{}</div>".format(text)))

In [None]:
# display the full responses for a specific transcript
index_to_display = 51
for question_num in range(1, 16):
    print(f'Q{question_num}:', responses_df.at[index_to_display,f'Response_{question_num}'])

### Store Errors to JSON File

In [None]:
def set_encoder(obj):
    if isinstance(obj, set):
        return list(obj)        # convert sets to lists
    raise TypeError('Object of type set is not JSON serializable')

# save ids and questions number that encountered errors
if '1.0' in model_name:
    errors_file_name = 'ids_nums_with_Blocked_error-Gemini-1_0.json'
elif '1.5' in model_name:
    errors_file_name = 'ids_nums_with_Blocked_error-Gemini-1_5.json'
with open(errors_file_name, 'w') as f:
    json.dump(ids_questions_with_error, f, default=set_encoder)

### Store Results in a CSV File

In [None]:
csv_output_file = os.path.join(responses_dir, f'{results_file_name}-response.csv')

responses_df.to_csv(csv_output_file, index=False, encoding='utf-8')