In [None]:
import matplotlib.pyplot as plt
import jax.random as jr
import optax

import jax.numpy as np
import cma
import pandas as pd
from memo import memo
import jax
import jax.numpy as np
from enum import IntEnum, auto
# read in files
# Read UK_df.csv as pandas dataframe
original_UK_dialogue = pd.read_csv('UK_df.csv')
original_UK_politeness = pd.read_csv('UK_direct_df.csv')
original_UK_narrator = pd.read_csv('UK_narrator_df.csv')
original_US_dialogue = pd.read_csv('US_df.csv')
original_US_politeness = pd.read_csv('US_direct_df.csv')
original_US_narrator = pd.read_csv('US_narrator_df.csv')
dataframes = [original_UK_dialogue, original_UK_politeness, original_UK_narrator, original_US_dialogue, original_US_politeness, original_US_narrator]
def elim_outliers(df):
    # dropped Unnamed: 0 column
    df.drop(columns=['Unnamed: 0'], inplace=True)
    filtered_df = df.loc[(df['response'] > 95) | (df['response'] < 5)]
    for id in df['person_id'].unique():
        if len(filtered_df[filtered_df['person_id'] == id])/len(df[df['person_id'] == id])>0.8:
            df.drop(df[df['person_id'] == id].index, inplace=True)
    df['predicate Z-score'] = df.groupby(['person_id','predicate'])['response'].transform(lambda x: (x - x.mean()) / x.std())
    # if has_intensifier = no then change 'intensifier' to 'none'
    df.loc[df['has intensifier?'] == 'no', 'intensifier'] = 'none'
    return df
for i in range(len(dataframes)):
    dataframes[i] = elim_outliers(dataframes[i])
dialogue = pd.concat([dataframes[0], dataframes[3]])
politeness = pd.concat([dataframes[1], dataframes[4]])
UK_dialogue = dataframes[0]
US_dialogue = dataframes[3]
UK_politeness = dataframes[1]
US_politeness = dataframes[4]

# end of reading in data
#-------------------------------------------------------------------------------

# compute U_soc (social Utility)
U_soc_data = politeness.groupby(['intensifier','predicate'])['predicate Z-score'].mean().to_dict()
UK_U_soc_data = UK_politeness.groupby(['intensifier','predicate'])['predicate Z-score'].mean().to_dict()
US_U_soc_data = US_politeness.groupby(['intensifier','predicate'])['predicate Z-score'].mean().to_dict()
class W(IntEnum):  # utterance space
    # intensifiers
    none = auto(0)
    slightly= auto()
    kind_of = auto()
    quite = auto()
    very= auto()
    extremely= auto()

class P(IntEnum):
    # predicates
    boring = auto(0)
    concerned = auto()
    difficult = auto()
    exhausted = auto()
    helpful = auto()
    impressive = auto()
    understandable = auto()
epsilon = 0.01
infty = 10000000
utterences =list(U_soc_data.keys())
easy_S = np.arange(-2.8,2.8,0.1)

# Create a list of JAX arrays
UK_measured_values = []
for p in P:
    for w in W:
        intensifier = w.name.replace('_'," ")
        predicate = p.name
        raw_values = UK_dialogue[((UK_dialogue['intensifier'] == intensifier) & (UK_dialogue['predicate'] == predicate))]['predicate Z-score'].values
        # measured_values.append(np.array([int(r/0.28)+10 for r in raw_values]))
        z = [int(r*10)+28 for r in raw_values]
        x = [0]*len(easy_S)
        for i in z:
           x[i] += 1
        UK_measured_values.append(x)
UK_measured_values = np.array(UK_measured_values)

@jax.jit
def state_prior(s):
    return np.exp(-s**2/2) # we assume s is roughly a gaussian distribution

@jax.jit
def UK_U_soc(intensifier,predicate):
    arr = np.array([
        [UK_U_soc_data[(w.name.replace('_'," "),p.name)] for p in P] 
        for w in W
    ])
    return arr[intensifier,predicate]

@jax.jit
def is_costly(w):
    arr = [0, 1, 1, 1, 1, 1]
    return np.array(arr)[w]

In [None]:
@jax.jit
def up_down_L(w, s,t0,t1,t2,t3,t4,t5,v0,v1, v2, v3, v4, v5):  # literal likelihood L(w | s)
    low_t = np.array([t0,t1,t2,t3,t4,t5])[w]
    high_t = np.array([v0,v1, v2, v3, v4, v5])[w]  # Variance parameters for each intensifier
    s1 = jax.nn.sigmoid(((s - low_t) * 20).astype(float))
    s2 = jax.nn.sigmoid(((high_t - s) * 20).astype(float))
    return s1 * s2
@memo
def up_down_UKL1[s: easy_S, w: W](inf_term, soc_term, cost,t0,t1,t2,t3,t4,t5,v0,v1, v2, v3, v4, v5,p):
    listener: thinks[
        speaker: given(s in easy_S, wpp=state_prior(s)),
        speaker: chooses(w in W, wpp=
            imagine[
                listener: knows(w),
                listener: chooses(s in easy_S, wpp=up_down_L(w, s,t0,t1,t2,t3,t4,t5,v0, v1, v2, v3, v4, v5)), # L(w|s) = literal likelihood,
                exp(0.3*inf_term * log(Pr[listener.s == s]+0.0000001) +
                0.5*soc_term * UK_U_soc(w,p) - # U_soc = listener's EU
                10*cost*is_costly(w)) # U_inf = listener's surprisal       
            ]
        )
    ]
    listener: observes[speaker.w] is w
    listener: chooses(s in easy_S, wpp=Pr[speaker.s == s])
    return Pr[listener.s == s]
def up_down_UK_logloss(*params):
    thetas = params[:6]
    cost = params[6]
    inf_term = params[7]
    soc_term = params[8]
    var = params[9:15]  # Extract the variance parameters
    P_l1 = np.concatenate([up_down_UKL1(inf_term=inf_term, soc_term=soc_term, cost=cost, t0 = thetas[0],t1=thetas[1], t2= thetas[2], t3 = thetas[3], t4 = thetas[4], t5 = thetas[5],v0 = var[0], v1=var[1], v2= var[2], v3 = var[3], v4 = var[4], v5 = var[5], p=p) for p in P],axis = 1)
    return np.sum(np.log(P_l1)*UK_measured_values.T)
def up_down_wrapped_loss(x):
    result = up_down_UK_logloss(*tuple(x))  # Unpack the parameters from the tuple
    return -result.item()
best_params_arr = []
best_values_arr = []
key = jr.PRNGKey(42)  # Random seed
keys = jr.split(key, 100)  # Generate 10 random keys
for m in range(100):
    initial_val = list(jr.uniform(keys[m], shape=(9,), minval=-3, maxval=3))+ list(jr.uniform(keys[m], shape=(6,), minval=0, maxval= 3))  # Generate 6 random numbers between 0 and 3
    print(m,"initial value:",initial_val) 
    for sigma in [0.1, 0.5,1]:
        best_params_list = []
        best_values_list = []
        for _ in range(3):
            # Get best solution
            es = cma.CMAEvolutionStrategy(
                initial_val,    # initial guess
                sigma, # sigma
                {"bounds": [None, None]},
            )
            es.optimize(up_down_wrapped_loss)
            best_params = es.result.xbest
            best_value = -es.result.fbest  # Negate to get the original loss value
            print(es.stop())
            print("Best value found:", best_value)
            print("Best parameters found:", best_params)
            best_params_list.append(best_params)
            best_values_list.append(best_value)
        best_params_arr.append(best_params_list)
        best_values_arr.append(best_values_list)
    # if m%10==0:
    #     onp.save(f'cma_best_params.npy', best_params_arr)
    #     onp.save(f'cma_best_values.npy', best_values_arr)

normal cdf

In [None]:
from jax.scipy.special import ndtr

@jax.jit
def ndtr_L(w, s,t0,t1,t2,t3,t4,t5,v0,v1, v2, v3, v4, v5):  # literal likelihood L(w | s)
    t = np.array([t0,t1,t2,t3,t4,t5])[w]
    var = np.array([v0,v1,v2,v3,v4,v5])[w]  # Variance parameters for each intensifier
    return ndtr((s-t)/np.sqrt(var))
@memo
def ndtr_UKL1[s: easy_S, w: W](inf_term, soc_term, cost,t0,t1,t2,t3,t4,t5,v0,v1, v2, v3, v4, v5,p):
    listener: thinks[
        speaker: given(s in easy_S, wpp=state_prior(s)),
        speaker: chooses(w in W, wpp=
            imagine[
                listener: knows(w),
                listener: chooses(s in easy_S, wpp=ndtr_L(w, s,t0,t1,t2,t3,t4,t5,v0, v1, v2, v3, v4, v5)), # L(w|s) = literal likelihood,
                exp(0.3*inf_term * log(Pr[listener.s == s]+0.0000001) +
                0.5*soc_term * UK_U_soc(w,p) - # U_soc = listener's EU
                10*cost*is_costly(w)) # U_inf = listener's surprisal       
            ]
        )
    ]
    listener: observes[speaker.w] is w
    listener: chooses(s in easy_S, wpp=Pr[speaker.s == s])
    return Pr[listener.s == s]
def ndtr_UK_logloss(*params):
    thetas = params[:6]
    cost = params[6]
    inf_term = params[7]
    soc_term = params[8]
    var = params[9:15]  # Extract the variance parameters
    P_l1 = np.concatenate([ndtr_UKL1(inf_term=inf_term, soc_term=soc_term, cost=cost, t0 = thetas[0],t1=thetas[1], t2= thetas[2], t3 = thetas[3], t4 = thetas[4], t5 = thetas[5],v0 = var[0], v1=var[1], v2= var[2], v3 = var[3], v4 = var[4], v5 = var[5], p=p) for p in P],axis = 1)
    return np.sum(np.log(P_l1)*UK_measured_values.T)
def ndtr_wrapped_loss(x):
    result = ndtr_UK_logloss(*tuple(x))  # Unpack the parameters from the tuple
    return -result.item()
best_params_arr = []
best_values_arr = []
key = jr.PRNGKey(42)  # Random seed
keys = jr.split(key, 100)  # Generate 10 random keys
for m in range(100):
    initial_val = list(jr.uniform(keys[m], shape=(9,), minval=-3, maxval=3))+ list(jr.uniform(keys[m], shape=(6,), minval=0, maxval= 3))  # Generate 6 random numbers between 0 and 3
    print(m,"initial value:",initial_val) 
    for sigma in [0.1, 0.5,1]:
        best_params_list = []
        best_values_list = []
        for _ in range(3):
            # Get best solution
            es = cma.CMAEvolutionStrategy(
                initial_val,    # initial guess
                sigma, # sigma
                {"bounds": [None, None]},
            )
            es.optimize(ndtr_wrapped_loss)
            best_params = es.result.xbest
            best_value = -es.result.fbest  # Negate to get the original loss value
            print(es.stop())
            print("Best value found:", best_value)
            print("Best parameters found:", best_params)
            best_params_list.append(best_params)
            best_values_list.append(best_value)
        best_params_arr.append(best_params_list)
        best_values_arr.append(best_values_list)

step function

In [None]:
@jax.jit
def step_L(w, s,t0,t1,t2,t3,t4,t5):  # literal likelihood L(w | s)
    t = np.array([t0,t1,t2,t3,t4,t5])[w]
    sigt = 1 / (1 + np.exp(-t))
    threshold = sigt * 5.6 - 2.8
    return jax.lax.cond(
        threshold> s,
        lambda: epsilon,  # If condition (t > s) is True
        lambda: (5.6 - epsilon * (threshold+2.8)) / (2.8 - threshold)  # s and threshold can range from -2.8 to 2.8
    )

@memo
def step_UKL1[s: easy_S, w: W](inf_term, soc_term, cost,t0,t1,t2,t3,t4,t5,p):
    listener: thinks[
        speaker: given(s in easy_S, wpp=state_prior(s)),
        speaker: chooses(w in W, wpp=
            imagine[
                listener: knows(w),
                listener: chooses(s in easy_S, wpp=step_L(w, s,t0,t1,t2,t3,t4,t5)) ,
                exp(inf_term * log(Pr[listener.s == s]) + 
                soc_term * UK_U_soc(w,p) - # U_soc = listener's EU
                cost*is_costly(w)) # U_inf = listener's surprisal       
            ]
        )
    ]
    listener: observes[speaker.w] is w
    listener: chooses(s in easy_S, wpp=Pr[speaker.s == s])
    return Pr[listener.s == s]
def step_UK_logloss(*params):
    thetas = params[:6]
    cost = params[6]
    inf_term = params[7]
    soc_term = params[8]
    P_l1 = np.concatenate([step_UKL1(inf_term=inf_term, soc_term=soc_term, cost=cost, t0 = thetas[0],t1=thetas[1], t2= thetas[2], t3 = thetas[3], t4 = thetas[4], t5 = thetas[5],p=p) for p in P],axis = 1)
    return np.sum(np.log(P_l1)**UK_measured_values.T)

gaussian literal

In [None]:
@jax.jit
def gaussian_L(w, s,t0,t1,t2,t3,t4,t5,v0,v1, v2, v3, v4, v5):  # literal likelihood L(w | s)
    t = np.array([t0,t1,t2,t3,t4,t5])[w]
    v = np.array([v0,v1, v2, v3, v4, v5])[w]  # Variance parameters for each intensifier
    return np.exp(-(t-s)**2/v)

@memo
def gaussian_UKL1[s: easy_S, w: W](inf_term, soc_term, cost,t0,t1,t2,t3,t4,t5,v0,v1, v2, v3, v4, v5,p):
    listener: thinks[
        speaker: given(s in easy_S, wpp=state_prior(s)),
        speaker: chooses(w in W, wpp=
            imagine[
                listener: knows(w),
                listener: chooses(s in easy_S, wpp=gaussian_L(w, s,t0,t1,t2,t3,t4,t5,v0, v1, v2, v3, v4, v5)), # L(w|s) = literal likelihood,
                exp(0.3*inf_term * log(Pr[listener.s == s]+0.0000001) +
                0.5*soc_term * UK_U_soc(w,p) - # U_soc = listener's EU
                10*cost*is_costly(w)) # U_inf = listener's surprisal       
            ]
        )
    ]
    listener: observes[speaker.w] is w
    listener: chooses(s in easy_S, wpp=Pr[speaker.s == s])
    return Pr[listener.s == s]


def gaussian_UK_logloss(*params):
    thetas = params[:6]
    cost = params[6]
    inf_term = params[7]
    soc_term = params[8]
    var = params[9:15]  # Extract the variance parameters
    P_l1 = np.concatenate([gaussian_UKL1(inf_term=inf_term, soc_term=soc_term, cost=cost, t0 = thetas[0],t1=thetas[1], t2= thetas[2], t3 = thetas[3], t4 = thetas[4], t5 = thetas[5],v0 = var[0], v1=var[1], v2= var[2], v3 = var[3], v4 = var[4], v5 = var[5], p=p) for p in P],axis = 1)
    return np.sum(np.log(P_l1)*UK_measured_values.T)

def gaussian_wrapped_loss(x):
    result = gaussian_UK_logloss(*tuple(x))  # Unpack the parameters from the tuple
    return -result.item()
best_params_arr = []
best_values_arr = []
key = jr.PRNGKey(2)  # Random seed
keys = jr.split(key, 100)  # Generate 10 random keys
for m in range(100):
    initial_val = list(jr.uniform(keys[m], shape=(9,), minval=-3, maxval=3))+ list(jr.uniform(keys[m], shape=(6,), minval=0, maxval= 3))  # Generate 6 random numbers between 0 and 3
    print(m,"initial value:",initial_val) 
    for sigma in [0.1, 0.5,1]:
        best_params_list = []
        best_values_list = []
        for _ in range(3):
            # Get best solution
            es = cma.CMAEvolutionStrategy(
                initial_val,    # initial guess
                sigma, # sigma
                {"bounds": [None, None]},
            )
            es.optimize(gaussian_wrapped_loss)
            best_params = es.result.xbest
            best_value = -es.result.fbest  # Negate to get the original loss value
            print(es.stop())
            print("Best value found:", best_value)
            print("Best parameters found:", best_params)
            best_params_list.append(best_params)
            best_values_list.append(best_value)
        best_params_arr.append(best_params_list)
        best_values_arr.append(best_values_list)
    # if m%10==0:
    #     onp.save(f'cma_best_params.npy', best_params_arr)
    #     onp.save(f'cma_best_values.npy', best_values_arr)



fitting literal listener

In [None]:
@memo
def literal_UKL1[s: easy_S, w: W](t0,t1,t2,t3,t4,t5,v0,v1, v2, v3, v4, v5,p):
    listener: thinks[
        speaker: given(s in easy_S, wpp=state_prior(s)),
        speaker: chooses(w in W, wpp=gaussian_L(w, s,t0,t1,t2,t3,t4,t5,v0, v1, v2, v3, v4, v5))
    ]
    listener: observes[speaker.w] is w
    listener: chooses(s in easy_S, wpp=Pr[speaker.s == s])
    return Pr[listener.s == s]

def literal_UK_logloss(*params):
    thetas = params[:6]
    var = params[6:12]  # Extract the variance parameters
    P_l1 = np.concatenate([literal_UKL1(t0 = thetas[0],t1=thetas[1], t2= thetas[2], t3 = thetas[3], t4 = thetas[4], t5 = thetas[5],v0 = var[0], v1=var[1], v2= var[2], v3 = var[3], v4 = var[4], v5 = var[5], p=p) for p in P],axis = 1)
    return np.sum(np.log(P_l1)*UK_measured_values.T)

def easy_wrapped_loss(x):
    result = literal_UK_logloss(*tuple(x))  # Unpack the parameters from the tuple
    return -result.item()

best_params_arr = []
best_values_arr = []
key = jr.PRNGKey(42)  # Random seed
keys = jr.split(key, 1000)  # Generate 10 random keys
for m in range(1000):
    initial_val = list(jr.uniform(keys[m], shape=(6,), minval=-3, maxval=3))+ list(jr.uniform(keys[m], shape=(6,), minval=0, maxval= 3))  # Generate 6 random numbers between 0 and 3
    print("initial value:",initial_val) 
    for sigma in [0.1, 0.5,1]:
        best_params_list = []
        best_values_list = []
        for _ in range(3):
            # Get best solution
            es = cma.CMAEvolutionStrategy(
                initial_val,    # initial guess
                sigma, # sigma
                {"bounds": [[None]*6+[0]*6, None],'tolfunhist': 1e-16,'tolflatfitness': 10,'tolfun': 1e-12},
            )
            es.optimize(easy_wrapped_loss)
            best_params = es.result.xbest
            best_value = -es.result.fbest  # Negate to get the original loss value
            print(es.stop())
            print("Best value found:", best_value)
            print("Best parameters found:", best_params)
            best_params_list.append(best_params)
            best_values_list.append(best_value)
        best_params_arr.append(best_params_list)
        best_values_arr.append(best_values_list)

one and two thresholds (defined by thresholds of amplifiers being inf)

In [33]:
@jax.jit
def up_down_L(w, s,t0,t1,t2,t3,t4,t5,v0,v1, v2, v3, v4, v5):  # literal likelihood L(w | s)
    low_t = np.array([t0,t1,t2,t3,t4,t5])[w]
    high_t = np.array([v0,v1, v2, v3, v4, v5])[w]  # Variance parameters for each intensifier
    s1 = jax.nn.sigmoid(((s - low_t) * 20).astype(float))
    s2 = jax.nn.sigmoid(((high_t - s) * 20).astype(float))
    return s1 * s2
@memo
def up_down_UKL1[s: easy_S, w: W](inf_term, soc_term, cost,t0,t1,t2,t3,t4,t5,v0,v1, v2, v3, v4, v5,p):
    listener: thinks[
        speaker: given(s in easy_S, wpp=state_prior(s)),
        speaker: chooses(w in W, wpp=
            imagine[
                listener: knows(w),
                listener: chooses(s in easy_S, wpp=up_down_L(w, s,t0,t1,t2,t3,t4,t5,v0, v1, v2, v3, v4, v5)), # L(w|s) = literal likelihood,
                exp(0.3*inf_term * log(Pr[listener.s == s]+0.0000001) +
                0.5*soc_term * UK_U_soc(w,p) - # U_soc = listener's EU
                10*cost*is_costly(w)) # U_inf = listener's surprisal       
            ]
        )
    ]
    listener: observes[speaker.w] is w
    listener: chooses(s in easy_S, wpp=Pr[speaker.s == s])
    return Pr[listener.s == s]
def up_down_UK_logloss(*params):
    thetas = params[:6]
    cost = params[6]
    inf_term = params[7]
    soc_term = params[8]
    var = params[9:15]  # Extract the variance parameters
    P_l1 = np.concatenate([up_down_UKL1(inf_term=inf_term, soc_term=soc_term, cost=cost, t0 = thetas[0],t1=thetas[1], t2= thetas[2], t3 = thetas[3], t4 = thetas[4], t5 = thetas[5],v0 = var[0], v1=var[1], v2= var[2], v3 = var[3], v4 = var[4], v5 = var[5], p=p) for p in P],axis = 1)
    return np.sum(np.log(P_l1)*UK_measured_values.T)
def up_down_wrapped_loss(x):
    result = up_down_UK_logloss(*tuple(x))  # Unpack the parameters from the tuple
    return -result.item()
best_params_arr = []
best_values_arr = []
inf = 1000000
key = jr.PRNGKey(42)  # Random seed
keys = jr.split(key, 100)  # Generate 10 random keys
for m in range(100):
    initial_val = list(jr.uniform(keys[m], shape=(9,), minval=-3, maxval=3))+ list(jr.uniform(keys[m], shape=(6,), minval=0, maxval= 3))  # Generate 6 random numbers between 0 and 3
    print(m,"initial value:",initial_val) 
    initial_val[-2]=inf
    initial_val[-1]=inf
    for sigma in [0.1, 0.5,1]:
        best_params_list = []
        best_values_list = []
        for _ in range(3):
            # Get best solution
            es = cma.CMAEvolutionStrategy(
                initial_val,    # initial guess
                sigma, # sigma
                {"bounds": [[None]*13+[inf-0.1]*2, [None]*13+[inf]*2]},
            )
            es.optimize(up_down_wrapped_loss)
            best_params = es.result.xbest
            best_value = -es.result.fbest  # Negate to get the original loss value
            print(es.stop())
            print("Best value found:", best_value)
            print("Best parameters found:", best_params)
            best_params_list.append(best_params)
            best_values_list.append(best_value)
        best_params_arr.append(best_params_list)
        best_values_arr.append(best_values_list)

0 initial value: [Array(0.18156481, dtype=float32), Array(-1.1198273, dtype=float32), Array(2.4091816, dtype=float32), Array(1.1899974, dtype=float32), Array(0.7083936, dtype=float32), Array(0.73111224, dtype=float32), Array(1.3064804, dtype=float32), Array(-1.7417994, dtype=float32), Array(-2.8596718, dtype=float32), Array(1.5907824, dtype=float32), Array(0.94008636, dtype=float32), Array(2.7045908, dtype=float32), Array(2.0949988, dtype=float32), Array(1.8541968, dtype=float32), Array(1.8655561, dtype=float32)]
(6_w,12)-aCMA-ES (mu_w=3.7,w_1=40%) in dimension 15 (seed=688331, Mon Apr 14 20:03:40 2025)
Iterat #Fevals   function value  axis ratio  sigma  min&max std  t[m:s]
    1     12 7.589663085937500e+03 1.0e+00 9.63e-02  3e-02  1e-01 0:00.1
    2     24 7.070872070312500e+03 1.1e+00 1.02e-01  3e-02  1e-01 0:00.2
    3     36 6.772757324218750e+03 1.2e+00 1.11e-01  3e-02  1e-01 0:00.2
  100   1200 5.171135742187500e+03 6.2e+00 1.03e-01  7e-03  1e-01 0:01.4
  200   2400 5.1706220703

KeyboardInterrupt: 