In [None]:
#******************************************************************************************************************************************************#
"""
    @author Shaela Khan, Created 20th August,2022,  Last Updated 23rd August, Tuesday 2022.

#  song_recognizer2.ipynb  -> Using pre-trained models
#  Bird Recognition from birdsongs using Deep learning -> We are working on building a model using Deep learning techniques to identify birds from their songs.
#  DataSource : - https://www.kaggle.com/datasets/rtatman/british-birdsong-dataset  (Aggregated from the original Xeno-Canto Dataset.)
#  Provided dataset has a directory with - ./small Xeno-Canto/songs --> Original audio dataset.
#                                        -  ./small Xeno-Canto/birdsong_metadata.csv  --> Original Dataset with labels.
#                                        -  ./img_data-test --> audio converted image files for testing
                                         -   ./img_data --> audio files converted into image files --> contains spectrographs of audio data.
#                                        - traincsv.csv file with labels
#                                        - testcsv.csv - for testing model performance
#
#  We then create a CNN model with possible usage of pre-trained models, that can identify the difference classes defined - this is a supervised learning
#  problem.
#  Input: birdsong_metadata.csv + songs
#  Output : The file should have
#          : Classify image from testing provided.
"""
#*******************************************************************************************************************************************************#

In [None]:

# COMMON LIBRARIES
import os,sys,random,math
from random import seed, random, randint
from tqdm import tqdm
import pathlib
from pathlib import Path
# import import_ipynb,import nbimporter , import helper-functions


# FILE PROCESSING
import csv
import pandas as pd
import shutil
from shutil import move
from itertools import groupby

# Plotting AND Visuals.
import matplotlib.pyplot as plt
import numpy as np, shutil, itertools, pickle,seaborn as sn, math, time,scipy

# Stats
from scipy.spatial import distance
from sklearn.metrics import confusion_matrix
from sklearn.preprocessing import normalize
from mpl_toolkits.mplot3d import Axes3D
from scipy import spatial


# Image processing libraries
#!pip install Pillow
from PIL import Image


# EXTRA -
%matplotlib inline

# ML, statistics
import warnings
warnings.filterwarnings('ignore')
import scipy
from sklearn.model_selection import train_test_split
from sklearn import metrics
from sklearn.metrics import confusion_matrix, roc_curve, auc, roc_auc_score
from sklearn.decomposition import PCA
#from sklearn.preprocessing import OneHotEncoder


# TENSORFLOW LIBRARIES. BACK-END SUPPORT.
# Creating Model and model Evaluation
import tensorflow as tf
import tensorboard,tensorflow_estimator
from tensorflow import keras
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model, load_model
from tensorflow.keras import layers

from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.layers import Dense, Dropout, Flatten, Activation
from tensorflow.keras.layers import GlobalMaxPooling2D, GlobalAveragePooling1D, AveragePooling2D, Input, Add
from tensorflow.keras.optimizers import Adam,SGD,RMSprop
from tensorflow.keras.callbacks import EarlyStopping,ReduceLROnPlateau,ModelCheckpoint

import tensorflow.keras.utils
import tensorflow.keras.activations
import tensorflow.keras.applications
import tensorflow.keras.callbacks
import tensorflow.keras.initializers
from tensorflow.keras import losses,metrics


# Audio file processing
import librosa
import librosa.display
import soundfile as sf


# EXTRA EXTRA - because they don't work upstairs?
import pydub
from pydub import AudioSegment
import IPython
from IPython.display import Audio
warnings.filterwarnings('ignore')


# Start Code Here.
print("Hello World ! ")
print("**********  Sweet Caroline ba ba ba !!**********")
print("\n Tensorflow version: ", tf.__version__)

# Prints current Python version
print("Python  ", sys.version)
#print("\n Pillow " )

In [None]:
# Data preprocessing.****************************************************
# Read metadata file
metadata = pd.read_csv("./small Xeno-Canto/birdsong_metadata.csv") # metadata pandas object Holds all of the data from the csv file.
header = list(metadata.head())


# Get bird names
bird_name = metadata['english_cname'].values
u, f = np.unique(bird_name, return_counts=True)
print("Unique Bird Names  \n",u) # Dont touch this part - works
print("data structure.")
print(metadata.head())
print(metadata.info(verbose=True))
#print("\n", bird_name)

# helper function, to sort in alphabetical order.
#guest_birds = bird_name
#guest_birds.sort()
#print("Hoopla over nothing. ",guest_birds)


In [None]:
from tokenize import String
# ADDING labels now , from the csv files - pandas provides the df object.
from keras_preprocessing.image import ImageDataGenerator
tdf = pd.read_csv('./small Xeno-Canto/birdsong_metadata.csv')
#print(tdf.head())
#print(tdf.info(verbose=True))


# Cleanup tdf  tdf  ----> According to proper processing procedures.
tdf = tdf.drop(columns='who_provided_recording')
tdf = tdf.drop(columns='country')
tdf = tdf.drop(columns='latitude')
tdf = tdf.drop(columns='longitute')
tdf = tdf.drop(columns='license')
#print("After dropping the redundant columns.")  #print(tdf.head())
# Find a better way to drop multiple columns drop dataframe columns.


# For brevity and clarity in processing.
tdf['file_id'] = tdf['file_id'].apply(str)


# train_df = The one here is a manual hack. Cleaned up the metadata.csv manually.
train_df = pd.read_csv('./small Xeno-Canto/birdsong_metadata_modified.csv')
train_df.fillna(0, inplace=True)

def extension_train_data(x):
    return "xc"+str(x)+".png"

# CHANGING the File extensions from .wav to png --> If you remember the originals were audio files.
train_df['file_id'] = train_df['file_id'].apply(extension_train_data)
print("Checking training dataset dataframe : ......................")
print(train_df.head())

In [None]:
#Pre-processing test-dataset and creating a pandas dataframe for ease of IMPLEMENTATION.
print("\n Pre-processing test-dataset and creating a pandas dataframe for ease of IMPLEMENTATION. ")
sdir = "./img_data-test"
filepaths=[]
labels=[]
flist=os.listdir(sdir)

for f in flist:
    fpath=os.path.join(sdir,f)
    filepaths.append(fpath)
    index1=f.rfind('-')+1
    index2=f.rfind('.')
    klass=f[index1:index2]
    labels.append(klass)
Fseries=pd.Series(filepaths, name='filepaths')
Lseries=pd.Series(labels, name='labels')
test_df=pd.concat([Fseries, Lseries], axis=1)
#print("\n ", labels)
#print("\nChecking TEST dataset dataframe:  ......................")
#print(test_df.head())
print("\n-------------------------------------------------------------------")
#print ("train_df length:", len(train_df), ' test_df length: ', len(test_df))


In [None]:
# Creating data generators.
# rescale all pixel values from 0-255, so after this step all our pixel values are in range (0,1)
train_df['labels'] = train_df['english_cname'].astype(str)
test_df['labels'] = test_df['labels'].astype(str)
print("Create DATA generators for the model.\n")

datagen=ImageDataGenerator(rescale=1./255,validation_split=0.3) # Training and Validation split 70/30
test_datagen = ImageDataGenerator(rescale=1./255)


train_generator =datagen.flow_from_dataframe(dataframe=train_df,directory='./img_data/',
                                             x_col="file_id",
                                             y_col="english_cname",
                                             class_mode="categorical",
                                             target_size=(254,254), #target_size=(64,64)
                                             validation_split=0.2,
                                             subset="training",
                                             seed=1337,batch_size=32,shuffle=True)

print('Train generator created')
val_generator =datagen.flow_from_dataframe(dataframe=train_df,directory='./img_data/',
                                           x_col="file_id",
                                           y_col="english_cname",
                                           class_mode="categorical",
                                           target_size=(254,254), #target_size=(64,64)
                                           subset="validation",
                                           seed=42,batch_size=32,shuffle=True)

# We are creating a copy of the val-generator to use as TEST_SET. Reason why we are not changing the name is so that, the
# The other models and things in motion already don't break.
test_generator1 = val_generator
print('Validation generator created')
# test_set =test_datagen.flow_from_directory(test_df,target_size=(64, 64),
                                          # batch_size=32, class_mode='categorical', shuffle = False)

test_generator=test_datagen.flow_from_dataframe(test_df,
                                                x_col='filepaths',
                                                y_col='labels',
                                                target_size=(254,254), #target_size=(64,64)
                                                class_mode='categorical',
                                                color_mode='rgb',
                                                shuffle=False, batch_size=32)

# test_gen=tvgen.flow_from_dataframe(test_df, x_col='filepaths', y_col='labels',target_size=img_size,
                                  # class_mode='categorical', color_mode='rgb', shuffle=False, batch_size=2)

print('Test generator created')
print("\n Sanity check Line.----------")
#print("Printing labels of the validation dataset: \n",val_generator.labels)
#print("Printing label- names  of the validation dataset: \n",val_generator.class_indices.keys())
#print("\n These names are needed for report and Presentation.")


In [None]:
# Helper function- to be moved to helper-functions.ipynb
from datetime import datetime
def timer():
    # datetime object containing current date and time
    now = datetime.now()
    print("now =", now)
    # dd/mm/YY H:M:S
    dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
    print("date and time =", dt_string)

print("\n DONE.")

In [None]:
# Plot the ImageGenerated data
import cv2
train_files=train_generator.filenames
classes=list(train_generator.class_indices.keys())
class_count=len(classes)     # WE NEED THIS TO TRAIN OUR MODEL. IT COUNTS THE NUMBER OF CLASSES FROM which to classify.
labels=train_generator.labels
images, labels=next(train_generator)
## convert image to RGB
image_rgb = cv2.cvtColor(images, cv2.COLOR_BGR2RGB)
#image_rgb = cv2.cvtColor(images, cv2.COLOR_RGB2YCrCb)
plt.figure(figsize=(20, 12))

for i in range (len(image_rgb)):
    plt.subplot(4,4, i+1)
    index=np.argmax(labels[i])
    class_name=classes[index]
    plt.title(class_name, color='yellow', fontsize=18)
    plt.axis('off')
    plt.imshow(image_rgb[i]/255)
plt.tight_layout()
plt.show()


In [None]:
train_image_data, train_labels = train_generator.next()
test_image_data, test_labels = test_generator1.next()
#train_image_data[0]
print("\n ImageGenerator created data trainset:  ",train_image_data.shape)
print("\n ImageGenerator created data testset:  ",test_image_data.shape)

In [None]:

# Model IMPLEMENTATIOM
print("Creating simple CNN model \n")
model_simple = tf.keras.models.Sequential()
#model_tn = Sequential()

input_shape=(64, 64, 3)#1st hidden layer
model_simple.add(Conv2D(32, (3, 3), strides=(2, 2), input_shape=input_shape))
model_simple.add(AveragePooling2D((2, 2), strides=(2,2)))
model_simple.add(Activation('relu'))#2nd hidden layer
model_simple.add(Conv2D(64, (3, 3), padding="same"))
model_simple.add(AveragePooling2D((2, 2), strides=(2,2)))
model_simple.add(Activation('relu'))#3rd hidden layer
model_simple.add(Conv2D(64, (3, 3), padding="same"))
model_simple.add(AveragePooling2D((2, 2), strides=(2,2)))
model_simple.add(Activation('relu'))#Flatten
model_simple.add(Flatten())
model_simple.add(Dropout(rate=0.5))#Add fully connected layer.
model_simple.add(Dense(64))
model_simple.add(Activation('relu'))
model_simple.add(Dropout(rate=0.5))#Output layer
model_simple.add(Dense(88))   # NUMBER OF CLASSES TO PREDICT.
model_simple.add(Activation('softmax'))
model_simple.summary()




In [None]:

# MODEL Specifications SETUP.
""" Model specifications notes = these specs epochs= 200, batch_size=8 learning rate=0.01, momentum= 0.9 SGD--> accuracy was bad."""
EPOCHS = 5
BATCH_SIZE = 8
l1_learning_rate = 0.01
decay_rate = l1_learning_rate / EPOCHS
momentum = 0.9
sgd = SGD(lr=l1_learning_rate, momentum=momentum, decay=decay_rate, nesterov=False)
adam = Adam(learning_rate=l1_learning_rate)
model_simple.compile(optimizer="adam", loss="categorical_crossentropy", metrics=['accuracy'])
print("\n Done. ")

# Fit the training-set to the model and Train.
train_set = train_generator
validation_set = val_generator
test_set  = test_generator

history = model_simple.fit( train_set,validation_data=validation_set,epochs=EPOCHS,batch_size=BATCH_SIZE)
# model._reset_compile_cache()  #print(test_set)
#model_simple.fit_generator(train_set, steps_per_epoch=100,epochs=50,validation_data=validation_set,validation_steps=50)
print("\n Training Done. ")
print("\n Metrics: ",history.history)


In [None]:
#Model Evaluation
print("Evaluate on test data")
results = model_simple.evaluate(train_set,test_set, batch_size=128)#OUTPUT [1.704445120342617, 0.33798882681564246]
print("test loss, test acc:", results)

test_set.reset()
pred = model_simple.predict(test_set, steps=50, verbose=1)

In [None]:
# ANOTHER RESNET50 model--> Think I was doing it wrong?
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input

print("------------Second Resnet50 model------------------\n")
# Pre-process for resnet
from tensorflow.keras.preprocessing.image import ImageDataGenerator

batch_size = 30

def generators(shape, preprocessing):
    #Create the training and validation datasets for
    #a given image shape.
    imgdatagen = ImageDataGenerator(
        preprocessing_function = preprocessing,
        horizontal_flip = True,
        validation_split = 0.2,
    )
    height, width = shape

    restrain_generator =imgdatagen.flow_from_dataframe(dataframe=train_df,directory='./img_data/',
                                             x_col="file_id",
                                             y_col="english_cname",
                                             class_mode="categorical",
                                             target_size=(height,width), #target_size=(64,64)
                                             validation_split=0.2,
                                             subset="training",
                                             seed=1337,batch_size=30,shuffle=True)

    print('Train generator created')

    resval_generator =imgdatagen.flow_from_dataframe(dataframe=train_df,directory='./img_data/',
                                           x_col="file_id",
                                           y_col="english_cname",
                                           class_mode="categorical",
                                           target_size=(height,width), #target_size=(64,64)
                                           subset="validation",
                                           seed=42,batch_size=30,shuffle=True)

    print('Validation generator created')
    return restrain_generator, resval_generator


resnet50 = keras.applications.resnet50
restrain_dataset, resval_dataset = generators( (224,224), preprocessing=resnet50.preprocess_input)

restrain_image_data, restrain_labels = restrain_dataset.next()
restest_image_data, restest_labels = resval_dataset.next()
#train_image_data[0]
print(restrain_image_data.shape)
print(restest_image_data.shape)

# Model Creation
dropout_dense_layer = 0.3
conv_model = resnet50.ResNet50(weights='imagenet', include_top=False, input_shape=(224,224,3))
for layer in conv_model.layers:
    layer.trainable = False
x = tf.keras.layers.Flatten()(conv_model.output)
x = tf.keras.layers.Dense(100, activation='relu')(x)
x = tf.keras.layers.Dense(100, activation='relu')(x)
x = tf.keras.layers.Dense(100, activation='relu')(x)
x = tf.keras.layers.Dropout(dropout_dense_layer)(x)
predictions = tf.keras.layers.Dense(88, activation='softmax')(x)
full_model = tf.keras.models.Model(inputs=conv_model.input, outputs=predictions)
full_model.summary()


In [None]:
# Step : 2 Training and running the model.  RESNET50 ---> The right way??
rcallback = [ModelCheckpoint(filepath='./models/bestie.hdf5', monitor='val_loss', save_best_only=True, mode='auto')]

full_model.compile(loss='categorical_crossentropy',
                  optimizer=keras.optimizers.Adamax(lr=0.001),
                  metrics=['acc'])


history__ = full_model.fit(restrain_dataset,
                           validation_data = resval_dataset,
                           callbacks=rcallback,
                           epochs=1000)
timer()
full_model.save_weights('resnet50.h5')

In [None]:
#
print("----------------------------------------------- \n")
def plot_history(history, yrange):
    #Plot loss and accuracy as a function of the epoch,
    #for the training and validation datasets.
    #
    accr = history.history['acc']
    val_acc = history.history['val_acc']
    lossr = history.history['loss']
    val_loss = history.history['val_loss']

    # Get number of epochs
    epochs = range(len(accr))

    # Plot training and validation accuracy per epoch
    plt.plot(epochs, accr)
    plt.plot(epochs, val_acc)
    plt.title('Training and validation accuracy')
    plt.ylim(yrange)

    # Plot training and validation loss per epoch
    plt.figure()
    plt.plot(epochs, lossr)
    plt.plot(epochs, val_loss)
    plt.title('Training and validation loss')

    plt.show()


plot_history(history__, yrange=(0.9,1))
#history=model.fit(x=train_generator, epochs=150,verbose=1,validation_data=val_generator,validation_steps=None,batch_size=32,shuffle=False,callbacks=[cb_checkpointer],initial_epoch=0)


In [None]:
# Prediction and plotting results.
print("----------------------------------------------- \n")

#Declare variables required for prediction case.
classes_to_predict = sorted(train_df.english_cname.unique())


# Create dataframe of the predictions --taking in fraction of the data slice.
preds = full_model.predict(resval_dataset)
validation_df = pd.DataFrame(columns=["prediction", "groundtruth", "correct_prediction"])


for pred, groundtruth in zip(preds[:20], resval_dataset.__getitem__(0)[1]):
    validation_df = validation_df.append({"prediction":classes_to_predict[np.argmax(pred)],
                                       "groundtruth":classes_to_predict[np.argmax(groundtruth)],
                                       "correct_prediction":np.argmax(pred)==np.argmax(groundtruth)}, ignore_index=True)

print("Predictions: \n")
print(validation_df)


# Evaluations Metric
#print("Evaluation metric:... \n")
#model.load_weights("./models/best.hdf5")
#trainloss, train_acc = model.evaluate(train_generator,verbose=1)
#loss, acc = model.evaluate(val_generator,verbose=1)
#print ("-------Accuracy on test set is: ", acc* 100, "%","Loss: ",loss,"-----------")
#print ("-------Accuracy on train set is: ", train_acc* 100, "%","Loss: ",trainloss*100,"-----------")
  # from keras.models import load_model , model = load_model('./vgg16_1.h5')



In [None]:
# CLASSIFICATION REPORT----------------------------------------
from sklearn.metrics import confusion_matrix,classification_report,accuracy_score

print('Classification Report')
predicted_class_idx=classes_to_predict[np.argmax(pred)]

#target_names = classes_to_predict
#print(classification_report(train_generator.classes, predicted_class_idx, target_names=target_names))



# Confusion Matrix.
#classes_to_predict = np.array(classes_to_predict)
#predicted_class_idx = np.array(predicted_class_idx)
#cm = confusion_matrix(classes_to_predict, predicted_class_idx)
#print('Confusion Matrix')
#print(cm)

print("Show me this is done.")

In [None]:


timer()
print("***********************************  End of Processing. !!************************************************")