In [13]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [14]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from numpy import exp
from math import log
numpy_rng = np.random.RandomState(1234)

In [15]:
train_df = pd.read_csv('/content/drive/MyDrive/Fashion_MNIST/fashion-mnist_train.csv')
test_df = pd.read_csv('/content/drive/MyDrive/Fashion_MNIST/fashion-mnist_test.csv')

In [16]:
train_data_df = train_df.drop('label', axis=1)
train_data_np = train_data_df.to_numpy()
train_bin = np.where(train_data_np > 127, 1, 0)
train_data = train_bin.reshape(train_bin.shape[0], 784)
train_labels = train_df['label']
train_data,val_data,train_labels,val_labels = train_test_split(train_data, train_labels, test_size=0.1, random_state=42)

In [None]:
val_data.shape

(6000, 784)

In [17]:
test_data_df = test_df.drop('label', axis=1)
test_data_np = test_data_df.to_numpy()
test_bin = np.where(test_data_np > 127, 1, 0)
test_data = test_bin.reshape(test_bin.shape[0], 784)
test_labels = test_df['label']

In [18]:
def sigmoid(x):
    return 1. / (1 + np.exp(-x))

In [None]:
def eval(data,W,b):
  return softmax(np.reshape(np.matmul(W.T,data),(10,1)) + b)

In [None]:
def one_hot_encoding(l, L):
  e = []
  for i in range(L):
    if i == l:
      e.append(1)
    else:
      e.append(0)
  e_y = np.asarray(e)
  return np.reshape(e_y,(10,1))

In [None]:
def softmax(x):
    e_x = np.exp(x - np.max(x))
    return e_x / e_x.sum()

In [None]:
def train(data,labels,W,b,lr = 0.1):

  t = []
  
  for j in range(data.shape[0]):
    t.append(one_hot_encoding(labels[j],10))
    #print(o[j])
    #print(o)
  for i in range(50):
    grad_w = np.zeros([W.shape[0],W.shape[1]])
    grad_b = np.zeros([b.shape[0],1])
    o = []
    for j in range(data.shape[0]):
      o.append(eval(data[j],W,b))
      mul2 = np.reshape((o[j]-t[j]),[10,1])
      mul1 = np.reshape(data[j],[data[j].shape[0],1])
      grad_w += np.matmul(mul1,mul2.T)
      grad_b += mul2
      W -= lr*grad_w
      b -= lr*grad_b
      grad_w = np.zeros([W.shape[0],W.shape[1]])
      grad_b = np.zeros([b.shape[0],1])
  return W,b

In [None]:
def test(data,labels,W,b):
  cross_entropy = 0;
  count = 0;
  t = []
  o = []
  for j in range(data.shape[0]):
    o.append(eval(data[j],W,b))
    t.append(one_hot_encoding(labels[j],10))
    
    index_t = np.argmax(t[j],axis = 0)
    index_o = np.argmax(o[j],axis = 0)
    #print(index,o[j],t[j])
   # print(index, o[j])
    if index_t == index_o:
      count += 1
  #  print(t[j][index],o[j][index])
   # print(o[j][index_t])
    cross_entropy -= log(o[j][index_t]+1e-300)
  accuracy = count
  cross_entropy = cross_entropy/data.shape[0]

  return cross_entropy, accuracy

In [19]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score,log_loss
def train_test(x_train, y_train, x_test,y_test):
  logisticRegr = LogisticRegression(max_iter = 1000,multi_class='multinomial')
  logisticRegr.fit(x_train, y_train)
  #print(logisticRegr.classes_)
  y_pred = logisticRegr.predict(x_test)
  pred = logisticRegr.predict_proba(x_test)
  #print(y_pred)
  #print(y_test)
  loss = log_loss(y_test,pred)
  accuracy = accuracy_score(y_test, y_pred)
  return loss, accuracy

In [22]:
class RBM(object):
    def __init__(self, input=None, n_visible=2, n_hidden=3, W=None, hbias=None, vbias=None):
        
        if W is None:
            initial_W = np.random.randn(n_visible, n_hidden) * 0.01
            W = initial_W

        if hbias is None:
            hbias = np.random.randn(n_hidden) * 0.01  # initialize h bias 0

        if vbias is None:
            vbias = np.random.randn(n_visible) * 0.01 # initialize v bias 0

        self.n_visible = n_visible 
        self.n_hidden = n_hidden   
        self.input = input
        self.W = W
        self.hbias = hbias
        self.vbias = vbias

def contrastive_divergence(rbm, lr=0.1, k=1, input=None):
    if input is not None:
        rbm.input = input
    

    ph_mean, ph_sample = sample_h_given_v(rbm,rbm.input)

    chain_start = ph_sample

    for step in range(k):
        if step == 0:
            nv_means, nv_samples,\
            nh_means, nh_samples = gibbs_hvh(rbm,chain_start)
        else:
            nv_means, nv_samples,\
            nh_means, nh_samples = gibbs_hvh(rbm,nh_samples)

    rbm.W += lr * (np.matmul(rbm.input.T, ph_mean) - np.matmul(nv_samples.T,nh_means))/rbm.input.shape[0]
    rbm.vbias += lr * np.mean(rbm.input - nv_samples, axis=0)
    rbm.hbias += lr * np.mean(ph_mean - nh_means, axis=0)

def sample_h_given_v(rbm, v0_sample):
    h1_mean = propup(rbm,v0_sample)
    h1_sample = numpy_rng.binomial(size=h1_mean.shape, n=1, p=h1_mean)
    return [h1_mean, h1_sample]


def sample_v_given_h(rbm, h0_sample):
    v1_mean = propdown(rbm,h0_sample)
    v1_sample = numpy_rng.binomial(size=v1_mean.shape, n=1, p=v1_mean) 
    return [v1_mean, v1_sample]

def propup(rbm, v):
    pre_sigmoid_activation = np.dot(v, rbm.W) + rbm.hbias
    return sigmoid(pre_sigmoid_activation)

def propdown(rbm, h):
    pre_sigmoid_activation = np.dot(h, rbm.W.T) + rbm.vbias
    return sigmoid(pre_sigmoid_activation)


def gibbs_hvh(rbm, h0_sample):

    v1_mean, v1_sample = sample_v_given_h(rbm,h0_sample)
    h1_mean, h1_sample = sample_h_given_v(rbm,v1_sample)

    return [v1_mean, v1_sample,
            h1_mean, h1_sample]

def get_hidden_reps(rbm, v,t):
    h_v = sigmoid(np.dot(v, rbm.W) + rbm.hbias)
    h_t = sigmoid(np.dot(t, rbm.W) + rbm.hbias)
    #reconstructed_v = sigmoid(np.dot(h, rbm.W.T) + rbm.vbias)
    return h_v,h_t
        

def train_rbm(rbm, v,t,learning_rate=0.05, k = 1):
    contrastive_divergence(rbm,lr=learning_rate,k=k)
    return get_hidden_reps(rbm,v,t)


In [26]:
def swp():
    hyperparameter_defaults = dict(
      n_hidden=64,
      k = 5
    )

    wandb.init(project="Assignment - 04", config=hyperparameter_defaults)
    config = wandb.config
    wandb.run.name = "{}_h_layers_{}_steps".format(config.n_hidden, config.k)
    n_visible=784
    rbm = RBM(input=train_data, n_visible=n_visible, n_hidden=config.n_hidden)

    for epoch in range(25):

      hidden_rep,hidden_rep_test= train_rbm(rbm, val_data,test_data,k=config.k)
      print('Training epoch: %d' %epoch)
      
      #W_l, b_l = train(hidden_rep,val_labels,W_l,b_l,lr=0.1)
      loss, accuracy = train_test(hidden_rep, val_labels, hidden_rep_test,test_labels)
      print('Loss:{} Accuracy:{}'.format(loss,accuracy))
      metrics = {'epoch':epoch, 'accuracy': val_accuracy, 'loss': loss}
      wandb.log(metrics)
    wandb.run.finish()

In [27]:
swp()

NameError: ignored