# Orthogonal Matching Pursuit (OMP)

## 3rd Attempt - Parallelize the Signals

### Imports

In [12]:
# Preparing the Spark Environment
from pyspark.sql import SparkSession
from pyspark import SparkContext
sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

# Mllib items
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix
from pyspark.mllib.regression import *

# System, Numpy and WAV reading
import os
from scipy.io import wavfile
import numpy as np

# Plots
import matplotlib.pyplot as plt
import matplotlib as mpl
mpl.rcParams['savefig.dpi'] = 80
mpl.rcParams['figure.dpi'] = 80
mpl.rcParams['figure.figsize'] = (16,9)

# Normalization and Metrics
from sklearn.preprocessing import normalize
from sklearn.metrics import mean_squared_error

### Creating the Dictionary (matrix)

In [13]:
# Setting the directories and Listing files

violaoWaveDir = 'audio/violao/used/'
violaoWaveList = sorted(os.listdir(violaoWaveDir))

pianoWaveDir = 'audio/piano/used/'
pianoWaveList = sorted(os.listdir(pianoWaveDir))

In [14]:
# Importing WAV files

violaoDict = []
for audioFile in violaoWaveList:
    violaoDict.append(wavfile.read(violaoWaveDir + audioFile)[1])
    
pianoDict = []
for audioFile in pianoWaveList:
    pianoDict.append(wavfile.read(pianoWaveDir + audioFile)[1])

In [15]:
# Creating the basis for the Dictionary

Dict = []
Dict.extend(violaoDict)
Dict.extend(pianoDict)

Dict = np.asarray(Dict)
Dict.shape

(20000, 4410)

In [16]:
# Function to remove the null signals

def clearSilence (signals, threshold=0.1):    
    s = []
    for i in range(signals.shape[0]):
        if any(elem > threshold for elem in signals[i]):
            s.append(signals[i])
    s = np.asarray(s)
    
    return s

In [17]:
# Creating the Dictionary (D) with "clean" signals only

D = np.transpose(clearSilence(Dict))

### Preparing the Test Data

In [18]:
# Setting the directories and listing files

testWaveDir = 'audio/TEST/'
testWaveList = sorted(os.listdir(testWaveDir))

In [19]:
# Importing WAV files

testData = []
for audioFile in testWaveList:
    testData.append(wavfile.read(testWaveDir + audioFile)[1])

In [20]:
# Normalizing all data

testData = normalize(testData, axis=1, norm='l2')
D = normalize(D, axis=0, norm='l2')

### The OMP function

In [26]:
# OMP function

def omp(D,s,k):
    """
    Orthogonal Matching Pursuit (OMP)
        
    Inputs
        D: dictionary (matrix)
        s: signal
        k: sparsity level
        
    Output
        x: coeff vector for sparse representation
    """
    
    [l, c] = D.shape
    
    x = np.zeros([c, 1]) # coefficient (output)
    r = s # residual of s
    omega = np.zeros([k, 1]) # selected support
    D_omega = np.zeros([l, 1]) # correspondng columns of D
    
    for cnt in range(k): # choose k atoms
        #print("Iteration: ", cnt)
        x_tmp = np.zeros([c, 1])
        inds = np.setdiff1d(np.arange(0,c-1), omega)
        
        for i in inds:
            t = np.transpose(D[:, i])
            n = np.linalg.norm(D[:, i])
            x_tmp[i] = np.matmul(t,r) / n
        
        ichosen = np.argmax(abs(x_tmp))
        omega[cnt] = ichosen
        D_omega = np.column_stack([D_omega, D[:, ichosen]])
        if (cnt == 0):
            D_omega = np.delete(D_omega, 0, 1)
        x_ls = np.linalg.lstsq(D_omega, s, rcond=None)[0]
        r = s - np.matmul(D_omega, x_ls)
    
    for a in range(k):
        x[int(omega[a])] = x_ls[a]

    return x

### Tests

#### Non-Parallel

In [None]:
%%time
RES_SER = [0] * testData.shape[0]
for i in range(testData.shape[0]):
    RES_SER[i] = omp(D[:,:], testData[i], 20)

#### Parallel

In [None]:
# Parallelize the data

rdd_data = sc.parallelize(list(enumerate(testData)), 32)

In [None]:
%%time
RES = (
    rdd_data
    .map(lambda x: omp(D[:,:], x[1], 20))
    .collect()
)