<a href="https://colab.research.google.com/github/zaoshangqichuang/MLnotes/blob/main/HW2_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Logistic回归

## 数据预处理

### 下载数据

In [None]:
!gdown --id '1HPkcmQmFGu-3OknddKIa5dNDsR05lIQR' --output data.zip
!unzip data.zip
!ls 

### 数据标准化

In [None]:
#数据标准化
import numpy as np
np.random.seed(0)
X_train_fpath = './timit_11/train_11.npy'
Y_train_fpath = './timit_11/train_label_11.npy'
X_test_fpath = './timit_11/test_11.npy'
'''
with open(X_train_fpath) as f:
  next(f)
  X_train = np.array([line.strip('\n').split(',')[1:] for line in f],dtype = float)
with open(Y_train_fpath) as f:
  next(f)
  Y_train = np.array([line.strip('\n').split(',')[1:] for line in f],dtype = float)
with open(X_test_fpath) as f:
  next(f)
  X_test = np.array([line.strip('\n').split(',')[1:] for line in f],dtype = float)
'''
X_train = np.load(X_train_fpath)
Y_train = np.load(Y_train_fpath)
X_test = np.load(X_test_fpath)
def _normalize(X, train = True, specified_column = None, X_mean = None, X_std = None):
  if specified_column == None:
    specified_column = np.arange(X.shape[1])
  if train:
    X_mean = np.mean(X[:,specified_column],0).reshape(1,-1)
    X_std = np.std(X[:,specified_column],0).reshape(1,-1)
  X[:,specified_column] = (X[:,specified_column]-X_mean) / (X_std + 1e-8) 
  return X,X_mean, X_std

def _train_dev_split(X,Y,dev_ratio=0.25):
  #将训练集和验证集分开
  train_size = int(len(X)*(1-dev_ratio))
  return X[:train_size],Y[:train_size],X[train_size:],Y[train_size:]
dev_ratio = 0.1
X_train,Y_train,X_dev,Y_dev = _train_dev_split(X_train,Y_train,dev_ratio = dev_ratio)
train_size = X_train.shape[0]
dev_size = X_dev.shape[0]
test_size = X_test.shape[0]
data_dim = X_train.shape[1]
print('Size of training set:{}'.format(train_size))
print('Size of development set:{}'.format(dev_size))
print('Size of test set:{}'.format(test_size))
print('Dimension of data:{}'.format(data_dim))

### 构建数据集

In [None]:
import torch
from torch.utils.data import Dataset
class TIMIDataset(Dataset):
  def __init__(self,X,y=None):
    self.data = torch.from_numpy(X).float()
    if y is not None:
      y = y.astype(np.int)
      self.label = torch.LongTensor(y)
    else:
      self.label = None
  def __getitem__(self,idx):
    if self.label is not None:
      return self.data[idx],self.label[idx]
    else:
      return self.data[idx]
  def __len__(self):
    return len(self.data)

In [None]:
from torch.utils.data import DataLoader
train_set = TIMIDataset(X_train,Y_train)
val_set = TIMIDataset(X_dev,Y_dev)
test_set = TIMIDataset(X_test)
BATCH_SIZE = 64
train_loader = DataLoader(train_set,batch_size=BATCH_SIZE,shuffle=True)
val_loader = DataLoader(val_set,batch_size=BATCH_SIZE,shuffle=False)
len(set(Y_train))

In [None]:
import gc
del X_train, Y_train, X_dev, Y_dev
gc.collect()

## 构建模型

In [None]:
import torch
import torch.nn as nn
class Classifier(nn.Module):
  def __init__(self):
    super(Classifier,self).__init__()
    self.layer1 = nn.Linear(429,1024)
    self.layer2 = nn.Linear(1024,512)
    self.layer3 = nn.Linear(512,128)
    self.out = nn.Linear(128,39)
    self.act_fn = nn.Sigmoid()
  def forward(self,x):
    x = self.layer1(x)
    x = self.act_fn(x)
    x = self.layer2(x)
    x = self.act_fn(x)
    x = self.layer3(x)
    x = self.act_fn(x)
    x = self.out(x)
    return x

def get_device():
  return 'cuda' if torch.cuda.is_available() else 'cpu'
get_device()

### 固定随机种子

In [None]:
def same_seeds(seed):
  torch.manual_seed(seed)
  if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
  np.random.seed(seed)
  torch.backends.cudnn.benchmark = False
  torch.backends.cudnn.deterministic = True

In [None]:
same_seeds(0)
device = get_device()
num_epoch = 20
learning_rate = 0.0001

model_path = './model.ckpt'
model = Classifier().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(),lr=learning_rate)

### 开始训练

In [None]:
best_acc = 0.0
for epoch in range(num_epoch):
  train_acc = 0.0
  train_loss = 0.0
  val_acc = 0.0
  val_loss = 0.0
  model.train()
  for i,data in enumerate(train_loader):
    inputs,labels = data
    inputs,labels = inputs.to(device),labels.to(device)
    optimizer.zero_grad()
    outputs = model(inputs)
    batch_loss = criterion(outputs,labels.long())
    _,train_pred = torch.max(outputs,1)
    batch_loss.backward()
    optimizer.step()
    train_acc += (train_pred.cpu()==labels.cpu()).sum().item()
    train_loss += batch_loss.item()
  if len(val_set)>0:
    model.eval()
    with torch.no_grad():
      for i,data in enumerate(val_loader):
        inputs,labels = data
        inputs,labels = inputs.to(device),labels.to(device)
        outputs = model(inputs)
        batch_loss = criterion(outputs,labels)
        _,val_pred = torch.max(outputs,1)
        val_acc += (val_pred.cpu()==labels.cpu()).sum().item()
        val_loss += batch_loss.item()
      print('[{:03d}/{:03d}] Train Acc: {:3.6f} Loss: {:3.6f} | Val Acc: {:3.6f} loss: {:3.6f}'.format(
                epoch + 1, num_epoch, train_acc/len(train_set), train_loss/len(train_loader), val_acc/len(val_set), val_loss/len(val_loader)
            ))
      if val_acc > best_acc:
        best_acc = val_acc
        torch.save(model.state_dict(),model_path)
        print('saving model with acc {:.3f}'.format(best_acc/len(val_set)))
  else:
     print('[{:03d}/{:03d}] Train Acc: {:3.6f} Loss: {:3.6f}'.format(
            epoch + 1, num_epoch, train_acc/len(train_set), train_loss/len(train_loader)
        ))

if len(val_set) == 0:
    torch.save(model.state_dict(), model_path)
    print('saving model at last epoch')
   

## 预测

In [None]:
test_set = TIMITDataset(X_test,None)
test_loader = DataLoader(test_set,batch_size=BATCH_SIZE,shuffle=False)
model = Classifier().to(device)
model.load_state_dict(torch.load(model_path))

In [None]:
predict = []
model.eval()
with torch.no_grad():
  for i,data in enumerate(test_loader):
    inputs = data
    inputs = inputs.to(device)
    outputs = model(inputs)
    _,test_pred = torch.max(outputs,1)
    for y in test_pred.cpu().numpy():
      predict.append(y)
with open('prediction.csv','w') as f:
  f.write('Id,Class\n')
  for i,y in enumerate(predict):
    f.write('{},{}\n'.format(i,y))