In [1]:
import ast
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import random

#from fastDamerauLevenshtein import damerauLevenshtein
from collections import Counter

### Helper functions to read + save data

In [2]:
def read_data(file, use_string_for_seq=False):
    '''
    Read in csv file with sequence + object information.
    
    Parameters
    ----------
    file : csv with sequence + object information

    Raises
    ------
    Exception if input data inconsistent (i.e., length of sequence != length of start_coordinate list,
                                          element in sequence not in coordinates dictionary)

    Returns
    -------
    df : dataframe with sequence + object information

    '''
    df = pd.read_csv(file, header=0)
    
    for row in range(0,len(df)):
        start_coordinates = list(ast.literal_eval(df.at[row, 'start_coordinates']))
        ID = str(df.at[row,'ID'])
        if use_string_for_seq == True:
            sequence = str(df.at[row, 'sequence'])
        else:
            sequence = [elem for elem in df.at[row,'sequence'].split(',')]
        coordinates = {key: ast.literal_eval(value) for key, value in
                       (elem.split(': ') for elem in df.at[row,'coordinates'].split(';'))}
        
        # check if nr. of items matches with nr. of start positions
        if len(sequence) > len(start_coordinates):
            raise Exception('Sequence length >  nr. of start positions for ID {}'.format(ID))
        
        # check if coordinates for all items are given
        for elem in sequence:
            if elem not in coordinates.keys():
                raise Exception('No coordinates for object {} in iD {}'.format(elem, ID))
    
    return df

In [3]:
def read_results(file):
    '''
    Read in previously saved results from main calculate_prediction_error function.
    
    Parameters
    ----------
    file : csv

    Returns
    -------
    results : results as pandas dataframe

    '''
    results = pd.read_csv(file, header=0)
    results = results.T
    results.reset_index(drop=True, inplace=True)

    header = results.iloc[0]
    results = results[1:]
    results.columns = header

    results.drop(results.tail(1).index, inplace=True)
    
    # convert strings to numeric if possible
    results = results.apply(pd.to_numeric, errors='ignore')

    return results

In [4]:
def save_results(file, filepath):
    '''
    Save results dataframe to csv.

    Parameters
    ----------
    file : pandas dataframe
        Results dataframe generated by calculate_prediction_error.
    filepath : str
        Save path for file.

    Returns
    -------
    None.

    '''
    file.T.to_csv(filepath, header=True, index=True)

In [5]:
def check_to_float(x):
    try:
        return float(x)
    except ValueError:
        return x

### Processing data

In [6]:
def generate_distances_dict(data, use_string_for_seq=False, 
                            dimensions=[[1, 'x'], [1, 'y'], [1, 'z'], [2, 'xy'], [2, 'xz'], [2, 'yz'], [3, 'xyz']]):
    '''
    Calculate all object distances in all dimensions (e.g., xy, xyz) to reduce computational effort
    in main optimization function (calculate_prediction_error).
    
    Parameters
    ----------
    data : dataframe with object information
    dimensions : list of dimensions to be considered, optional
                The default is [[1, 'x'], [1, 'y'], [1, 'z'], [2, 'xy'], [2, 'xz'], [2, 'yz'], [3, 'xyz']].

    Returns
    -------
    distances_dict : dictionary of all object distances for all dimensions

    '''
    distances_dict = {}
    
    for dim in dimensions:
        dimension = dim[1]
        distances_dict[dimension] = {}
    
        for row in range(0,len(data)):
            #objects = list(data.at[row,'sequence'])
            
            if use_string_for_seq == True:
                objects = list(data.at[row, 'sequence'])
            else:
                objects = [elem for elem in data.at[row, 'sequence'].split(',')]
            
            ID = str(data.at[row,'ID'])
            start_coordinates = list(ast.literal_eval(data.at[row,'start_coordinates']))
            coordinates = {key: ast.literal_eval(value) for key, value in
                       (elem.split(': ') for elem in data.at[row,'coordinates'].split(';'))}
    
            distances_dict[dimension][ID] = {}
            
            new_coords, new_start_coords = filter_for_dimension(dim, coordinates, start_coordinates)
    
            for pos in new_start_coords:
                try:
                    position = tuple(pos)
                except TypeError:
                    position = str(pos)
                
                distances_dict[dimension][ID][position] = {}
                
                for obj in objects:
                    if obj not in distances_dict[dimension][ID][position]:
                        distances_dict[dimension][ID][position][obj] = np.linalg.norm(np.array(pos) -
                                                                     np.array(new_coords[obj]))
                
    return distances_dict

In [18]:
def calculate_prediction_error(data, distances_dict, error_function, n=10, 
                             dimensions=[[2, 'xy'], [3, 'xyz']],
                             use_categories=False,
                             seqcol='sequence', coords='coordinates', error='error',
                             use_string_for_seq=False):
    '''
    Calculates prediction error for all combinations of parameter values (c, k, dimension).

    Parameters
    ----------
    data : pandas.DataFrame
        Generated with read_data function from csv, contains information on objects and sequence.
    distances_dict : dictionary
        Contains distances between all objects in all possible dimension combinations.
    error_function : function
        Error function to use for prediction error.
        Options: editdist (Damerau-Levenshtein distance), prequential (prequential method)
    dimensions : list
        Dimensions to use. The default is [[2, 'xy'], [3, 'xyz']].  
    use_categories : Boolean
        Set to true if model should include category comparison. Default is false.
    n : int
        Number of iterations for prediction. The default is 10.
    seqcol : str, optional
        Column of dataframe containing sequence. The default is 'sequence'.
    coords : str, optional
        Column of dataframe containing coordinates. The default is 'coordinates'.
    error : str, optional
        Column of dataframe containing error for random samping of sequence
        (only relevant when using editdist prediction). The default is 'error'.   
    use_str_for_seq : Boolean
        Set to true if sequence input is a singular string, set to false if sequence
        input is a list of strings.

    Returns
    -------
    results : pandas.DataFrame
        Median error over all iterations. Column names: parameter values.

    '''

    results = pd.DataFrame()
    
    for row in range(0, len(data)):
        # get episode information from input row
        coordinates = {key: ast.literal_eval(value) for key, value in
                       (elem.split(': ') for elem in data.at[row, coords].split(';'))}

        start_coordinates = list(ast.literal_eval(data.at[row, 'start_coordinates']))
        ID = str(data.at[row,'ID'])
        
        if use_string_for_seq == True:
            seq = str(data.at[row, seqcol])
            objects = list(data.at[row, seqcol])
        else:
            seq = [elem for elem in data.at[row, seqcol].split(',')]
            objects = [elem for elem in data.at[row, seqcol].split(',')]

        # get list of objects that have relational dependencies, if any (else set to empty list)
        try:
            strong_k = list(data.at[row, 'strong_k'].split(','))
        except AttributeError:
            strong_k = []

        try:
            mid_k = list(data.at[row, 'mid_k'].split(','))
        except AttributeError:
            mid_k = []

        try:
            food_k = list(data.at[row, 'food_k'].split(','))
        except AttributeError:
            food_k = []

        

        # go through parameter ranges
        # set k to current param if object has relational dependencies, else 1.0
        for k2 in np.arange(1.1, 2.0, 0.1):
            k_food = round(k2, 2)
            k1 = {obj: k_food if obj in food_k else 1.0 for obj in objects}

            for k in np.arange(0, 0.9, 0.1):
                k_strong = round(k, 2)
                k_mid = round(k + 0.1, 2)
                k1 = {obj: k_strong if obj in strong_k else k_mid if obj in mid_k else round(k1[obj], 2) for obj in
                      objects}

                for c in np.arange(1.0, 2.0, 0.1):
                    c = round(c, 1)
                    # set c to current param if object contained, else 1.0
                    c1 = {obj: c if obj in data.at[row, 'containment'] else 1.0 for obj in objects}

                    for dim in dimensions:
                        # get median error for parameter combination based on error function
                        median = get_median_error(error_function, row, ID, objects, coordinates, start_coordinates, 
                                                  c1, k1, dim,
                                                  seq, distances_dict, n)

                        # save parameter combination as column name in results
                        params = 'c: ' + str(c) + '; k: ' + str(k_strong) + ',' + str(k_mid) + ',' + str(
                            k_food) + '; ' + str(dim[1])

                        results.at[row, params] = median

        #results.at[row, 'sequence'] = seq
        results.at[row, 'error'] = data.at[row, error]
        results.at[row, 'ID'] = ID

    return results

In [8]:
def get_lowest_error(results):
    '''
    Return lowest error in dataframe, index of lowest error, lowest median,
    and dataframe with mean/median.

    Parameters
    ----------
    results : pandas.DataFrame
        Resuts dataframe generated with calculate_prediction_error.

    Returns
    -------
    lowest_mean : float
        Lowest mean error.
    lowest_median : float
        Lowest median error.
    lowest_mean_idx : col index
        Column index where mean error is lowest.
    results : pandas.DataFrame
        Results dataframe with mean/median for each parameter combination calculated.

    '''

    for col in list(results):
        if col != 'sequence' and col != 'error' and col != 'ID':
            results.loc['mean', col] = results[col].mean()
            results.loc['median', col] = results[col].median()
    lowest_mean = min(results.loc['mean'])
    lowest_median = min(results.loc['median'])
    #mean = list(results.loc['mean'])
    lowest_mean_idx = results.columns[(results.loc['mean'] == lowest_mean)]

    return lowest_mean, lowest_mean_idx, lowest_median, results

### Prediction + helper functions

In [9]:
def filter_for_dimension(dimension, coordinates, start_coordinates):
    '''
    Filter coordinates and start coordinates for given dimension (e.g., xyz -> xy).

    Parameters
    ----------
    dimension : list of [int, str]
        Dimension for which to adapt coordinates (default before filtering: 3D).
    coordinates : dictionary
        Coordinates of objects in 3D.
    start_coordinates : list
        List of start coordinates where subject is standing before next picking_up action
        in 3D.

    Returns
    -------
    new_coords : dictionary
        Dictionary with filtered coordinates.
    new_start_coords : list
        List with filtered start coordinates.

    '''
    
    new_coords =  {}
    new_start_coords = []
    
    if dimension[0] == 3:  # no changes if 3D
        new_coords = coordinates
        new_start_coords = start_coordinates

    elif dimension[0] == 2:  # 2D: remove obsolete coordinate
        if dimension[1] == 'xy':
            new_coords = {key: value[:-1] for key, value in coordinates.items()}
            new_start_coords = [x[:-1] for x in start_coordinates]

        elif dimension[1] == 'xz':
            new_start_coords = [[x[0], x[-1]] for x in start_coordinates]

            for key, value in coordinates.items():
                new_value = (value[0], value[-1])
                new_coords[key] = new_value

        elif dimension[1] == 'yz':
            new_coords = {key: value[1:] for key, value in coordinates.items()}
            new_start_coords = [x[1:] for x in start_coordinates]

    elif dimension[0] == 1:  # 1D: choose appropriate coordinate
        if dimension[1] == 'x':
            new_coords = {key: value[0] for key, value in coordinates.items()}
            new_start_coords = [x[0] for x in start_coordinates]

        elif dimension[1] == 'y':
            new_coords = {key: value[1] for key, value in coordinates.items()}
            new_start_coords = [x[1] for x in start_coordinates]

        elif dimension[1] == 'z':
            new_coords = {key: value[2] for key, value in coordinates.items()}
            new_start_coords = [x[2] for x in start_coordinates]
            
    return new_coords, new_start_coords

In [17]:
def predict_prequential(distances_dict, ID, objects, coordinates, start_coordinates, sequence, 
                                 c, k, dimension=[3, ], use_categories=False):
    '''
    Predict sequence based on prequential method (predict one step, compare with observed behavior,
    error measure: 0 if predicted == observed, 1 if predicted != observed).

    Parameters
    ----------
    distances_dict : dictionary
        Dictionary containing distances between objects in all dimensions.
    ID : str
        Identifier for episode.
    objects : list
        Objects in episode.
    coordinates : dictionary
        Coordinates of objects.
    start_coordinates : list
        List of coordinates where subject is standing before each picking-up action.
    sequence : list
        List of objects in sequential order.
    c : dictionary
        Parameter values for containment for all objects.
    k : dictionary
        Parameter values for relational dependencies for all objects.
    dimension : list [int, str], optional
        Dimension in which to consider distances. The default is [3, ].
    use_categories : Boolean
        Set to true if model should include category comparison. Default is false.

    Returns
    -------
    errors : list
        List of error values for observed vs predicted sequence.

    '''
    
    
    i = 0
    errors = []
    possible_items = dict.fromkeys(objects, 0)  # generate dict from object list
    item_count = Counter(objects)
    
    coord_index = 0
    
    new_coords, new_start_coords = filter_for_dimension(dimension, coordinates, start_coordinates)

    while i < len(sequence) - 1:        
        for obj in possible_items.keys():            
            try:
                position = tuple(new_start_coords[coord_index])
            except TypeError:
                position = str(new_start_coords[coord_index])
            
            # calculate weighted cost from current position to next item(s)
            possible_items[obj] = distances_dict[dimension[1]][ID][position][obj] ** k[obj] * c[obj]
            
            if use_categories == True:
                pass
                # TODO: call calculate_cost function that checks categories
                # input -> sequence + possible_items dict with the costs
                # output: dictionary with updated costs
            else:
                continue

        minval = min(possible_items.values())
        minval = [k for k, v in possible_items.items() if v == minval]
        minval = random.choice(minval)  # choose prediction randomly if multiple items have same cost
        
        prediction = minval
        observed = sequence[i]
        
        if prediction == observed:
            error = 0
        else:
            error = 1
        
        errors.append(error)
        
        if item_count[sequence[i]] > 1:
            item_count[sequence[i]] = item_count[sequence[i]] - 1
        else:
            del possible_items[sequence[i]]
        
        coord_index += 1
        i += 1
    
    return errors

In [20]:
def get_median_error(error_function, row, ID, objects, coordinates, 
                     start_coordinates, c, k, dimension, sequence, 
                     distances_dict, n=1, use_categories=False):
    '''
    Return median error for chosen error measure (editdist or prequential) for n trials.

    Parameters
    ----------
    error_function : function
        Error measure to use: editdist or prequential.
    row : int
        Row number in dataframe.
    ID : str
        Identifier for episode.
    objects : list
        Objects in episode.
    coordinates : dictionary
        Coordinates of objects.
    start_coordinates : list
        List of coordinates where subject is standing before each picking-up action.
    c : dictionary
        Parameter values for containment for all objects.
    k : dictionary
        Parameter values for relational dependencies for all objects.
    dimension : list [int, str]
        Dimension in which to consider distances. The default is [3, ].
    sequence : str
        Observed sequence of objects in episode.
    distances_dict : dictionary
        Dictionary containing distances between objects in all dimensions.
    n : int, optional
        Number of iterations. The default is 1.
    use_categories : Boolean
        Set to true if model should include category comparison. Default is false.

    Returns
    -------
    median : float
        Median error value.

    '''

    error_list = []

    for x in range(0, n):
        # get median error using edit distance (predict whole sequence, then compare)
        if error_function == 'editdist':
            # get predicted sequence for list of objects
            prediction = ''.join(predict_editdist(distances_dict, ID, objects, coordinates, 
                                          start_coordinates, sequence, c, k, dimension))

            # calculate normalized error between predicted and given sequence
            #dl = 1 - damerauLevenshtein(sequence, prediction)

            #error_list.append(dl)
        
        # get median summed error using prequential method (predict only for each next step)
        elif error_function == 'prequential':
            errors = predict_prequential(distances_dict, ID, objects, coordinates,
                                         start_coordinates, sequence, c, k, dimension,
                                         use_categories)
            summed = sum(errors)
            error_list.append(summed)
                        
    median = np.nanmedian(error_list)
    return median

### How to use

In [23]:
# read in csv data
#data = pd.read_csv(path-to-file, header=0)
data = pd.read_csv('task_environments_cooking_list_2022-05-10.csv', header=0)

# generate dictionary with distances for all dimensions
distances_dict = generate_distances_dict(data)

# calculate prediction error with prequential error function, 2D distances, 1 iteration (change n for more)
results = calculate_prediction_error(data, distances_dict=distances_dict, 
                                    error_function='prequential',
                                    n=1, dimensions=[[2, 'xy']],
                                    use_categories=False)

# return parameter combination with lowest prediction error (best model)
lowest_mean, lowest_mean_idx, lowest_median, results_median = get_lowest_error(results)

# the code throws warnings at this point, just ignore that

  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, pa

  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, params] = median
  results.at[row, pa

In [24]:
# print best parameter combination & mean/median prediction error
print(lowest_mean, lowest_mean_idx, lowest_median)

2.596774193548387 Index(['c: 1.2; k: 0.5,0.6,1.4; xy'], dtype='object') 2.0


### Plot results

In [14]:
%matplotlib qt

IDs = data['ID']

error = data['error']

#seqs= results_sum['sequence'][:-2].values

res = results[lowest_mean_idx[0]][:-2].values
median = [np.nanmedian(res)] * len(res)

x = [x for x in range (0,len(res))]

# plot scatter + lines for simulations
plt.scatter(x, res, marker='o', s=26, c='darkred', alpha=0.8, 
            label=str('model-generated median: ') + str(round(lowest_median,3)))
plt.plot(x, median, c='darkred', alpha=0.95, linewidth=2)
plt.fill_between(x, res, alpha=0.3, color='darkred')


plt.xticks(x, labels=IDs, rotation=90, fontsize=5)

plt.ylabel('accumulated prediction error', fontsize=22)
plt.xlabel('sequence', fontsize=22)

plt.title('best model: ' + str(lowest_mean_idx[0]), fontsize=24, pad=20)
plt.margins(0.01)

plt.legend(fontsize=20, framealpha=0.8, loc='upper right', markerscale=2.5)

#plt.savefig('plot_median_editdist_individualerrors_diff.png', bbox_inches='tight')
plt.show()