In [1]:
import numpy as np
import random
import itertools
import math
import tqdm
from tqdm import trange

In [2]:
key_mapping={
    'C':0,
    'D':2,
    'E':4,
    'F':5,
    'G':7,
    'A':9,
    'B':11
}
def key2num(key):  
  key=key.upper()
  num=key_mapping[key[0]]
  modifier=len(key)
  if modifier==1:
    return num
  elif key[1]=='#':
    return (num+(modifier-1))%12
  elif key[1]=='B' or key[1]=='-':
    return (num-(modifier-1))%12
  elif key[1]=='X':
    return (num+(modifier-1)*2)%12

# key_list to number_list
def keys2num(keys):
    return [key2num(key) for key in keys]

In [3]:
# function to distingish whether notes in given timestamp and chord are within or outside the chord (0=outside,1=within)
import json
with open('../modules/json_files/keychorddict.json') as json_file:
    chord_notes = json.load(json_file)
  
    def note_2_class(chord,notes_at_t,chord_notes=chord_notes):
        note_in_chord=chord_notes[chord]['idx']
        return [int(note in note_in_chord) for note in notes_at_t]

In [14]:
### I threat the total length of time as the length of the observered list

class HMM:
    def __init__(self,no_of_state,no_of_value,states,values):
        #randomize all matrix, each row should sum to 1
        #self.emssion_matrix=np.array([ran/ran.sum() for ran in np.array([np.random.rand(no_of_value) for i in range(no_of_state)])]) #b[i][O]  --> probability to emit values[O] from states[i]
        

        #Modification: 2 parameter for all chord  [note inside chord & note outside chord]
        self.emssion_matrix=np.random.rand(no_of_value)
        self.emssion_matrix=self.emssion_matrix/self.emssion_matrix.sum()
        self.emssion_matrix=np.array([self.emssion_matrix,]*len(states)) # assume same probability to emit "note within chord"  for all chords
        
        self.initial_matrix= np.random.rand(no_of_state) #π[i] --> probability to start at states[i]
        self.initial_matrix/= self.initial_matrix.sum()
        
        self.transition_matrix=np.array([ran/ran.sum() for ran in np.array([np.random.rand(no_of_state) for i in range(no_of_state)])])  #a[i][j] --> probability from states[i] transit to states[j]
        
        self.states=states
        self.values=values
        self.observered=None
        
        self.probit_at_i_table=None
        self.probit_transit_i_j_table=None
        self.forward_table=None
        self.backward_table=None
        
        #self.prev_initial_matrix=self.initial_matrix
        #self.prev_emssion_matrix=self.emssion_matrix
        #self.prev_transition_matrix=self.transition_matrix
    def update_state_name(self,name):
        self.states=name
        
    def debug(self):
        print('initial_matrix\n',self.initial_matrix)
        print('transition_matrix\n',self.transition_matrix)
        print('emission_matrix\n',self.emssion_matrix)
        
    def likelihood(self,state,ob_t):
        
        #Modification: convert observation[t] from notes to 2_classes
        chord=self.states[state]  #numeric to chord name
        ob_t=note_2_class(chord,ob_t) #2 class
        
        prob=1
        for x in ob_t:
            prob*=(self.emssion_matrix[state][x])
        return prob
 
    def forward(self,t,j,ob=None,mode=False):
        if ob is None:
            ob=self.observered
        if np.isnan(self.forward_table[t][j]).any():
            if t==0:
                if mode==True:
                    self.forward_table[t][j]=[self.initial_matrix[j]*self.likelihood(j,ob[t]),0]
                    return self.forward_table[t][j][0],self.forward_table[t][j][1]
                else:
                    self.forward_table[t][j]=self.initial_matrix[j]*self.likelihood(j,ob[t])
                    return self.forward_table[t][j]
            else:          
                if mode==True:
                    result=np.array([self.forward(t-1,i,ob,mode)[0]*self.transition_matrix[i][j] for i in range(len(self.states))]) * self.likelihood(j,ob[t])
                    self.forward_table[t][j]=[np.max(result),np.argmax(result)]
                    return self.forward_table[t][j][0],self.forward_table[t][j][1]
                else:
                    self.forward_table[t][j]=sum([self.forward(t-1,i,ob,mode)*self.transition_matrix[i][j] for i in range(len(self.states))]) * self.likelihood(j,ob[t])
                    return self.forward_table[t][j]
        else:
            return self.forward_table[t][j]

    def backward(self,t,i,ob=None,mode=False):
        if ob is None:
            ob=self.observered
        if np.isnan(self.backward_table[t][i]) or mode==True:
            if t==len(ob)-1:
                self.backward_table[t][i]=1
                return self.backward_table[t][i]
            else:
                if mode==True:
                    self.backward_table[t][i]=max([self.transition_matrix[i][j]*self.likelihood(j,ob[t+1])*self.backward(t+1,j,ob,mode) for j in range(len(self.states))])
                    return self.backward_table[t][i]
                else:
                    self.backward_table[t][i]=sum([self.transition_matrix[i][j]*self.likelihood(j,ob[t+1])*self.backward(t+1,j,ob,mode) for j in range(len(self.states))])
                    return self.backward_table[t][i]
        else:
            return self.backward_table[t][i]
            
           
    def probit_at_i(self,t,i,ob=None):#Gamma γt(i) = P(qt = i|O,λ)      
        if ob is None:
            ob=self.observered
        if np.isnan(self.probit_at_i_table[t][i]):
            numerator=self.forward(t,i,ob)*self.backward(t,i,ob)#sum probability of all path passing through state[i] at time t
            denominator=sum([self.forward(t,j,ob)*self.backward(t,j,ob) for j in range(len(self.states))]) #prob of passing through  ALL_state at time t
            self.probit_at_i_table[t][i]=numerator/denominator
            return self.probit_at_i_table[t][i]
        else:
            return self.probit_at_i_table[t][i]
    
    def probit_transit_i_j(self,t,i,j,ob=None):#epsilon ξt(i, j) = P(qt = i,qt+1 = j|O,λ)
        if ob is None:
            ob=self.observered
        if np.isnan(self.probit_transit_i_j_table[t][i][j]):
            numerator=self.forward(t,i,ob)*self.transition_matrix[i][j]*self.likelihood(j,ob[t+1])*self.backward(t+1,j,ob)#sum probability of all path transit from state[i] to state[j] at time t
            denominator=sum([sum([self.forward(t,m,ob)*self.transition_matrix[m][n]*self.likelihood(n,ob[t+1])*self.backward(t+1,n,ob) for n in range(len(self.states))]) for m in range(len(self.states))]) #prob of ALL transition combination at time t
            self.probit_transit_i_j_table[t][i][j]=(numerator/denominator)
            return self.probit_transit_i_j_table[t][i][j]
        else:
            return self.probit_transit_i_j_table[t][i][j]
    def predict(self,ob):
        self.update_state_name([test_key_name.lower()[:-1]+states for states in h_states])
        self.probit_at_i_table=np.empty((len(ob),len(self.states)))
        self.probit_transit_i_j_table=np.empty((len(ob),len(self.states),len(self.states)))
        self.forward_table=np.empty((len(ob),len(self.states),2))
        self.backward_table=np.empty((len(ob),len(self.states),2))
        self.probit_at_i_table[:]= np.NaN
        self.probit_transit_i_j_table[:]=np.NaN
        self.forward_table[:]= np.NaN
        self.backward_table[:]=np.NaN
        T1=np.empty((len(self.states),len(ob)),'d')
        T2=np.empty((len(self.states),len(ob)),'B')
        for idx in range(len(self.states)):
            T1[idx,0]=self.forward(0,idx,ob,True)[0]
        T2[:,0]=0
        
        for i in range(1,len(ob)):
            for idx in range(len(self.states)):
                T1[idx,i],T2[idx,i]=self.forward(i,idx,ob,True)
        x = np.empty(len(ob), 'B')
        x[-1] = np.argmax(T1[:, len(ob) - 1])
        for i in reversed(range(1, len(ob))):
            x[i - 1] = T2[x[i], i]
        return x
        
        
    def train(self,obs=None,epochs=2):
        #O:observed values
        #λ:model parameters
         
        if obs is None:
            obs=self.observered
        
        for epoch in range(epochs):
            for idxx,ob in enumerate(obs):
                self.update_state_name([key_name[idxx].lower()[:-1]+states for states in h_states])
                print('running',idxx,key_name[idxx].lower()[:-1])
                self.probit_at_i_table=np.empty((len(ob),len(self.states)))
                self.probit_transit_i_j_table=np.empty((len(ob),len(self.states),len(self.states)))
                self.forward_table=np.empty((len(ob),len(self.states)))
                self.backward_table=np.empty((len(ob),len(self.states)))
                self.probit_at_i_table[:]= np.NaN
                self.probit_transit_i_j_table[:]=np.NaN
                self.forward_table[:]= np.NaN
                self.backward_table[:]=np.NaN
                
                #initalize DP table
                for t, i in itertools.product(range(len(ob)),range(len(self.states))):
                    self.forward(t,i,ob)
                    self.backward(t,i,ob)
                    self.probit_at_i(t,i,ob)
                    for j in range(len(self.states)):
                        if t!=len(ob)-1:
                            self.probit_transit_i_j(t,i,j,ob)
                
                
                
                #initial matrix
                for i in range(len(self.states)):
                    self.initial_matrix[i]=self.probit_at_i(0,i,ob)
                    if self.initial_matrix[i]==0:
                        self.initial_matrix[i]=0.000000001
                #transition matrix
                for i, j in itertools.product(range(len(self.states)),range(len(self.states))):
                    self.transition_matrix[i][j]=sum([self.probit_transit_i_j(t,i,j,ob) for t in range(len(ob)-1)])/sum([self.probit_at_i(t,i,ob) for t in range(len(ob)-1)])
                    if self.transition_matrix[i][j]==0:
                        self.transition_matrix[i][j]=0.000000001
                #emission matrix
                for j, k in itertools.product(range(len(self.states)),range(len(self.values))):   
                    total=0
                    
                    #Modification: convert notes to 2 classes (outside or inside given chord)
                    chord=self.states[j]  #numeric to chord name
                    ob_2_class=[note_2_class(chord,ob_t) for ob_t in ob] #2 class

                    for t in range(len(ob)):
                        if k in ob_2_class[t]:
                            #Modification: multiple by how many times do k appear at timestamp t
                            total+=self.probit_at_i(t,j,ob)*ob_2_class[t].count(k)  
                            
                    #Modification: multiple by len(ob[t]), which is the total length of notes at timestamp t                    
                    self.emssion_matrix[j][k]=total/sum([self.probit_at_i(t,j,ob)*len(ob[t]) for t in range(len(ob))])  
                    
                    #smoothing
                    if self.emssion_matrix[j][k]==0:
                        self.emssion_matrix[j][k]=0.000000001

In [15]:
import json
with open('../data/training_data.json') as json_file:
    data = json.load(json_file)

In [16]:
h_states=['MinorI', 'MinorI+',
          'MinorbII', 'MinorII', 'MinorII7',
          'MinorIII',
          'MinorIV', 'MinorIV+',
          'MinorV', 'MinorV+', 'MinorV+7',
          'MinorVI', 'MinorGerVI', 'MinorFreVI', 'MinorItaVI',
          'MinorVII', 'MinorDimVII', 'MinorDimVII7',
]

In [17]:
score_name=['Etude_in_C_Minor',
'Etude_in_F_Minor',
'Menuet_in_G_Minor',
'Nocturne_in_C_Minor',
'Nocturne_in_E_Minor',
'Nocturne_in_F_Minor',
'Nocturne_No._20_in_C_Minor',
'Prelude_in_B_Minor',
'Prelude_in_C_Minor',
'Waltz_in_A_Minor',
           ]
key_name=['Cm','Fm','Gm','C#m','Em','Fm','C#m','Bm','Cm','Am']

In [18]:
all_training=[]

In [19]:
for idx_root,name in enumerate(score_name):
    select_idx=[]
    for idx,chord in enumerate(data[name+'.mxl']['chord_seq']):
        if chord.startswith(key_name[idx_root]):
            select_idx.append(idx)
    training=[]
    training_label=[]
    for idx in select_idx:
        training.append(data[name+'.mxl']['note_seq'][idx])
        training_label.append(data[name+'.mxl']['chord_seq'][idx])
    training=[keys2num(segment)for segment in training]
    all_training.append(training)

In [20]:
test=HMM(len(h_states),2,h_states,["outside chord","inside chord"])

In [21]:
import time
start=time.time()
test.train(all_training,1)
print('timetaken',time.time()-start)

running 0 c
running 1 f
running 2 g
running 3 c#
running 4 e
running 5 f
running 6 c#
running 7 b
running 8 c
running 9 a
timetaken 1063.6339321136475


In [22]:
#prepare test data
test_sample=[]
select_idx=[]
testing_label=[]
for idx,chord in enumerate(data[name+'.mxl']['chord_seq']):
    if chord.startswith(key_name[idx_root]):
        select_idx.append(idx)
training=[]
training_label=[]
for idx in select_idx:
    training.append(data[name+'.mxl']['note_seq'][idx])
    training_label.append(data[name+'.mxl']['chord_seq'][idx])
training=[keys2num(segment)for segment in training]
test_sample=training
testing_label=training_label

test_key_name='Em'
test_states=[test_key_name.lower()[:-1]+states for states in h_states]

In [23]:
start=time.time()
prediction=test.predict(test_sample)
prediction=[test_states[idx] for idx in prediction]
print('timetaken',time.time()-start)
(prediction,testing_label)

timetaken 0.24178290367126465


(['eMinorbII',
  'eMinorV+',
  'eMinorI+',
  'eMinorbII',
  'eMinorIII',
  'eMinorDimVII',
  'eMinorV+',
  'eMinorItaVI',
  'eMinorV+',
  'eMinorI+',
  'eMinorbII',
  'eMinorDimVII',
  'eMinorIII',
  'eMinorbII',
  'eMinorDimVII',
  'eMinorIII',
  'eMinorV+',
  'eMinorI+',
  'eMinorbII',
  'eMinorIII',
  'eMinorbII',
  'eMinorDimVII',
  'eMinorI+',
  'eMinorbII',
  'eMinorV+',
  'eMinorV+',
  'eMinorI+',
  'eMinorbII',
  'eMinorIII',
  'eMinorDimVII',
  'eMinorV',
  'eMinorItaVI',
  'eMinorV',
  'eMinorItaVI',
  'eMinorbII',
  'eMinorDimVII',
  'eMinorV+',
  'eMinorIV+',
  'eMinorbII',
  'eMinorV+',
  'eMinorItaVI',
  'eMinorV',
  'eMinorItaVI',
  'eMinorDimVII',
  'eMinorIII',
  'eMinorDimVII',
  'eMinorI+',
  'eMinorbII',
  'eMinorIII',
  'eMinorbII',
  'eMinorV+',
  'eMinorI+',
  'eMinorbII',
  'eMinorIII',
  'eMinorbII',
  'eMinorIII',
  'eMinorbII',
  'eMinorDimVII',
  'eMinorV+',
  'eMinorI+',
  'eMinorbII',
  'eMinorIII',
  'eMinorDimVII',
  'eMinorV+',
  'eMinorIII',
  'eMinorb

In [None]:
test.debug()