<a href="https://colab.research.google.com/github/mustafabozkaya/DeepLearningBootcamp2022/blob/master/classsification_urbansounds_with_cnn_kaggle.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

[<img align="Left" width="100" height="100" src="https://thumbs.dreamstime.com/b/mb-initial-letter-vector-logo-icon-mb-initial-letter-vector-logo-icon-204517753.jpg">](https://github.com/mustafabozkaya)

# Spectogram Recognition with CNN

---
[<img align="Left" width="800" height="300" src="https://www.researchgate.net/publication/319081627/figure/fig1/AS:534034566004736@1504335170521/Spectrogram-of-a-speech-signal-with-breath-sound-marked-as-Breath-whose-bounds-are.png">](#)



In [None]:
!pwd


/kaggle/working


# Downloading and Extracting the Dataset

In [None]:
!wget https://zenodo.org/record/1203745/files/UrbanSound8K.tar.gz

In [None]:
!ls

__notebook_source__.ipynb


In [None]:
!tar -xzf UrbanSound8K.tar.gz #extract tar file

In [None]:
!ls -la UrbanSound8K/

ls: cannot access 'UrbanSound8K/': No such file or directory


In [None]:
!cat UrbanSound8K/UrbanSound8K_README.txt

cat: UrbanSound8K/UrbanSound8K_README.txt: No such file or directory


In [None]:
import os
for root ,dir ,file in os.walk("../working/UrbanSound8K/"):
    print(root)
    print(dir)
    print(len(file))

# Creating Spectrograms

In [None]:
df = pd.read_csv("UrbanSound8K/metadata/UrbanSound8K.csv")
df.head()

In [None]:
df.info()

In [None]:
df.describe()

In [None]:
# load the metadata file and read it into a pandas dataframe
def create_spectrogram(y): # y is the audio time series
    spec = librosa.feature.melspectrogram(y=y) # melspectrogram is a visual representation of the short-term power spectrum of a sound,retrieved from the audio time series
    specdb = librosa.power_to_db(spec, ref=np.max) # power_to_db is used to convert a spectrogram from power (amplitude squared) units to decibel (dB) units , ref is used to set the reference power
    spec_conv = librosa.amplitude_to_db(spec, ref=np.max) # amplitude_to_db is used to convert a spectrogram from amplitude units to decibel (dB) units , ref is used to set the reference power
    
    return spec_conv


def save_spectrogram(spect_paths,spectrogram, file_name, classid):
    if str(classid) not in os.listdir(spect_paths):
        os.mkdir(f"{spect_paths}/{classid}")

    save_name = file_name.split(".")[0]
    
    plt.figure()
    librosa.display.specshow(spectrogram) # specshow is used to display a spectrogram
    plt.savefig(f"{spect_paths}/{classid}/{save_name}.png", bbox_inches="tight", pad_inches=0) # savefig is used to save the current figure,bbox_inches is used to trim the figure to the given bounding box in inches, pad_inches is used to specify the extra padding around the figure when bbox_inches is used
    plt.close()
# listen to a random sound from the dataset
def listen_to_random_sound(data,sr):
  
    IPython.display.display(IPython.display.Audio(data, rate=sr)) # display is used to display the audio file

In [None]:
!cd ..

In [None]:
!ls

In [None]:
path_to_folds = "working/UrbanSound8K/audio"
output_path="./working"

if "spectrograms" not in os.listdir(output_path):

    os.mkdir(f"{output_path}/spectrograms")
number_of_files = df.shape[0] # get the rows
number_of_processed = 0
number_of_errors = 0


with open(f"{output_path}/errors.txt", "w") as error_file:
    for index, row in df.iterrows(): # iterrows is used to iterate over rows of a dataframe,retrieving the index and the row
        try:
            file_name = row["slice_file_name"]
            fold = row["fold"]
            classid = row["classID"]
            path_to_file = f"{path_to_folds}/fold{fold}/{file_name}"
                        
            data, sr = librosa.load(path_to_file)# load is used to load audio files with librosa,retrieving the time series and the sampling rate
            # type(data) is numpy.ndarray
            # type(sr) is int
            #shape of data is (n,) where n is the number of samples
            # sr is the sampling rate of the audio file
            #print(f" \r  data type is {type(data)} and sampling rate is {type(sr)} - shape of data is {data.shape} and sampling rate is {sr}",end="")
            #listen_to_random_sound(data,sr)
            
            spectrogram = create_spectrogram(data)
            spects_path=f"{output_path}/spectrograms"
            save_spectrogram(spects_path,spectrogram, file_name, classid)
            
            del data
            del sr
            del spectrogram
            
        except Exception as e:
            number_of_errors += 1
            error_file.write(f"{number_of_errors}: {e}\n")
        
        finally:
            number_of_processed += 1
        
        print(f"\rNumber: {number_of_processed}/{number_of_files} | Errors: {number_of_errors}", end="")
        
        #break

# Load and  Save İmage datasets 

In [None]:
# create numpy arrays for the training images and labels
def create_training_data(datasetfolder_path,resize=None,grayscale=False,normalize=False):

    training_data = []
    number_of_errors = 0
    number_of_processed = 0

    for classid in os.listdir(datasetfolder_path):
        path_to_class = f"{datasetfolder_path}/{classid}"
        
        for img in os.listdir(path_to_class):
            try:
                if grayscale:
                    img_array = cv.imread(os.path.join(
                        path_to_class, img), cv.IMREAD_GRAYSCALE)
                # imread is used to read an image from the specified file,IMREAD_GRAYSCALE is used to load an image in grayscale mode,retrieving the image as a numpy array

                else:
                    img_array = cv.imread(f"{path_to_class}/{img}")

                if resize is not None:
                    img_array = cv.resize(img_array, resize)
                    # resize is used to resize an image,retrieving the resized image as a numpy array
                
                if normalize:
                    img_array = img_array/255.0
                    # normalize the image array

                training_data=np.append([img_array, classid])
                number_of_processed += 1

            except:
                number_of_errors += 1

        print(f"\rProcessed: {number_of_processed} | Erros: {number_of_errors} | training data shape : {np.ndarray(training_data).shape} ", end="")
    return training_data

In [None]:
img_datasets=create_training_data(datasetfolder_path=f"{output_path}/spectrograms")
len(img_datasets)

In [None]:
for image ,id in images:
  print(f"\r class id :{id}  image :{image.shape}", end="")

In [None]:
# write the csv file for the training data
def write_csv(training_data, csv_name):
    df = pd.DataFrame(training_data, columns=["image", "classid"])
    df.to_csv(csv_name, index=False)
    print("CSV file created")

    return df

In [None]:
def create_pickle(path,data):
    if "pickle_img_dataset" not in os.listdir(path):
    os.mkdir(f"{path}/pickle_img_dataset")

    with open((f"{path}/pickle_img_dataset/allimg8732.pickle", "wb") as f:
      pickle.dump(data, f)

In [None]:
!ls

In [None]:
dataset_dir=f"{output_path}/spectrograms/"


In [None]:
data_dir = pathlib.Path(dataset_dir)
image_count = len(list(data_dir.glob('*/*.png')))
print(image_count)

In [None]:
spec = list(data_dir.glob('1/*'))
PIL.Image.open(str(spec[6]))

In [None]:
for root, dirs, files in os.walk(dataset_dir, topdown=True):
    print("="*50)
    print(root)
    print(f"root type :{type(files)}")
    print(f"root lenth :{len(files)}")
    print(files)
    print(f"files type :{type(files)}")
    print(f"files lenth :{len(files)}")
    print(dirs)
    print(f"DİR type :{type(dirs)}")
    print(f"DİR lenth :{len(dirs)}")
    print("="*50)
    break
    

In [None]:
for root,dirs, files in os.walk(dataset_dir):
    for file in (files):
        print(os.path.join(root, file))
        break
    

# Data Exploration

In [None]:

class_name="1" # set to class name

train_files = glob.glob(str(os.path.join(dataset_dir,str(class_name)))+'/*')


print(*train_files)
print(f"1 spectogram img lenth :{len(train_files)}")
train_imgs = [img_to_array(load_img(img)) for img in train_files]
train_imgs = np.array(train_imgs)
train_labels = np.array([fn.split('/')[5].split(".")[0].strip() for fn in train_files]) # target , y label

print('Train dataset shape:', train_imgs.shape)
print('Train labels shape:', train_labels.shape)

In [None]:
# create image datasets 

In [None]:
image_size=(374,500)
batch_size=32 # paketler

#Setting train/test split
train_ds = tf.keras.preprocessing.image_dataset_from_directory(
    
    directory=dataset_dir,
    labels="inferred",
    validation_split=0.3,
    subset="training",
    seed=42,
    image_size=image_size,
    batch_size=batch_size,
)

valid_ds = tf.keras.preprocessing.image_dataset_from_directory(
    directory=dataset_dir,
    labels="inferred",
    validation_split=0.3,
    subset="validation",
    seed=1007,
    image_size=image_size,
    batch_size=batch_size,
)



In [None]:
print(f"{train_ds.class_names}")

In [None]:
# print how many images are in the training and validation datasets
print("Number of training images: ", len(train_ds))
print("Number of validation images: ", len(valid_ds))

In [None]:
# split valid dataset to test and valid
test_ds = valid_ds.take(30) # get 100 data from valid dataset
valid_ds = valid_ds.skip(30) # skip first 100

In [None]:
# how many bachs are in the training and validation datasets
print("Number of training batches: ", len(train_ds))
print("Number of validation batches: ", len(valid_ds))
print("Number of test  batches: ", len(test_ds))

In [None]:
dataset_type=[train_ds,valid_ds,test_ds]

In [None]:
for ds in dataset_type:
  print(type(ds))

In [None]:
#Checking images and labels shapes (amount of images, height, width, color channels)
for image_batch, labels_batch in test_ds.take(2):
  print(image_batch.shape)
  print(labels_batch.shape)
  break

In [None]:
#Checking images and labels shapes (amount of images, height, width, color channels)
for image_batch, labels_batch in valid_ds:
  print(image_batch.shape)
  print(labels_batch.shape)
  break

# Data Visualizaiton

In [None]:
#Displaying image samples 
plt.figure(figsize=(18, 18))
for images, labels in train_ds.take(1):
    for i in range(batch_size):
        ax = plt.subplot(4,8,i+1)
        plt.imshow(images[i].numpy().astype("uint8"))
        plt.title(int(labels[i]))
        plt.axis("off")

In [None]:
#Displaying image samples 
plt.figure(figsize=(18, 18))
for images, labels in valid_ds.take(1):
    for i in range(batch_size):
        ax = plt.subplot(4,8,i+1)
        plt.imshow(images[i].numpy().astype("uint8"))
        plt.title(int(labels[i]))
        plt.axis("off")

In [None]:
#Displaying image samples 
plt.figure(figsize=(18, 18))
for images, labels in test_ds.take(1):
    for i in range(batch_size):
        ax = plt.subplot(4,8,i+1)
        #print(f"type image {images[i]}")
        plt.imshow(images[i].numpy().astype("uint8"))
        plt.title(int(labels[i]))
        plt.axis("off")

# Data Preprocessing

Before we can build our model and start training, we need to apply one simple augmentation the dataset and that is rescaling. We rescale an input in the (0, 255) range to be in the (0,1) range.

In [None]:


# convert rgb img to gray scale img
def convert_to_gray_scale(img):
   
    img = tf.image.rgb_to_grayscale(img)
   
    print(f"\r convert gray :  image shape : {img.shape} | img type: {type(img)} ", end="")
    return img



In [None]:
# Function to prepare our datasets for modelling
def prepare(ds, augment=False, batch_size=None, resized_size=None,grayscale=True):
    # Define normalisation function
    normalization_layer = tf.keras.layers.Rescaling(1./255)
    # gray_scale transfpormayion 
    resize=tf.keras.Sequential(
        [tf.keras.layers.experimental.preprocessing.Resizing(
            resized_size[0],
            resized_size[1],
            crop_to_aspect_ratio=True,
        )]
    )
     #define data augmentation function
    flip_and_rotate = tf.keras.Sequential([
        tf.keras.layers.experimental.preprocessing.RandomFlip(
            "horizontal_and_vertical"),
        tf.keras.layers.experimental.preprocessing.RandomRotation(0.2)
    ])

    if grayscale:
        ds = ds.map(lambda x, y: (convert_to_gray_scale(x), y))
    if resized_size:
        ds = ds.map(lambda x, y: (resize(x, training=True), y))

  
    ds = ds.map(lambda x, y: (normalization_layer(x), y))
    image_batch, labels_batch = next(iter(ds))
    print(f"batch size :{image_batch.shape}")
    print(f"label size :{labels_batch.shape}")
    first_image = image_batch[0]
    # Notice the pixel values are now in `[0,1]`.
    print(np.min(first_image), np.max(first_image))

    if augment:
        ds = ds.map(lambda x, y: (flip_and_rotate(x, training=True), y))

        
    
    
    return ds

image_size= (187,250)
train_dataset = prepare(train_ds, augment=False, batch_size=batch_size, resized_size=image_size)
valid_dataset = prepare(test_ds, augment=False, batch_size=batch_size, resized_size=image_size)
test_dataset = prepare(valid_ds, augment=False, batch_size=batch_size, resized_size=image_size)



In [None]:
for image_batch, labels_batch in valid_dataset:
  print(image_batch.shape)
  print(labels_batch.shape)
  break

In [None]:
#Displaying image samples 
plt.figure(figsize=(18, 12))
for images, labels in train_dataset.take(1):
    for i in range(8):
        ax = plt.subplot(2,4,i+1)
        #print(f"type image {images[i]}")
        plt.imshow(images[i].numpy().squeeze(axis=2),cmap="gray")
        plt.title(int(labels[i]))
        plt.axis("off")

# Modelling

In [None]:
#Checking if the data format i.e the RGB channel is coming first or last so, whatever it may be, model will check first and then input shape will be feeded accordingly.
from keras import backend as K
image_size= (187,250)
img_height=image_size[0]
img_width=image_size[1]
if K.image_data_format() == "channels_first":
    input_shape = (1, img_height, img_width)
else:
    input_shape = (img_height, img_width, 1)


In [None]:
def conv_net(select_model=1): 
  model = tf.keras.Sequential()
  if select_model==1:

    model.add(tf.keras.layers.Conv2D(96, kernel_size = (3,3), input_shape = input_shape, strides = (2,2), activation = 'relu'))

    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.MaxPooling2D(pool_size = (3,3),strides = (2,2))) 
    model.add(tf.keras.layers.Conv2D(filters=256, kernel_size=(5,5), strides=(1,1) ,activation='relu', padding="same")) 
    model.add(tf.keras.layers.BatchNormalization()) 
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(3,3), strides=(2,2))) 
    model.add(tf.keras.layers.Conv2D(filters=384, kernel_size=(3,3), strides=(1,1), activation='relu', padding="same")) 
    model.add(tf.keras.layers.BatchNormalization()) 
    model.add(tf.keras.layers.Conv2D(filters=384, kernel_size=(3,3), strides=(1,1), activation='relu', padding="same")) 
    model.add(tf.keras.layers.BatchNormalization()) 
    model.add(tf.keras.layers.Conv2D(filters=256, kernel_size=(3,3), strides=(1,1), activation='relu', padding="same")) 
    model.add(tf.keras.layers.BatchNormalization()) 
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(3,3), strides=(2,2))) 

    model.add(tf.keras.layers.Flatten()) 

    model.add(tf.keras.layers.Dense(4096, activation='relu')) 
    model.add(tf.keras.layers.Dropout(0.5)) 
    model.add(tf.keras.layers.Dense(4096, activation='relu')) 
    model.add(tf.keras.layers.Dropout(0.5)) 
    model.add(tf.keras.layers.Dense(2048, activation='relu'))
    model.add(tf.keras.layers.Dropout(0.5))
    model.add(tf.keras.layers.Dense(1024, activation='relu'))
    model.add(tf.keras.layers.Dropout(0.5))
    model.add(tf.keras.layers.Dense(512, activation='relu'))
    model.add(tf.keras.layers.Dropout(0.5))
    model.add(tf.keras.layers.Dense(256, activation='relu'))
    model.add(tf.keras.layers.Dropout(0.5))
    model.add(tf.keras.layers.Dense(128, activation='relu'))
    model.add(tf.keras.layers.Dropout(0.5))


    model.add(tf.keras.layers.Dense(10, activation='softmax')) 

  elif select_model==2:
    # Create CNN model
  
    model.add(tf.keras.layers.Input(shape=input_shape))

    model.add(tf.keras.layers.Conv2D(
        32, 3, strides=2, padding='same', activation='relu'))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.Conv2D(128, 3, padding='same', activation='relu'))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(tf.keras.layers.BatchNormalization())

    model.add(tf.keras.layers.Flatten())

    model.add(tf.keras.layers.Dense(256, activation='relu'))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.Dropout(0.5))

    model.add(tf.keras.layers.Dense(10, activation='softmax'))
  elif select_model==3:
    model.add(tf.keras.layers.Input(shape=input_shape))
    model.add(tf.keras.layers.Conv2D(32, 3, strides=2, padding='same', activation='relu'))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.Conv2D(128, 3, padding='same', activation='relu'))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2)))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(256, activation='relu'))
    model.add(tf.keras.layers.BatchNormalization())
    model.add(tf.keras.layers.Dropout(0.5))
    model.add(tf.keras.layers.Dense(10, activation='softmax'))

  else:

    model.add(tf.keras.layers.experimental.preprocessing.Rescaling(1./255, input_shape=(input_shape)))

    model.add(tf.keras.layers.Conv2D(16, 3, padding="same", activation="relu"))
    model.add(tf.keras.layers.MaxPooling2D())
    model.add(tf.keras.layers.Conv2D(32, 3, padding="same", activation="relu"))
    model.add(tf.keras.layers.MaxPooling2D())
    model.add(tf.keras.layers.Conv2D(64, 3, padding="same", activation="relu"))
    model.add(tf.keras.layers.MaxPooling2D())
    model.add(tf.keras.layers.Dropout(0.5))
  
    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(128, activation="relu"))

    model.add(tf.keras.layers.Dense(10, activation='softmax'))
    
  return model

In [None]:
model_dl=conv_net(select_model=3)

# Model Visualization

In [None]:
model_dl.summary()


In [None]:


# Compile model
model_dl.compile(
    loss='sparse_categorical_crossentropy',
    optimizer=tf.keras.optimizers.RMSprop(),
    metrics=['accuracy'],
)

# Train model for 10 epochs, capture the history
history = model_dl.fit(train_dataset,
          batch_size=batch_size,
          verbose=1,
          epochs=50,
          validation_data=valid_dataset)

### 

In [None]:
# model_dl.fit(x_train, y_train,
#           batch_size=batch_size,
#           epochs=epochs,
#           verbose=1,
#           validation_data=(x_test, y_test))

In [None]:
# score = model_dl.evaluate(x_test, y_test, verbose=0)
# score = model_dl.evaluate(test_dataset, verbose=0)
# print(f"Loss: {score[0]} - Accuracy: {score[1]}")


# Model Saving

In [None]:
model_test = model_dl.save('save_models/spectogram_model.h5')

# Model Evaluation

We can compute the final loss and accuracy score on our valid dataset using the evaluate() function.

In [None]:
# Compute the final loss and accuracy
final_loss, final_acc = model_dl.evaluate(valid_dataset, verbose=0)
print("Final loss: {0:.6f}, final accuracy: {1:.6f}".format(
    final_loss, final_acc))


In [None]:
# Plot the loss curves for training and validation.
history_dict = history.history
loss_values = history_dict['loss']
val_loss_values = history_dict['val_loss']
epochs = range(1, len(loss_values)+1)

plt.figure(figsize=(18, 12))
plt.plot(epochs, loss_values, 'y', label='Training loss')
plt.plot(epochs, val_loss_values, 'b', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()


In [None]:
# Plot the accuracy curves for training and validation.
acc_values = history_dict['accuracy']
val_acc_values = history_dict['val_accuracy']
epochs = range(1, len(acc_values)+1)

plt.figure(figsize=(18, 12))
plt.plot(epochs, acc_values, 'y', label='Training accuracy')
plt.plot(epochs, val_acc_values, 'b', label='Validation accuracy')
plt.title('Training and validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()


In [None]:
next(iter(test_dataset))[1].numpy()[8]

In [None]:
testimg=next(iter(test_dataset))[0].numpy()[8]

In [None]:
testimg.shape

In [None]:
testimg=np.expand_dims(testimg,axis=0)

In [None]:
pred=model_dl.predict(testimg)

In [None]:
pred

In [None]:
np.argmax(pred)