# **다수결 투표를 위한 분류 앙상블**

In [1]:
from sklearn.base import BaseEstimator
from sklearn.base import ClassifierMixin
from sklearn.preprocessing import LabelEncoder
from sklearn.base import clone
from sklearn.pipeline import _name_estimators
import numpy as np
import operator

In [8]:
class MajorityVoteClassifier(BaseEstimator, ClassifierMixin):
  def __init__(self, classifiers, vote= 'classlabel', weights=None):
    self.classifiers = classifiers
    self.named_classifiers = {key: value for key, value in _name_estimators(classifiers)}
    self.vote = vote
    self.weights = weights

  def fit(self, X, y):
    if self.vote not in ('probability', 'classlabel'):
      raise ValueError("vote는 'probability' 또는 'classlabel'이어야 합니다"
      "; (vote=%r)이 입력되었습니다." % self.vote)

    if self.weights and len(self.weights) != len(self.classifiers):
      raise ValueError('분류기와 가중치 개수는 같아야 합니다'
      '; 가중치 %d 개, 분류기 %d 개' % (len(self.weights), len(self.classifiers)))

    self.lablenc_ = LabelEncoder()
    self.lablenc_.fit(y)
    self.classes_ = self.lablenc_.classes_
    self.classifiers_ = []
    for clf in self.classifiers:
      fitted_clf = clone(clf).fit(X, self.lablenc_.transform(y))
      self.classifiers_.append(fitted_clf)
    return self

  def predict(self, X):
    if self.vote == 'probability':
      maj_vote = np.argmax(self.predict_proba(X), axis=1)
    else: # 'classlabel' 투표

    # clf.predict 메서드를 사용하여 결과를 모읍니다
      predictions = np.asarray([clf.predict(X)
      for clf in self.classifiers_]).T

      maj_vote = np.apply_along_axis(
          lambda x:
          np.argmax(np.bincount(x,
                                weights=self.weights)),
          axis=1,
          arr=predictions)

    maj_vote = self.lablenc_.inverse_transform(maj_vote)

    return maj_vote

  def predict_proba(self, X):
    probas = np.asarray([clf.predict_proba(X)
    for clf in self.classifiers_])

    avg_proba = np.average(probas, axis=0, weights=self.weights)
    return avg_proba

  def get_params(self, deep=True):
    if not deep:
      return super(MajorityVoteClassifier, self).et_params(deep=False)
    else:
      out = self.named_classifiers.copy()
      for name, step in self.named_classifiers.items():
        for key, value in step.get_params(deep=True).items():
          out['%s__%s' % (name, key)] = value

    return out

In [9]:
from sklearn import datasets
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

In [10]:
iris = datasets.load_iris()
X, y = iris.data[50:, [1, 2]], iris.target[50:]
le = LabelEncoder()
y = le.fit_transform(y)

X_train, X_test, y_train, y_test =\
train_test_split(X, y,
                 test_size=0.5,
                 random_state=1,
                 stratify=y)

In [11]:
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score

In [12]:
clf1 = LogisticRegression(penalty='l2', C=0.001, random_state=1)
clf2 = DecisionTreeClassifier(max_depth=1, criterion='entropy', random_state=0)
clf3 = KNeighborsClassifier(n_neighbors=1, p=2, metric='minkowski')

In [13]:
pipe1 = Pipeline([['sc', StandardScaler()],
 ['clf', clf1]])
pipe3 = Pipeline([['sc', StandardScaler()],
 ['clf', clf3]])

clf_labels = ['Logistic regression', 'Decision tree', 'KNN']

print('10-겹 교차 검증:\n')
for clf, label in zip([pipe1, clf2, pipe3], clf_labels):
  scores = cross_val_score(estimator=clf,
                           X=X_train,
                           y=y_train,
                           cv=10,
                           scoring='roc_auc')
  print("ROC AUC: %0.2f (+/- %0.2f) [%s]"
  % (scores.mean(), scores.std(), label))

10-겹 교차 검증:

ROC AUC: 0.92 (+/- 0.15) [Logistic regression]
ROC AUC: 0.87 (+/- 0.18) [Decision tree]
ROC AUC: 0.85 (+/- 0.13) [KNN]


In [15]:
# 다수결 (하드) 투표

mv_clf = MajorityVoteClassifier(classifiers=[pipe1, clf2, pipe3])

clf_labels += ['Majority voting']
all_clf = [pipe1, clf2, pipe3, mv_clf]

for clf, label in zip(all_clf, clf_labels):
  scores = cross_val_score(estimator=clf,
                           X=X_train,
                           y=y_train,
                           cv=10,
                           scoring='roc_auc')

  print("ROC AUC: %0.2f (+/- %0.2f) [%s]"
  % (scores.mean(), scores.std(), label))

ROC AUC: 0.92 (+/- 0.15) [Logistic regression]
ROC AUC: 0.87 (+/- 0.18) [Decision tree]
ROC AUC: 0.85 (+/- 0.13) [KNN]
ROC AUC: 0.98 (+/- 0.05) [Majority voting]


In [None]:
from sklearn.metrics import roc_curve
from sklearn.metrics import auc

colors = ['black', 'orange', 'blue', 'green']
linestyles = [':'
,
'--'
,
'-.',
'-']

for clf, label, clr, ls in zip(all_clf, clf_labels, colors, linestyles):

# 양성 클래스의 레이블이 1이라고 가정합니다
y_pred = clf.fit(X_train, y_train).predict_proba(X_test)[:, 1]
fpr, tpr, thresholds = roc_curve(y_true=y_test, y_score=y_pred)
roc_auc = auc(x=fpr, y=tpr)
plt.plot(fpr, tpr, color=clr, linestyle=ls, label='%s (auc = %0.2f)' % (label, roc_auc))

In [25]:
from sklearn.datasets import load_wine
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_auc_score
from sklearn.ensemble import VotingClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier

# 와인 데이터 로드
wine = load_wine()
X, y = wine.data, wine.target

# 이진 분류 문제로 변환 (클래스 0과 나머지로 분류)
y = (y == 0).astype(int)

# 데이터 분할 (train/test)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# 특성 스케일링
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

In [39]:
class MajorityVoteClassifier(BaseEstimator, ClassifierMixin):
  def __init__(self, classifiers, vote= 'classlabel', weights=None):
    self.classifiers = classifiers
    self.named_classifiers = {key: value for key, value in _name_estimators(classifiers)}
    self.vote = vote
    self.weights = weights

  def fit(self, X, y):
    if self.vote not in ('probability', 'classlabel'):
      raise ValueError("vote는 'probability' 또는 'classlabel'이어야 합니다"
      "; (vote=%r)이 입력되었습니다." % self.vote)

    if self.weights and len(self.weights) != len(self.classifiers):
      raise ValueError('분류기와 가중치 개수는 같아야 합니다'
      '; 가중치 %d 개, 분류기 %d 개' % (len(self.weights), len(self.classifiers)))

    self.lablenc_ = LabelEncoder()
    self.lablenc_.fit(y)
    self.classes_ = self.lablenc_.classes_
    self.classifiers_ = []
    for clf in self.classifiers:
      fitted_clf = clone(clf).fit(X, self.lablenc_.transform(y))
      self.classifiers_.append(fitted_clf)
    return self

  def predict(self, X):
    if self.vote == 'probability':
      maj_vote = np.argmax(self.predict_proba(X), axis=1)
    else: # 'classlabel' 투표

    # clf.predict 메서드를 사용하여 결과를 모읍니다
      predictions = np.asarray([clf.predict(X)
      for clf in self.classifiers_]).T

      maj_vote = np.apply_along_axis(
          lambda x:
          np.argmax(np.bincount(x,
                                weights=self.weights)),
          axis=1,
          arr=predictions)

    maj_vote = self.lablenc_.inverse_transform(maj_vote)

    return maj_vote

  def predict_proba(self, X):
    probas = np.asarray([clf.predict_proba(X)
    for clf in self.classifiers_])

    avg_proba = np.average(probas, axis=0, weights=self.weights)
    return avg_proba

  def get_params(self, deep=True):
    if not deep:
      return super(MajorityVoteClassifier, self).et_params(deep=False)
    else:
      out = self.named_classifiers.copy()
      for name, step in self.named_classifiers.items():
        for key, value in step.get_params(deep=True).items():
          out['%s__%s' % (name, key)] = value

    return out

In [27]:
from sklearn import datasets
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

In [28]:
wine = datasets.load_iris()
X, y = wine.data[50:, [1, 2]], iris.target[50:]
le = LabelEncoder()
y = le.fit_transform(y)

X_train, X_test, y_train, y_test =\
train_test_split(X, y,
                 test_size=0.5,
                 random_state=1,
                 stratify=y)

In [33]:
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_score

In [34]:
clf1 = LogisticRegression(penalty='l2', C=0.001, random_state=1)
clf2 = DecisionTreeClassifier(max_depth=1, criterion='entropy', random_state=0)
clf3 = KNeighborsClassifier(n_neighbors=1, p=2, metric='minkowski')

In [37]:
pipe1 = Pipeline([['sc', StandardScaler()],
 ['clf', clf1]])
pipe3 = Pipeline([['sc', StandardScaler()],
 ['clf', clf3]])

clf_labels = ['Logistic regression', 'Decision tree', 'KNN']

print('10-겹 교차 검증:\n')
for clf, label in zip([pipe1, clf2, pipe3], clf_labels):
  scores = cross_val_score(estimator=clf,
                           X=X_train,
                           y=y_train,
                           cv=10,
                           scoring='roc_auc')
  print("ROC AUC: %0.2f (+/- %0.2f) [%s]"
  % (scores.mean(), scores.std(), label))

10-겹 교차 검증:

ROC AUC: 0.92 (+/- 0.15) [Logistic regression]
ROC AUC: 0.87 (+/- 0.18) [Decision tree]
ROC AUC: 0.85 (+/- 0.13) [KNN]


In [41]:
# 다수결 (하드) 투표

mv_clf = MajorityVoteClassifier(classifiers=[pipe1, clf2, pipe3])

clf_labels += ['Majority voting']
all_clf = [pipe1, clf2, pipe3, mv_clf]

for clf, label in zip(all_clf, clf_labels):
  scores = cross_val_score(estimator=clf,
                           X=X_train,
                           y=y_train,
                           cv=10,
                           scoring='roc_auc')

  print("ROC AUC: %0.2f (+/- %0.2f) [%s]"
  % (scores.mean(), scores.std(), label))

ROC AUC: 0.92 (+/- 0.15) [Logistic regression]
ROC AUC: 0.87 (+/- 0.18) [Decision tree]
ROC AUC: 0.85 (+/- 0.13) [KNN]


AttributeError: 'super' object has no attribute 'et_params'

In [17]:
svm_clf = SVC(probability=True, random_state=42)
svm_clf.fit(X_train, y_train)
svm_probs = svm_clf.predict_proba(X_test)[:, 1]
svm_auc = roc_auc_score(y_test, svm_probs)

In [18]:
knn_clf = KNeighborsClassifier()
knn_clf.fit(X_train, y_train)
knn_probs = knn_clf.predict_proba(X_test)[:, 1]
knn_auc = roc_auc_score(y_test, knn_probs)

In [19]:
log_clf = LogisticRegression(random_state=42)
log_clf.fit(X_train, y_train)
log_probs = log_clf.predict_proba(X_test)[:, 1]
log_auc = roc_auc_score(y_test, log_probs)

In [20]:
tree_clf = DecisionTreeClassifier(random_state=42)
tree_clf.fit(X_train, y_train)
tree_probs = tree_clf.predict_proba(X_test)[:, 1]
tree_auc = roc_auc_score(y_test, tree_probs)

In [21]:
# 앙상블 모델
voting_clf = VotingClassifier(
    estimators=[('svm', svm_clf), ('knn', knn_clf), ('log', log_clf), ('tree', tree_clf)],
    voting='soft'  # 확률 기반 앙상블
)
voting_clf.fit(X_train, y_train)
voting_probs = voting_clf.predict_proba(X_test)[:, 1]
voting_auc = roc_auc_score(y_test, voting_probs)

In [23]:
print(f"SVM ROC AUC: {svm_auc:.4f}")
print(f"KNN ROC AUC: {knn_auc:.4f}")
print(f"로지스틱 회귀 ROC AUC: {log_auc:.4f}")
print(f"의사결정트리 ROC AUC: {tree_auc:.4f}")
print(f"앙상블 ROC AUC: {voting_auc:.4f}")

SVM ROC AUC: 1.0000
KNN ROC AUC: 1.0000
로지스틱 회귀 ROC AUC: 1.0000
의사결정트리 ROC AUC: 0.9773
앙상블 ROC AUC: 1.0000
