## Imports

In [None]:
import soundfile as sf
from rich import print
import numpy as np
import matplotlib.pyplot as plt
import librosa
import librosa.display
from IPython.display import Audio as ipy_audio
from collections import defaultdict, Counter
from itertools import permutations, product
from typing import Dict
from copy import deepcopy

In [None]:
from midiutils.read import get_audio, get_symbol_string
from mogra.datatypes import Swar

## Read MIDI Sample

In [None]:
# midi_file = "midiutils/samples/Bahar.mid"  # root 49
midi_file = "midiutils/samples/MiyankiMalhar.mid"  # root 61
audio, sr = get_audio(midi_file)

In [None]:
ipy_audio(data=audio[:1500000], rate=sr)

In [None]:
root = int(input("set root note (C = 60)"))

In [None]:
syms = get_symbol_string(midi_file, root)
plt.hist(syms)

In [None]:
allowed = input("type list of allowed symbols (empty = include all)")
if allowed == "": allowed = "".join(set(syms))
syms = [ss for ss in syms if ss in allowed]
syms_list = [syms]

## Read Transcribed Sample

In [None]:
transcription_file = "transcriptions/KaushikDhwani_[oQuyV_tsNs].txt"

In [None]:
with open(transcription_file, "r") as fp:
    data = fp.readlines()

syms_list = [ll.strip().replace(" ","") for ll in data if (ll[0]!="#" and len(ll)>1)]

## Markov Analysis

#### Functions

In [None]:
MARKOV_UNSEEN_SMOOTHING = np.log(1e-10)
MARKOV_EVAL_FRACTION = 0.2

In [None]:
class SimpleMarkov:
    def __init__(self, order: int) -> None:
        self.order = order
        self.transition_counts = defaultdict(Counter)
        self.total_counts = Counter()
        self.transition_probs = {}
    
    def compute_probs(self):
        self.states = set([s[0] for s in self.total_counts.keys()]).union(set(["!", "|"]))
        for state, next_states in self.transition_counts.items():
            total = self.total_counts[state]
            self.transition_probs[state] = {k: v / total for k, v in next_states.items()}
        
    def fit(self, train_sequence: list):
        for ii in range(len(train_sequence) - self.order):
            state = tuple(train_sequence[ii:ii+self.order])
            self.total_counts[state] += 1
            next_state = train_sequence[ii+self.order]
            self.transition_counts[state][next_state] += 1
        
        self.compute_probs()
    
    def calculate_log_likelihood(self, eval_sequences) -> Dict:
        """ Function to calculate log-likelihood of the sequence under the fitted model
        """
        all_log_likelihoods = []
        for eval_sequence in eval_sequences:
            log_likelihood_array = np.zeros(len(eval_sequence) - self.order)
            for ii in range(self.order, len(eval_sequence)):
                state = tuple(eval_sequence[ii-self.order:ii])
                next_state = eval_sequence[ii]
                if (state in self.transition_probs) and (next_state in self.transition_probs[state]):
                    log_likelihood_array[ii-self.order] = np.log(self.transition_probs[state][next_state])
                else:
                    log_likelihood_array[ii-self.order] = MARKOV_UNSEEN_SMOOTHING  # Smoothing for unseen transitions
            all_log_likelihoods.extend(log_likelihood_array)
        
        return {
            "log_likelihood": sum(all_log_likelihoods),
            "perplexity": np.exp(-np.mean(all_log_likelihoods))
        }


In [None]:
# each subsequent order carries this fraction of the weight of the current order
# if this = 0.5 and order = 3, then the weightages of jump matrices of order 1, 2, 3, = np.array([1, 0.5, 0.25])/1.75
ORDER_DISCOUNTING_FACTOR = 0.5

class JumpMarkov:
    def __init__(self, order: int) -> None:
        self.order = order
        self.transition_counts = {oo: defaultdict(Counter) for oo in range(1, order+1)}
        self.total_counts = {oo: Counter() for oo in range(1, order+1)}
        self.discounting_normalizer = (1-ORDER_DISCOUNTING_FACTOR**self.order)/(1-ORDER_DISCOUNTING_FACTOR)
        self.transition_probs = {oo: defaultdict(Counter) for oo in range(1, order+1)}
    
    def compute_probs(self):
        self.states = set(self.total_counts[1].keys()).union(set(["!", "|"]))
        for state in self.states:
            for oo in range(1, self.order+1):
                total = self.total_counts[oo][state]
                self.transition_probs[oo][state] += {k: v / total for k, v in self.transition_counts[oo][state].items()}
    
    def fit(self, train_sequence: list):
        for oo in range(1, self.order+1):
            for ii in range(self.order, len(train_sequence)):
                state = train_sequence[ii-oo]
                self.total_counts[oo][state] += 1
                next_state = train_sequence[ii]
                self.transition_counts[oo][state][next_state] += 1
        
        self.compute_probs()
    
    def calculate_log_likelihood(self, eval_sequences):
        """ Function to calculate log-likelihood of the sequence under the fitted model
        """
        # TODO(neeraja): something's wrong here!
        all_log_likelihoods = []
        for eval_sequence in eval_sequences:
            log_likelihood_array = np.zeros(len(eval_sequence) - self.order)
            mult = 1
            for oo in range(1, self.order+1):
                for ii in range(self.order, len(eval_sequence)):
                    state = eval_sequence[ii-oo]
                    next_state = eval_sequence[ii]
                    if (state in self.transition_probs[oo]) and (next_state in self.transition_probs[oo][state]):
                        log_likelihood_array[ii-self.order] += mult * np.log(self.transition_probs[oo][state][next_state])
                    else:
                        log_likelihood_array[ii-self.order] += mult * MARKOV_UNSEEN_SMOOTHING  # Smoothing for unseen transitions
            
                mult *= ORDER_DISCOUNTING_FACTOR
            
            log_likelihood_array = log_likelihood_array/self.discounting_normalizer
            all_log_likelihoods.extend(log_likelihood_array)
        
        return {
            "log_likelihood": sum(all_log_likelihoods),
            "perplexity": np.exp(-np.mean(all_log_likelihoods))
        }

In [None]:
# Calculate AIC and BIC for model comparison
def calculate_aic_bic(log_likelihood, num_params, n):
    aic = 2 * num_params - 2 * log_likelihood
    bic = np.log(n) * num_params - 2 * log_likelihood
    return aic, bic

In [None]:
# Unit tests

strain = "abababababababab"
stest1 = ["aaaaaaaaaaaaaaaa"]
stest2 = ["babababababababa"]

sm = SimpleMarkov(1)
sm.fit(strain)
p1 = sm.calculate_log_likelihood(stest1)["perplexity"]
sm = SimpleMarkov(2)
sm.fit(strain)
p2 = sm.calculate_log_likelihood(stest1)["perplexity"]
assert int(p1) == int(p2)

jm = JumpMarkov(1)
jm.fit(strain)
p3 = jm.calculate_log_likelihood(stest1)["perplexity"]
jm = JumpMarkov(2)
jm.fit(strain)
p4 = jm.calculate_log_likelihood(stest1)["perplexity"]
assert p4 < p3

#### Fit

In [None]:
# Train val split
if len(syms_list) == 1:
    syms = syms_list[0]
    eval_begin = int(len(syms)*(1-MARKOV_EVAL_FRACTION))
    sequences_train = [syms[:eval_begin]]
    sequences_eval = [syms[eval_begin:]]
else:
    np.random.seed(256)
    shuffled_list = deepcopy(syms_list)
    np.random.shuffle(shuffled_list)
    eval_begin = int(len(shuffled_list)*(1-MARKOV_EVAL_FRACTION))
    sequences_train = shuffled_list[:eval_begin]
    sequences_eval = shuffled_list[eval_begin:]

In [None]:
# Fit Markov models of order 1, 2, and 3
models = {}
log_likelihoods = {}
perplexities = {}
orders = ["1s", "2s", "3s", "1j", "2j", "3j"]
for order in orders:
    if order[1] == "s":
        sm = SimpleMarkov(order=int(order[0]))
        for strain in sequences_train:
            sm.fit(strain)
        models[order] = sm
    elif order[1] == "j":
        jm = JumpMarkov(order=int(order[0]))
        for strain in sequences_train:
            jm.fit(strain)
        models[order] = jm
    eval = models[order].calculate_log_likelihood(sequences_eval)
    log_likelihoods[order] = eval["log_likelihood"]
    perplexities[order] = eval["perplexity"]


In [None]:
# Evaluate them
aic = {}
bic = {}
n = len(syms)
for order in orders:
    num_params = sum(len(next_states) for next_states in models[order].transition_probs.values())
    aic[order], bic[order] = calculate_aic_bic(log_likelihoods[order], num_params, n)

In [None]:
# Results
print(f"Order of Model:\t\tLog-Likelihood\tPerplexity\tAIC\tBIC")
for order in orders:
    print(f"{order}\t{log_likelihoods[order]}\t{perplexities[order]}\t{aic[order]}\t{aic[order]}")

In [None]:
def ordering(state):
    if state == "!":
        return 0
    elif state == "|":
        return 100
    else:
        return Swar[state].value

In [None]:
# Transition matrix for the 1st order model
sm1 = models["1s"]
sorted_states = sorted(sm1.states, key=ordering)
sm1_transition_map = np.zeros((len(sorted_states), len(sorted_states)))
for tup in sm1.transition_probs:
    sm1_transition_map[sorted_states.index(tup[0])] = [
        sm1.transition_probs[tup].get(ss, np.exp(MARKOV_UNSEEN_SMOOTHING))
        for ss in sorted_states
    ]

In [None]:
plt.matshow(sm1_transition_map)
# plt.xticks(ticks=np.arange(len(sorted_states)), labels=sorted_states, rotation=90)
# plt.yticks(ticks=np.arange(len(sorted_states)), labels=sorted_states)
plt.clim(0, 1)
plt.colorbar()

In [None]:
sorted_states