In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import openai
from dotenv import load_dotenv
import warnings
import os
from pydantic import BaseModel
import datetime
warnings.filterwarnings("ignore")

# Set OpenAI API key
openai.api_key = os.getenv("OPENAI_API_KEY")
# Define random number generator
rng = np.random.default_rng(0)

# Define OpenAI client
client = openai.OpenAI()

In [2]:
# Today date

today = datetime.date.today()
today.strftime("%Y-%m-%d")

'2024-02-23'

In [31]:
class GPT3Challenge():    
    def __init__(self):
        self.instruction = """
Sei un giudice indipendente in un concorso per determinare quale delle due risposte è più informativa.
Ti verranno presentate due risposte a una domanda del sondaggio e dovrai determinare quale delle due è più informativa.
Devi rispondere solo a un output in formato JSON. La chiave è 'risultato' e il valore è 'a', 'b' o 'p'.
Formato:
<json>{'risultato': 'a' o 'b' o 'p'}</json>      
            """
        self.template = """
Vorremmo un aiuto nell'analizzare le risposte date ad un sondaggio che è stato somministrato a circa un migliaio di persone. L'obiettivo è raccogliere le opinioni individuali circa il sistema pensionistico e le relative riforme. Questa domanda chiede ai rispondenti di esprimere qual è il momento che loro considerano essere giusto per andare in pensione. Il testo della domanda specifica che la risposta può essere espressa sia in termini di età anagrafica che di anni di contributi versati.

Il testo esatto della domanda è: “Secondo lei, quale dovrebbe essere l'età raggiunta la quale le persone devono poter andare in pensione? Oppure, in alternativa, il numero di anni di contributi, raggiunti i quali, le persone devono poter andare in pensione indipendentemente dall'età che hanno? Dica un numero e spieghi perché le sembra l'età giusta (o il numero di anni di contributi giusto).”

Ti daremo due risposte. Devi dirci quale è più informativo. Precisamente, dovrebbe restituire un JSON in cui il risultato è "a" quando l'opzione a è più informativa, "b" quando l'opzione b è più informativa, o "p" se le due risposte sono informative con un livello pari.

Nota bene: la valutazione che ti chiediamo non è legata al numero di parole o frasi contenute nella risposta, ma alla quantità e la complessità delle informazioni contenute. Ad esempio, la ripetizione di alcune parole e concetti, l'utilizzo di intercalari e l'uso di frasi complesse non implica necessariamente che la risposta contenga più informazioni.

Può bensì capitare che tra due risposte di diversa lunghezza, la più lunga racchiuda meno informazioni paragonata alla seconda risposta che, sebbene più breve, sia densa di informazioni.

Quindi, il livello di informazioni contenuti in una risposta è indipendente dalla lunghezza.

Nota anche che le risposte vanno valutate in relazione al testo della domanda. Una risposta può essere una combinazione di parole con scarso significato se considerate in senso assoluto. Le stesse parole possono assumere significato se messe in relazione al contenuto della domanda che è stata posta.

Di seguito sono riportate le due risposte per l'analisi:
```
a) {answer_1}
b) {answer_2}
```
"""
    
    def game(self,answer_1, answer_2):
        message = []
        message.append({"role": "system", "content": self.instruction})
        message.append({"role": "user", "content": self.template.format(answer_1 = answer_1, answer_2 = answer_2)})
        try: 
            response = client.chat.completions.create(
                model="gpt-3.5-turbo-1106",
                response_format={ "type": "json_object" },
                messages=message,
                temperature = 0,
                seed = 0,
            )                     
            arguments = response.choices[0].message.content
            arguments = arguments.replace("\n","")
            result = eval(arguments).get("risultato")
            if not result:
                return 99
            if result == "a":
                return 1
            elif result == "b":
                return 0
            elif result == "p":
                return 2
            else:             
                return 99
        except:
            return 99

In [4]:
class GamesDFModel(BaseModel):
    IDU_incumbent: str
    response_incumbent: str
    audio_incumbent: str
    IDU_challenger: str
    response_challenger: str
    audio_challenger: str
    game_number: int

class HistoryDFModel(BaseModel):
    IDU_incumbent: str
    response_incumbent: str
    audio_incumbent: str
    IDU_challenger: str
    response_challenger: str
    audio_challenger: str
    game_number: int
    result: int

def validate_dataframe(df: pd.DataFrame, model: BaseModel):
    model_fields = set(model.model_fields.keys())
    df_columns = set(df.columns)
    if not model_fields.issubset(df_columns):
        missing_columns = model_fields - df_columns
        raise ValueError(f"The DataFrame is missing the following columns required by the model: {missing_columns}")

In [32]:
class GameSimulator:
    def __init__(self, df, df_games= None, df_history=None):
        self.df = df
        self.response_dict = {IDU: self.df[self.df['IDU'] == IDU]['d3_'].values[0] for IDU in self.df['IDU'].unique()}
        self.count = 0
        
        if df_games is None:
            self.df_games = pd.DataFrame()  
    
        if df_history is None:
            self.df_history = pd.DataFrame()  
                    
        if isinstance(df_games, pd.DataFrame):
            validate_dataframe(df_games, GamesDFModel)
            self.df_games = df_games
            
        if isinstance(df_history,pd.DataFrame):
            validate_dataframe(df_history, HistoryDFModel)
            self.df_history = df_history
            self.history = self.__create_dict_history()
        else:
            self.history = {IDU: [[],[]] for IDU in self.df['IDU'].unique()}
            
        self.GPT3 = GPT3Challenge()
    
    def __create_gamesdf(self, n_games): 
        # Initialize lists to store results
        rng = np.random.default_rng(0) 
        records = []

        for incumbent_IDU in self.df['IDU'].unique():
            incumbents = self.df[self.df['IDU'] == incumbent_IDU]
            challengers = self.df[self.df['IDU'] != incumbent_IDU]
            
            # Sample challengers
            num_challengers = min(n_games, len(challengers))
            sampled_challengers = challengers.sample(n=num_challengers, replace=False, random_state=rng).reset_index(drop=True)
            
            for _, incumbent_row in incumbents.iterrows():
                for game_num in range(num_challengers):
                    challenger_row = sampled_challengers.iloc[game_num]
                    
                    # Create a record for each game
                    record = {
                        'IDU_incumbent': incumbent_IDU,
                        'response_incumbent': incumbent_row['d3_'],
                        'audio_incumbent': incumbent_row['audio'],
                        'IDU_challenger': challenger_row['IDU'],
                        'response_challenger': challenger_row['d3_'],
                        'audio_challenger': challenger_row['audio'],
                        'game_number': game_num
                    }
                    records.append(record)

        game_df = pd.DataFrame(records)
        
        game_df.sort_values(by=['game_number', 'IDU_incumbent', 'IDU_challenger'], inplace=True)
        
        return game_df

    def __create_dict_history(self):
        history = {
            IDU: [self.df_history[self.df_history['IDU_incumbent']==IDU]["IDU_challenger"].to_list(),
                    self.df_history[self.df_history['IDU_incumbent']==IDU]["result"].to_list()]
            for IDU in self.df_history['IDU_incumbent'].unique()
        }
        return history
    
    
    def __save_game(self, path):
        self.df_games.to_csv(path, index=False)
    
    
    def __check_history_result(self, IDU, c):            
        if c in self.history[IDU][0]:
            index = self.history[IDU][0].index(c)
            last_result = self.history[IDU][1][index]
            return last_result
        else:
            result = self.__play_game(IDU, c)    
            self.history[IDU][0].append(c)
            self.history[IDU][1].append(result)
            self.count += 1
            return result
    
    def __play_game(self, IDU, c):
        answer_1 = self.response_dict[IDU]
        answer_2 = self.response_dict[c]
        result = self.GPT3.game(answer_1, answer_2)
        return result
    
    def export_df_games(self, path):
        self.df_games.to_csv(path, index=False)
    
    def simulate_games(self, n_games=1, rounds=None):
        
        today_str = datetime.date.today().strftime("%Y-%m-%d")

        if not self.df_games.empty:
            print("n_games already defined")
            n_games = self.df_games['game_number'].nunique()
        else:
            self.df_games = self.__create_gamesdf(n_games)
             
        number_rounds = 0
        for i in range(n_games):
            
            incubents = self.df_games[self.df_games["game_number"]==i]['IDU_incumbent'].values
            opponents = self.df_games[self.df_games["game_number"]==i]['IDU_challenger'].values
            game_result = [self.__check_history_result(IDU,c) for IDU,c in zip(incubents,opponents)]
            indices = self.df_games[self.df_games['game_number'] == i].index
            if len(indices) == len(game_result):
                self.df_games.loc[indices, 'result'] = game_result
                self.__save_game(path=f"data/game_history_{i}_{today_str}.csv")
            else:
                print(f"Error: The number of results ({len(game_result)}) does not match the expected number ({len(indices)}) for game {i}.")
            number_rounds += 1
            if rounds:
                if number_rounds >= rounds:
                    break

In [6]:
df_survey = pd.read_csv(r"data\pensioni_cleantranscripts.csv")
df_survey = df_survey.dropna(subset=['d3_'])

In [7]:
game_df = pd.read_csv("game_df.csv")

In [183]:
simulator_1 = GameSimulator(df=df_survey, df_games=game_df)

In [184]:
simulator_1.simulate_games()

n_games already defined


In [64]:
history_df = pd.read_csv(r"data\game_history_16_2024-02-24.csv")
history_df = history_df.dropna(subset=['result'])

In [65]:
simulator_2 = GameSimulator(df=df_survey, df_history=history_df, df_games=game_df)

In [66]:
simulator_2.simulate_games(rounds=26)

n_games already defined


In [67]:
simulator_2.count

9585

In [27]:
simulator_2.count

6390