Script to annotate data via gpt api

In [None]:
%load_ext dotenv
%dotenv

# import api_key from .env
import os
print(os.environ.get("OPENAI_API_KEY"))

import random
from io import StringIO

import pandas as pd

from openai import OpenAI

In [9]:
client = OpenAI()

In [10]:
instruction = """You are a professional annotator tasked with categorizing social media posts.
Assign a binary code to each post - whether (code 1) or not (code 0) the post reflects the Values listed below:
Self-direction: Independent thought and action—choosing, creating, and exploring.
Stimulation: Excitement, novelty, and challenge in life.
Hedonism: Pleasure and sensuous gratification for oneself.
Achievement: Personal success through demonstrating competence according to social standards.
Power: Social status and prestige, wealth, control or dominance over people and resources.
Security: Safety, harmony, strong government, and stability of society, relationships, and self.
Conformity: The restraint of actions, inclinations, and impulses that are likely to upset or harm others and violate social expectations or norms.
Tradition: Respect, commitment, and acceptance of the customs and ideas that traditional culture or religion provides.
Benevolence: Preservation and enhancement of the welfare of people with whom one is in frequent personal contact.
Universalism: Understanding, appreciation, tolerance, and protection for the welfare of all people and of nature.
For example, the post "Для меня главное традиции и друзья" expresses Value Tradition and Value Benevolence.
For the following JSON entries of type {post_id:post_text}, return comma separated data frame with post_id as index, Value as columns and binary code in cell.
"""

instruction_end="""
Print comma separated data frame only without any explanation. Before printing, ensure that each post_id and each Value in the data frame has a corresponding binary code value. Data frame must have all post_id as index and all Values as columns. If errors are found, fix them."""


In [7]:
import pandas as pd

df=pd.read_csv('data-to-annotate', sep="|", encoding ='utf-8')
df.drop('Unnamed: 0', axis=1, inplace=True)
df.reset_index(inplace=True)
df= df.rename(columns={'level_0':"post_id"})


In [28]:
df['text'] = df['text'].str.replace('"', '', regex=True)

In [31]:
#calculate tokens number:

import tiktoken

def num_tokens_from_messages(messages, model):  #="gpt-3.5-turbo-0613"
    """Return the number of tokens used by a list of messages."""
    try:
        encoding = tiktoken.encoding_for_model(model)
    except KeyError:
        # print("Warning: model not found. Using cl100k_base encoding.")
        encoding = tiktoken.get_encoding("cl100k_base")
    if model in {
        "gpt-3.5-turbo-0613",
        "gpt-3.5-turbo-16k-0613",
        "gpt-4-0314",
        "gpt-4-32k-0314",
        "gpt-4-0613",
        "gpt-4-32k-0613",
        "gpt-4-turbo",
        "gpt-4-turbo-2024-04-09",
        "gpt-4o-2024-05-13",
        }:
        tokens_per_message = 3
        tokens_per_name = 1
    elif model == "gpt-3.5-turbo-0301":
        tokens_per_message = 4  # every message follows <|start|>{role/name}\n{content}<|end|>\n
        tokens_per_name = -1  # if there's a name, the role is omitted
    elif "gpt-3.5-turbo" in model:
        print("Warning: gpt-3.5-turbo may update over time. Returning num tokens assuming gpt-3.5-turbo-0613.")
        return num_tokens_from_messages(messages, model="gpt-3.5-turbo-0613")
    elif "gpt-4o" in model:
        # print(
        #     "Warning: gpt-4o may update over time. Returning num tokens assuming gpt-4o-2024-05-13.")
        return num_tokens_from_messages(messages, model="gpt-4o-2024-05-13")
    elif "gpt-4" in model:
        print("Warning: gpt-4 may update over time. Returning num tokens assuming gpt-4-0613.")
        return num_tokens_from_messages(messages, model="gpt-4-0613")
    else:
        raise NotImplementedError(
            f"""num_tokens_from_messages() is not implemented for model {model}. See https://github.com/openai/openai-python/blob/main/chatml.md for information on how messages are converted to tokens."""
        )
    num_tokens = 0
    for message in messages:
        num_tokens += tokens_per_message
        for key, value in message.items():
            num_tokens += len(encoding.encode(value))
            if key == "name":
                num_tokens += tokens_per_name
    num_tokens += 3  # every reply is primed with <|start|>assistant<|message|>
    return num_tokens

In [32]:
def SelectRandomExcept(df, idx_done, k):
    """Return list of idx, randomized. Max len = k"""
    from_which_list_to_select=set(range(0, df.shape[0]))-set(idx_done)
    k = min(k,len(from_which_list_to_select))
    random_chunk=random.sample(list(from_which_list_to_select), k)
    return [i for i in random_chunk if not i in idx_done]

In [33]:
# returns a string with posts shuffled in random order
def create_post_list_inline(instruction, instruction_end, post_dict100, model):  
    instruction_num_token=num_tokens_from_messages(messages = [{"role": "user", "content": instruction}], model=model) 
    max_input_tokens=4096   #1000 - for 3.5
    post_num_token=0
    dict_full={}
    i=0
    key_list=[]
    for key, post in post_dict100.items():
        post_dict={}
        post_dict[i]=post
        post_num_token=post_num_token+num_tokens_from_messages(messages = [{"role": "user", "content": str(post_dict)+instruction_end}], model=model)
        
        if instruction_num_token+post_num_token<max_input_tokens:
                dict_full.update(post_dict)
                key_list.append(key)
        if (instruction_num_token+post_num_token>max_input_tokens)&(i==0):                
                dict_full.update(post_dict)
                key_list.append(key)
        if (instruction_num_token+post_num_token>max_input_tokens)&(i>0):
            break
        i+=1
   
    return dict_full, key_list
        

In [202]:
TOKENS_OUTPUT_POST = 30
TOKENS_OUTPUT_HEADER = 100

def AreThereValuesInThePost(instruction, instruction_end, post_list_inline, model="gpt-4o", n=1, post_num:int=None):
    list_labels=[]
    prompt=instruction+str(post_list_inline)+instruction_end
    messages = [{"role": "user", "content": prompt}]

    if post_num is None:
        max_tokens = 1000  #1000
    else:
        max_tokens = TOKENS_OUTPUT_HEADER
        max_tokens += TOKENS_OUTPUT_POST*post_num
    
    response = client.chat.completions.create(
                            model=model,  #"gpt-4o", 
                            messages=messages,
                            max_tokens=1000,  #1000, 
                            n=n,
                            temperature=0.1
                        )
    for i in range(n):
        list_labels.append(response.choices[i].message.content)
    # list_labels = response.choices[0].message.content.strip().split('\n')    
    return list_labels

PLAINTEXT_PREFIX="```plaintext\n"
PREFIX="```\n"
POSTFIX="```"
COLUMNS_ROW = "post_id,Self-direction,Stimulation,Hedonism,Achievement,Power,Security,Conformity,Tradition,Benevolence,Universalism"
columns_ref = set(COLUMNS_ROW.split(",")) 
# {'Achievement',
#  'Benevolence',
#  'Conformity',
#  'Hedonism',
#  'Power',
#  'Security',
#  'Self-direction',
#  'Stimulation',
#  'Tradition',
#  'Universalism',
#  'post_id'}

def PostProcText(t: str, post_num: int, remove_plaintext: bool = True, check_columns: bool = True) -> str:
    if remove_plaintext and t.startswith(PLAINTEXT_PREFIX) and t.endswith(POSTFIX) and (len(t) > len(PLAINTEXT_PREFIX + POSTFIX)):
        t = t[len(PLAINTEXT_PREFIX):-len(POSTFIX)]
    if remove_plaintext and t.startswith(PREFIX) and t.endswith(POSTFIX) and (len(t) > len(PREFIX + POSTFIX)):
        t = t[len(PREFIX):-len(POSTFIX)]
    if check_columns:
        if t.startswith(COLUMNS_ROW+"\n"):
            return t
        # else:
        # fix columns
        rows = t.split("\n")
        # any col name in 1st row
        if sum(1 for col in columns_ref if col in rows[0]):
            # rm 1sr row
            rows.pop(0)
        # and fix with default one
        rows.insert(0,COLUMNS_ROW)
        return "\n".join(rows)
    return t
            

## RUN labeling with GPT

In [10]:
model = "gpt-4"  #"gpt-3.5-turbo-0125"  "gpt-4o"
model_choices_n=1 # number of chat_gpt iterations
model_try_n=5  # !!! how many times each post should be lableled with model
chunk_rnd_size=100 #50 - for 3.5 # number of rnd numbers generated at the moment

errors_num = 0

model_try_df_responses = {}
not_parsed_responses = []
for i in range(1, model_try_n+1):
    df_chunks = [] # list of df-s with parsed results for chunk of posts
    idx_done=[]  # номера строк, которые уже выпадали в рандомайзере
    while (len(idx_done)<test1.shape[0]) and (errors_num < 5):
        chunk_idx=SelectRandomExcept(test1, idx_done, chunk_rnd_size)
        chunk_dict=dict(zip(chunk_idx, test1.iloc[chunk_idx].text.to_list()))
        posts_chunk_inline, chunk_idx_list=create_post_list_inline(instruction, 
                                                                   instruction_end, 
                                                                   chunk_dict, 
                                                                   model)

        response_check_status = True
        try:
            print(chunk_idx_list)
            print(posts_chunk_inline)
            model_responses=AreThereValuesInThePost(instruction, 
                                                    instruction_end, 
                                                    posts_chunk_inline, 
                                                    model, 
                                                    model_choices_n,
                                                    len(chunk_idx_list),
                                                   )
            print(model_responses)
        except:
            print('Error while promt model')
            print(posts_chunk_inline)
            response_check_status = False
        try:
            if response_check_status:
                df_chunk_response = pd.read_csv(StringIO(PostProcText(model_responses[0], len(chunk_idx_list))), sep=',')
        except:
            response_check_status = False
            not_parsed_responses.append(model_responses[0])
        if response_check_status:
            response_check_status = columns_ref.issubset(set(df_chunk_response.columns))
        try:
            df_chunk_response.index = chunk_idx_list
            if 'post_id' in df_chunk_response.columns:
                df_chunk_response=df_chunk_response.drop(['post_id'],axis=1)
            elif '0' in df_chunk_response.columns:
                df_chunk_response=df_chunk_response.drop(['post_id'],axis=1)                   
        except:
            response_check_status = False
        
        if response_check_status:
            df_chunk_response = df_chunk_response.dropna(axis=0)
            response_check_status = (df_chunk_response.empty == False)
        if not response_check_status:
            errors_num += 1
            print('Error to parse GPT model response as CSV')            
        else:
            errors_num = 0
            df_chunks.append(df_chunk_response)
            print(df_chunk_response)
            idx_done=idx_done+list(df_chunk_response.index)
            
    if len(df_chunks):        
        model_try_df_responses[i]=pd.concat(df_chunks,axis=0).sort_index()
# create multiIndex df for Values columns
if len(model_try_df_responses):
    df_responses =  pd.concat(model_try_df_responses.values(), keys=model_try_df_responses.keys(), names=['try'], axis=1)
else:
    df_responses = None

In [None]:
df_concat=pd.concat([test1, pd.concat([df_responses[i] for i in range (1,model_try_n+1)]).astype(str).groupby(level=0).agg(','.join)], axis=1)
df_concat

In [212]:
res_folder=''
df_concat.to_csv(res_folder+'', sep='|', encoding='utf-8')