In [32]:
# # 安装MindSpore库
# !pip install mindspore

# 导入所需库
import mindspore as ms
from mindspore import context, Tensor
from mindspore.dataset import GeneratorDataset
import numpy as np
import pandas as pd


In [33]:
# 设置MindSpore的运行环境
context.set_context(mode=context.GRAPH_MODE, device_target="CPU")

# 自定义数据加载函数
def load_data(file_path, has_label=True):
    data = pd.read_csv(file_path)
    if has_label:
        X = data.iloc[:, :-1].values
        y = data.iloc[:, -1].values
        return X, y
    else:
        X = data.values
        return X

# 加载数据
X_train, y_train = load_data('data/train.csv')
X_val, y_val = load_data('data/val.csv')
X_test = load_data('data/test_data.csv', has_label=False)

# 数据标准化
def standardize_data(X_train, X_val, X_test):
    mean = np.mean(X_train, axis=0)
    std = np.std(X_train, axis=0)
    X_train = (X_train - mean) / std
    X_val = (X_val - mean) / std
    X_test = (X_test - mean) / std
    return X_train, X_val, X_test

X_train, X_val, X_test = standardize_data(X_train, X_val, X_test)

# 创建MindSpore数据集
def generator_train():
    for i in range(len(X_train)):
        yield X_train[i], y_train[i]

def generator_val():
    for i in range(len(X_val)):
        yield X_val[i], y_val[i]

def generator_test():
    for i in range(len(X_test)):
        yield X_test[i]

train_dataset = GeneratorDataset(generator_train, ["data", "label"])
val_dataset = GeneratorDataset(generator_val, ["data", "label"])
test_dataset = GeneratorDataset(generator_test, ["data"])


In [34]:
class KNN:
    def __init__(self, k=3):
        self.k = k

    def fit(self, X, y):
        self.X_train = X
        self.y_train = y

    def predict(self, X):
        distances = self._compute_distances(X)
        return self._predict_labels(distances)

    def _compute_distances(self, X):
        distances = []
        for x_test in X:
            distance = np.linalg.norm(self.X_train - x_test, axis=1)
            distances.append(distance)
        return np.array(distances)

    def _predict_labels(self, distances):
        y_pred = []
        for distance in distances:
            k_nearest = np.argsort(distance)[:self.k]
            k_nearest_labels = self.y_train[k_nearest]
            most_common = np.argmax(np.bincount(k_nearest_labels))
            y_pred.append(most_common)
        return np.array(y_pred)


In [35]:
# 训练KNN模型
knn = KNN(k=3)
knn.fit(X_train, y_train)

# 在验证集上评估模型
val_predictions = knn.predict(X_val)
val_accuracy = np.mean(val_predictions == y_val)
print(f'Validation Accuracy: {val_accuracy}')


Validation Accuracy: 0.9333333333333333


In [36]:
# 使用训练好的模型对测试集进行预测
test_predictions = knn.predict(X_test)

# 转换为DataFrame并保存预测结果
test_predictions = test_predictions.astype(int)
test_predictions_df = pd.DataFrame(test_predictions, columns=['label'])
test_predictions_df.to_csv('task3_test_prediction.csv', index=False)

# 显示前几行预测结果
# test_predictions_df.head().style.background_gradient(sns.color_palette("YlOrBr", as_cmap=True))


In [39]:
import numpy as np
from mindspore import Tensor, ops

class KNN:
    def __init__(self, k=3):
        self.k = k

    def fit(self, X, y):
        self.X_train = Tensor(X, ms.float32)
        self.y_train = Tensor(y, ms.int32)

    def predict(self, X):
        distances = self._compute_distances(Tensor(X, ms.float32))
        return self._predict_labels(distances).asnumpy()

    def _compute_distances(self, X):
        dists = []
        for x_test in X:
            distance = ops.sqrt(ops.reduce_sum((self.X_train - x_test) ** 2, 1))
            dists.append(distance)
        return ops.stack(dists)

    def _predict_labels(self, distances):
        y_pred = []
        for distance in distances:
            k_nearest = ops.TopK(-distance, self.k)[1]
            k_nearest_labels = ops.gather(self.y_train, k_nearest, 0)
            most_common = ops.mode(k_nearest_labels)[0]
            y_pred.append(most_common)
        return ops.stack(y_pred)

# 示例代码
# 加载数据并标准化
def load_data(file_path, has_label=True):
    data = pd.read_csv(file_path)
    if has_label:
        X = data.iloc[:, :-1].values
        y = data.iloc[:, -1].values
        return X, y
    else:
        X = data.values
        return X

X_train, y_train = load_data('data/train.csv')
X_val, y_val = load_data('data/val.csv')
X_test = load_data('data/test_data.csv', has_label=False)

def standardize_data(X_train, X_val, X_test):
    X_train = Tensor(X_train, ms.float32)
    X_val = Tensor(X_val, ms.float32)
    X_test = Tensor(X_test, ms.float32)

    mean = ops.ReduceMean(keep_dims=True)(X_train, 0)
    variance = ops.ReduceMean(keep_dims=True)((X_train - mean) ** 2, 0)
    std = ops.sqrt(variance)
    
    X_train = (X_train - mean) / std
    X_val = (X_val - mean) / std
    X_test = (X_test - mean) / std
    
    return X_train.asnumpy(), X_val.asnumpy(), X_test.asnumpy()

X_train, X_val, X_test = standardize_data(X_train, X_val, X_test)

# 训练KNN模型
knn = KNN(k=3)
knn.fit(X_train, y_train)

# 在验证集上评估模型
val_predictions = knn.predict(X_val)
val_accuracy = np.mean(val_predictions == y_val)
print(f'Validation Accuracy: {val_accuracy}')

# 使用训练好的模型对测试集进行预测
test_predictions = knn.predict(X_test)

# 转换为DataFrame并保存预测结果
test_predictions_df = pd.DataFrame(test_predictions, columns=['label'])
test_predictions_df.to_csv('task3_test_prediction.csv', index=False)

# 显示前几行预测结果
print(test_predictions_df.head())


TypeError: too many positional arguments