<a href="https://colab.research.google.com/github/matthiasweidlich/promi_course/blob/master/case_labelling/case_labelling_filled.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Simple notebook to run the individual steps of the EM-based approach to case labelling

## Initialisation

First, helper functions to add dedicated symbols for the start and end of a trace. Also, given an input sequence, a function prints the traces based on a given sequence of case identifiers. The latter is useful for understanding the  partitioning of the input sequence based on the current state of the EM-based approach to case labelling. 

In [45]:
CONST_START = "<"
CONST_END = ">"

def add_start_end(seq: list) -> list:
    """ Add dedicated symbols for start and end to the sequence """
    return [CONST_START] + seq + [CONST_END]

def print_cases(input_sequence: list, case_sequence: list):
    """ Extracts the actual traces from the input sequence, based on the given case sequence """
    cases = {}
    for idx, val in enumerate(input_sequence):
        if val == CONST_START or val == CONST_END:
            continue
        cases.setdefault(case_sequence[idx], []).append(val)
    for case in cases.values():
        print(case)

Second, helper functions to initialise the transition matrix.


In [46]:
def get_empty_matrix(symbols: set) -> dict:
    # initialise matrix with 0 values for all pairs
    return {s:{s:0 for s in symbols} for s in symbols}

def init_matrix(input_sequence: list) -> dict:
    """ Initialise transition matrix based on ratio of counts for directly-follows relation """
    symbols = set(input_sequence)
    matrix = get_empty_matrix(symbols)

    # count directly-follows relation entries
    for idx, val in enumerate(input_sequence):
        if idx == 0:
            continue
        first = input_sequence[idx-1]
        second = input_sequence[idx]
        matrix[first][second] += 1

    # normalise counts of directly-follows relation entries
    for first in symbols:
        count = 0.0
        for second in symbols:
            count += matrix[first][second]
        if count == 0.0:
            continue
        for second in symbols:
            matrix[first][second] = matrix[first][second] / count

    return matrix

Let's define the specific example from the exercise sheet. 

In [47]:
# define the input sequence for to which the case labels shall be assigned
input_sequence = ['G','A','G','A','R','B','B','R','B','G','R','G','X','A','B','R']

In [48]:
# pre-processing step: assign dedicated labels to the start and end of the input sequence
input_sequence = add_start_end(input_sequence)

## Definition of the EM-approach

The following code snippets defines the function for the expectation step. It also provides a skeleton for the maximisation step.

**Task:** Complete the code snippet for the maximisation step.

In [49]:
def expectation_step(input_sequence: list, matrix: dict) -> list:
    """ Expectation step: estimate the case sequence based on the input sequence and the transition matrix """
    case_sequence = [-1]

    next_case_id = 0
    last_symbols_open_cases = {}

    for idx, symbol in enumerate(input_sequence):
        if symbol == CONST_START or symbol == CONST_END:
            continue

        max_prob = 0
        max_prob_case_id = -1
        for case_id, last_symbol in last_symbols_open_cases.items():
            if matrix[last_symbol][symbol] > max_prob:
                max_prob = matrix[last_symbol][symbol]
                max_prob_case_id = case_id

        if max_prob_case_id == -1 or matrix[CONST_START][symbol] == max({matrix[s][symbol] for s in matrix.keys()}):
            case_sequence.append(next_case_id)
            last_symbols_open_cases[next_case_id] = symbol
            next_case_id += 1
        else:
            case_sequence.append(max_prob_case_id)
            last_symbols_open_cases[max_prob_case_id] = symbol
            if matrix[symbol][CONST_END] == max({matrix[symbol][s] for s in matrix.keys()}):
                last_symbols_open_cases.pop(max_prob_case_id, None)

    case_sequence.append(-1)

    return case_sequence

def maximisation_step(input_sequence: list, case_sequence: list) -> dict:
    """ Maximisation step: estimate the transition matrix based on the input sequence and the case sequence """

    symbols = set(input_sequence)
    matrix = get_empty_matrix(symbols)
    
    ##########################
    open_cases = set()
    follows_per_symbol = {}

    for idx, val in enumerate(input_sequence):
        if val == CONST_START or val == CONST_END:
            continue
        if case_sequence[idx] not in open_cases:
            # a new case is opened, record its first symbol as one following the dedicated start symbol
            matrix[CONST_START][val] += 1
            follows_per_symbol[CONST_START] = follows_per_symbol.get(CONST_START, 0) + 1
            open_cases.add(case_sequence[idx])

        idx_2 = idx + 1

        if idx_2 == len(input_sequence):
            # the last symbol always closes the current case
            matrix[val][CONST_END] += 1
            follows_per_symbol[val] = follows_per_symbol.get(val, 0) + 1
        else:
            idx_2 = idx + 1
            # determine the next symbol in the trace of the current symbol
            while case_sequence[idx] != case_sequence[idx_2] and idx_2 < len(input_sequence)-1:
                idx_2 += 1

            if idx_2 == len(input_sequence):
                # the last symbol of a case, closes it
                matrix[val][CONST_END] += 1
                follows_per_symbol[val] = follows_per_symbol.get(val, 0) + 1
            else:
                matrix[val][input_sequence[idx_2]] += 1
                follows_per_symbol[val] = follows_per_symbol.get(val, 0) + 1

    follows_per_symbol[CONST_END] = 1
    for first in symbols:
        for second in symbols:
            matrix[first][second] = matrix[first][second] / (1.0 * follows_per_symbol[first])
    ##########################

    return matrix


## Run the approach for the example

Run the EM-approach to case labelling for the given input sequence.

In [50]:
# initialise the transition matrix based on the input sequence
matrix = init_matrix(input_sequence)

In [51]:
# print the initial transition matrix
matrix

{'<': {'<': 0.0, '>': 0.0, 'A': 0.0, 'B': 0.0, 'G': 1.0, 'R': 0.0, 'X': 0.0},
 '>': {'<': 0, '>': 0, 'A': 0, 'B': 0, 'G': 0, 'R': 0, 'X': 0},
 'A': {'<': 0.0,
  '>': 0.0,
  'A': 0.0,
  'B': 0.3333333333333333,
  'G': 0.3333333333333333,
  'R': 0.3333333333333333,
  'X': 0.0},
 'B': {'<': 0.0, '>': 0.0, 'A': 0.0, 'B': 0.25, 'G': 0.25, 'R': 0.5, 'X': 0.0},
 'G': {'<': 0.0, '>': 0.0, 'A': 0.5, 'B': 0.0, 'G': 0.0, 'R': 0.25, 'X': 0.25},
 'R': {'<': 0.0, '>': 0.25, 'A': 0.0, 'B': 0.5, 'G': 0.25, 'R': 0.0, 'X': 0.0},
 'X': {'<': 0.0, '>': 0.0, 'A': 1.0, 'B': 0.0, 'G': 0.0, 'R': 0.0, 'X': 0.0}}

In [52]:
# one iteration of the EM-approach
case_sequence = expectation_step(input_sequence, matrix)
print_cases(input_sequence, case_sequence)
matrix = maximisation_step(input_sequence, case_sequence)
matrix

['G', 'A', 'R', 'B', 'R', 'B', 'R', 'B', 'R']
['G', 'A', 'B']
['G', 'X', 'A']
['G']


{'<': {'<': 0.0, '>': 0.0, 'A': 0.0, 'B': 0.0, 'G': 1.0, 'R': 0.0, 'X': 0.0},
 '>': {'<': 0.0, '>': 0.0, 'A': 0.0, 'B': 0.0, 'G': 0.0, 'R': 0.0, 'X': 0.0},
 'A': {'<': 0.0,
  '>': 0.3333333333333333,
  'A': 0.0,
  'B': 0.3333333333333333,
  'G': 0.0,
  'R': 0.3333333333333333,
  'X': 0.0},
 'B': {'<': 0.0, '>': 0.25, 'A': 0.0, 'B': 0.0, 'G': 0.0, 'R': 0.75, 'X': 0.0},
 'G': {'<': 0.0, '>': 0.25, 'A': 0.5, 'B': 0.0, 'G': 0.0, 'R': 0.0, 'X': 0.25},
 'R': {'<': 0.0, '>': 0.25, 'A': 0.0, 'B': 0.75, 'G': 0.0, 'R': 0.0, 'X': 0.0},
 'X': {'<': 0.0, '>': 0.0, 'A': 1.0, 'B': 0.0, 'G': 0.0, 'R': 0.0, 'X': 0.0}}

In [53]:
# second iteration of the EM-approach
case_sequence = expectation_step(input_sequence, matrix)
print_cases(input_sequence, case_sequence)
matrix = maximisation_step(input_sequence, case_sequence)
matrix

['G', 'A']
['G', 'A']
['R', 'B', 'R', 'B', 'R', 'B', 'R']
['B']
['G', 'X', 'A']
['G']


{'<': {'<': 0.0,
  '>': 0.0,
  'A': 0.0,
  'B': 0.16666666666666666,
  'G': 0.6666666666666666,
  'R': 0.16666666666666666,
  'X': 0.0},
 '>': {'<': 0.0, '>': 0.0, 'A': 0.0, 'B': 0.0, 'G': 0.0, 'R': 0.0, 'X': 0.0},
 'A': {'<': 0.0, '>': 1.0, 'A': 0.0, 'B': 0.0, 'G': 0.0, 'R': 0.0, 'X': 0.0},
 'B': {'<': 0.0, '>': 0.25, 'A': 0.0, 'B': 0.0, 'G': 0.0, 'R': 0.75, 'X': 0.0},
 'G': {'<': 0.0, '>': 0.25, 'A': 0.5, 'B': 0.0, 'G': 0.0, 'R': 0.0, 'X': 0.25},
 'R': {'<': 0.0, '>': 0.25, 'A': 0.0, 'B': 0.75, 'G': 0.0, 'R': 0.0, 'X': 0.0},
 'X': {'<': 0.0, '>': 0.0, 'A': 1.0, 'B': 0.0, 'G': 0.0, 'R': 0.0, 'X': 0.0}}

In [54]:
# third iteration of the EM-approach
case_sequence = expectation_step(input_sequence, matrix)
print_cases(input_sequence, case_sequence)
matrix = maximisation_step(input_sequence, case_sequence)
matrix

['G', 'A']
['G', 'A']
['R', 'B', 'R', 'B', 'R', 'B', 'R']
['B']
['G', 'X', 'A']
['G']


{'<': {'<': 0.0,
  '>': 0.0,
  'A': 0.0,
  'B': 0.16666666666666666,
  'G': 0.6666666666666666,
  'R': 0.16666666666666666,
  'X': 0.0},
 '>': {'<': 0.0, '>': 0.0, 'A': 0.0, 'B': 0.0, 'G': 0.0, 'R': 0.0, 'X': 0.0},
 'A': {'<': 0.0, '>': 1.0, 'A': 0.0, 'B': 0.0, 'G': 0.0, 'R': 0.0, 'X': 0.0},
 'B': {'<': 0.0, '>': 0.25, 'A': 0.0, 'B': 0.0, 'G': 0.0, 'R': 0.75, 'X': 0.0},
 'G': {'<': 0.0, '>': 0.25, 'A': 0.5, 'B': 0.0, 'G': 0.0, 'R': 0.0, 'X': 0.25},
 'R': {'<': 0.0, '>': 0.25, 'A': 0.0, 'B': 0.75, 'G': 0.0, 'R': 0.0, 'X': 0.0},
 'X': {'<': 0.0, '>': 0.0, 'A': 1.0, 'B': 0.0, 'G': 0.0, 'R': 0.0, 'X': 0.0}}

## -- End