In [1]:
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
# %matplotlib inline

ModuleNotFoundError: No module named 'torch'

In [None]:
dataset = pd.read_csv('./chap02/data/car_evaluation.csv')
# dataset = pd.read_csv('./data/car_evaluation.csv')

: 

In [None]:
print(dataset.head())
print()
print(dataset.info())
print()
print(dataset.describe())

: 

In [None]:
# fig_size = plt.rcParams["figure.figsize"]
# fig_size[0] = 8
# fig_size[1] = 6
# plt.rcParams["figure.figsize"] = fig_size

plt.figure(figsize=(8 ,6))
dataset.output.value_counts().plot(kind='pie', 
                                   autopct='%0.02f%%', 
                                   colors=['lightblue', 'lightgreen', 'orange', 'pink'],
                                   explode=(0.05, 0.05, 0.05, 0.05))
plt.show()

: 

In [None]:
categorical_columns = ['price', 'maint', 'doors', 'persons', 'lug_capacity', 'safety']

for category in categorical_columns:
    dataset[category] = dataset[category].astype('category')

: 

In [None]:
# print(dataset.head())
# print()
# print(dataset.info())
# print()
# print(dataset.describe())

: 

In [None]:
price = dataset['price'].cat.codes.values
maint = dataset['maint'].cat.codes.values
doors = dataset['doors'].cat.codes.values
persons = dataset['persons'].cat.codes.values
lug_capacity = dataset['lug_capacity'].cat.codes.values
safety = dataset['safety'].cat.codes.values

categorical_data = np.stack([price, maint, doors, persons, lug_capacity, safety], 1)

: 

In [None]:
print(f"price.shape: {price.shape}")
print()
print(f"categorical_data.shape: {categorical_data.shape}")
print()
print(pd.DataFrame(categorical_data).head())
print()
print(pd.DataFrame(categorical_data).info())
print()
print(pd.DataFrame(categorical_data).describe())

: 

In [None]:
categorical_data = torch.tensor(categorical_data, dtype=torch.int64)
print(categorical_data[:5])

: 

In [None]:
outputs = pd.get_dummies(dataset.output)
outputs = outputs.values
outputs = torch.tensor(outputs).flatten()

print(f"categorical_data.shape: {categorical_data.shape}")
print(f"outputs.shape: {outputs.shape}")

: 

In [None]:
categorical_column_sizes = [len(dataset[column].cat.categories) for column in categorical_columns]
categorical_embedding_sizes = [(col_size, min(50, (col_size + 1) // 2)) for col_size in categorical_column_sizes]

# TODO embedding and dimension size 
# 임베딩 크기에 대한 정확한 규칙은 없지만, 칼럼의 고유 값 수를 2로 나누는 것을 많이 사용합니다.
# (모든 범주형 칼럼의 고유 값 수, 차원의 크기)
# (모든 범주형 칼럼의 고유 값 수, 임베딩 크기)
print(categorical_embedding_sizes)

: 

In [None]:
total_records = 1728
test_records = int(total_records * 0.2)

categorical_train_data = categorical_data[:(total_records - test_records)]
categorical_test_data = categorical_data[(total_records - test_records):total_records]
train_outputs = outputs[:(total_records - test_records)]
test_outputs = outputs[(total_records - test_records):total_records]

: 

In [None]:
print(f"categorical train: {len(categorical_train_data)}")
print(f"outputs train    : {len(train_outputs)}")
print(f"categorical test : {len(categorical_test_data)}")
print(f"outputs test     : {len(test_outputs)}")

: 

In [None]:
class Model(nn.Module):
    def __init__(self, embedding_size, output_size, layers, p=0.4):
        super().__init__()
        self.all_embeddings = nn.ModuleList([nn.Embedding(ni, nf) for ni, nf in embedding_size])
        self.embedding_dropout = nn.Dropout(p)

        # DONE all_layers.append 
        # Unpacking
        # all_layers = [Linear(in_features=1...bias=True), ReLU(inplace=True), ...]
        # *all_layers = Linear(in_features=1...bias=True), ReLU(inplace=True), ...
        all_layers = []
        num_categorical_cols = sum((nf for ni, nf in embedding_size))
        input_size = num_categorical_cols 

        for i in layers:
            all_layers.append(nn.Linear(input_size, i))
            all_layers.append(nn.ReLU(inplace=True))
            all_layers.append(nn.BatchNorm1d(i))
            all_layers.append(nn.Dropout(p))
            input_size = i

        all_layers.append(nn.Linear(layers[-1], output_size))
        self.layers = nn.Sequential(*all_layers)

    def forward(self, x_categorical):
        embeddings = []
        for i,e in enumerate(self.all_embeddings):
            embeddings.append(e(x_categorical[:,i]))
        x = torch.cat(embeddings, 1)
        x = self.embedding_dropout(x)
        x = self.layers(x)
        return x

: 

In [None]:
model = Model(categorical_embedding_sizes, 4, [200, 100, 50], p=0.4)
print(model)

: 

In [None]:
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

: 

In [None]:
if torch.backends.mps.is_available():
    print("device: mps")
    device = torch.device('mps')
else:
    print("device: cpu")
    device = torch.device('cpu')

: 

In [None]:
epochs = 500
aggregated_losses = []
train_outputs = train_outputs.to(device=device, dtype=torch.int64)
for i in range(epochs):
    i += 1
    y_pred = model(categorical_train_data).to(device)
    single_loss = loss_function(y_pred, train_outputs)
    aggregated_losses.append(single_loss)

    if i%25 == 1:
        print(f'epoch: {i:3} loss: {single_loss.item():10.8f}')

    optimizer.zero_grad()
    single_loss.backward()
    optimizer.step()

print(f'epoch: {i:3} loss: {single_loss.item():10.2f}')

: 

In [None]:
# DONE mps error
if device == torch.device('mps'):
    device = torch.device('cpu')

test_outputs = test_outputs.to(device=device, dtype=torch.int64)
# DONE with torch.no_grad():
# https://pytorch.org/docs/stable/generated/torch.no_grad.html
# Context-manager that disabled gradient calculation.
# Disabling gradient calculation is useful for inference, 
# when you are sure that you will not call Tensor.backward(). 
# It will reduce memory consumption for computations 
# that would otherwise have requires_grad=True.
with torch.no_grad():
    y_val = model(categorical_test_data).to(device)
    loss = loss_function(y_val, test_outputs)

print(f"Loss: {loss:.8f}")

: 

In [None]:
y_val = np.argmax(y_val.cpu().numpy(), axis=1)

: 

In [None]:
import warnings
warnings.filterwarnings('ignore') 
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score

test_outputs = test_outputs.cpu().numpy()
print(confusion_matrix(test_outputs, y_val))
print(classification_report(test_outputs, y_val))
print(accuracy_score(test_outputs, y_val))

: 