# Data Mining on Sentiment Analysis

---

## Preliminaries

Import libraries. **You can add other libraries if necessary.**

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer  # Data transformation
from sklearn.model_selection import train_test_split  # Data testing
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score  # Comparison between real and predicted
import re  # Regular expressions
import nltk
from nltk import word_tokenize
nltk.download('punkt')
nltk.download('stopwords')
from collections import Counter
from sklearn.model_selection import KFold
from sklearn.model_selection import GridSearchCV # 交叉验证
import os
import time

Load the data and add column keys.

In [None]:
train_data = pd.read_csv("data/sentiment_train.csv", header=None)
test_data = pd.read_csv("data/sentiment_test.csv", header=None)
train_data.columns = ['id', 'information', 'type', 'text']
test_data.columns = ['id', 'information', 'type', 'text']

Take a glance at the provided data.

In [None]:
train_data.info()
train_data.head(40)

Each item consists of 4 columns, where Columns ID and Information are almost task-irrevalent. **Column Type is users' sentiments, which we should predict as our labels based on Column Text.**
Now let's take a look at possible values of Column Type.

In [None]:
train_data["type"].value_counts()

In [None]:
test_data["type"].value_counts()

There are 4 possible values, and ***our goal is to perform the quadruple classification over texts.***

---

## Data Processing

In [None]:
def preprocess(text):
    # TODO
    # 将输入文本转换为字符串类型
    text = str(text)
    # 将所有字母转换为小写字母
    text = text.lower()
    # 过滤文本中的标点符号和空格
    text = re.sub(r'[^\w\s]', '', text)
    return text  # modify this line

Perform preprocessing, and compare the raw text and the preprocessed tokens.

In [None]:
train_data["clean"] = train_data.text.apply(preprocess)
test_data["clean"] = test_data.text.apply(preprocess)
train_data.head(40)

## Feature Engineering

The feature engineering with clean texts starts from text tokenization, i.e., split the text into word tokens. Let's see what the tokenization do. It groups all the texts by words stored on a list.

In [None]:
print(train_data.text[6])
print(train_data.clean[6])
print(word_tokenize(train_data.clean[6]))

We can count the total number of tokens in the training data.

In [None]:
len(set(token for text in train_data.clean for token in word_tokenize(text)))

Stop words are the words in a stop list which are filtered out (i.e. stopped) before or after processing of natural language data (text) because they are insignificant. We can refer to `nltk.corpus.stopwords` to obtain the stop words to use.

In [None]:
stopwords_nltk = nltk.corpus.stopwords
stop_words = stopwords_nltk.words('english')
print(len(stop_words), stop_words[:10])

With text tokenization tools, we can conduct feature engineering on texts with stop words filtered out.

For simplification, we mainly consider two possible features:
- **Word count**: a vector with the dimension of the number of tokens, the value in each dimension is the number of occurrences of the corresponding word token.
- **TF-IDF (Term Frequency-Inverse Document Frequency)**: a "weighted version" of word count, each value is the term frequency (normalized by total number of tokens) multiplies the inverse of the frequency of documents consisting this word token. For details please refers to https://en.wikipedia.org/wiki/Tf%E2%80%93idf.

> **TODO**
Sklearn provides the implementation of the two feature extraction methods, and the interface is in the following. **You should manually re-implement at least one of them in our provided framework.**

*Hint: if you are concerned about the too large dimension, which might slow the model inference, there are three possible solutions which you can have a try:*
- *Hashing: hash the vector into low dimensions with a function from high-dimension index to lower one. We provide a simple hash function and you can also implement a complex one.*
- *Random projection: project the vector into low-dimension space with a fixed random projection. We provide a simple projection method.*
- *Sparsity: use sparse matrices instead of dense ones, which is used in sklearn's implementation. Please refer to https://docs.scipy.org/doc/scipy/reference/sparse.html for more details.*

In [None]:
class ManualVectorizer(object):
    """
    Manual vectorizer.
    """
    def __init__(self, tokenizer, stop_words):
        """
        Initialize the vectorizer.
        You can add additional attributes.
        """
        self.tokenizer = tokenizer
        self.stop_words = stop_words
        # TODO
        self.vocab = None
        self.idf = None
    def fit_transform(self, texts):
        """
        Fit the dictionary and other attributes (such as IDF) with the texts, and then perform feature extraction.
        This method is used on training data.

        Parameters:
            raw_documents (List[str]): a list of untokenized texts.

        Return:
            np.array: a 2-D array where each row refers to the feature vector of the corresponding text.
        """
        # TODO
        # 统计每个单词在多少个文本中出现过
        word_doc_count = Counter() # 创建空的计数器对象
        for text in texts:
            words = set(self.tokenizer(text)) - set(self.stop_words)
            seen_words = set()  # 创建一个空的集合，用于记录已经出现过的单词
            for word in words:
                if word not in seen_words:  # 如果单词没有出现过，则进行计数
                    word_doc_count[word] += 1
                    seen_words.add(word)
        # 构建词汇表
        n = 1000
        self.vocab = {word: idx for idx, (word, _) in enumerate(word_doc_count.most_common(n))}
        # 计算每个单词的逆文档频率（IDF）
        doc_count = len(texts)
        self.idf = {word: np.log(doc_count / count) for word, count in word_doc_count.items() if count > 0}
        # 对每个文本进行特征提取
        features = []
        for text in texts:
            # 统计每个单词在该文本中出现的频次
            words = self.tokenizer(text)
            word_counts = Counter(words)
            # 构建文本的特征向量
            feature_vector = np.zeros(len(self.vocab))
            for word, count in word_counts.items():
                if word in self.vocab:
                    idx = self.vocab[word]
                    feature_vector[idx] = count * self.idf[word]
            features.append(feature_vector)
        # 将特征向量转换为numpy数组并返回
        return np.array(features)
    
    def transform(self, texts):
        """
        Perform feature extraction with the learned dictionary and other attributes.
        This method is used on test data.
        Note: if a word token does not appear in training data, it will not be counted as the test feature.

        Parameters:
            raw_documents (List[str]): a list of untokenized texts.

        Return:
            np.array or np.matrix: a 2-D array where each row refers to the feature vector of the corresponding text.
        """
        # TODO
        features = []
        for text in texts:
            words = self.tokenizer(text)
            word_counts = Counter(words)
            feature_vector = np.zeros(len(self.vocab))
            for word, count in word_counts.items():
                if word in self.vocab:
                    idx = self.vocab[word]
                    feature_vector[idx] = count * self.idf[word]
            features.append(feature_vector)
        return np.array(features)
        

In [None]:
wordcount_extractor = CountVectorizer(
    tokenizer=word_tokenize,
    stop_words=stop_words,
)

tfidf_extractor = TfidfVectorizer(
    tokenizer=word_tokenize,
    stop_words=stop_words,
)

manual_extractor = ManualVectorizer(
    tokenizer=word_tokenize,
    stop_words=stop_words,
)
extractor = manual_extractor  # TODO: you can modify here

In [None]:
class Identity(object):
    """
    Do nothing.
    """
    def __init__(self):
        pass

    def __call__(self, xs):
        return xs


class VectorHasher(object):
    """
    Vector hasher for dimension reduction.
    """
    def __init__(self, target_length=100, hash_func=None):
        self.target_length = target_length
        if hash_func is None:
            self.hash_func = lambda x: x % self.target_length
        else:
            self.hash_func = hash_func
    
    def __call__(self, xs):
        hashed_xs = np.zeros(xs.shape[:-1] + (self.target_length, ))
        for idx in range(xs.shape[-1]):
            hashed_idx = self.hash_func(idx)
            hashed_xs[:, hashed_idx] += xs[:, idx]
        return hashed_xs


class VectorProjector(object):
    """
    Vector projector for dimension reduction.
    """
    def __init__(self, source_length, target_length=30):
        self.projector = np.random.normal(size=(source_length, target_length))
    
    def __call__(self, xs):
        return xs @ self.projector

In [None]:
identity = Identity()
hasher = VectorHasher()
projector = VectorProjector(len(train_data.clean))

vector_post_process = VectorHasher()  # TODO: you can modify here

In [None]:
X_train_raw = vector_post_process(extractor.fit_transform(train_data.clean))
X_test = vector_post_process(extractor.transform(test_data.clean))
# print(X_train_raw.shape, X_test.shape)

For label features, it is natual to assign each label name with an index.

In [None]:
name_to_index = {
    "Positive": 0,
    "Negative": 1,
    "Neutral": 2,
    "Irrelevant": 3,
}
y_train_raw = np.asarray(train_data.type.apply(lambda x: name_to_index[x]))
y_test = np.asarray(test_data.type.apply(lambda x: name_to_index[x]))
y_train_raw[:120]

---

## Model Selection

In order to train a model and perform model selection, we should split the raw training data into *training data* and *validation data*
> **TODO**

**Split the data into training data and validation data with proper ratio. You can use `train_test_split` function.**

In [None]:
# TODO
X_train, X_val, y_train, y_val = train_test_split(X_train_raw, y_train_raw, test_size=0.2, random_state=42)  # modify this line
print(X_train.shape, X_val.shape)

Now it's time to train your models and select the best ones.

> **TODO**

**Train your model on `X_train` and `y_train`, and select your model on `X_val` and `y_val`**

We provide an example of `DecisionTreeClassifier`. Now it's time for you to select the model you like to conduct classification.

> Requirements
- Select at least **three** other Machine Learning classification models and train them on the train split. And among them you should implement at least **one** by yourself with our provided interface.

In [None]:
class ManualModel(object):
    """
    Manual model with sklearn-style interface.
    """

    def __init__(self, n_neighbors=3, metric='manhattan'):
        """
        Initialize the model with some hyperparameters. You can modify the arguments.
        """
        # TODO
        self.k = n_neighbors
        self.distance_metric = metric
        self.X_train = None
        self.y_train = None
        pass

    def fit(self, X, y):
        """
        Fit the model on training set.

        Parameters:
            X: inputs of training data.
            y: labels of training data.
        """
        # TODO
        self.X_train = np.array(X)
        self.y_train = np.array(y)

    def predict(self, X):
        """
        Predict the labels of inputs X with the trained model.

        Parameters:
            X: inputs of test data

        Return:
            The predicted labels of test data.
        """
        # TODO
        X_test = np.array(X)
        predictions = []
        for x_test in X_test:
            distances = self.calculate_distances(x_test)
            nearest_indices = np.argsort(distances)[:self.k]
            nearest_labels = self.y_train[nearest_indices]
            prediction = Counter(nearest_labels).most_common(1)[0][0]
            predictions.append(prediction)
        return np.array(predictions)
    
    def calculate_distances(self, x_test):
        """
        计算测试样本与训练样本之间的距离

        Parameters:
            x_test: 单个测试样本

        Return:
            distances: 一个包含x_test与训练样本之间距离的一维数组
        """
        if self.distance_metric == 'euclidean':
            distances = np.linalg.norm(self.X_train - x_test, axis=1)
        elif self.distance_metric == 'manhattan':
            distances = np.sum(np.abs(self.X_train - x_test), axis=1)
        elif self.distance_metric == 'minkowski':
            p = 2  # Set the power parameter for Minkowski distance
            distances = np.power(
                np.sum(np.power(np.abs(self.X_train - x_test), p), axis=1), 1/p)
        else:
            raise ValueError(
                "Invalid distance metric. Please choose from 'euclidean', 'manhattan', 'minkowski', etc.")
        return distances


In [None]:
name_acc_list = {
    "name": [],
    "acc": []
}

def model_assess(model, name='Default'):
    model.fit(X_train, y_train)
    prds = model.predict(X_val)
    acc = 100 * accuracy_score(y_val, prds)
    name_acc_list["name"].append(name)
    name_acc_list["acc"].append(acc)
    print(f'Model: {name}, Accuracy: {acc}%')

In [None]:
model_0 = DecisionTreeClassifier(max_depth=None, criterion='entropy')
start_time = time.time()
model_assess(model_0, "DT")
end_time = time.time()
execution_time = end_time - start_time
print(f"代码执行时间：{execution_time}秒")

In [None]:
# TODO: add your models here. At least one of them should be your manual model.
model_1 = ManualModel(n_neighbors=3, metric='manhattan')
start_time = time.time()
model_assess(model_1, "DT-1")
end_time = time.time()
execution_time = end_time - start_time
print(f"代码执行时间：{execution_time}秒")

model_2 = LogisticRegression(C=0.001, solver='liblinear', penalty='l2')
start_time = time.time()
model_assess(model_2, "DT-2")
end_time = time.time()
execution_time = end_time - start_time
print(f"代码执行时间：{execution_time}秒")

model_3 = RandomForestClassifier(max_depth=None, n_estimators=300)
start_time = time.time()
model_assess(model_3, "DT-3")
end_time = time.time()
execution_time = end_time - start_time
print(f"代码执行时间：{execution_time}秒")

Visualize the model accuracies.

In [None]:
def plot_acc():
    plt.rcParams['figure.figsize']=4,4
    sns.set_style('darkgrid')
    ax = sns.barplot(x=name_acc_list["name"], y=name_acc_list["acc"], palette="coolwarm", saturation=2.0)
    plt.xlabel('Classifier Models', fontsize=12)
    plt.ylabel('% of Accuracy', fontsize=12)
    plt.title('Accuracy of different Classifier Models', fontsize=16)
    plt.xticks(fontsize=12, horizontalalignment='center')
    plt.yticks(fontsize=12)
    for i in ax.patches:
        width, height = i.get_width(), i.get_height()
        x, y = i.get_xy() 
        ax.annotate(f'{round(height,2)}%', (x + width/2, y + height), ha='center', fontsize='x-large')
    plt.show()

In [None]:
# plot_acc()

> **TODO**

**Tune model for good performance on validation set.**

**Note:** you should only tune your model on the validation set, and keep the test data **unseen until the model is selected**.

Now it is your time to provide the final solution.

> Requirements
- Tune model's hyperparameters evaluate all your selected models. And give a detailed report on the performance and computational efficiency.
- Evaluate your final model on test set, and report the final result.
- It is appreciated if other machine learning techniques that help to improve performance are employed.

In [None]:
# TODO: your code here
def perform_grid_search(model, param_grid, X_train, y_train, X_test, y_test, save_path, cv=5):
    """对于sklearn库中的模型进行网格搜索和交叉验证

    Args:
        model (_type_): 分类模型
        param_grid (_type_): 超参数空间
        X_train (_type_): 训练样本特征
        y_train (_type_): 训练样本标签
        X_test (_type_): 测试样本特征
        y_test (_type_): 测试样本标签
        save_path (_type_): 保存路径
        cv (int, optional): 交叉验证折数
    """
    # 利用GridSearchCV进行网格搜索
    grid_search = GridSearchCV(estimator=model, param_grid=param_grid, cv=cv, scoring='accuracy')

    grid_search.fit(X_train, y_train)

    # 得到最好的参数组合和最佳模型
    best_params = grid_search.best_params_
    best_model = grid_search.best_estimator_

    # 对于每个参数组合打印准确度
    means = grid_search.cv_results_['mean_test_score']
    params = grid_search.cv_results_['params']
    for mean, param in zip(means, params):
        mean_acc = 100 * mean
        print(f"Parameters: {param}, Accuracy: {mean_acc}%")
    print(f"Best Parameters:{best_params}")
    param_labels = [str(param) for param in params]
    mean_accs = [100 * mean for mean in means]
    plt.figure(figsize=(10, 6))
    plt.bar(param_labels, mean_accs)
    plt.xticks(rotation=45)
    plt.xlabel('Parameter Combination')
    plt.ylabel('Accuracy')
    plt.title('Accuracy for Each Parameter Combination')
    plt.tight_layout()
    
    # 获取当前工作目录
    current_dir = os.getcwd()
    # 拼接保存路径
    save_path = os.path.join(current_dir, save_path)
    plt.savefig(save_path)
    plt.show()
    plt.close()
    
    print(f"Best Parameters: {best_params}")
    
    # 训练最佳模型
    best_model.fit(X_train, y_train)

    # 在测试集上进行评估
    y_pred = best_model.predict(X_test)
    accuracy = 100 * accuracy_score(y_test, y_pred)
    print(f"Accuracy on test set: {accuracy}%")
    
def knn_grid_search(X, y, param_grid, save_path):
    """对于手动实现的模型进行网格搜索

    Args:
        X (_type_): 训练样本特征
        y (_type_): 训练样本标签
        param_grid (_type_): 超参数空间
        save_path (_type_): 保存路径
    """
    params = param_grid['n_neighbors']
    metrics = param_grid['metric']
    
    # 保存不同参数组合的准确度
    accuracies = np.zeros((len(params), len(metrics)))
    
    # 执行K折交叉验证
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)
        
    for i, n_neighbors in enumerate(params):
        for j, metric in enumerate(metrics):
            print(n_neighbors, metric)
            knn = ManualModel(n_neighbors=n_neighbors, metric=metric)
            knn.fit(X_train, y_train)
            y_pred = knn.predict(X_val)
            accuracy = 100 * accuracy_score(y_val, y_pred)
            accuracies[i, j] += accuracy
    
    print(accuracies)
    # 绘制二维图
    fig = plt.figure()
    ax = fig.add_subplot(111)
    for i in range(len(params)):
        ax.plot(metrics, accuracies[i], marker='o', label=f'n_neighbors={params[i]}')
    ax.set_xlabel('Metric')
    ax.set_ylabel('Accuracy')
    ax.legend()
    # 获取当前工作目录
    current_dir = os.getcwd()
    # 拼接保存路径
    save_path = os.path.join(current_dir, save_path)
    plt.savefig(save_path)
    plt.show()
    print(accuracies)


In [None]:
"""
    设置不同的超参数范围，进行交叉验证
"""
dt = DecisionTreeClassifier()
param_grid_dt = {
    'max_depth': [2, 3, 4, None],
    'criterion': ['gini', 'entropy']}
# 执行交叉验证
perform_grid_search(dt, param_grid_dt, X_train_raw, y_train_raw, X_test, y_test, 'plots/plot1.png', cv=5)

lr = LogisticRegression()
param_grid_lr = {
    'penalty': ['l1', 'l2'],
    'C': [0.001, 0.01, 0.1, 1, 10, 100],
    'solver': ['liblinear', 'saga']
}
perform_grid_search(lr, param_grid_lr, X_train_raw, y_train_raw, X_test, y_test, 'plots/plot2.png', cv=5)

rf = RandomForestClassifier()
param_grid_rf = {
    'n_estimators': [100, 200, 300],
    'max_depth': [5, 10, None]}
perform_grid_search(rf, param_grid_rf, X_train_raw, y_train_raw, X_test, y_test, 'plots/plot3.png', cv=5)  

param_grid_knn = {
    'n_neighbors': [3, 5, 7],
    'metric': ['euclidean', 'manhattan', 'minkowski']}
knn_grid_search(X_train_raw, y_train_raw, param_grid_knn, 'plots/plot4.png')

# knn在测试集上验证
best_model = ManualModel(n_neighbors=3, metric='manhattan')
best_model.fit(X_train, y_train)
prds = best_model.predict(X_test)
acc = 100 * accuracy_score(y_test, prds)
print(f'Accuracy: {acc}%')