In [1]:
from sklearn.ensemble import VotingClassifier

from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

from sklearn import preprocessing
import pandas as pd

df_train = pd.read_csv("./stat679final/data/metadata/train_metadata.csv")
df_test = pd.read_csv("./stat679final/data/metadata/test_metadata.csv")
X_train, y_train = df_train.iloc[:, 0:6], df_train["class"]
# X, y = df.iloc[:, 0:6], df["class"]
X_test, y_test = df_test.iloc[:, 0:6], df_test["class"]

scaler = preprocessing.StandardScaler().fit(X_train)
X_train = scaler.transform(X_train)
X_test = scaler.transform(X_test)

# Instantiate the individual classifiers
knn_clf = KNeighborsClassifier(n_neighbors=3)
tree_clf = DecisionTreeClassifier()
lr_clf = LogisticRegression(max_iter=3000)

In [2]:
# svm_clf = SVC(probability=True)  # Enable probability for soft voting

# Create a voting classifier
voting_clf_hard = VotingClassifier(
    estimators=[('knn', knn_clf), ('dt', tree_clf)],
    voting='hard'
)
voting_clf_soft = VotingClassifier(
    estimators=[('knn', knn_clf), ('dt', tree_clf)],
    voting='soft'
)

# Train the classifiers
voting_clf_hard.fit(X_train, y_train)
print("hard")
voting_clf_soft.fit(X_train, y_train)
print("soft")
# Make predictions and evaluate
y_pred_hard = voting_clf_hard.predict(X_test)
y_pred_soft = voting_clf_soft.predict(X_test)

print("Hard Voting Accuracy:", accuracy_score(y_test, y_pred_hard))
print("Soft Voting Accuracy:", accuracy_score(y_test, y_pred_soft))

hard
soft
Hard Voting Accuracy: 0.9785478547854786
Soft Voting Accuracy: 0.9807480748074807


In [3]:
import numpy as np
%matplotlib inline
import torch
import torch.nn as nn
import torch.nn.functional as F
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("Using device:", device)
from torchvision import transforms
from PIL import Image


class PyTorchClassifierWrapper:
    def __init__(self, model, device='cpu'):
        self.model = model
        self.model.to(device)
        self.device = device

    def predict_proba(self, dataloader):
        # Ensure the model is in evaluation mode
        self.model.eval()
        
        # List to hold all batch probabilities
        all_probabilities = []

        # Loop over all batches in the dataloader
        for X_batch in dataloader:
            images, label = X_batch
            # Transfer batch to the device (GPU or CPU)
            images = images.to(self.device)

            # Disable gradient computation
            with torch.no_grad():
                if torch.all(images.eq(-1)).item():
                    probabilities = torch.tensor([[ 0, 0, 0]])
                    #print('un')
                else: 
                    outputs = self.model(images)

                    # Apply softmax to compute probabilities
                    probabilities = F.softmax(outputs, dim=1)
            
            # Move probabilities to CPU and convert to numpy
            probabilities = probabilities.cpu().numpy()
            
            # Append batch probabilities to list
            all_probabilities.append(probabilities)

        # Concatenate all batch probabilities into a single array
        all_probabilities = np.concatenate(all_probabilities, axis=0)
        
        return all_probabilities
    def predict(self,dataloader):
        all_predictions = []
        for X_batch in dataloader:
            images, label = X_batch
            # Transfer batch to the device (GPU or CPU)
            images = images.to(self.device)

            # Disable gradient computation
            with torch.no_grad():
                if torch.all(images.eq(-1)).item():
                    predictions = torch.tensor([[0, 0, 0]])
                    print(1)
                else: 
                    outputs = self.model(images)
                    predictions = outputs.argmax(dim=-1)

                    # Convert indices to one-hot encoded tensor
                    #predictions = F.one_hot(max_indices, num_classes=outputs.shape[-1]).cpu().numpy()
                    #print(1)
                    # Move probabilities to CPU and convert to numpy

                    # Append batch probabilities to list
            all_predictions.append(predictions)

        # Concatenate all batch probabilities into a single array
        all_predictions = np.concatenate(all_predictions, axis=0)
        
        return all_predictions

    
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        # Input channels = 3 (RGB), 6 output channels, 5x5 square convolution kernel
        self.conv1 = nn.Conv2d(3, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        # an affine operation: y = Wx + b
        self.fc1 = nn.Linear(16 * 13 * 13, 120)  # Adjusted for 64x64 input images
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 3)  # Output 3 classes: STAR, QSO, GALAXY

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 16 * 13 * 13)  # Flatten the tensor for the fully connected layer
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


  from .autonotebook import tqdm as notebook_tqdm


Using device: cuda:0


In [4]:
model = SimpleCNN()
model.load_state_dict(torch.load('./stat679final/model/CNN/SimpleCNN_state_dict.pth'))
model.eval()  
print(model)


# Define the same transformation as during the training
transform = transforms.Compose([
    transforms.Resize((64, 64)),  # Example resize, adjust to your model's input
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Load the image
image = Image.open('./stat679final/data/GALAXY/GALAXY_1.jpg')
image = transform(image).unsqueeze(0)  # Add batch dimension
with torch.no_grad():  # Turn off gradients for inference
    output = model(image)
    # Assuming classification: get the predicted class (the index of the max log-probability)
    predicted_class = torch.argmax(output, dim=1)
    if predicted_class.item()+1 == 1:
      print("it is a galaxy")
    print(output)

SimpleCNN(
  (conv1): Conv2d(3, 6, kernel_size=(5, 5), stride=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
  (fc1): Linear(in_features=2704, out_features=120, bias=True)
  (fc2): Linear(in_features=120, out_features=84, bias=True)
  (fc3): Linear(in_features=84, out_features=3, bias=True)
)
it is a galaxy
tensor([[ 3.7507, -0.3346, -4.1034]])


In [5]:
model_spec = SimpleCNN()
model_spec.load_state_dict(torch.load('./stat679final/model/CNN/spec_state_dict.pth'))
model_spec.eval()  
print(model_spec)


# Define the same transformation as during the training
transform = transforms.Compose([
    transforms.Resize((64, 64)),  # Example resize, adjust to your model's input
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

# Load the image
image = Image.open('./data_spec/GALAXY_spec/GALAXY_1.jpg').convert('RGB')
image = transform(image).unsqueeze(0)  # Add batch dimension
with torch.no_grad():  # Turn off gradients for inference
    output = model_spec(image)
    # Assuming classification: get the predicted class (the index of the max log-probability)
    predicted_class = torch.argmax(output, dim=1)
    if predicted_class.item()+1 == 1:
        print("it is a galaxy")
    print(output.argmax(dim=-1))
    print(output)

SimpleCNN(
  (conv1): Conv2d(3, 6, kernel_size=(5, 5), stride=(1, 1))
  (pool): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
  (fc1): Linear(in_features=2704, out_features=120, bias=True)
  (fc2): Linear(in_features=120, out_features=84, bias=True)
  (fc3): Linear(in_features=84, out_features=3, bias=True)
)
it is a galaxy
tensor([0])
tensor([[ 2.8297, -1.9385, -1.7206]])


In [6]:
def soft_voting(models, datasets):
    """
    对多个模型的预测概率进行平均，返回最终预测类别。
    :param models: 包含四个模型的列表
    :param datasets: 每个模型对应的测试数据集列表
    :return: 最终的预测类别数组
    """
    # 收集所有模型的预测概率
    proba_lists = [model.predict_proba(data) for model, data in zip(models, datasets)]

    # 计算平均预测概率
    avg_proba = np.mean(proba_lists, axis=0)

    # 选择平均概率最高的类别作为最终预测
    final_predictions = np.argmax(avg_proba, axis=1)
    return final_predictions


In [7]:
import numpy as np

def weighted_hard_voting(models, datasets, weights):
    """
    对多个模型的预测类别进行加权硬投票，返回最终预测类别。
    :param models: 包含四个模型的列表
    :param datasets: 每个模型对应的测试数据集列表
    :param weights: 每个模型在训练集上的准确率作为权重，列表形式
    :return: 最终的预测类别数组
    """
    # 收集所有模型的预测类别
    predictions = [model.predict(data) for model, data in zip(models, datasets)]
    #print(predictions.shape)
    #print(datasets[3].shape[0])
    # 转换预测为权重计票矩阵
    weighted_votes = np.zeros((datasets[3].shape[0], np.max(predictions) + 1))  # 假设类别从0开始

    # 累加每个模型的预测权重到对应类别
    for prediction, weight in zip(predictions, weights):
        for i, pred in enumerate(prediction):
            weighted_votes[i, pred] += weight

    # 选择权重计票最高的类别作为最终预测
    final_predictions = np.argmax(weighted_votes, axis=1)
    return final_predictions

In [8]:
pytorch_model1 = PyTorchClassifierWrapper(model, device='cpu')
pytorch_model2 = PyTorchClassifierWrapper(model_spec, device='cpu')

In [9]:
# from sklearn.model_selection import GridSearchCV
# knn = KNeighborsClassifier()

# # Create a dictionary with the values of n_neighbors
# param_grid = {'n_neighbors': range(1, 21)}

# # Setup the grid search
# grid_search = GridSearchCV(knn, param_grid, cv=5)  # 5-fold cross-validation

# # Fit the grid search to the data
# grid_search.fit(X_train, y_train)

# # Best parameter and score
# print("Best parameters:", grid_search.best_params_)
# print("Best cross-validation score: {:.2f}".format(grid_search.best_score_))

In [10]:
# from sklearn.linear_model import LogisticRegression
# from sklearn.model_selection import GridSearchCV

# # Define the model
# log_reg = LogisticRegression()

# # Define a grid of parameters
# param_grid = {
#     'C': [0.01, 0.1, 1, 10],
#     'penalty': ['none', 'l2'],
#     'max_iter': [2000, 3000, 4000]  # Adding different values for max_iter
# }

# # Setup the grid search
# grid_search = GridSearchCV(log_reg, param_grid, cv=5, scoring='accuracy')

# # Fit grid search
# grid_search.fit(X_train, y_train)

# # Print the best parameters and the best score
# print("Best parameters:", grid_search.best_params_)
# print("Best cross-validation score: {:.2f}".format(grid_search.best_score_))


In [11]:
knn_clf = KNeighborsClassifier(n_neighbors=3)
tree_clf = DecisionTreeClassifier(max_depth=4)
lr_clf = LogisticRegression(max_iter=2000)

knn_clf.fit(X_train, y_train)
tree_clf.fit(X_train, y_train)
lr_clf.fit(X_train, y_train)

In [12]:
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
test_image = pd.read_csv("./stat679final/data/rnk_test.csv")
#test_image

In [13]:
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split

transform = transforms.Compose([
    transforms.Resize((64, 64)),  # Resize to ensure all images are the same size
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),  # Normalize images
])
from PIL import Image
from torch.utils.data import Dataset
class CustomImageDataset(Dataset):
    def __init__(self, img_paths, labels, transform=None):
        self.img_paths = img_paths
        self.labels = labels  # Store the labels
        self.transform = transform

    def __len__(self):
        return len(self.img_paths)

    def __getitem__(self, idx):
        img_path = self.img_paths[idx]
        label = self.labels[idx]  # Get the label for the current image
        if self.transform:
            try:
                image = Image.open(img_path).convert('RGB')
                image = self.transform(image)
                return image, label  # Return both the image and the label
            except IOError:
                return torch.zeros([3,64,64]), label
# Assuming 'class' column in image_paths_df contains labels
img_paths = test_image['image']
labels = test_image['class']  # Load labels from the dataframe

# Initialize dataset with paths and labels
custom_dataset = CustomImageDataset(img_paths, labels, transform=transform)
custom_data_loader = DataLoader(custom_dataset, batch_size=64, shuffle=False)

img_paths_spec = test_image['spec']
labels = test_image['class']  # Load labels from the dataframe

# Initialize dataset with paths and labels
custom_dataset_spec = CustomImageDataset(img_paths_spec, labels, transform=transform)
custom_data_loader_spec = DataLoader(custom_dataset_spec, batch_size=64, shuffle=False)

In [14]:
models = [pytorch_model1,pytorch_model2, knn_clf,tree_clf,lr_clf]
datasets = [custom_data_loader,custom_data_loader_spec,X_test,X_test,X_test]
# do soft voting
soft_predictions = soft_voting(models, datasets)

In [15]:
weights = [0.918, 0.9914,0.968,0.9770,0.9699]
models = [pytorch_model1,pytorch_model2, knn_clf, tree_clf,lr_clf]
datasets = [custom_data_loader,custom_data_loader_spec,X_test,X_test,X_test]
# hard voting
hard_predictions = weighted_hard_voting(models, datasets,weights)
#predictions = [model.predict(data) for model, data in zip(models, datasets)]

In [16]:
CNN_image = pytorch_model1.predict(dataloader=custom_data_loader)
CNN_spec = pytorch_model2.predict(dataloader=custom_data_loader_spec)
knn_pred = knn_clf.predict(X_test)
tree_pred = tree_clf.predict(X_test)
lr_pred = lr_clf.predict(X_test)

In [17]:
print('The accuracy of CNN image classifier',(CNN_image == y_test).sum()/len(y_test))
print('The accuracy of CNN spec classifier',(CNN_image == y_test).sum()/len(y_test))
print('The accuracy of knn classifier',(knn_pred == y_test).sum()/len(y_test))
print('The accuracy of tree classifier',(tree_pred == y_test).sum()/len(y_test))
print('The accuracy of logistic regression classifier',(lr_pred == y_test).sum()/len(y_test))
print('The accuracy of Soft Voting classifier',(soft_predictions == y_test).sum()/len(y_test))
print('The accuracy of Hard Voting classifier',(hard_predictions == y_test).sum()/len(y_test))

The accuracy of CNN image classifier 0.9173417341734174
The accuracy of CNN spec classifier 0.9173417341734174
The accuracy of knn classifier 0.968046804680468
The accuracy of tree classifier 0.9743974397439744
The accuracy of logistic regression classifier 0.96999699969997
The accuracy of Soft Voting classifier 0.9897489748974897
The accuracy of Hard Voting classifier 0.9862986298629863


In [18]:
df_predictions = pd.DataFrame({
    'y_test':y_test,
    'KNN': knn_pred,
    'Decision Tree': tree_pred,
    'Logistic Regression': lr_pred,
    'CNN_image':CNN_image,
    'CNN_spec':CNN_spec,
    'Soft Voting': soft_predictions
})
df_predictions.to_csv("pred_results.csv")

In [19]:
lr_clf.coef_

array([[ 1.11069454e+00, -1.69765980e+00, -1.52708694e-01,
         6.14388340e-01, -2.38249174e-02,  2.33575804e+01],
       [-2.88333134e+00,  5.21275341e+00,  7.95731972e-01,
        -1.22133450e+00, -2.14097172e+00,  3.25077789e+01],
       [ 1.77263680e+00, -3.51509361e+00, -6.43023278e-01,
         6.06946165e-01,  2.16479664e+00, -5.58653593e+01]])

In [20]:
from sklearn.tree import export_graphviz

# Assuming 'tree_clf' is your trained DecisionTreeClassifier
export_graphviz(
    tree_clf,
    out_file="tree.dot",
    #feature_names=['feature1', 'feature2', 'feature3', 'etc'],  # replace with real feature names
    #class_names=['class1', 'class2', 'class3'],  # replace with real class names
    rounded=True,
    filled=True
)


In [21]:
tree_clf.feature_importances_

array([2.96993384e-03, 7.95374525e-05, 0.00000000e+00, 3.70286625e-08,
       8.03605042e-04, 9.96146887e-01])

In [22]:
import graphviz

# Load dot file
with open("tree.dot") as f:
    dot_graph = f.read()

# Create a graph from dot file and render it to a file
graph = graphviz.Source(dot_graph)
graph.render("tree", format='png', cleanup=True)


'tree.png'