In [55]:
import numpy as np
from sklearn.preprocessing import PolynomialFeatures

class LogisticRegression:
    def __init__(self, learning_rate=0.01, iterations=5000, degree=1):
        self.learning_rate = learning_rate
        self.iterations = iterations
        self.degree = degree
        self.poly = PolynomialFeatures(degree=self.degree)
    
    def sigmoid(self, z):
        return 1 / (1 + np.exp(-z))
    
    def fit(self, X, y):
        X_poly = self.poly.fit_transform(X)
        self.weights = np.zeros(X_poly.shape[1])
        self.bias = 0
        
        for _ in range(self.iterations):
            model = np.dot(X_poly, self.weights) + self.bias
            predictions = self.sigmoid(model)
            
            dw = np.dot(X_poly.T, (predictions - y)) / len(y)
            db = np.sum(predictions - y) / len(y)
            
            self.weights -= self.learning_rate * dw
            self.bias -= self.learning_rate * db
    
    def predict(self, X):
        X_poly = self.poly.transform(X)
        model = np.dot(X_poly, self.weights) + self.bias
        predictions = self.sigmoid(model)
        return [1 if i > 0.5 else 0 for i in predictions]
    
    def get_params(self, deep=True):
        return {'learning_rate': self.learning_rate, 'iterations': self.iterations, 'degree': self.degree}
    
    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self


In [2]:
from sklearn.tree import DecisionTreeClassifier

class RandomForest:
    def __init__(self, n_estimators=100, max_depth=None):
        self.n_estimators = n_estimators
        self.max_depth = max_depth
        self.trees = []
    
    def fit(self, X, y):
        # 确保X和y都是numpy数组以便于索引
        X = np.array(X)
        y = np.array(y)
        for _ in range(self.n_estimators):
            tree = DecisionTreeClassifier(max_depth=self.max_depth)
            bootstrap_indices = np.random.randint(low=0, high=len(X), size=len(X))
            X_bootstrap = X[bootstrap_indices]
            y_bootstrap = y[bootstrap_indices]
            tree.fit(X_bootstrap, y_bootstrap)
            self.trees.append(tree)
        return self
    
    def predict(self, X):
        tree_preds = np.array([tree.predict(X) for tree in self.trees])
        tree_preds = np.swapaxes(tree_preds, 0, 1)
        majority_votes = np.array([np.bincount(tree_pred).argmax() for tree_pred in tree_preds])
        return majority_votes
    
    def score(self, X, y):
        from sklearn.metrics import accuracy_score
        predictions = self.predict(X)
        return accuracy_score(y, predictions)
    
    def get_params(self, deep=True):
        return {'n_estimators': self.n_estimators, 'max_depth': self.max_depth}
    
    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self


In [8]:
from scikeras.wrappers import KerasClassifier
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
#from tensorflow.compat.v1.losses import sparse_softmax_cross_entropy
from sklearn.base import BaseEstimator, ClassifierMixin

def create_model(n_features, n_classes):
    model = Sequential()
    model.add(Dense(128, input_dim=n_features, activation='relu'))
    model.add(Dense(64, activation='relu'))
    model.add(Dense(n_classes, activation='softmax'))
    model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
    return model

class FCNNClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, n_features, n_classes):
        self.n_features = n_features
        self.n_classes = n_classes
        self.build_fn = create_model
        self.keras_model = self.build_fn(n_features, n_classes)
        self.classifier = KerasClassifier(model=self.keras_model)

    def fit(self, X, y, epochs=100, batch_size=10):
        self.model.fit(X, y, epochs=epochs, batch_size=batch_size)

    def predict(self, X):
        y_pred = self.model.predict(X)
        return np.argmax(y_pred, axis=1)


In [53]:
import numpy as np
from sklearn.tree import DecisionTreeClassifier

# XGBoost 简化版
class XGBoostSimple:
    def __init__(self, n_estimators=5):
        self.n_estimators = n_estimators
        self.stumps = []
        self.stump_weights = []

    def fit(self, X, y):
        sample_weights = np.full(X.shape[0], (1 / X.shape[0]))
        for _ in range(self.n_estimators):
            stump = DecisionTreeClassifier(max_depth=1)
            stump.fit(X, y, sample_weight=sample_weights)
            stump_pred = stump.predict(X)
            incorrect = (stump_pred != y)
            error = np.mean(np.average(incorrect, weights=sample_weights, axis=0))
            stump_weight = 0.5 * np.log((1 - error) / (error + 1e-10))
            sample_weights *= np.exp(-stump_weight * y * stump_pred)
            sample_weights /= np.sum(sample_weights)
            self.stumps.append(stump)
            self.stump_weights.append(stump_weight)

    def predict(self, X):
        stump_preds = np.array([stump_weight * stump.predict(X) for stump, stump_weight in zip(self.stumps, self.stump_weights)])
        return np.sign(np.sum(stump_preds, axis=0))

    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self



In [57]:
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV,cross_val_score
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.metrics import f1_score,make_scorer
from sklearn.preprocessing import LabelEncoder
from xgboost import XGBClassifier

# 加载数据
train_df = pd.read_csv(r'C:\Users\mooncell\project3\train.csv')
test_df = pd.read_csv(r'C:\Users\mooncell\project3\test.csv')

# 数据预处理
# 分离特征和标签
X_train = train_df.drop(['id', 'label'], axis=1)
y_train = train_df['label']
X_test = test_df.drop('id', axis=1)

# 使用 LabelEncoder 对 y_train 进行编码
label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train)

# 定义数值和分类特征
numeric_features = ['fixed acidity', 'volatile acidity', 'citric acid', 'residual sugar',
                    'chlorides','free sulfur dioxide','total sulfur dioxide','density','pH','sulphates','alcohol']
categorical_features = []

# 将分类特征转换为字符串
X_train[categorical_features] = X_train[categorical_features].astype(str)
X_test[categorical_features] = X_test[categorical_features].astype(str)


# 创建预处理转换器
numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())])

categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))])

# 定义评分函数
f1_scorer = make_scorer(f1_score, average='macro')

# 合并预处理步骤
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)])

# 创建模型字典
models = {
    'LogisticRegression': LogisticRegression(),
    'RandomForestClassifier': RandomForest(),
    #'GradientBoostingClassifier': GradientBoostingClassifier(),
    #'KNNClassifier': KNNClassifier(),
    'KerasClassifier':KerasClassifier(model=create_model, epochs=100, batch_size=10, verbose=0),
    'XGBClassifier': XGBClassifier()
}

# 创建参数网格字典
param_grids = {
    'LogisticRegression': { 'classifier__learning_rate': [0.001, 0.01, 0.1], 'classifier__iterations': [100, 1000, 10000],'classifier__degree':[3]},
    'RandomForestClassifier': {'classifier__n_estimators': [100, 200], 'classifier__max_depth': [None, 10, 20]},
    #'GradientBoostingClassifier': {'classifier__n_estimators': [100, 200], 'classifier__learning_rate': [0.01, 0.1, 1]},
    #'KNNClassifier': {'classifier__k': [3, 5, 7, 9]},
    'KerasClassifier': {'classifier__model__n_features': [X_train.shape[1]],'classifier__model__n_classes': [len(np.unique(y_train))],
        'classifier__batch_size': [10, 20, 50],'classifier__epochs': [10, 50, 100]},
    'XGBClassifier': { 'classifier__n_estimators': [100, 200, 300], 'classifier__learning_rate': [0.01, 0.1, 0.3], 'classifier__max_depth': [3, 4, 5]}
}

# 比较模型性能
best_models = {}
for model_name, model in models.items():
    pipeline = Pipeline(steps=[('preprocessor', preprocessor),
                               ('classifier', model)])
    param_grid = param_grids[model_name]
    grid_search = GridSearchCV(pipeline, param_grid, scoring=f1_scorer, cv=5)
    grid_search.fit(X_train, y_train_encoded)
    best_models[model_name] = grid_search.best_estimator_
    print(f'{model_name} best score: {grid_search.best_score_}')

# 使用最佳模型进行预测
for model_name, model in best_models.items():
    y_test_pred = model.predict(X_test)
    # 将预测的编码标签转换回原始标签
    y_test_pred_original = label_encoder.inverse_transform(y_test_pred)
    # 生成提交文件
    submission = pd.DataFrame({'id': test_df['id'], 'label': y_test_pred})
    submission.to_csv(fr'C:\Users\mooncell\project3\sample_submission_{model_name}.csv', index=False)

LogisticRegression best score: 0.011861771257788728
RandomForestClassifier best score: 0.2794113393301014
KerasClassifier best score: 0.31985710072255036
XGBClassifier best score: 0.3228908081976596


In [54]:
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV,cross_val_score
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.metrics import f1_score,make_scorer
from sklearn.preprocessing import LabelEncoder
from xgboost import XGBClassifier

# 加载数据
train_df = pd.read_csv(r'C:\Users\mooncell\project3\train.csv')
test_df = pd.read_csv(r'C:\Users\mooncell\project3\test.csv')

# 数据预处理
# 分离特征和标签
X_train = train_df.drop(['id', 'label'], axis=1)
y_train = train_df['label']
X_test = test_df.drop('id', axis=1)

# 使用 LabelEncoder 对 y_train 进行编码
label_encoder = LabelEncoder()
y_train_encoded = label_encoder.fit_transform(y_train)

# 定义数值和分类特征
numeric_features = ['fixed acidity', 'volatile acidity', 'citric acid', 'residual sugar',
                    'chlorides','free sulfur dioxide','total sulfur dioxide','density','pH','sulphates','alcohol']
categorical_features = []

# 将分类特征转换为字符串
X_train[categorical_features] = X_train[categorical_features].astype(str)
X_test[categorical_features] = X_test[categorical_features].astype(str)


# 创建预处理转换器
numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())])

categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OneHotEncoder(handle_unknown='ignore'))])

# 定义评分函数
f1_scorer = make_scorer(f1_score, average='macro')

# 合并预处理步骤
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features)])

# 创建模型字典
models = {
    'XGBClassifier': XGBoostSimple()
}

# 创建参数网格字典
param_grids = {
    'XGBClassifier': { 'classifier__n_estimators': [100, 200, 300], 'classifier__learning_rate': [0.01, 0.1, 0.3], 'classifier__reg_lambda': [3, 4, 5]}
}

# 比较模型性能
best_models = {}
for model_name, model in models.items():
    pipeline = Pipeline(steps=[('preprocessor', preprocessor),
                               ('classifier', model)])
    param_grid = param_grids[model_name]
    grid_search = GridSearchCV(pipeline, param_grid, scoring=f1_scorer, cv=5)#,error_score='raise')
    grid_search.fit(X_train, y_train_encoded)
    best_models[model_name] = grid_search.best_estimator_
    print(f'{model_name} best score: {grid_search.best_score_}')

# 使用最佳模型进行预测
for model_name, model in best_models.items():
    y_test_pred = model.predict(X_test)
    # 将浮点数预测转换为整数标签
    y_test_pred_int = np.argmax(y_test_pred, axis=1) if y_test_pred.ndim > 1 else y_test_pred.astype(int)
    
    # 将预测的编码标签转换回原始标签
    y_test_pred_original = label_encoder.inverse_transform(y_test_pred_int)
    # 生成提交文件
    submission = pd.DataFrame({'id': test_df['id'], 'label': y_test_pred_original})
    submission.to_csv(fr'C:\Users\mooncell\project3\sample_submission_{model_name}.csv', index=False)

XGBClassifier best score: 0.011861771257788728


In [48]:
print(y_train_encoded.shape)
print(y_test_pred_original.shape)

(1125,)
(474,)


In [3]:
from collections import Counter

class KNNClassifier:
    def __init__(self, k=3):
        self.k = k

    def fit(self, X, y):
        self.X_train = X
        self.y_train = y

    def predict(self, X):
        y_pred = [self._predict(x) for x in X]
        return np.array(y_pred)

    def _predict(self, x):
        # 计算距离
        distances = [np.sqrt(np.sum((x_train - x) ** 2)) for x_train in self.X_train]
        # 获取k个最近样本的索引
        k_indices = np.argsort(distances)[:self.k]
        # 获取这些样本的标签
        k_nearest_labels = [self.y_train[i] for i in k_indices]
        # 多数投票
        most_common = Counter(k_nearest_labels).most_common(1)
        return most_common[0][0]
    
    def get_params(self, deep=True):
        return {'k': self.k}

    def set_params(self, **parameters):
        for parameter, value in parameters.items():
            setattr(self, parameter, value)
        return self