REDUCED ORDER MODELLING CLASS

In [None]:
class ROM:

    def __init__(self,directory,dir_test,freq):
        # initializing manual rank, dataset frame per second rate and dataset directory address
        self.directory = directory
        self.dir_test = dir_test
        self.freq      = freq
        #self.rank      = rank
        image_list = []
        # importing libraries 
        from PIL import Image
        import glob
        import numpy as np
        from skimage.color import rgb2gray 
        import matplotlib.pyplot as plt
        import matplotlib.cm as cm
        from scipy.fftpack import fft
        # appending files with .jpg format in the directory 
        self.directory = self.directory + '/*.jpg'
        for filename in glob.glob(self.directory):
            im = Image.open(filename)
            image_list.append(im)

        total_images1 = len(image_list)
        for n in range(0,total_images1):
            image_list[n] = np.asarray(image_list[n])
            image_list[n] = rgb2gray(image_list[n])
        
        (r,c) = image_list[0].shape # image resolution of the dataset 
        # loading the data matrix X 
        X =[]
        for i in range(0,total_images1):
            X.append(np.reshape(image_list[i].T, (r*c,-1)))
        # converting X data as a list to a Numpy Array 
        X = np.array(X)
        X = np.reshape(X.T,(r*c,total_images1))
        
        # loading test set 
        image_list = []
        self.dir_test = self.dir_test + '/*.jpg'
        for filename in glob.glob(self.dir_test):
            im = Image.open(filename)
            image_list.append(im)

        total_images2 = len(image_list)
        for n in range(0,total_images2):
            image_list[n] = np.asarray(image_list[n])
            image_list[n] = rgb2gray(image_list[n])
        
        (r,c) = image_list[0].shape # image resolution of the dataset 
        # loading the data matrix X 
        X_test =[]
        for i in range(0,total_images2):
            X_test.append(np.reshape(image_list[i].T, (r*c,-1)))
        # converting X data as a list to a Numpy Array 
        X_test = np.array(X_test)
        X_test = np.reshape(X_test.T,(r*c,total_images2))
        
        self.X    = X
        self.X_test = X_test
        self.N    = total_images1

    
    def pod(self):
        import numpy as np
        # economy SVD for calculating POD 
        U,S,VT = np.linalg.svd(self.X, full_matrices=False)
        V = VT.T
        X = self.X
        return U, S, V, X
    
    def pca(self):
        import numpy as np
        # average substraction for PCA 
        Xavg = np.mean(self.X,axis=1)
        B = self.X - np.tile(Xavg, (self.N,1)).T
        # applying SVD to the mean-substracted data PCA
        Up,Sp,VpT = np.linalg.svd(B, full_matrices=False)
        Vp = VpT.T
        Up = Up
        Sp = Sp 
        Vp = Vp
        return Up, Sp, Vp


    # PSD 
    def dft(self,V):  #discrete fourier transform
        import numpy as np
        X = np.asarray(V, dtype=float)
        N = X.shape[0]
        n = np.arange(N)
        k = n.reshape((N, 1))
        M = np.exp(-2j * np.pi * k * n / N)
        C = np.dot(M, X)
        return C

    def nextPowerOf2(self): # specifying next number that is a power of 2 
        N = self.N
        count = 0
        if (N and not(N & (N - 1))):
            return N
        while( N != 0): 
            N >>= 1
            count += 1
        return 1 << count
             

    def PSD(self,C,NFFT): # Power Spectral Density Calculation of POD time dynamics coefficient V 
        import numpy as np
        P2 = abs(C/NFFT)
        P1 = []
        for i in range(0,int((NFFT/2)+1)):
            if i==0:
                P1.append(P2[i])
            else:
                P1.append(2*P2[i])
        P1 = np.array(P1)
        f = []
        for i in range(0,int((NFFT/2)+1)):
            f.append(self.freq*(i/NFFT))
        f = np.array(f)
        max_freq = np.array(f[np.argmax(P1[1:len(P1)])])
        return f,P1, max_freq

    def ROM_projected(self,Up,U,Phi,rank): # dataset projection onto the PCA, POD & DMD modes 
        import numpy as np
        line = []
        for i in range(0,rank):
            line.append(i)
        line = np.asarray(line)
        line = np.reshape(line,(1,rank))
        coeff_PCA =  np.dot(self.X.T,Up[:,0:rank])
        coeff_PCA = abs(coeff_PCA)
        coeff_PCA = np.append(line,coeff_PCA,axis=0)
        
        coeff_PCA_test =  np.dot(self.X_test.T,Up[:,0:rank])
        coeff_PCA_test = abs(coeff_PCA_test)
        coeff_PCA_test = np.append(line,coeff_PCA_test,axis=0)
        
        coeff_POD =  np.dot(self.X.T,U[:,0:rank])
        coeff_POD = abs(coeff_POD)
        coeff_POD = np.append(line,coeff_POD,axis=0)
        
        coeff_POD_test =  np.dot(self.X_test.T,U[:,0:rank])
        coeff_POD_test = abs(coeff_POD_test)
        coeff_POD_test = np.append(line,coeff_POD_test,axis=0)
        
        coeff_DMD =  np.dot(self.X.T,Phi)
        coeff_DMD = abs(coeff_DMD)
        coeff_DMD = np.append(line,coeff_DMD,axis=0)
        
        coeff_DMD_test =  np.dot(self.X_test.T,Phi)
        coeff_DMD_test = abs(coeff_DMD_test)
        coeff_DMD_test = np.append(line,coeff_DMD_test,axis=0)
        
        return coeff_PCA, coeff_POD, coeff_DMD, coeff_PCA_test, coeff_POD_test, coeff_DMD_test

    def rank_trunc(self,S): # automatic SVD rank selection scheme based on (Gavish 2014) 
        import numpy as np
        beta = np.size(self.X,1)/np.size(self.X,0)
        fac  = 0.56*beta**3 - 0.95*beta**2 + 1.82*beta + 1.43
        med  = fac*np.median(S)
        for i in range(0,self.N):
            if np.array(S[i]) < med:
                rank = i-1
                break
        return rank
        
    def DMD(self,rank):
        import numpy as np
        X1 = np.array(self.X[:,0:self.N-1])
        X2 = np.array(self.X[:,1:self.N])
        U,S,VT = np.linalg.svd(X1, full_matrices=False)
        V = VT.T
        Ur = np.array(U[:,0:rank])
        Sr = np.array(S[0:rank])
        Vr = np.array(V[:,0:rank])
        # Calculation of the linear Opearator A 
        Atilde = np.dot(Ur.T,np.dot(X2,np.dot(Vr,np.linalg.inv(np.diag(Sr)))))
        # eigenvalues & eigenvectors of the linear operator A
        D, W = np.linalg.eig(Atilde)
        # DMD modes calculation 
        Phi = np.dot(X2,np.dot(Vr,np.dot(np.linalg.inv(np.diag(Sr)), W)))
        # logaritmic eigenvalues for frequency and decay rate estimation 
        omega = np.log10(D)*self.freq
        # data reconstruction using DMD reduced order model 
        x1 = np.flip((self.X)[:,[0]].T,0) # initial condition 
        b = np.dot(np.linalg.pinv(Phi),x1.T) 
        t = np.linspace(0, (self.N)/self.freq, num=self.N)
        t_dyn = np.multiply(b,np.exp(np.dot(np.reshape(omega,(rank,1)),np.reshape(t,(1,self.N)))))
        X_dmd = np.dot(Phi,t_dyn) # reconstructed data 
        D = np.reshape(D,(rank,1)) 
        q = np.multiply(D,b) # defines the DMD modes' energy 
        ind = np.argsort(-abs(q),axis=None) # orders the DMD modes based on their energy 
        q = -np.sort(-abs(q),axis=None) 
        Phi = Phi.T.tolist()
        Phi = [Phi[i] for i in ind] # reordering the DMD modes based on their energy 
        Phi = np.asarray(Phi).T 
        return Ur, Vr, Sr, Phi, D, omega, X_dmd,b,q,ind,t_dyn
    
    def Error(self,X_dmd): # error calculation 
        import numpy as np
        w = []
        for i in range(0,self.N):
            w.append(np.sum(self.X[:,i]))
        w = np.array(w)
        w = np.sqrt(abs(w))
        z = []
        for i in range(0,self.N):
            z.append(np.sum(X_dmd[:,i]))
        z = np.array(z)
        z = np.sqrt(abs(z))
        error = abs((w-z)/w)*100
        error1 = []
        for i in range(0,self.N):
            error1.append(np.sqrt(np.sum((abs(self.X[:,i]-X_dmd[:,i]))**2)))
        error1 = np.array(error1)
        return error,error1
    
    