In [76]:
import numpy as np
import scipy.io.wavfile as wvf

class ICA:
    def __init__(self,data):
        self.data = (np.matrix(data)).astype(float)
        self.epsilon = 0.0000001 #収束判定用
        
    def center(self): #平均を0にシフト
        self.data -= np.average(self.data)
            
    def whiten(self): #白色化
        sigma = np.cov(self.data, bias=True)
        d,E = np.linalg.eigh(sigma) #エルミート行列対角化
        D = np.matrix((d**-0.5)*np.eye(len(d)))
        V = E * D * E.transpose()
        z = V * self.data
        return z
    
    def convergence(self,v,w): #収束判定
        if np.linalg.norm(v - w) < self.epsilon:
            return True
        else:
            return False
        
    def normalize(self,w): #正規化
        if w.sum() < 0: #向きが逆のとき
            w = -w
        return w / np.linalg.norm(w)
    
    def orthogonalize(self,w,matrix_W):
        new_matrix = np.matrix(np.vstack((matrix_W,w.transpose())))
        Q,R = np.linalg.qr(new_matrix.transpose())
        w_t = Q.transpose()[-1]
        return w_t.transpose()
    
    def optimize(self,z): #ベクトルwの探索
        matrix_W = np.empty((0,len(self.data)))
        for i in range(len(self.data)):
            w = np.random.rand(len(self.data),1)
            w = self.normalize(w)
            while True:
                tot_vector = np.asarray(z) * np.asarray(np.asarray(np.dot(w.transpose(),z))**3)
                new_w = np.asmatrix(np.average(tot_vector,axis=1)).transpose() - 3*w
                new_w = self.orthogonalize(new_w,matrix_W)#直交化
                new_w = self.normalize(new_w) #正規化
                if self.convergence(new_w,w): #収束時
                    w = new_w
                    break
                w = new_w
            matrix_W = np.vstack((matrix_W,w.transpose())) #ベクトルwを積む
        y = matrix_W*z
        return y
        
    def main(self): #独立成分分析の本体
        self.center()
        z = self.whiten()
        y = self.optimize(z)
        return y
  
#音声ファイル用
def ica_wav(filenames):
    #ファイル読み込み
    sample_rate = np.array([0 for i in range(len(filenames))])
    for i in range(len(filenames)):
        sample_rate[i],data_tem = wvf.read(filenames[i])
        if i == 0:
            data = np.array([data_tem])
        else:
            data = np.vstack((data,data_tem))
    
    #独立成分分析
    data_ica = ICA(data).main()
    data_ica = [(data_i * 32767 / max(np.absolute(data_i))).astype('int16') for data_i in np.asarray(data_ica)]
    
    #ファイル出力
    for i in range(len(filenames)):
        wvf.write('ica_'+filenames[i],sample_rate[i],data_ica[i])

In [77]:
filenames = ['music1.wav','music2.wav']
ica_wav(filenames)

In [78]:
filenames = ['speechA1.wav','speechA2.wav']
ica_wav(filenames)

In [80]:
filenames = ['speechB1.wav','speechB2.wav','speechB3.wav']
ica_wav(filenames)