# Korean Hate Speech Classifier
## Written by: [Jehwan Kim](github.com/kreimben)
## Date: 19th Feb 2024
## Referenced Paper: 
* [Convolutional Neural Networks for Sentence Classification](https://arxiv.org/abs/1408.5882)
* [Cyclical Learning Rates for Training Neural Networks](https://arxiv.org/abs/1506.01186)

### Load the word2vec model first,

In [None]:


import lightning as L
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import torch
import torch.nn.functional as F


%matplotlib inline

### And then, load labeled data using pandas

In [None]:
dev_df = pd.read_csv('./labeled/dev.tsv', sep='\t')
train_df = pd.read_csv('./labeled/train.tsv', sep='\t')

In [None]:
train_df.head()

In [None]:
dev_df.head()

### In `hate` column, `offensive`, `none`, `hate`.
### In `contain_gender_bias` column, `True`, `False`.
### In `bias` column, `none`, `gender`, `others`.

In [None]:
# combine train and dev data.
df = pd.concat([dev_df, train_df], ignore_index=True)
df.sample(5)

In [None]:
df.hate.value_counts(), dev_df.contain_gender_bias.value_counts(), dev_df.bias.value_counts()

### Load words data and tokeniser from past project.

In [None]:
import pickle

with open('tokenizer.pkl', 'rb') as handle:
    tokenizer = pickle.load(handle)

with open('words.pkl', 'rb') as handle:
    words = pickle.load(handle)

In [None]:
tokenizer.tokenize('혼전임신은 미리 조심하지 못한 여자 잘못이 크다')

In [None]:
from utils.stopwords import STOP_WORDS


def tokenize(sentence):
    tokens = tokenizer.tokenize(sentence)
    preprocess = lambda x: [w for w in x if w not in STOP_WORDS]
    return preprocess(tokens)

In [None]:
df['tokens'] = df['comments'].apply(tokenize)
# 공격적인(offensive) 댓글 또한 혐오 데이터 셋으로 분류함.
df['hate'] = df['hate'].replace(['none', 'offensive', 'hate'], [0, 1, 1])
df['contain_gender_bias'] = df['contain_gender_bias'].replace([True, False], [1, 0])
df.head()

In [None]:
vocab_size = len(words.keys())
vocab_size

### Vectorise

In [None]:
with open('vectorizer.pkl', 'rb') as handle:
    vectorizer = pickle.load(handle)

### Encoding

In [None]:
df['encoding'] = df['comments'].apply(vectorizer.encode_a_doc_to_list)
df.encoding

In [None]:
X_data = df['encoding']
y_data = df['hate']
print(y_data.value_counts())
len(X_data), len(y_data), len(X_data) == len(y_data)

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X_data, y_data, test_size=.1, random_state=42, stratify=y_data)
X_train, X_valid, y_train, y_valid = train_test_split(X_train, y_train, test_size=.1, random_state=42, stratify=y_train)

y_train.value_counts()

In [None]:
print('공격적인(offensive) 댓글 또한 혐오 데이터 셋으로 분류함.')
print('--------훈련 데이터의 비율-----------')
print(f'혐오 댓글 = {round(y_train.value_counts()[1] / len(y_train) * 100, 3)}%')
print(f'일반 댓글 = {round(y_train.value_counts()[0] / len(y_train) * 100, 3)}%')
print('--------검증 데이터의 비율-----------')
print(f'혐오 댓글 = {round(y_valid.value_counts()[1] / len(y_valid) * 100, 3)}%')
print(f'일반 댓글 = {round(y_valid.value_counts()[0] / len(y_valid) * 100, 3)}%')
print('--------테스트 데이터의 비율-----------')
print(f'혐오 댓글 = {round(y_test.value_counts()[1] / len(y_test) * 100, 3)}%')
print(f'일반 댓글 = {round(y_test.value_counts()[0] / len(y_test) * 100, 3)}%')

### Padding

In [None]:
print('댓글의 최대 길이 :', max(len(review) for review in X_train))
print('댓글의 평균 길이 :', sum(map(len, X_train)) / len(X_train))
plt.hist([len(review) for review in X_train], bins=50)
plt.xlabel('length of samples')
plt.ylabel('number of samples')
plt.show()

In [None]:
max_len = max(len(review) for review in X_train)

### 길이가 길지 않아서 그냥 최대값 그대로 진행합니다. (패딩)

In [None]:
def pad_sequences(sentences: [[int]], max_len: int) -> np.ndarray:
    features = np.zeros((len(sentences), max_len), dtype=int)
    for index, sentence in enumerate(sentences):
        if len(sentence) != 0:
            features[index, :len(sentence)] = np.array(sentence)[:max_len]
    return features

In [None]:
padded_X_train = pad_sequences(X_train, max_len=max_len)
padded_X_valid = pad_sequences(X_valid, max_len=max_len)
padded_X_test = pad_sequences(X_test, max_len=max_len)

print('훈련 데이터의 크기 :', padded_X_train.shape)
print('검증 데이터의 크기 :', padded_X_valid.shape)
print('테스트 데이터의 크기 :', padded_X_test.shape)

In [None]:
padded_X_test[:5, :]

### Check the GPU models (cuz I use M2 mac and 4080 on pc either)

In [None]:
from utils.device import get_device

device = get_device()

### Modeling

In [None]:
train_label_tensor = torch.tensor(np.array(y_train))
valid_label_tensor = torch.tensor(np.array(y_valid))
test_label_tensor = torch.tensor(np.array(y_test))
print(train_label_tensor[:5])

In [None]:
class TextCNN(torch.nn.Module):
    def __init__(self, vocab_size, num_labels):
        super(TextCNN, self).__init__()

        # 오직 하나의 종류의 필터만 사용함.
        self.num_filter_sizes = 1  # 윈도우 5짜리 1개만 사용
        self.num_filters = 256

        self.word_embed = torch.nn.Embedding(num_embeddings=vocab_size, embedding_dim=128, padding_idx=0)
        # 윈도우 5짜리 1개만 사용
        self.conv1 = torch.nn.Conv1d(128, self.num_filters, 5, stride=1)
        self.dropout = torch.nn.Dropout(0.5)
        self.fc1 = torch.nn.Linear(1 * self.num_filters, num_labels, bias=True)

    def forward(self, inputs):
        # word_embed(inputs).shape == (배치 크기, 문장길이, 임베딩 벡터의 차원)
        # word_embed(inputs).permute(0, 2, 1).shape == (배치 크기, 임베딩 벡터의 차원, 문장 길이)
        embedded = self.word_embed(inputs).permute(0, 2, 1)

        # max를 이용한 maxpooling
        # conv1(embedded).shape == (배치 크기, 커널 개수, 컨볼루션 연산 결과) == ex) 32, 256, 496
        # conv1(embedded).permute(0, 2, 1).shape == (배치 크기, 컨볼루션 연산 결과, 커널 개수)
        # conv1(embedded).permute(0, 2, 1).max(1)[0]).shape == (배치 크기, 커널 개수)
        x = F.relu(self.conv1(embedded).permute(0, 2, 1).max(1)[0])

        # y_pred.shape == (배치 크기, 분류할 카테고리의 수)
        y_pred = self.fc1(self.dropout(x))

        return y_pred

In [None]:

import torch


class TextCNNLightning(L.LightningModule):
    def __init__(self, vocab_size, num_labels):
        super().__init__()

        # Parameters
        self.num_filter_sizes = 1  # Only using one kind of filter
        self.num_filters = 256

        # Layers
        self.word_embed = torch.nn.Embedding(num_embeddings=vocab_size, embedding_dim=128, padding_idx=0)
        self.conv1 = torch.nn.Conv1d(128, self.num_filters, 5, stride=1)
        self.dropout = torch.nn.Dropout(0.5)
        self.fc1 = torch.nn.Linear(1 * self.num_filters, num_labels, bias=True)

    def forward(self, inputs):
        # Word embedding
        embedded = self.word_embed(inputs).permute(0, 2, 1)

        # Convolution and max pooling
        x = F.relu(self.conv1(embedded).permute(0, 2, 1).max(1)[0])

        # Dropout and fully connected layer
        y_pred = self.fc1(self.dropout(x))

        return y_pred

    def configure_optimizers(self):
        # Adam optimizer with default parameters
        return torch.optim.Adam(self.parameters(), lr=1e-3)

    def training_step(self, batch, batch_idx):
        # Get inputs and labels
        inputs, labels = batch

        # Forward pass
        outputs = self(inputs)

        # Calculate loss
        loss = F.cross_entropy(outputs, labels)

        # Log loss
        self.log("train_loss", loss)

        # Return loss
        return loss

    def validation_step(self, batch, batch_idx):
        # Get inputs and labels
        inputs, labels = batch
        inputs, labels = inputs.to(self.device), labels.to(self.device)

        # Forward pass
        outputs = self(inputs)

        # Calculate loss
        loss = F.cross_entropy(outputs, labels)

        # Calculate accuracy
        acc = (outputs.argmax(dim=1) == labels).float().mean().item() #torch.sum(outputs.argmax(dim=1) == labels) / len(labels) 
        """
        def calculate_accuracy(logits, labels):
            predicted = torch.argmax(logits, dim=1)
            correct = (predicted == labels).sum().item()
            total = labels.size(0)
            accuracy = correct / total
            return accuracy
        """

        # Log loss and accuracy
        self.log("val_loss", loss)
        self.log("val_acc", acc)

        # Return loss and accuracy
        return loss, acc

    def train_dataloader(self):
        encoded_train = torch.tensor(padded_X_train).to(torch.int32)
        train_dataset = torch.utils.data.TensorDataset(encoded_train, train_label_tensor)
        train_dataloader = torch.utils.data.DataLoader(train_dataset, shuffle=True, num_workers=7)
        return train_dataloader

    def val_dataloader(self):
        encoded_valid = torch.tensor(padded_X_valid).to(torch.int32)
        valid_dataset = torch.utils.data.TensorDataset(encoded_valid, valid_label_tensor)
        valid_dataloader = torch.utils.data.DataLoader(valid_dataset, shuffle=False, batch_size=1, num_workers=7)
        return valid_dataloader

    def test_dataloader(self):
        encoded_test = torch.tensor(padded_X_test).to(torch.int32)
        test_dataset = torch.utils.data.TensorDataset(encoded_test, test_label_tensor)
        test_dataloader = torch.utils.data.DataLoader(test_dataset, shuffle=False, batch_size=1)
        return test_dataloader

    # def test_step(self, batch, batch_idx):
    #     batch_X, batch_y = batch
    #     batch_X, batch_y = batch_X.to(self.device), batch_y.to(self.device)
    #     logits = self(batch_X)
    #     loss = self.criterion(logits, batch_y)
    # 
    #     # Calculate test accuracy
    #     acc = calculate_accuracy(logits, batch_y)
    # 
    #     # Log metrics
    #     self.log('test_loss', loss)
    #     self.log('test_acc', acc)
    # 
    #     return loss  # Optional but can be used for model selection 


In [None]:
model = TextCNNLightning(vocab_size=vocab_size, num_labels=len(set(y_train)))

### Train

In [None]:
# train the model (hint: here are some helpful Trainer arguments for rapid idea iteration)
trainer = L.Trainer(max_epochs=1, accelerator="auto", devices="auto", strategy="auto")
trainer.fit(model=model)

In [None]:
trainer.test()