In [2]:
import pywt
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict, Counter
import keras
from keras.layers import Dense, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.models import Sequential
from keras.callbacks import History 
from sklearn.model_selection import train_test_split
history = History()

Using TensorFlow backend.


## 加载数据文件

In [3]:
typeDescription = {
    1: 'normal',
    2: 'crack',
    3: 'manhole',
    4: 'hole',
    5: 'patch',
    6: 'bump',
    7: 'junction',
    8: 'rough',
    9: 'bridge'
}


def readFile(filename):
    return np.loadtxt(filename)

def randomize(dataset, labels):
    permutation = np.random.permutation(labels.shape[0])
    shuffled_dataset = dataset[permutation, :, :]
    shuffled_labels = labels[permutation]
    return shuffled_dataset, shuffled_labels

In [8]:
folderPath ='../data/7-12/camera/April/datasets/'
dataFile = ['dataX.txt', 'dataY.txt', 'dataZ.txt']
labelFile = ['labelSeverity.txt', 'labelType.txt','isAnomaly.txt','anomalyPercent.txt']

signals = []
for file in dataFile:
    dataPath = folderPath+file
    signals.append(np.loadtxt(dataPath))
signals = np.transpose(np.array(signals), (1, 2, 0))
print('数据集：',signals.shape)

labelTypePath = folderPath+labelFile[1]
isAnomalyPath = folderPath+labelFile[2]
labelType = np.loadtxt(labelTypePath)
isAnomaly = np.loadtxt(isAnomalyPath)

dic = {}
temp = Counter(labelType)
for key in temp.keys():
    dic[typeDescription[key]] = temp[key]
print(dic)

数据集： (5717, 128, 3)
{'normal': 4994, 'crack': 40, 'manhole': 100, 'hole': 102, 'patch': 142, 'bridge': 5, 'bump': 15, 'junction': 31, 'rough': 288}


In [9]:
shuffled_signals, shuffled_labelType  = randomize(signals, labelType)

## Applying CWT and extracting features

In [10]:
scales = range(1,128)
waveletname = 'morl'
num =shuffled_signals.shape[0]
data_cwt = np.ndarray(shape=(num, 127, 127, 3),dtype='float32')

for ii in range(0,num):
    if ii % 1000 == 0:
        print(ii)
    for jj in range(0,3):
        signal = shuffled_signals[ii, :, jj]
        coeff, freq = pywt.cwt(signal, scales, waveletname, 1)
        #print(coeff.shape)
        coeff_ = coeff[:,:127] #coeff.shape = (127,128), coeff_.shape = (127,127)
        #print(coeff_.shape)
        data_cwt[ii, :, :, jj] = coeff_


0
1000
2000
3000
4000
5000


## Training a CNN 

In [13]:
x_train,x_test,y_train,y_test = train_test_split(data_cwt,shuffled_labelType,test_size=0.28,random_state=0)
print(x_train.shape)
print(x_test.shape)

img_x = 127
img_y = 127
img_z = 3
num_classes = 10 #类别

batch_size = 16
epochs = 10

# reshape the data into a 4D tensor - (sample_number, x_img_size, y_img_size, num_channels)
# because the MNIST is greyscale, we only have a single channel - RGB colour images would have 3
input_shape = (img_x, img_y, img_z)

# convert the data to the right type
x_train = x_train.reshape(x_train.shape[0], img_x, img_y, img_z)
x_test = x_test.reshape(x_test.shape[0], img_x, img_y, img_z)
x_train = x_train.astype('float32')
x_test = x_test.astype('float32')

print('x_train shape:', x_train.shape)
print(x_train.shape[0], 'train samples')
print(x_test.shape[0], 'test samples')

# convert class vectors to binary class matrices - this is for use in the
# categorical_crossentropy loss below
y_train = keras.utils.to_categorical(y_train, num_classes)
y_test = keras.utils.to_categorical(y_test, num_classes)


(4116, 127, 127, 3)
(1601, 127, 127, 3)


"\nimg_x = 127\nimg_y = 127\nimg_z = 3\nnum_classes = 10 #类别\n\nbatch_size = 16\nepochs = 10\n\n# reshape the data into a 4D tensor - (sample_number, x_img_size, y_img_size, num_channels)\n# because the MNIST is greyscale, we only have a single channel - RGB colour images would have 3\ninput_shape = (img_x, img_y, img_z)\n\n# convert the data to the right type\nx_train = x_train.reshape(x_train.shape[0], img_x, img_y, img_z)\nx_test = x_test.reshape(x_test.shape[0], img_x, img_y, img_z)\nx_train = x_train.astype('float32')\nx_test = x_test.astype('float32')\n\nprint('x_train shape:', x_train.shape)\nprint(x_train.shape[0], 'train samples')\nprint(x_test.shape[0], 'test samples')\n\n# convert class vectors to binary class matrices - this is for use in the\n# categorical_crossentropy loss below\ny_train = keras.utils.to_categorical(y_train, num_classes)\ny_test = keras.utils.to_categorical(y_test, num_classes)\n"

In [None]:
model = Sequential()
model.add(Conv2D(32, kernel_size=(5, 5), strides=(1, 1), activation='relu', input_shape=input_shape)) 
model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
model.add(Conv2D(64, (5, 5), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Flatten())
model.add(Dense(1000, activation='relu'))
model.add(Dense(num_classes, activation='softmax'))

model.compile(loss=keras.losses.categorical_crossentropy, 
              optimizer=keras.optimizers.Adam(), 
              metrics=['accuracy'])

model.fit(x_train, y_train, batch_size=batch_size, 
          epochs=epochs, verbose=1, 
          validation_data=(x_test, y_test), 
          callbacks=[history])

train_score = model.evaluate(x_train, y_train, verbose=0)
print('Train loss: {}, Train accuracy: {}'.format(train_score[0], train_score[1]))
test_score = model.evaluate(x_test, y_test, verbose=0)
print('Test loss: {}, Test accuracy: {}'.format(test_score[0], test_score[1]))

Instructions for updating:
If using Keras pass *_constraint arguments to layers.


Train on 4116 samples, validate on 1601 samples
Epoch 1/10


In [None]:
class FocalLoss(nn.Module):
    def __init__(self):
        super(FocalLoss, self).__init__()
    def forward(self, predictions, target, cuda_device, alpha=0.25, gamma=2):
        zeros = torch.zeros(predictions.shape, dtype=predictions.dtype)
        if cuda_device != -1:
            zeros = zeros.cuda()
        pos_corr = torch.where(target > zeros, target - predictions, zeros)
        neg_corr = torch.where(target > zeros, zeros, predictions)
        fl_loss = - alpha * (pos_corr ** gamma) * torch.log(torch.clamp(predictions, 1e-8, 1.0)) \
                    - (1 - alpha) * (neg_corr ** gamma) * torch.log(torch.clamp(1.0 - predictions, 1e-8, 1.0))
        return torch.sum(fl_loss)

In [None]:
fig, axarr = plt.subplots(figsize=(12,6), ncols=2)
axarr[0].plot(range(1, 11), history.history['acc'], label='train score')
axarr[0].plot(range(1, 11), history.history['val_acc'], label='test score')
axarr[0].set_xlabel('Number of Epochs', fontsize=18)
axarr[0].set_ylabel('Accuracy', fontsize=18)
axarr[0].set_ylim([0,1])
axarr[1].plot(range(1, 11), history.history['acc'], label='train score')
axarr[1].plot(range(1, 11), history.history['val_acc'], label='test score')
axarr[1].set_xlabel('Number of Epochs', fontsize=18)
axarr[1].set_ylabel('Accuracy', fontsize=18)
axarr[1].set_ylim([0.9,1])
plt.legend()
plt.show()