<a target="_blank" href="https://colab.research.google.com/github/UpstageAI/cookbook/blob/main/cookbooks/upstage/Solar-Full-Stack LLM-101/05_3_OracleDB.ipynb">
<img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

# EWHA bagging _ alpha

In [1]:
# set parameters

file = open("info/api.txt", "r")
api_key = file.read()
file.close()

file = open("info/datapath.txt", "r")
data_path = file.read()
file.close()

file = open("info/resultspath.txt", "r")
results_path = file.read()
file.close()

In [2]:
from langchain_upstage import UpstageEmbeddings
import time

# 쿼리 전용 임베딩 모델
query_embeddings = UpstageEmbeddings(api_key=api_key, model="solar-embedding-1-large-query")

# 문장 전용 임베딩 모델
passage_embeddings = UpstageEmbeddings(api_key=api_key, model="solar-embedding-1-large-passage")

In [3]:
# funcion to extract an answer from response

import re

def extract_answer(response):
    """
    extracts the answer from the response using a regular expression.
    expected format: "[ANSWER]: (A) convolutional networks"

    if there are any answers formatted like the format, it returns None.
    """
    pattern = r"\[ANSWER\]:\s*\((A|B|C|D|E)\)"  # Regular expression to capture the answer letter and text
    match = re.search(pattern, response)

    if match:
        return match.group(1) # Extract the letter inside parentheses (e.g., A)
    else:
        return extract_again(response)

def extract_again(response):
    pattern = r"\b[A-J]\b(?!.*\b[A-J]\b)"
    match = re.search(pattern, response)
    if match:
        return match.group(0)
    else:
        return None

## 1. build DB

In [4]:
from langchain_upstage import UpstageLayoutAnalysisLoader
import os
import numpy as np


UPSTAGE_API_KEY = api_key

# .npy 파일 로드 (타입==넘파이)
textbookDB = np.load(data_path+f'embedding/full_philosophy_textbook.npy')
textbookDB = textbookDB.tolist()

textbookDB_embed = np.load(data_path+f'embedding/full_philosophy_textbook_embed.npy')
textbookDB_embed = textbookDB_embed.tolist()

## 3. test set 갖고오기

In [5]:
# read samples.csv file
import pandas as pd

def read_data(data_path):
    data = pd.read_csv(data_path)
    prompts = data['prompts']
    answers = data['answers']
    # returns two lists: prompts and answers
    return prompts, answers

In [6]:
prompts, answers = read_data(os.path.join(data_path, 'mmlupro_test_philosophy.csv'))
testdata = pd.read_csv(data_path+'mmlupro_test_philosophy.csv')

In [7]:
nowtest = pd.DataFrame(columns=['index', 'embed_ques', 'question', 'prompts', 'answers', 'top1', 'top2', 'top3', 'top1_1pred','top1_2pred','top1_3pred', 'top2pred', 'top3pred', 'predict' ])

for index, row in testdata.iterrows():
    #if index == 100 : break # 일단 실험할 땐 100개 단위로 끊어서 가져옴
    q = row.prompts
    a = row.answers
    question = q.partition('(A)')[0]
    question = question.partition(') ')[2]
    q = q.partition(') ')[2]
    try : 
        embedded_query = query_embeddings.embed_query(question) # 질문만 받아와서 embedding 하기
        nowtest.loc[len(nowtest)] = {'index':index, 'embed_ques' : embedded_query, 'question' : question, 'prompts' : q, 'answers' : a}

    except :
        print(f'pass: {index}')
        continue 


In [None]:
nowtest

## 4. Prompt engineering

In [None]:
import numpy as np


for idx, row in nowtest.iterrows() : # 질문 받아오기 

    embed_ques= row.embed_ques

    # 유사도 기준 내림차순 정렬
    sorted_idx = (np.array(embed_ques) @ np.array(textbookDB_embed).T).argsort()[::-1]

    nowtest.loc[idx, 'top1'] = textbookDB[sorted_idx[0]]
    nowtest.loc[idx, 'top2'] = textbookDB[sorted_idx[1]]
    nowtest.loc[idx, 'top3'] = textbookDB[sorted_idx[2]]


In [10]:
try : del [[bagging_pred]]
except : pass
bagging_pred = pd.DataFrame(columns=['questionNum', 'answer', 'top1_1pred', 'top1_2pred', 'top1_3pred', 'top2pred', 'top3pred', 'predict'])

In [None]:
############# first PREDICTION ##########

from langchain_core.prompts import PromptTemplate
from langchain_upstage import ChatUpstage

 
llm = ChatUpstage(api_key = api_key)

prompt_template = PromptTemplate.from_template(
    '''
    
    Please provide most correct answer. Let's think step by step.
    
    When translating the answer, DO NOT exlain anything. And you must also include the choice number like :
    Answer : (Number) the answer choice
    ---
    
    Question: {question}

    Context: {context1}

    Answer :
    ---
        
    '''

)
ko_chain1 = prompt_template | llm

for idx, row in nowtest.iterrows() :
    #if idx == 100 : break
    max_retries = 3  # 최대 재시도 횟수
    retries = 0
    while retries < max_retries:
        try:
            response = ko_chain1.invoke({"question": row.prompts, "context1": row.top1}) # 선지 전까지 받아오기
            nowtest.loc[idx, 'top1pred'] = response.content

            generated_answer = extract_answer(response.content)
            bagging_pred.loc[len(bagging_pred)] = {'questionNum': row.question, 'answer': row.answers, 'top1_1pred': generated_answer}
            break

        except Exception as e:  # API 호출 에러
            retries += 1
            print(f"Error occurred: {e}. Retrying idx:{idx} - {retries}/{max_retries} after 10 seconds...")
            time.sleep(10)  # 10초 대기 후 재시도
            if retries == max_retries:
                print(f"Failed after {max_retries} retries. Skipping this context.")
    


for i in range(2,4) : 
    for idx, row in nowtest.iterrows() :
        #if idx == 100 : break
        max_retries = 3  # 최대 재시도 횟수
        retries = 0
        while retries < max_retries:
            try:
                response = ko_chain1.invoke({"question": row.prompts, "context1": row.top1}) # 선지 전까지 받아오기
                nowtest.loc[idx, 'top1pred'] = response.content

                generated_answer = extract_answer(response.content)
                bagging_pred.loc[idx, f'top1_{i}pred'] = generated_answer
                break

            except Exception as e:  # API 호출 에러
                retries += 1
                print(f"Error occurred: {e}. Retrying idx:{idx} - {retries}/{max_retries} after 10 seconds...")
                time.sleep(10)  # 10초 대기 후 재시도
                if retries == max_retries:
                    print(f"Failed after {max_retries} retries. Skipping this context.")
            
        


In [None]:
bagging_pred

In [None]:
############# second PREDICTION ##########

from langchain_core.prompts import PromptTemplate
from langchain_upstage import ChatUpstage
import time
 
llm = ChatUpstage(api_key = api_key)
    
prompt_template = PromptTemplate.from_template(
    '''

    Please provide most correct answer. Let's think step by step.
    
    When translating the answer, DO NOT exlain anything. And you must also include the choice number like :
    Answer : (Number) the answer choice
    ---
    
    Question: {question}

    Context: {context1} {context2}

    Answer :
    ---
    '''

)
ko_chain2 = prompt_template | llm

for idx, row in nowtest.iterrows() :
    #if idx == 100 : break
    max_retries = 3  # 최대 재시도 횟수
    retries = 0
    while retries < max_retries:
        try:
            response = ko_chain2.invoke({"question": row.prompts, "context1": row.top1, "context2":row.top2})
            nowtest.loc[idx, 'top2pred'] = response.content

            generated_answer = extract_answer(response.content)
            bagging_pred.loc[idx, 'top2pred'] = generated_answer
            break

        except Exception as e:  # API 호출 에러
            retries += 1
            print(f"Error occurred: {e}. Retrying idx:{idx} - {retries}/{max_retries} after 10 seconds...")
            time.sleep(10)  # 10초 대기 후 재시도
            if retries == max_retries:
                print(f"Failed after {max_retries} retries. Skipping this context.")

    


In [None]:
############# third PREDICTION ##########

from langchain_core.prompts import PromptTemplate
from langchain_upstage import ChatUpstage

 
llm = ChatUpstage(api_key = api_key)

prompt_template = PromptTemplate.from_template(
    '''
    Please provide most correct answer. Let's think step by step.
    
    When translating the answer, DO NOT exlain anything. And you must also include the choice number like :
    Answer : (Number) the answer choice
    ---
    
    Question: {question}

    Context: {context1} {context2} {context3}

    Answer :
    ---
        
    '''

)
ko_chain3 = prompt_template | llm

for idx, row in nowtest.iterrows() :
    #if idx == 100 : break
    max_retries = 3  # 최대 재시도 횟수
    retries = 0
    while retries < max_retries:
        try:
            response = ko_chain3.invoke({"question": row.prompts, "context1": row.top1, "context2":row.top2, "context3":row.top3}) # 선지 전까지 받아오기
            nowtest.loc[idx, 'top3pred'] = response.content

            generated_answer = extract_answer(response.content)
            bagging_pred.loc[idx, 'top3pred'] = generated_answer
            break

        except Exception as e:  # API 호출 에러
            retries += 1
            print(f"Error occurred: {e}. Retrying {retries}/{max_retries} after 10 seconds...")
            time.sleep(10)  # 10초 대기 후 재시도
            if retries == max_retries:
                print(f"Failed after {max_retries} retries. Skipping this context.")



In [None]:
bagging_pred

# bagging

In [None]:
from collections import Counter

final_pred = []
for idx, row in bagging_pred.iterrows() :

    pred = []
    pred.append(row.top1_1pred)
    pred.append(row.top1_2pred)
    pred.append(row.top1_3pred)
    pred.append(row.top2pred)
    pred.append(row.top3pred)

    counts = Counter(pred)

    prediction = counts.most_common(1)[0][0]
    if prediction == None :
        try : prediction = counts.most_common(2)[1][0]
        except : pass
    if prediction == None : prediction = 'A'
    final_pred.append(prediction)
    bagging_pred.loc[idx, 'predict'] = prediction

final_pred

In [None]:
######### 정답 확인 + wrong 뽑아내기 ######

# print accuracy

cnt = 0
wrong = []
for idx, (answer, response) in enumerate(zip(answers, final_pred)):
    print("-"*10)
    try : generated_answer = extract_answer(response)
    except : pass
    print(response)
    # check
    if generated_answer:
        print(f"idx: {idx} | generated answer: {generated_answer}, answer: {answer}")
    else:
        print("extraction fail")

    if generated_answer == None:
        wrong.append(idx+1)
        bagging_pred.loc[idx, 'iswrong'] = 'X'
        continue
    
    if generated_answer in answer:
        cnt += 1
    else : 
        wrong.append(idx+1)
        bagging_pred.loc[idx, 'iswrong'] = 'X'
        
acc = cnt/len(answers)*100
print(f"acc: {acc}%")
print()
print("wrong:", wrong)
bagging_pred.loc[len(bagging_pred), 'predict'] = acc

In [None]:
######### 탑 원으로만 확인  ######

# print accuracy
top1_pred = []
for idx, row in bagging_pred.iterrows() :
    top1 = row.top1_1pred
    if top1 == None : top1 = 'A'
    top1_pred.append(top1)

print(top1_pred)


cnt = 0
wrong = []
for idx, (answer, response) in enumerate(zip(answers, top1_pred)):
    print("-"*10)
    try : generated_answer = extract_answer(response)
    except : pass
    print(response)
    # check
    if generated_answer:
        print(f"idx: {idx} | generated answer: {generated_answer}, answer: {answer}")
    else:
        print("extraction fail")

    if generated_answer == None:
        wrong.append(idx+1)
        bagging_pred.loc[idx, 'iswrong'] = 'X'
        continue
    
    if generated_answer in answer:
        cnt += 1
    else : 
        wrong.append(idx+1)
        bagging_pred.loc[idx, 'iswrong'] = 'X'
        
acc = cnt/len(answer)*100
print(f"acc: {acc}%")
print()
print("wrong:", wrong)
bagging_pred.loc[len(bagging_pred), 'predict'] = acc

In [None]:
bagging_pred

In [20]:
#bagging_pred.to_csv(results_path+'ewha_bagging01.csv')

In [None]:
try : del[[wdf]]
except : pass
wdf = bagging_pred[bagging_pred['iswrong']=='X']
wdf.loc[len(bagging_pred), 'predict'] = acc

wdf