In [None]:
%tensorflow_version 2.x
import tensorflow as tf
import numpy as np
from numpy import newaxis
from google.colab import drive

drive.mount('/content/gdrive/')

class DualStudent():

    def __init__(self, nr_of_units=768, nr_of_layers=6, nr_of_classes=61, student_version="Mono_directional", show_summary=True, epsilon=0.016395, lambda_1=1, lambda_2=10000):
        self.nr_of_units=nr_of_units
        self.nr_of_layers=nr_of_layers
        self.nr_of_classes=nr_of_classes
        self.student_version=student_version
        self.x=None
        self.y=None
        self.lambda_1=lambda_1
        self.lambda_2=lambda_2
        self.epsilon=epsilon
        self.cce = tf.keras.losses.CategoricalCrossentropy()
        self.mse = tf.keras.losses.MeanSquaredError()
        self.show_summary=show_summary
        self.get_data()
        if self.student_version=="Mono_directional":
            self.student1=self.get_model("student1")
            self.student2=self.get_model("student2")

        elif self.student_version=="Imbalanced":
            self.student1=self.get_model("student1")
            self.student2=self.get_model("student2", lstm_version="Bi_directional")

        else:
            self.student1=self.get_model("student1" , lstm_version="Bi_directional" )
            self.student2=self.get_model("student2", lstm_version="Bi_directional" )
        
        self.models={"student1":self.student1,"student2":self.student2}

    def get_data(self):
        with open('/content/gdrive/My Drive/train_xspeech.npy', 'rb') as f:
            train_x = np.load(f)
            self.y = np.load(f)

        shape_=np.shape(train_x)
        x_train=train_x[:,newaxis,:]
        tf.reshape(x_train,(shape_[0],1, shape_[1]) )
        self.x = x_train 

    def get_model(self, name_="", lstm_version="Mono_directional"):
        inputs = tf.keras.Input(shape=np.shape(self.x)[1:])

        if lstm_version=="Bi_directional":
            x=tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=self.nr_of_units, return_sequences=True))(inputs) 
            for i in range(self.nr_of_layers-3):
                x=tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=self.nr_of_units, return_sequences=True))(x)  
            x=tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=self.nr_of_units, return_sequences=False))(x)  
            outputs=tf.keras.layers.Dense(units=self.nr_of_classes, activation="softmax")(x)

        else:
            x=tf.keras.layers.LSTM(units=self.nr_of_units, return_sequences=True)(inputs) 
            for i in range(self.nr_of_layers-3):
                x=tf.keras.layers.LSTM(units=self.nr_of_units, return_sequences=True)(x)  
            x=tf.keras.layers.LSTM(units=self.nr_of_units, return_sequences=False)(x)  
            outputs=tf.keras.layers.Dense(units=self.nr_of_classes, activation="softmax")(x)

        model = tf.keras.Model(inputs=inputs, outputs=outputs, name=lstm_version+"_"+name_)
        if self.show_summary:  
            model.summary()
            print("\n\n")
        optimizer=tf.keras.optimizers.SGD(learning_rate=0.01, name='SGD')
        model.compile(optimizer=optimizer, loss="categorical_crossentropy", metrics=["accuracy"])
        return model
    
    def train(self, x=None, y=None, nr_epochs=100, batch_size=100, unlabeled_x=None):
        if x==None and y==None:
            x=self.x
            y=self.y

        self.epochs=nr_epochs
        losses={}
        stable_samples={}
        for epoch in range(1,self.epochs+1):
            #model="student1"
            #Y_pred=self.models[model].predict(x)
            #print("accuracy",tf.reduce_mean(tf.keras.metrics.categorical_accuracy(y, Y_pred)))
            for i in range(int(np.ceil(np.size(x,0)/batch_size))):
                x_batch=x[i*batch_size:(i+1)*batch_size]
                y_true=y[i*batch_size:(i+1)*batch_size]
                B_1=x_batch + np.random.random(np.shape(x_batch))*0.1
                B_2=x_batch + np.random.random(np.shape(x_batch))*0.1

                #change this when we get real data
                unlabeled_x=x[0:100]
                noisy_augmentation = unlabeled_x + np.random.random(np.shape(unlabeled_x))

                with tf.GradientTape(persistent=True) as tape:
                    for model in self.models:
                    
                        # Calculate L_cls on labeled samples
                        y_pred=self.models[model](x_batch)
                        loss_cls =  self.cce(y_true, y_pred)

                        # Calculate L_con by Eq. 1 between B1 and B2
                        y_B_1=self.models[model](B_1)
                        y_B_2=self.models[model](B_2)
                        loss_con= self.lambda_1 * self.mse(y_B_1, y_B_2)
                        losses[model+"_loss"] = loss_cls + self.lambda_1 * loss_con
                        
                        # Determine whether x is stable by Eq. 3
                        U_pred=self.models[model](unlabeled_x)
                        noisy_pred=self.models[model](noisy_augmentation)
                        
                        P_i=tf.argmax(U_pred, axis=1)
                        P_j=tf.argmax(noisy_pred, axis=1)   
                        M_i=tf.math.reduce_max(U_pred, axis=1) 
                        M_j=tf.math.reduce_max(noisy_pred, axis=1)  
                        M_i_j=tf.where(tf.where(M_i>self.epsilon,1,0)+tf.where(M_j>self.epsilon,1,0)>0,1,0)
                        
                        stable_samples[model]=tf.where(P_i==P_j,1,0)*M_i_j 
                        stable_samples[model+"_pred"]=U_pred 
                        stable_samples[model+"_noise"]=noisy_pred


                    # R_1, R_2, R_i, R_j and R_12 does not mean the same thing as in the paper
                    R_1=tf.where(stable_samples["student1"]-stable_samples["student2"]>0,True,False)                
                    R_2=tf.where(stable_samples["student2"]-stable_samples["student1"]>0,True,False)
                    R_12=tf.where(stable_samples["student1"]+stable_samples["student2"]==2,True,False)
                    
                    # where both R_1 and R_2 are equal to one (R_12) measure prediction consistancy with Euclidean distance
                    epsilon_i=tf.math.reduce_euclidean_norm(stable_samples["student1_pred"][R_12]-stable_samples["student1_noise"][R_12], axis=1)
                    epsilon_j=tf.math.reduce_euclidean_norm(stable_samples["student2_pred"][R_12]-stable_samples["student2_noise"][R_12], axis=1)
                    R_i=epsilon_i>epsilon_j
                    R_j=epsilon_i<=epsilon_j

                    # loss_sta for student 1
                    sample1_update1=tf.concat([stable_samples["student1_pred"][R_2], stable_samples["student1_pred"][R_12][R_i]],axis=0)
                    sample2_update1=tf.concat([stable_samples["student2_pred"][R_2], stable_samples["student2_pred"][R_12][R_i]],axis=0)
                    loss_sta=self.mse(sample1_update1, sample2_update1)
                    losses["student1_loss"] = losses["student1_loss"] + self.lambda_2 * loss_sta

                    # loss_sta for student 2
                    sample1_update2=tf.concat([stable_samples["student1_pred"][R_1], stable_samples["student1_pred"][R_12][R_j]],axis=0)
                    sample2_update2=tf.concat([stable_samples["student2_pred"][R_1], stable_samples["student2_pred"][R_12][R_j]],axis=0)
                    loss_sta=self.mse(sample1_update2, sample2_update2)
                    losses["student2_loss"] = losses["student2_loss"] + self.lambda_2 * loss_sta

                # update the model parameters 
                for model in self.models:
                    trainable_vars = self.models[model].trainable_variables
                    gradients = tape.gradient(losses[model+"_loss"], trainable_vars)
                    self.models[model].optimizer.apply_gradients(zip(gradients, trainable_vars))
                del tape                   

        return 

models={}
for version_ in ["Mono_directional", "Imbalanced", "Bi_directional"]:
    models[version_]=DualStudent(student_version=version_)
    print("\n\n\n")

models["Mono_directional"].train(nr_epochs=5)

Drive already mounted at /content/gdrive/; to attempt to forcibly remount, call drive.mount("/content/gdrive/", force_remount=True).
Model: "Mono_directional_student1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_7 (InputLayer)         [(None, 1, 91)]           0         
_________________________________________________________________
lstm_30 (LSTM)               (None, 1, 768)            2641920   
_________________________________________________________________
lstm_31 (LSTM)               (None, 1, 768)            4721664   
_________________________________________________________________
lstm_32 (LSTM)               (None, 1, 768)            4721664   
_________________________________________________________________
lstm_33 (LSTM)               (None, 1, 768)            4721664   
_________________________________________________________________
lstm_34 (LSTM)               (None, 768)