In [4]:
import os
import glob
import numpy as np
import scipy.io
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from keras.layers import Dense
from keras.callbacks import ModelCheckpoint, EarlyStopping
from keras.layers import Dropout
from keras import backend as K
from sklearn import svm
from sklearn.preprocessing import StandardScaler
from tensorflow.keras.models import Sequential
import tensorflow_addons as tfa
import pandas as pd
import matplotlib.pyplot as plt

adam = tf.keras.optimizers.Adam(learning_rate=0.0005)

In [5]:
# pip install -r requirements.txt

In [6]:
def dataloader(path, featType):
    data = scipy.io.loadmat(path)
    print(data.keys())

    AF = data['AF']
    x1 = AF[:-2]
    y = AF[-2]
    w = AF[-1]

    if featType == 1:
        x = x1
    else:
        x2 = data['CF']
        x = np.concatenate((x1, x2), axis=0)
    return x.T, y.T, w.T, data['CF_info']

def calculate_accuracy(arr1, arr2):
    count = sum(1 for itr1, itr2 in zip(arr1, arr2) if itr1 == itr2)
    return count / len(arr1)

def normalization(feats):
  df = pd.DataFrame(feats)
  scaler = StandardScaler()
  x_new = pd.DataFrame(scaler.fit_transform(df), columns=df.columns)
  return x_new

#Flag

In [7]:
def make_partitions(arr_words, arr_labels):
    v = []
    temp = []

    for i in range(len(arr_words) - 1):
        word = arr_words[i]
        next_word = arr_words[i + 1]
        temp.append(arr_labels[i])

        if word != next_word or i == len(arr_words) - 2:
            if i == len(arr_words) - 2:
                temp.append(arr_labels[i + 1])

            numpy_temp = np.array(temp)
            temp_max = np.amax(numpy_temp)
            numpy_temp = np.divide(numpy_temp, temp_max)
            v = np.concatenate((v, numpy_temp), axis=None)
            temp.clear()

    v1 = [1 if i == 1 else 0 for i in v]
    return v1

In [8]:
fatyp = 'TypicalFA_comb1'
drivepath = 'finalData/'+ fatyp +'/';
#featFiles = 'GER_train_fisher-2000_FA_GT_ESTphnTrans_estStress'               #glob.glob(drivepath + '*train*')
filee = drivepath+'GER_train_fisher-2000_FA_GT_ESTphnTrans_estStress.mat'
featType = 1; #Acoustic or Acoustic+context
if featType == 1:
  original_dim = 19
else:
  original_dim = 38

In [9]:
train_path = filee; test_path = filee.replace('train','test')
# print('test file:::::::',os.path.basename(test_path))
xtrain, ytrain, wtrain, info_train = dataloader(train_path, featType); 
xtrain1, ytrain1, wtrain1, info_train1 = dataloader(train_path, featType=2);

xtest, ytest, wtest ,info_test = dataloader(test_path, featType)
xtest1, ytest1, wtest1, info_test1 = dataloader(test_path, featType=2);

dict_keys(['__header__', '__version__', '__globals__', 'AF', 'AF_info', 'CF', 'CF_info'])
dict_keys(['__header__', '__version__', '__globals__', 'AF', 'AF_info', 'CF', 'CF_info'])
dict_keys(['__header__', '__version__', '__globals__', 'AF', 'AF_info', 'CF', 'CF_info'])
dict_keys(['__header__', '__version__', '__globals__', 'AF', 'AF_info', 'CF', 'CF_info'])


In [10]:
xtest_a = normalization(xtest)
xtest_ac = normalization(xtest1)

xtrain = normalization(xtrain)
xtrain1 = normalization(xtrain1)

In [11]:
print(xtrain.shape)
woPP=[]; wPP=[]

input_shape1 = (19,)
input_shape2 = (38,)

temperature = 0.03
learning_rate=0.001

(5891, 19)


In [12]:
class SupervisedContrastiveLoss(keras.losses.Loss):
    def __init__(self, temperature=temperature, name=None):
        super(SupervisedContrastiveLoss, self).__init__(name=name)
        self.temperature = temperature

    def __call__(self, labels, feature_vectors, sample_weight=None):
        # Normalize feature vectors
        print(feature_vectors.shape)
        # labels = tf.keras.layers.Concatenate(axis=0)([labels, labels])
        feature_vectors_normalized = tf.math.l2_normalize(feature_vectors, axis=1)

        # Compute logits
        logits = tf.divide(tf.matmul(feature_vectors_normalized, tf.transpose(feature_vectors_normalized)), self.temperature)
        
        # print(feature_vectors.shape)
        # print(labels.shape)
        # print('loss:::::::', tfa.losses.npairs_loss(tf.squeeze(labels), logits))
        return 0.35*(tfa.losses.npairs_loss(tf.squeeze(labels), logits))
        #find out more about why 0.35 is used

In [13]:
from keras.models import Sequential
from keras.layers import Activation
from keras.layers import Input
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv1D, MaxPooling1D, Flatten, Dense
import tensorflow as tf
import tensorflow_addons as tfa
import numpy as np
from tensorflow import keras
from tensorflow.keras import layers
import scipy.io as sio
import pandas as pd
import matplotlib.pyplot as plt
from tensorflow.keras.models import Model
from keras.layers import Reshape

In [25]:
def create_encoder1(latent_dim):
    encoder_input = Input(shape=(19,1))
    x1 = Conv1D(filters=64, kernel_size=2, activation='relu')(encoder_input)
    x2 = MaxPooling1D(pool_size=1)(x1)
    x3 = Dropout(0.5)(x2)
    x4 = Conv1D(filters=32, kernel_size=2, activation='relu')(x3)
    x5 = MaxPooling1D(pool_size=1)(x4)
    x6 = Flatten()(x5)
    output_layer = Dense(latent_dim, activation='gelu')(x6)
    encoder1 = Model(encoder_input, output_layer)
    return encoder1

In [26]:
def create_encoder2(latent_dim):
    encoder_input = Input(shape=(38,1))
    x1 = Conv1D(filters=64, kernel_size=2, activation='relu')(encoder_input)
    x2 = MaxPooling1D(pool_size=1)(x1)
    x3 = Dropout(0.5)(x2)
    x4 = Conv1D(filters=32, kernel_size=2, activation='relu')(x3)
    x5 = MaxPooling1D(pool_size=1)(x4)
    x6 = Flatten()(x5)
    output_layer = Dense(latent_dim, activation='gelu')(x6)
    encoder2 = Model(encoder_input, output_layer)
    return encoder2

In [27]:
def add_projection_head1(Encoder1, Encoder2):
    inp1 = keras.Input(shape=input_shape1)
    inp2 = keras.Input(shape=input_shape2)
    hidden3a  = Encoder1(inp1)
    hidden3b = Encoder2(inp2)
    features = tf.keras.layers.Concatenate(axis=1)([hidden3a, hidden3b])
    features = layers.Dense(16, activation="relu")(features)
    model = keras.Model(
        inputs=[inp1, inp2], outputs=features, name="cifar-encoder_with_projection-head"
    )
    return model

In [28]:
def create_classifier(encoder, trainable):

    for layer in encoder.layers:
        layer.trainable = trainable

    inputs1 = keras.Input(shape=input_shape1)
    inputs2 = keras.Input(shape=input_shape2)
    features1 = encoder1(inputs1)
    features2 = encoder2(inputs2)
    features = tf.keras.layers.Concatenate(axis=1)([features1, features2])
    # features = layers.Dropout(0.2)(features)
    features = layers.Dense(hidden_units, activation="relu")(features)
    
    features = layers.BatchNormalization()(features)
    features = layers.Dropout(dropout_rate)(features)
    features = layers.Dense(32, activation="relu")(features)
    features = layers.Dropout(0.2)(features)
    features = layers.Dense(16, activation="relu")(features)
    # features = layers.BatchNormalization()(features)
    # features = layers.Dropout(0.2)(features)
    features = layers.Dense(4, activation="relu")(features)
    # features = layers.BatchNormalization()(features)
    # features = layers.Dropout(0.1)(features)
    outputs = layers.Dense(num_classes, activation="softmax")(features)

    model = keras.Model(inputs=[inputs1,inputs2], outputs=outputs)
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate),
        loss=keras.losses.SparseCategoricalCrossentropy(),
        metrics=[keras.metrics.SparseCategoricalAccuracy()],
    )
    return model

In [29]:
train_size = xtrain.shape[0]
from sklearn.metrics import f1_score

In [30]:
for j in range(0,5): # folds 

  xval_a = xtrain[(train_size*j)//5:(train_size*(j+1))//5];
  print(xval_a.shape)
  yval_a = ytrain[(train_size*j)//5:(train_size*(j+1))//5]
  xtra_a = np.concatenate((xtrain[:(train_size*j)//5], xtrain[((train_size*(j+1))//5):]), axis=0) 
  ytra_a = np.concatenate((ytrain[:(train_size*j)//5], ytrain[((train_size*(j+1))//5):]), axis=0)
  # print('Fold  '+str(j+1)+'  xtrain shape:::::::', xtra_a.shape)

  xval_ac = xtrain1[(train_size*j)//5:(train_size*(j+1))//5];
  print(xval_ac.shape)
  yval_ac = ytrain1[(train_size*j)//5:(train_size*(j+1))//5]
  xtra_ac = np.concatenate((xtrain1[:(train_size*j)//5], xtrain1[((train_size*(j+1))//5):]), axis=0)
  ytra_ac = np.concatenate((ytrain1[:(train_size*j)//5], ytrain1[((train_size*(j+1))//5):]), axis=0)
  y = np.concatenate((ytra_a,ytra_a), axis=0)
  yv = np.concatenate((yval_a,yval_a), axis=0)
  print(yval_ac.shape)
  #check what's happening here is it intended
  encoder1 = create_encoder1(latent_dim=10)
  encoder2 = create_encoder2(10)
  encoder_with_projection_head = add_projection_head1(encoder1,encoder2)
  encoder_with_projection_head.compile(optimizer=keras.optimizers.Adam(learning_rate),loss=SupervisedContrastiveLoss(temperature))
  encoder_with_projection_head.summary()
                                                            #ytra_a                                  #yval_a
  history = encoder_with_projection_head.fit([xtra_a,xtra_ac], ytra_ac , validation_data =([xval_a,xval_ac],yval_ac), batch_size=32, epochs=100, callbacks=[EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=10)])


  learning_rate = 0.0005
  batch_size = 16
  hidden_units = 64
  projection_units = 128
  num_epochs = 200
  dropout_rate = 0.3
  num_classes = 2
  input_shape1 = (19,)
  input_shape2 = (38,)

  from keras.callbacks import ModelCheckpoint ,EarlyStopping
  classifier = create_classifier(encoder_with_projection_head, trainable=False)
  classifier.summary()
  history = classifier.fit(x=[xtra_a,xtra_ac], y=ytra_ac, validation_data =([xval_a,xval_ac],yval_ac), batch_size=batch_size, epochs=num_epochs, callbacks=[EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=10)])

  accuracy = classifier.evaluate([xtest_a,xtest_ac], ytest)[1]

  ##  Accuracy on post processed labels (each word should contain only one stressed syllable)
  pred_output= classifier.predict([xtest_a,xtest_ac])
  # pred_labels= pred_output.argmax(axis =1)
  pred1_labels = pred_output[:,1]
  post_labels = make_partitions(wtest, pred1_labels)
  post_accuracy = calculate_accuracy(post_labels, ytest)

  F1_score_WoPP = f1_score(ytest, pred_output.argmax(axis =1))
  F1_score_WPP = f1_score(ytest, post_labels)

  print(f"Test accuracy: {round(accuracy * 100, 3)}%")
  print(f"Postprocessing Test accuracy: {round(post_accuracy * 100, 3)}%")
  print(f"Test F1_score: {round(F1_score_WoPP * 100, 3)}%")
  print(f"Postprocessing F1_score: {round(F1_score_WPP * 100, 3)}%")


(1178, 19)
(1178, 38)
(1178,)
Model: "cifar-encoder_with_projection-head"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_51 (InputLayer)          [(None, 19)]         0           []                               
                                                                                                  
 input_52 (InputLayer)          [(None, 38)]         0           []                               
                                                                                                  
 model_24 (Functional)          (None, 10)           9770        ['input_51[0][0]']               
                                                                                                  
 model_25 (Functional)          (None, 10)           15850       ['input_52[0][0]']               
                                   