In [1]:
# PCA
from sklearn.decomposition import PCA
import numpy as np
from scipy import linalg
# generate random data(10000, 1000)
data = np.random.rand(10000, 1000)

# generate data, 4 distributions cluster

data_cluster = np.concatenate([
    np.random.normal(loc=0, scale=1, size=(1000, 100)),
    np.random.normal(loc=5, scale=1, size=(1000, 100)),
    np.random.normal(loc=10, scale=1, size=(1000, 100)),
    np.random.normal(loc=15, scale=1, size=(1000, 100)),
])


# PCAの一般的な実装

In [2]:
def pca_sklearn(data, n_components=2):
    pca = PCA(n_components=n_components)
    return pca.fit_transform(data)

In [12]:
class PCA_Scratch:
    def __init__(self, n_components=2):
        self.n_components = n_components
        self.mean_ = None
        self.components_ = None
        self.explained_variance_ = None
        self.explained_variance_ratio_ = None
        self.singular_values_ = None
        self.n_samples_ = None
        self.noise_variance_ = None

    def fit(self, X):
        n_samples, n_features = X.shape
        self.n_samples_ = n_samples

        # データの平均
        self.mean_ = np.mean(X, axis=0)

        # 共分散行列を計算（平均中心化は間接的に）
        C = X.T @ X
        C -= n_samples * np.outer(self.mean_, self.mean_)
        C /= (n_samples - 1)

        # 固有値分解
        eigenvals, eigenvecs = np.linalg.eigh(C)

        # 降順に並べ替え
        eigenvals = eigenvals[::-1]
        eigenvecs = eigenvecs[:, ::-1]

        # 数値誤差で負の値があれば0に
        eigenvals = np.clip(eigenvals, 0, None)

        # 主成分の数だけ切り出し
        self.components_ = eigenvecs[:, :self.n_components].T  # (n_components, n_features)
        self.explained_variance_ = eigenvals[:self.n_components]

        total_var = np.sum(eigenvals)
        self.explained_variance_ratio_ = self.explained_variance_ / total_var

        self.singular_values_ = np.sqrt(self.explained_variance_ * (n_samples - 1))

        # ノイズ分散（主成分以外の固有値の平均）
        if self.n_components < min(n_samples, n_features):
            self.noise_variance_ = np.mean(eigenvals[self.n_components:])
        else:
            self.noise_variance_ = 0.0

        # データ中心化
        X_centered = X - self.mean_

        # 主成分スコアを計算
        U = X_centered @ self.components_.T  # (n_samples, n_components)

        return U, self.singular_values_, self.components_, X_centered, True, np

    def fit_transform(self, X):
        U, S, _, X_centered, x_is_centered, xp = self.fit(X)
        # 主成分スコアに特異値を掛ける（scikit-learn準拠）
        # U = U * S

        return U

In [13]:
# test: 実装したPCA_Scratchクラスを使って　変換されたデータが一致するか
import plotly.express as px
pca_scratch = PCA_Scratch(n_components=2)
pca_result = pca_scratch.fit_transform(data_cluster)
print(pca_scratch.explained_variance_ratio_)

pca_sklearn = PCA(n_components=2)
pca_sklearn_result = pca_sklearn.fit_transform(data_cluster)
print(pca_sklearn.explained_variance_ratio_)

fig = px.scatter(
    x=pca_result[:, 0],
    y=pca_result[:, 1],
    title="PCA Scratch Result",
    labels={'x': 'PC1', 'y': 'PC2'}
)
fig.show()
fig = px.scatter(
    x=pca_sklearn_result[:, 0],
    y=pca_sklearn_result[:, 1],
    title="PCA Sklearn Result",
    labels={'x': 'PC1', 'y': 'PC2'}
)
fig.show()

[9.69324767e-01 4.06730971e-04]
[9.69324767e-01 3.94369337e-04]


In [33]:
# 計算時間比較

import time
start_time = time.time()
pca_scratch.fit_transform(data_cluster)
end_time = time.time()
print(f"PCA_Scratch time: {end_time - start_time:.4f} seconds")
start_time = time.time()
pca_sklearn.fit_transform(data_cluster)
end_time = time.time()
print(f"PCA_Sklearn time: {end_time - start_time:.4f} seconds")


PCA_Scratch time: 0.0051 seconds
PCA_Sklearn time: 0.0100 seconds


# 共分散行列の作成

In [37]:
import numpy as np

class WeightedPCA:
    def __init__(self, n_components=2):
        self.n_components = n_components
        self.components_ = None
        self.mean_ = None
        self.explained_variance_ = None

    def fit(self, X, sample_weights=None):
        if sample_weights is None:
            sample_weights = np.ones(X.shape[0])

        # Normalize weights
        sample_weights = sample_weights / np.sum(sample_weights)

        # Compute weighted mean
        self.mean_ = np.average(X, axis=0, weights=sample_weights)

        # Centered data
        X_centered = X - self.mean_

        # Compute weighted covariance matrix
        weighted_cov = (X_centered.T * sample_weights) @ X_centered

        # SVD
        U, S, Vt = np.linalg.svd(weighted_cov)

        # Select top components
        self.components_ = Vt[:self.n_components]
        self.explained_variance_ = S[:self.n_components]

    def transform(self, X):
        X_centered = X - self.mean_
        return X_centered @ self.components_.T

    def fit_transform(self, X, sample_weights=None):
        self.fit(X, sample_weights)
        return self.transform(X)


In [45]:
# 重み: 最初の10個を強調
weights = np.ones(1000)
weights[:10] = 10
# ダミーデータ
np.random.seed(42)
X = np.random.rand(1000, 500)
# 実行
wpca = WeightedPCA(n_components=2)
X_proj = wpca.fit_transform(X, sample_weights=weights)

In [94]:
import numpy as np

def covariance(X):
    """普通の共分散行列計算"""
    X_centered = X - np.mean(X, axis=0)
    return (X_centered.T @ X_centered) / (X.shape[0] - 1)

# def weighted_covariance(X, w):
#     """重み付き共分散行列計算"""
#     w = np.array(w)
#     w_sum = np.sum(w)
#     x_mean = np.sum(X * w[:, None], axis=0) / w_sum
#     X_centered = X - x_mean
   

#     cov = np.zeros((X.shape[1], X.shape[1]))
#     for i in range(len(X)):
#         cov += w[i] * np.outer(X_centered[i], X_centered[i])
#     cov /= (w_sum - 1)
#     return cov

def weighted_covariance(X, w):
    """重み付き共分散行列の高速版（ループ無し）"""
    w = np.asarray(w)
    w_sum = np.sum(w)
    x_mean = np.average(X, axis=0, weights=w)
    X_centered = X - x_mean  # shape: (n, d)

    # 各行に対応する重みをかける (w[:, None] によって (n, 1) → (n, d) へブロードキャスト)
    X_weighted = X_centered * np.sqrt(w[:, None])  # 安定性のため平方根
    cov = (X_weighted.T @ X_weighted) / (w_sum - 1)

    return cov

def weighted_covariance_time(X, w):
    """重み付き共分散行列の高速版（ループ無し）"""
    _start = time.time()
    w = np.asarray(w)
    _end = time.time()
    print(f"Time to convert weights: {_end - _start:.6f} seconds")
    _start = time.time()
    w_sum = np.sum(w)
    _end = time.time()
    print(f"Time to sum weights: {_end - _start:.6f} seconds")
    _start = time.time()
    # x_mean = np.average(X, axis=0, weights=w)
    w = w / np.sum(w)
    x_mean = (X.T @ w).T
    _end = time.time()
    print(f"Time to compute weighted mean: {_end - _start:.6f} seconds")
    _start = time.time()
    X_centered = X - x_mean  # shape: (n, d)
    _end = time.time()
    print(f"Time to center data: {_end - _start:.6f} seconds")
    # 各行に対応する重みをかける (w[:, None] によって (n, 1) → (n, d) へブロードキャスト)
    _start = time.time()
    X_weighted = X_centered * np.sqrt(w[:, None])  # 安定性のため平方根
    _end = time.time()
    print(f"Time to weight data: {_end - _start:.6f} seconds")
    _start = time.time()
    cov = (X_weighted.T @ X_weighted) / (w_sum - 1)
    _end = time.time()
    print(f"Time to compute covariance: {_end - _start:.6f} seconds")
    return cov
# 元データ
X = np.array([
    [2, 0],
    [0, 4],
    [3, 3]
])

# 一つのデータ点を複製（例えば最初のデータ点を2回追加 = 計3回出現）
weight_factor = 2
dup_index = 0  # 最初のデータ点を複製
X_dup = np.vstack([X] + [X[dup_index:dup_index+1]] * (weight_factor - 1))

# 重みベクトル（最初のデータ点だけ重みを大きく）
w = np.array([weight_factor if i == dup_index else 1 for i in range(len(X))])
print("重み:", w)

# 共分散行列の計算
cov_dup = covariance(X_dup)
cov_w = weighted_covariance(X, w)

# 結果表示
print("X_dup（複製後のデータ）:")
print(X_dup)

print("\n共分散行列（複製データ）:")
print(cov_dup)

print("\n共分散行列（重み付き）:")
print(cov_w)

print("\n共分散行列（元データ）:")
print(covariance(X))

# 差分のノルム
diff = np.linalg.norm(cov_dup - cov_w)
print(f"\n差のノルム: {diff:.6f}")


重み: [2 1 1]
X_dup（複製後のデータ）:
[[2 0]
 [0 4]
 [3 3]
 [2 0]]

共分散行列（複製データ）:
[[ 1.58333333 -1.08333333]
 [-1.08333333  4.25      ]]

共分散行列（重み付き）:
[[ 1.58333333 -1.08333333]
 [-1.08333333  4.25      ]]

共分散行列（元データ）:
[[ 2.33333333 -1.33333333]
 [-1.33333333  4.33333333]]

差のノルム: 0.000000


In [95]:
# 大きいデータでどれくらい差が出るか計測

import numpy as np
# ダミーデータ
np.random.seed(42)
X = np.random.rand(10000, 500)

# 重み: 最初の1000個を強調
weights = np.ones(10000)
weights[:1000] = 10

# 重複データ（複製）
weight_factor = 100

# 最初の1000個を複製
X_dup = np.vstack([X] + [X[:1000]] * (weight_factor - 1))

start_time = time.time()
covariance(X_dup)
end_time = time.time()
print(f"普通の共分散計算時間: {end_time - start_time:.4f}秒")

start_time = time.time()
weighted_covariance_time(X, weights)
end_time = time.time()
print(f"重み付き共分散計算時間: {end_time - start_time:.4f}秒")



普通の共分散計算時間: 0.2367秒
Time to convert weights: 0.000000 seconds
Time to sum weights: 0.001001 seconds
Time to compute weighted mean: 0.001522 seconds
Time to center data: 0.009517 seconds
Time to weight data: 0.008013 seconds
Time to compute covariance: 0.011027 seconds
重み付き共分散計算時間: 0.0347秒


In [2]:
# pca 実装

class PCA_Weighted:
    def __init__(self, n_components=2):
        self.n_components = n_components
        self.mean_ = None
        self.components_ = None
        self.singular_values_ = None
        self.explained_variance_ = None
        self.explained_variance_ratio_ = None
        self.n_samples_ = None


    def fit(self, X, weights=None):
        n_samples, n_features = X.shape

        w = np.asarray(weights)
        w_sum = np.sum(w)
        w = w / w_sum
        # データの平均
        x_mean = (X.T @ w).T  # 重み付き平均
        # 中心化
        X_centered = X - x_mean
        # 共分散行列
        X_weighted = X_centered * np.sqrt(w[:, None])  # 各行に重みを掛ける
        cov = (X_weighted.T @ X_weighted) / (w_sum - 1)
        # 固有値分解
        eigenvals, eigenvecs = np.linalg.eigh(cov)  
        # 降順に並べ替え
        eigenvals = eigenvals[::-1]
        eigenvecs = eigenvecs[:, ::-1]

        # 主成分
        self.components_ = eigenvecs[:, :self.n_components].T  # (n_components, n_features)

        # explained variance
        self.explained_variance_ = eigenvals[:self.n_components]
        total_var = np.sum(eigenvals)
        self.explained_variance_ratio_ = self.explained_variance_ / total_var
        
        self.singular_values_ = np.sqrt(self.explained_variance_ * (n_samples - 1))

        # 主成分スコアを計算
        U = X_centered @ self.components_.T
        
        return U, self.singular_values_, self.components_, X_centered, True, np


    def fit_transform(self, X, weights=None):
        U, S, _, X_centered, x_is_centered, xp = self.fit(X, weights)
        # 主成分スコアに特異値を掛ける（scikit-learn準拠）
        # U = U * S

        return U

In [7]:

# PCA_Weightedのテスト
pca_weighted = PCA_Weighted(n_components=2)
weights = np.ones(data_cluster.shape[0])
weights[:10] = 10  # 最初の10個のデータ点に大きな重みを設定
pca_weighted_result = pca_weighted.fit_transform(data_cluster, weights=weights) 
# 結果の確認
import plotly.express as px
fig = px.scatter(
    x=pca_weighted_result[:, 0],
    y=pca_weighted_result[:, 1],
    title="PCA Weighted Result",
    labels={'x': 'PC1', 'y': 'PC2'}
)
fig.show()

In [None]:
# 計算速度
import time
start_time = time.time()
pca_weighted.fit_transform(data_cluster, weights=weights)
end_time = time.time()
print(f"PCA_Weighted time: {end_time - start_time:.4f} seconds")

pca = PCA(n_components=2)
start_time = time.time()
pca.fit_transform(data_cluster)
end_time = time.time()
print(f"PCA Sklearn time: {end_time - start_time:.4f} seconds")


PCA_Weighted time: 0.0077 seconds
PCA Sklearn time: 0.0337 seconds
