In [None]:
import keras

import numpy as np
import pandas as pd
import seaborn as sn
import matplotlib.pyplot as plt
import matplotlib.cm as cm
from matplotlib.colors import Normalize

def encode(series):
    return pd.get_dummies(series.astype(str))
def mean_absolute_error(y_true, y_pred): 
    y_true, y_pred = np.array(np.squeeze(y_true)), np.array(np.squeeze(y_pred))
    return np.mean(np.abs((y_true - y_pred)))

In [None]:
model = keras.models.load_model('cnnc_iter3.h5')
model.summary()

In [None]:
weights = model.get_weights()

In [None]:
w1 = model.layers[0].get_weights()[0]
w2 = model.layers[3].get_weights()[0]
w3 = model.layers[6].get_weights()[0]
#w4 = model.layers[6].get_weights()[0]
#w5 = model.layers[9].get_weights()[0]
#w6 = model.layers[10].get_weights()[0]
#w4 = model.layers[12].get_weights()[0]

conv_layer_weights = [w1,w2,w3]

In [None]:
w4 = np.array(np.squeeze(abs(w4)))
fc_inds = np.argsort(w4)
sorted_w4 = w4[fc_inds]

plt.plot(sorted_w4)

In [None]:
filt_vec = np.zeros((6,256))

for i in range(len(conv_layer_weights)):
    weight=conv_layer_weights[i]
    weights_dict = {}
    num_filters = len(weight[0,0,0,:])
    
    for j in range(num_filters):
        ws = np.sum(abs(weight[:,:,:,j]))
        filt = '{}'.format(j)
        weights_dict[filt]=ws
    
    weights_dict_sort = sorted(weights_dict.items(),key=lambda kv:kv[1])
    print('L1 norm conv layer{}/n'.format(i+1),weights_dict_sort)
    print(0.1*float(weights_dict_sort[-1][1]))
    
    weights_value = []
    n = 0
    for elem in weights_dict_sort:
        weights_value.append(elem[1])
        filt_vec[i,n] = elem[0]
        n = n+1
        
    xc = range(num_filters)
    
    plt.figure(i+1,figsize=(8,5))
    plt.plot(xc, weights_value)
    plt.xlabel('Filter Number')
    plt.ylabel('L1 Norm')
    plt.title('CNN-c Convolutional Layer {}'.format(i+1))
    plt.grid(True)
    

In [None]:
from kerassurgeon import identify, Surgeon
from kerassurgeon.operations import delete_channels, delete_layer
lr = 0.0001
loss='mean_squared_error'
optimizer = keras.optimizers.Adam(lr)

In [None]:
li1 = []
for i in range(len(filt_vec[0,0:2])):
    li1.append(int(filt_vec[0,i]))

li2 = []
for i in range(len(filt_vec[1,0:10])):
    li2.append(int(filt_vec[1,i]))

li3 = []
for i in range(len(filt_vec[2,0:50])):
    li3.append(int(filt_vec[2,i]))
#li4 = []
#for i in range(len(filt_vec[3,0:4])):
#    li4.append(int(filt_vec[3,i]))

#li5 = []
#for i in range(len(filt_vec[4,0:6])):
#    li5.append(int(filt_vec[4,i]))
#li6 = []
#for i in range(len(filt_vec[5,0:4])):
#    li6.append(int(filt_vec[5,i]))


In [None]:
layer1 = model.layers[1]
layer2 = model.layers[4]
layer3 = model.layers[7]
#layer4 = model.layers[6]
#layer5 = model.layers[9]
#layer6 = model.layers[10]

surgeon = Surgeon(model)
#surgeon.add_job('delete_channels',layer1,channels=li1)
surgeon.add_job('delete_channels',layer2,channels=li2)
#surgeon.add_job('delete_channels',layer3,channels=li3)
#surgeon.add_job('delete_channels',layer4,channels=li4)
#surgeon.add_job('delete_channels',layer5,channels=li5)
#surgeon.add_job('delete_channels',layer6,channels=li6)

model_new = surgeon.operate()
model_new.compile(optimizer,loss, metrics=['mean_absolute_error'])
model_new.summary()

In [None]:
#CNN Classification testing
#Import label data

temp_labels = np.loadtxt('vec_mat_clabels_icex_src_0.01train.csv',delimiter=",")
labels_t = []

for l in range(len(temp_labels)):
    labels_t.append(str(temp_labels[l]))

labels_t = np.array(labels_t)
labels_t = labels_t.ravel()

def encode(series):
    return pd.get_dummies(series.astype(str))

y_train = encode(labels_t)
labels = list(y_train.columns.values)

#Import test data
features_test = np.loadtxt('vec_mat_features_icex_src_test2_norm_m10db.csv',delimiter=",")
temp_ytest = np.loadtxt('vec_mat_rlabels_icex_src_test2.csv',delimiter=",")
y_test= []

real_test = features_test[:,0::2]
imag_test = features_test[:,1::2]
X_test = np.zeros((features_test.shape[0],32,32,2))

for k in range(features_test.shape[0]):
    count = 0
    for i in range(32):
        for j in range(i,32):
            X_test[k,i,j,0] = real_test[k,count]
            X_test[k,i,j,1] = imag_test[k,count]
            
            if i!=j:
                X_test[k,j,i,0] = X_test[k,i,j,0]
                X_test[k,j,i,1] = -X_test[k,i,j,1]
            
            count = count + 1



temp_ytest = temp_ytest.ravel()
y_test = temp_ytest

#print(X_test.shape)
#print(y_test.shape)

predictions = model.predict(X_test)

pred_labels = []
for i in np.argmax(predictions, axis=1):
    pred_labels.append(float(labels[i]))
    
    
#Classification acc
mae = mean_absolute_error(y_test, pred_labels)
diff = abs((np.transpose(pred_labels))-(y_test))
error = diff[diff>1]
percent_correct = (len(y_test)-len(error))/len(y_test)

print('Test mae:', mae)
print('Testing percent within 1km:', percent_correct*100)

In [None]:
#CNN Regression Testing

features_test = np.loadtxt('vec_mat_features_icex_src_test2_norm.csv',delimiter=",")
temp_ytest = np.loadtxt('vec_mat_rlabels_icex_src_test2.csv',delimiter=",")
y_test= []

real_test = features_test[:,0::2]
imag_test = features_test[:,1::2]
X_test = np.zeros((features_test.shape[0],32,32,2))

for k in range(features_test.shape[0]):
    count = 0
    for i in range(32):
        for j in range(i,32):
            X_test[k,i,j,0] = real_test[k,count]
            X_test[k,i,j,1] = imag_test[k,count]
            
            if i!=j:
                X_test[k,j,i,0] = X_test[k,i,j,0]
                X_test[k,j,i,1] = -X_test[k,i,j,1]
            
            count = count + 1

    #X_test[k,:,:,0] = X_test[k,:,:,0]/np.amax(np.abs(X_test[k,:,:,0]))
    #X_test[k,:,:,1] = X_test[k,:,:,1]/np.amax(np.abs(X_test[k,:,:,1]))

temp_ytest = temp_ytest.ravel()
y_test = temp_ytest

predictions = model_new.predict(X_test)
mae = mean_absolute_error(y_test,predictions);
print('Test mae:', mae)

diff = abs((np.transpose(predictions))-(y_test))
error = diff[diff>1]
percent_correct = (len(y_test)-len(error))/len(y_test)
print('Percent within 1km:', percent_correct*100)

In [None]:
np.mean(np.abs(np.array(np.squeeze(y_test))-np.array(np.squeeze(predictions))))

In [None]:
from keras.initializers import glorot_uniform  # Or your initializer of choice
import keras.backend as K

initial_weights = model_new.get_weights()

backend_name = K.backend()
if backend_name == 'tensorflow': 
    k_eval = lambda placeholder: placeholder.eval(session=K.get_session())
elif backend_name == 'theano': 
    k_eval = lambda placeholder: placeholder.eval()
else: 
    raise ValueError("Unsupported backend")

new_weights = [k_eval(glorot_uniform()(w.shape)) for w in initial_weights]

model_new.set_weights(new_weights)

In [None]:
model_new.save('iter2.h5')

In [None]:
import h5py

def mean_absolute_percentage_error(y_true, y_pred): 
    y_true, y_pred = np.array(y_true), np.array(y_pred)
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100

class traininghist(keras.callbacks.Callback):
    def __init__(self, test_data):
        self.test_data = test_data
        self.trainingloss = []
        self.trainingmape = []
        
    def on_epoch_end(self, epoch, logs={}):
        x, y = self.test_data
        loss, mape = self.model.evaluate(x, y, verbose=0)
        self.trainingloss.append(loss)
        self.trainingmape.append(mape)
        print('Training loss: {}, mape: {}\n'.format(loss, mape))

In [None]:
#Import training data

features = np.loadtxt('vec_mat_features_icex_src_0.01train_norm.csv',delimiter=",")
labels_unstand = np.loadtxt('vec_mat_rlabels_icex_src_0.01train.csv',delimiter=",")
labels_t = []

real = features[:,0::2]
imag = features[:,1::2]
X_train = np.zeros((features.shape[0],32,32,2))

for k in range(features.shape[0]):
    count = 0
    for i in range(32):
        for j in range(i,32):
            X_train[k,i,j,0] = real[k,count]
            X_train[k,i,j,1] = imag[k,count]
            
            if i!=j:
                X_train[k,j,i,0] = X_train[k,i,j,0]
                X_train[k,j,i,1] = -X_train[k,i,j,1]
                
            count = count + 1
    
    #X_train[k,:,:,0] = X_train[k,:,:,0]/np.amax(np.abs(X_train[k,:,:,0]))
    #X_train[k,:,:,1] = X_train[k,:,:,1]/np.amax(np.abs(X_train[k,:,:,1]))        

labels_unstand = labels_unstand.ravel()
#y_train,mu,sigma_labels = std_y(labels_unstand)

y_train = labels_unstand


In [None]:
#Import test data

features_test = np.loadtxt('vec_mat_features_icex_src_test2_norm.csv',delimiter=",")
temp_ytest = np.loadtxt('vec_mat_rlabels_icex_src_test2.csv',delimiter=",")
y_test= []

real_test = features_test[:,0::2]
imag_test = features_test[:,1::2]
X_test = np.zeros((features_test.shape[0],32,32,2))

for k in range(features_test.shape[0]):
    count = 0
    for i in range(32):
        for j in range(i,32):
            X_test[k,i,j,0] = real_test[k,count]
            X_test[k,i,j,1] = imag_test[k,count]
            
            if i!=j:
                X_test[k,j,i,0] = X_test[k,i,j,0]
                X_test[k,j,i,1] = -X_test[k,i,j,1]
                
            count = count + 1
    
    #X_test[k,:,:,0] = X_test[k,:,:,0]/np.amax(np.abs(X_test[k,:,:,0]))
    #X_test[k,:,:,1] = X_test[k,:,:,1]/np.amax(np.abs(X_test[k,:,:,1])) 

temp_ytest = temp_ytest.ravel()
#y_test = (temp_ytest - mu)/sigma_labels
y_test = temp_ytest


print(X_train.shape)
print(y_train.shape)
print(X_test.shape)
print(y_test.shape)

In [None]:
#Training

index=np.arange(X_train.shape[0])
np.random.shuffle(index)

X_train=X_train[index,:,:,:]
y_train=y_train[index]

drate = 0.25
n_node = 256
batch_size = 128
loss='mean_squared_error'

lr = 0.001
optimizer = keras.optimizers.Adam(lr)

filepath = "temp.h5"
checkpoint = keras.callbacks.ModelCheckpoint(filepath, monitor='val_loss', save_best_only=True, mode='auto',period=1)
reduce = keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=25, mode='auto')
early = keras.callbacks.EarlyStopping(monitor='val_loss', min_delta=1e-4, patience=40, mode='auto',restore_best_weights=True)
traininghistory = traininghist((X_train,y_train))
callbacks_list = [checkpoint,reduce,traininghistory,early]

infdb = model_new.fit(X_train, y_train, batch_size,verbose = True, epochs=1000, validation_split=0.2, shuffle=True,callbacks=callbacks_list)

#Testing

test_loss, test_acc = model_new.evaluate(X_test,y_test)
train_loss, train_acc = model_new.evaluate(X_train,y_train)
    
predictions = model_new.predict(X_test)
diff = abs((np.transpose(predictions))-(y_test))
error = diff[diff>1]
percent_correct = (len(y_test)-len(error))/len(y_test)
    
predictions_train = model_new.predict(X_train)
diff = abs((np.transpose(predictions_train))-(y_train))
error = diff[diff>1]
percent_correct_train = (len(y_train)-len(error))/len(y_train)
    
print('Training mape:', train_acc)
print('Test mape:', test_acc)
print('Training percent within 1km:',percent_correct_train*100)
print('Testing percent within 1km:', percent_correct*100)