In [1]:
import os
import joblib
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import NearMiss
from keras.utils import to_categorical
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier  # 분류분석
from sklearn.metrics import classification_report, precision_score, recall_score, f1_score, fbeta_score
from sklearn.metrics import adjusted_rand_score, homogeneity_score, completeness_score, v_measure_score, mutual_info_score
from sklearn.neural_network import MLPClassifier
from sklearn.cluster import KMeans
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier, VotingClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier

In [4]:
filepath=r'D:\ai\Downloads\Data\paintings\data'

# data load

In [5]:
VGG_vectors = np.load(os.path.join(filepath,'VGG_vectors.npy'))
VGG_vectors.shape

FileNotFoundError: [Errno 2] No such file or directory: 'D:\\ai\\Downloads\\Data\\VGG_vectors.npy'

In [None]:
painting = pd.read_csv(os.path.join(filepath,'painting.csv'))
painting.head()

In [None]:
X = VGG_vectors
y = painting['artist']

In [None]:
# y 라벨 인코딩 (문자형 → 숫자로 변환)
label_encoder = LabelEncoder()
y_labels = label_encoder.fit_transform(y)  # 장르를 숫자로 변환

# y 원핫 인코딩 
onehot_encoder = OneHotEncoder(sparse=False)
y_onehot = onehot_encoder.fit_transform(y_labels.reshape(-1, 1))
y_labels[:5],  y_onehot[:5]

In [None]:
# 장르별 데이터 개수 확인
artist_counts = y.value_counts()
artist_ratios = artist_counts / artist_counts.sum()  # 장르별 비율 유지
artist_counts.head(), artist_ratios.head()

In [None]:
# 각 장르에서 샘플링할 개수 설정
total_samples = 80000  # 사용할 총 샘플 수
samples_per_artist = (artist_ratios * total_samples).astype(int)

# 최소 10개 이상 유지
min_samples_per_class = 10
samples_per_artist[samples_per_artist < min_samples_per_class] = min_samples_per_class

In [None]:
# 샘플링 수행 (각 장르에서 비율 유지)
selected_indices = []

for artist, sample_count in samples_per_artist.items():
    artist_indices = y[y == artist].index.tolist()
    sampled_indices = np.random.choice(artist_indices, sample_count)
    selected_indices.extend(sampled_indices)

In [None]:
X_sampled = X[selected_indices]  # 이미지 데이터 샘플링
y_sampled = y_onehot[selected_indices]  # 레이블 샘플링

In [None]:
train_X, test_X, train_y, test_y = train_test_split(X_sampled, y_sampled, 
                                                    test_size=0.3, 
                                                    shuffle=True,  # 분할 전 데이터 섞기
                                                    stratify=y_sampled     # 층화추출
                                                   )
train_X.shape, test_X.shape, train_y.shape, test_y.shape

# 분류분석

## DecisionTreeClassifier

In [None]:
# 의사결정나무를 구현한 DecisionTreeClassifier를 통한 모형
dt_model = DecisionTreeClassifier(random_state=1)
dt_model = dt_model.fit(train_X, train_y)  # 머신러닝에서는 fit시 독립변수에 array
dt_model

In [None]:
dt_model.score(test_X, test_y)  # accuracy 3장에서 더 많은 평가 방법 확인

In [None]:
dt_pred_y = dt_model.predict(test_X)
pd.crosstab(test_y, dt_pred_y)  # 실제값, 예측값

In [None]:
# y_test는 실제 정답 (원-핫 인코딩된 경우 argmax로 변환)
y_test_labels = test_y
y_pred_labels = dt_pred_y  # 모델 예측값도 동일하게 변환

# ✅ 상세한 성능 지표 출력
print("🔹 Classification Report:")
print(classification_report(y_test_labels, y_pred_labels, digits=4))

# ✅ 개별 지표 계산 (매크로 평균 사용)
precision = precision_score(y_test_labels, y_pred_labels, average='macro')
recall = recall_score(y_test_labels, y_pred_labels, average='macro')
f1 = f1_score(y_test_labels, y_pred_labels, average='macro')

print(f"🎯 Precision: {precision:.4f}")
print(f"🔄 Recall: {recall:.4f}")
print(f"🔥 F1-score: {f1:.4f}")

In [None]:
# fbeta score의 beta=1 : f1 score
# fbeta score의 2>=beta>1 : recall의 가중치가 높게 조정된 f1 score
# fbeta score의 0<=beta<1 : precision의 가중치가 높게 조정된 f1 score
print(fbeta_score(test_y, dt_pred_y, beta=0.5, average='macro'))
print(fbeta_score(test_y, dt_pred_y, beta=1, average='macro'))
print(fbeta_score(test_y, dt_pred_y, beta=2, average='macro'))

## MLPClassifier

In [None]:
# 다중신경망 모형
mlp_model = MLPClassifier(hidden_layer_sizes=(38, 64, 32), max_iter=500, random_state=1)
mlp_model = mlp_model.fit(train_X, train_y)
mlp_model

In [None]:
mlp_model.score(test_X, test_y)

In [None]:
mlp_pred_y = mlp_model.predict(test_X)
mlp_result = pd.crosstab(test_y, mlp_pred_y)
mlp_result

In [None]:
# y_test는 실제 정답 (원-핫 인코딩된 경우 argmax로 변환)
y_test_labels = test_y
y_pred_labels = mlp_pred_y  # 모델 예측값도 동일하게 변환

# ✅ 상세한 성능 지표 출력
print("🔹 Classification Report:")
print(classification_report(y_test_labels, y_pred_labels, digits=4))

# ✅ 개별 지표 계산 (매크로 평균 사용)
precision = precision_score(y_test_labels, y_pred_labels, average='macro')
recall = recall_score(y_test_labels, y_pred_labels, average='macro')
f1 = f1_score(y_test_labels, y_pred_labels, average='macro')

print(f"🎯 Precision: {precision:.4f}")
print(f"🔄 Recall: {recall:.4f}")
print(f"🔥 F1-score: {f1:.4f}")

In [None]:
# fbeta score의 beta=1 : f1 score
# fbeta score의 2>=beta>1 : recall의 가중치가 높게 조정된 f1 score
# fbeta score의 0<=beta<1 : precision의 가중치가 높게 조정된 f1 score
print(fbeta_score(test_y, mlp_pred_y, beta=0.5, average='macro'))
print(fbeta_score(test_y, mlp_pred_y, beta=1, average='macro'))
print(fbeta_score(test_y, mlp_pred_y, beta=2, average='macro'))

# 군집분석

In [None]:
os.environ['OMP_NUM_THREADS'] = '1'

In [None]:
# 그래프 저장 시 해상도 높게
%config InlineBackend.figure_format = 'retina'

# 한글설정
# plt.rc('font', family='Malgun Gothic')  # 윈도우즈
# plt.rcParams['font.family'] = 'Malgun Gothic'
plt.rc('font', family='AppleGothic') # Mac 운영체제
plt.rc('axes', unicode_minus=False)  # 축의 -(마이너스) 깨짐 방지

## KMeans 클러스터링

In [None]:
len(set(y))

In [None]:
kmeans_model = KMeans(n_clusters=len(set(y)),  # 클러스터 갯수:2개의 그룹으로 나눔
               init='random', # random(중심초기점이 random), k-means++(멀리 떨어진 초기점)
               n_init=5,
               max_iter=300)  # 300번 중심점 이동
kmeans_model.fit(X)

In [None]:
kmeans_model.cluster_centers_  # 최종 두 클러스터의 중심점

In [None]:
len(kmeans_model.cluster_centers_)

In [None]:
pred = kmeans_model.predict(X)
print(' 예 측 값 : ', pred)
print('modelLabel: ', kmeans_model.labels_)
print(' 실제 y값 : ', y)

In [None]:
pd.crosstab(y, kmeans_model.labels_, rownames=['실제값'], colnames=['k-means값'])

## 군집모형 성능평가

In [None]:
VGG_vectors = np.load(os.path.join(filepath,'VGG_vectros.npy'))
painting = pd.read_csv(os.path.join(filepath,'painting.csv')

X = VGG_vectors
y = painting['artist']

le = LabelEncoder()
y_encoded = le.fit_transform(y)  # 문자열 라벨을 숫자로 변환

In [None]:
len(le.classes_)

In [None]:
model = KMeans(n_clusters=len(le.classes_), random_state=1, n_init=10)
model.fit(X)

In [None]:
pred = model.predict(X)
all(pred == model.labels_)

In [None]:
# 예측된 클러스터 라벨
pred = model.labels_

# 각 클러스터에서 가장 많이 등장하는 실제 라벨 찾기
mapping = {}
for cluster in range(len(le.classes_)):
    mask = (pred == cluster)
    if np.any(mask):  # 해당 클러스터에 속한 샘플이 존재하는 경우만 실행
        most_common_label = np.bincount(y_encoded[mask]).argmax()
        mapping[cluster] = most_common_label

# 클러스터 인덱스를 실제 라벨과 매핑
adjusted_pred = np.vectorize(mapping.get)(pred)

# 교차표 생성
cross_tab = pd.crosstab(y_encoded, adjusted_pred, rownames=['실제값'], colnames=['예측값'])

# 결과 출력
cross_tab

In [None]:
pred_str = le.inverse_transform(adjusted_pred)
pd.crosstab(y, pred_str)

## 조정된 rand지수 외 성능평가 기준 함수

In [None]:
adjusted_rand_score(labels_true=y, labels_pred=pred_str)

In [None]:
adjusted_rand_score(labels_true=y_encoded, labels_pred=pred)

In [None]:
homogeneity_score(y, pred_str)

In [None]:
completeness_score(y, pred_str)

In [None]:
v_measure_score(y, pred_str)

In [None]:
mutual_info_score(y, pred_str)

# 앙상블모형

In [None]:
train_X.shape, train_y.shape, test_X.shape, test_y.shape

In [None]:
le = LabelEncoder()
train_y = le.fit_transform(train_y)
test_y = le.fit_transform(test_y)

def model_measure(model, train_X=train_X, train_y=train_y, test_X=test_X, test_y=test_y):
    model.fit(train_X, train_y)
    pred = model.predict(test_X)
    accuracy  = model.score(test_X, test_y)
    precision = precision_score(test_y, pred, average="macro")
    recall    = recall_score(test_y, pred, average="macro")
    f1score  = f1_score(test_y, pred, average="macro")
    return '정확도:{:.3f}, 정밀도:{:.3f}, 재현율:{:.3f}, f1_score:{:.3f}'.format(accuracy, precision, recall, f1score)

# 의사결정 방식

## 배깅 방식

In [None]:
# # 배깅 알고리즘
# from sklearn.ensemble import BaggingClassifier
# bag = BaggingClassifier(estimator=dt_model, 
#                         n_estimators=500,         # 500그루
#                         bootstrap=True,           # 복원추출, 중복추출 허용
#                         bootstrap_features=False, # 모든 특성을 사용해서 학습
#                         random_state=1)
# bag.fit(train_X, train_y)
# bag.score(test_X, test_y)

In [None]:
rf = RandomForestClassifier(n_estimators=100,         # 의사결정나무 100개
                            max_features=len(set(y)), # 2개 특징으로 나눔
                            random_state=42).fit(train_X, train_y)
rf.score(test_X, test_y)

In [None]:
# rf_model = model_measure(RandomForestClassifier())
# rf_model

## 부스팅 방식

In [None]:
xgb = model_measure(XGBClassifier())
xgb

In [None]:
lgb = model_measure(LGBMClassifier(force_col_wise=True))
lgb

## 투표를 이용한 앙상블

In [None]:
# # voting 알고리즘 - hard 방식
# rf_model = RandomForestClassifier(n_estimators=100, # 의사결정나무 100개
#                                   max_features=2,     # 2개 특징으로 나눔
#                                   random_state=42)
# xgb_model = XGBClassifier(max_depth=10,          # 트리의 최대 깊이
#                           n_estimators=100,      # 트리 갯수
#                           learning_rate=0.01,    # 학습률
#                           eval_metric='logloss') # 평가지표(이진분류)
# lgb_model = LGBMClassifier(force_col_wise=True, verbose=-1)
# print(model_measure(rf_model))
# print(model_measure(xgb_model))
# print(model_measure(lgb_model))

In [None]:
# ✅ 경량화된 랜덤포레스트 모델
rf_model = RandomForestClassifier(
    n_estimators=50,      # 트리 개수 줄이기
    max_depth=5,          # 트리 깊이 제한
    max_features='sqrt',  # 최적의 특징 개수 자동 선택
    random_state=42
)

# ✅ 경량화된 XGBoost 모델
xgb_model = XGBClassifier(
    max_depth=4,          # 트리 깊이 제한
    n_estimators=50,      # 트리 개수 줄이기
    learning_rate=0.1,    # 학습 속도 증가
    subsample=0.8,        # 데이터 일부 샘플링
    colsample_bytree=0.8, # 일부 특성만 사용
    tree_method='hist',   # 히스토그램 기반 트리 (메모리 절약)
    eval_metric='logloss',
#     use_label_encoder=True,
    random_state=42
)

# ✅ 경량화된 LightGBM 모델
lgb_model = LGBMClassifier(
    n_estimators=50,      # 트리 개수 줄이기
    max_depth=4,          # 트리 깊이 제한
    num_leaves=16,        # 리프 개수 줄이기
    subsample=0.8,        # 데이터 일부 샘플링
    colsample_bytree=0.8, # 일부 특성만 사용
    verbose=-1,           # 불필요한 출력 제거
    random_state=42
)
print(model_measure(rf_model))
print(model_measure(xgb_model))
print(model_measure(lgb_model))

In [None]:
voting_model_hard = VotingClassifier(estimators=[('rfm', rf_model),
                                                 ('xgb', xgb_model),
                                                 ('lgb', lgb_model)],
                                     voting='hard') # voting='hard' 기본값
voting_model_hard.fit(X_sampled, y_sampled)

In [None]:
voting_model_hard.predict(test_X[0].reshape(1, -1))

In [None]:
# voting 알고리즘 - soft 방식
voting_model_soft = VotingClassifier(estimators=[('rfm', rf_model),
                                                 ('xgb', xgb_model),
                                                 ('lgb', lgb_model)],
                                     voting='soft') # voting='hard' 기본값
voting_model_soft.fit(X_sampled, y_sampled)

In [None]:
voting_model_soft.predict(test_X[0].reshape(1, -1))

In [None]:
test_y[0]

In [None]:
inverse_test_y = le.inverse_transform(test_y)
inverse_test_y[0]

In [None]:
model_measure(voting_model_hard)

In [None]:
model_measure(voting_model_soft)

In [None]:
# voting_model_hard의 개별 모델들 딕셔너리 형태로 반환
voting_model_hard.named_estimators_

In [None]:
# voting_model_soft의 개별 모델들 딕셔너리 형태로 반환
voting_model_soft.named_estimators_

In [None]:
print(model_measure(voting_model_hard.named_estimators_['rfm']))  # 단일모델로 개별모델 access 가능
print(model_measure(voting_model_hard.named_estimators_['xgb']))
print(model_measure(voting_model_hard.named_estimators_['lgb']))

In [None]:
print(model_measure(voting_model_soft.named_estimators_['rfm']))  # 단일모델로 개별모델 access 가능
print(model_measure(voting_model_soft.named_estimators_['xgb']))
print(model_measure(voting_model_soft.named_estimators_['lgb']))

In [None]:
voting_model_hard.get_params()  # 모델 내의 모든 파라미터(하이퍼파라미터)

In [None]:
voting_model_soft.get_params()  # 모델 내의 모든 파라미터(하이퍼파라미터)

## 머신러닝 모형 저장

In [None]:
# 대용량 모형일 때 : joblib 파일로 저장 (joblib 라이브러리 사용)
print(joblib.dump(voting_model_hard, os.path.join(filepath,'voting_model_hard.joblib')))
print(joblib.dump(voting_model_soft, os.path.join(filepath,'voting_model_soft.joblib')))

In [None]:
# 모델 load
loaded_voting_model_hard = joblib.load(os.path.join(filepath,'voting_model_hard.joblib'))
model_measure(loaded_voting_model_hard)

In [None]:
# 모델 load
loaded_voting_model_soft = joblib.load(os.path.join(filepath,'voting_model_soft.joblib'))
model_measure(loaded_voting_model_soft)