In [1]:
import os
os.getcwd()
os.chdir("drive/My Drive/STAT212/DeepZip_code/src")

In [2]:
from sklearn.preprocessing import OneHotEncoder
import numpy as np
import torch
from torch import nn
from torch.utils.data import TensorDataset, DataLoader
import torchvision.datasets as dsets
import torchvision.transforms as transforms
import torch.nn.functional as F
import torch.nn.utils.prune as prune
import torch.quantization
import copy 
import os
import zipfile
import tempfile
import shutil

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
torch.manual_seed(1)

<torch._C.Generator at 0x7f7f50e6c250>

In [3]:
file_path="../data/processed_files/text8_sub2000000.npy"
batch_size=128
time_steps= 64

def strided_app(a, L, S):  # Window len = L, Stride len/stepsize = S
        nrows = ((a.size - L) // S) + 1
        n = a.strides[0]
        return np.lib.stride_tricks.as_strided(a, shape=(nrows, L), strides=(S * n, n), writeable=False)

In [4]:
# load the preprocessed data
series = np.load(file_path)
series = series.reshape(-1, 1)

onehot_encoder = OneHotEncoder(sparse=False)
onehot_encoded = onehot_encoder.fit(series)
series = series.reshape(-1)

data = strided_app(series, time_steps+1, 1)
l = int(len(data)/batch_size) * batch_size

data = data[:l] 
X = data[:, :-1]
Y = data[:, -1:].reshape([-1,])
Y_hot = onehot_encoder.transform(data[:, -1:])

In [5]:
class LSTM(nn.Module):
    def __init__(self, num_classes, hidden_size1=32, hidden_size2=32, num_layers=2):
        super(LSTM, self).__init__()
        self.hidden_size1 = hidden_size1
        self.hidden_size2 = hidden_size2
        self.num_layers = num_layers
        self.embedding= nn.Embedding(num_classes, 32).to(device)
        self.lstm = nn.LSTM(32, hidden_size1, num_layers, batch_first=True).to(device)
        self.fc1 = nn.Linear(hidden_size1, hidden_size2).to(device)
        self.fc2 = nn.Linear(hidden_size2, num_classes).to(device)
        

    def forward(self, x):
        # initialize
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size1).to(device)
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size1).to(device)
        
        out=self.embedding(x[:,:,0].long())
        out, (h_n, h_c) = self.lstm(out, (h0, c0))
        out =self.fc1(out[:, -1, :])
        out =self.fc2(nn.ReLU()(out))

        return out


class GRU(nn.Module):
    def __init__(self, num_classes, hidden_size1=32, hidden_size2=32, num_layers=2):
        super(GRU, self).__init__()
        self.hidden_size1 = hidden_size1
        self.hidden_size2 = hidden_size2
        self.num_layers = num_layers
        self.embedding= nn.Embedding(num_classes, 32).to(device)
        self.lstm = nn.GRU(32, hidden_size1, num_layers, batch_first=True).to(device)
        self.fc1 = nn.Linear(hidden_size1, hidden_size2).to(device)
        self.fc2 = nn.Linear(hidden_size2, num_classes).to(device)
        

    def forward(self, x):
        # initialize
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size1).to(device)
        
        out=self.embedding(x[:,:,0].long())
        out, _ = self.lstm(out, h0)
        out =self.fc1(out[:, -1, :])
        out =self.fc2(nn.ReLU()(out))

        return out

class biLSTM(nn.Module):
    def __init__(self, num_classes, hidden_size1=32, hidden_size2=32, num_layers=2):
        super(biLSTM, self).__init__()
        self.hidden_size1 = hidden_size1
        self.hidden_size2 = hidden_size2
        self.num_layers = num_layers
        self.embedding= nn.Embedding(num_classes, 32).to(device)
        self.lstm = nn.LSTM(32, hidden_size1, num_layers, batch_first=True,bidirectional=True).to(device)
        self.fc1 = nn.Linear(hidden_size1*2, hidden_size2*2).to(device)
        self.fc2 = nn.Linear(hidden_size2*2, num_classes).to(device)
        

    def forward(self, x):
        # initialize
        h0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size1).to(device)
        c0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size1).to(device)
        
        out=self.embedding(x[:,:,0].long())
        out, (h_n, h_c) = self.lstm(out, (h0, c0))
        out =self.fc1(out[:, -1, :])
        out =self.fc2(nn.ReLU()(out))

        return out
        
class biGRU(nn.Module):
    def __init__(self, num_classes, hidden_size1=32, hidden_size2=32, num_layers=2):
        super(biGRU, self).__init__()
        self.hidden_size1 = hidden_size1
        self.hidden_size2 = hidden_size2
        self.num_layers = num_layers
        self.embedding= nn.Embedding(num_classes, 32).to(device)
        self.lstm = nn.GRU(32, hidden_size1, num_layers, batch_first=True,bidirectional=True).to(device)
        self.fc1 = nn.Linear(hidden_size1*2, hidden_size2*2).to(device)
        self.fc2 = nn.Linear(hidden_size2*2, num_classes).to(device)
        

    def forward(self, x):
        # initialize
        h0 = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size1).to(device)
        
        out=self.embedding(x[:,:,0].long())
        out, _ = self.lstm(out, h0)
        out =self.fc1(out[:, -1, :])
        out =self.fc2(nn.ReLU()(out))

        return out

In [6]:
# Hyper Parameters
input_size = 1   
num_epochs=10        
hidden_size1 = 32
hidden_size2 = 32
num_layers = 2
num_classes = Y_hot.shape[1]
lr = 0.001 

model = LSTM(num_classes,hidden_size1, hidden_size2, num_layers)
# model = GRU(num_classes,hidden_size1, hidden_size2, num_layers)
# model = biLSTM(num_classes,hidden_size1, hidden_size2, num_layers)
# model = biGRU(num_classes,hidden_size1, hidden_size2, num_layers)

# loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr)

In [None]:
# load training data
train_data = TensorDataset(torch.Tensor(X),torch.Tensor(Y).long())
train_loader = DataLoader(dataset=train_data,batch_size=batch_size,shuffle=True) 
test_data = TensorDataset(torch.Tensor(X),torch.Tensor(Y).long())
test_loader = DataLoader(dataset=test_data,batch_size=batch_size,shuffle=True) 

# train
total_step = len(train_loader)
model_ls=[]
accuracy=[]
for epoch in range(num_epochs):
    for i, (x, y) in enumerate(train_loader):
        x = x.reshape(-1, time_steps, input_size).to(device)
        y = y.to(device)

        # forward pass
        outputs = model(x)
        loss = criterion(outputs, y)

        # backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if i % 1000 == 0:
            print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'
                   .format(epoch+1, num_epochs, i+1, total_step, loss.item()))
            
            with torch.no_grad():
                correct = 0
                total = 0
                times=0
                for x, y in test_loader:
                    x = x.reshape(-1, time_steps, input_size).to(device)
                    y = y.to(device)
                    outputs = model(x)
                    prob=F.softmax(outputs)
                    _, predicted = torch.max(prob, 1)
                    total += y.size(0)
                    correct += (predicted == y).sum().item()
                    times=times+1
                    if times > 100:
                        break

                print('Test Accuracy of the model on the 10000 test x: {} %'.format(100 * correct / total))
            model_ls.append(model)
            accuracy.append(100 * correct / total)

In [8]:
model=model_ls[np.argmax(accuracy)]

In [9]:
# test accuary
test_data = TensorDataset(torch.Tensor(X),torch.Tensor(Y).long())
test_loader = DataLoader(dataset=test_data,batch_size=batch_size,shuffle=True) 
with torch.no_grad():
    correct = 0
    total = 0
    times=0
    for x, y in test_loader:
        x = x.reshape(-1, time_steps, input_size).to(device)
        y = y.to(device)
        outputs = model(x)
        prob=F.softmax(outputs)
        _, predicted = torch.max(prob, 1)
        total += y.size(0)
        correct += (predicted == y).sum().item()
        times=times+1
        if times > 100:
            break

    print('Test Accuracy of the model on the 10000 test x: {} %'.format(100 * correct / total))

  if sys.path[0] == '':


Test Accuracy of the model on the 10000 test x: 53.44987623762376 %


In [10]:
# save the original model
torch.save(model.state_dict(), "../data/trained_models/text8/temptory/lstm_weights")

In [11]:
# copy a model0 for pruning
model0=copy.deepcopy(model)

for name, module in model0.named_modules():
    # prune 20% of connections in all lstm layers
    if isinstance(module, torch.nn.LSTM):
        prune.l1_unstructured(module, name='weight_hh_l0', amount=0.2)
        prune.l1_unstructured(module, name='weight_ih_l0', amount=0.2)
    # prune 40% of connections in all linear layers
    elif isinstance(module, torch.nn.Linear):
        prune.l1_unstructured(module, name='weight', amount=0.4)

for name, module in model0.named_modules():
    # prune 20% of connections in all lstm layers
    if isinstance(module, torch.nn.LSTM):
        prune.remove(module, 'weight_hh_l0')
        prune.remove(module, 'weight_ih_l0')
    # prune 40% of connections in all linear layers
    elif isinstance(module, torch.nn.Linear):
        prune.remove(module, 'weight')

torch.save(model0.state_dict(), "../data/trained_models/text8/temptory/lstm_pruned_weights")

In [12]:
# quantize the unpruned model
quantized_model = torch.quantization.quantize_dynamic(
    model.to('cpu'), {nn.LSTM, nn.Linear}, dtype=torch.qint8
)
print(quantized_model)

# quantize the pruned model
quantized_model0 = torch.quantization.quantize_dynamic(
    model0.to('cpu'), {nn.LSTM, nn.Linear}, dtype=torch.qint8
)
print(quantized_model0)

torch.save(quantized_model.state_dict(), "../data/trained_models/text8/temptory/lstm_quantization_weights")
torch.save(quantized_model0.state_dict(), "../data/trained_models/text8/temptory/lstm_quantization_pruned_weights")

LSTM(
  (embedding): Embedding(27, 32)
  (lstm): DynamicQuantizedLSTM(32, 32, num_layers=2, batch_first=True)
  (fc1): DynamicQuantizedLinear(in_features=32, out_features=32, dtype=torch.qint8, qscheme=torch.per_tensor_affine)
  (fc2): DynamicQuantizedLinear(in_features=32, out_features=27, dtype=torch.qint8, qscheme=torch.per_tensor_affine)
)
LSTM(
  (embedding): Embedding(27, 32)
  (lstm): DynamicQuantizedLSTM(32, 32, num_layers=2, batch_first=True)
  (fc1): DynamicQuantizedLinear(in_features=32, out_features=32, dtype=torch.qint8, qscheme=torch.per_tensor_affine)
  (fc2): DynamicQuantizedLinear(in_features=32, out_features=27, dtype=torch.qint8, qscheme=torch.per_tensor_affine)
)


In [13]:
# define a gzip function
def get_gzipped_model_size(file,path):
  # Returns size of gzipped model, in bytes.
  _, zipped_file = tempfile.mkstemp('.zip')
  with zipfile.ZipFile(zipped_file, 'w', compression=zipfile.ZIP_DEFLATED) as f:
    f.write(file)
  shutil.copy(zipped_file, path)
  return os.path.getsize(zipped_file)

os.chdir("../data/trained_models/text8/temptory")

In [14]:
file_path="lstm_weights"
save_path="../lstm_weights.zip"
print("Size of gzipped baseline model: %.2f KB" % (get_gzipped_model_size(file_path,save_path)/1024))

file_path="lstm_pruned_weights"
save_path="../lstm_pruned_weights.zip"
print("Size of gzipped pruned model: %.2f KB" % (get_gzipped_model_size(file_path,save_path)/1024))

file_path="lstm_quantization_weights"
save_path="../lstm_quantization_weights.zip"
print("Size of gzipped quantized baseline model: %.2f KB" % (get_gzipped_model_size(file_path,save_path)/1024))

file_path="lstm_quantization_pruned_weights"
save_path="../lstm_quantization_pruned_weights.zip"
print("Size of gzipped quantized and pruned model: %.2f KB" % (get_gzipped_model_size(file_path,save_path)/1024))

os.chdir("../../../../src")

Size of gzipped baseline model: 73.02 KB
Size of gzipped pruned model: 67.45 KB
Size of gzipped quantized baseline model: 23.83 KB
Size of gzipped quantized and pruned model: 23.13 KB
