# NMF (Non-Negative Matrix Factorization; 非負値行列因子分解)

## NMFとは


## scikit-learnを使った実験

In [1]:
import numpy as np
import pandas as pd 
import plotly.express as px 
from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from sklearn.decomposition import NMF 
from sklearn.exceptions import NotFittedError
from tqdm.auto import trange
import plotly.express as px 

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
news_train = fetch_20newsgroups(subset="train")
news_test = fetch_20newsgroups(subset="test")
vectorizer = CountVectorizer(lowercase=True, max_features=1000, stop_words="english", min_df=2, max_df=0.5)
X_train = vectorizer.fit_transform(news_train.data)
X_test = vectorizer.transform(news_test.data)

In [7]:
id2word = {id:key for id,key in enumerate(vectorizer.get_feature_names())}
word2id = {key:id for id,key in id2word.items()}



## NumPyを使って実装する

In [34]:
def update_Vt_by_euclid(X, U, Vt):
    _X = U @ Vt # (D,F)=(D,K)@(K,F)
    _bias = (U.T @ X) / (U.T @ _X) # (D,K).T@(D,F) / (D,K).T@(D,F)
    _bias[np.isnan(_bias)] = 0.0
    Vt *= _bias # (K,F)=(K,F)*(K,F)
    return Vt 

def update_U_by_euclid(X,U,Vt):        
    _X = U @ Vt # (D,F)=(D,K)@(K,F)
    _bias = (X @ Vt.T) / (_X @ Vt.T) # (D,F)@(K,F).T / (D,F)@(K,F).T
    _bias[np.isnan(_bias)] = 0
    U *= _bias # (D,K)=(D,K)(D,K)
    return U

def cost_fn_by_euclid(X,_X):
    return np.linalg.norm(X - _X, axis=1).mean()

class MyNMF():
    def __init__(self, n_components:int=2, max_iter:int=100, rng:bool=None, divergence="euclid"):
        self.n_components = n_components
        self.max_iter = max_iter
        self.rng_ = rng if rng is not None else np.random.default_rng(2**1000) 
        self.divergence = divergence
        self.is_fitted = False 
        self.cost_ = []
        if self.divergence == "euclid":
            self.update_Vt = update_Vt_by_euclid
            self.update_U = update_U_by_euclid
            self.cost_fn = cost_fn_by_euclid
        else:
            NotImplementedError('divergenceは["eculid",]から選択')

    def fit_transform(self, X:np.ndarray,y=None):
        X = X.astype(np.float64)
        self._n_features = X.shape[1]
        
        # Initialize two small matrices from a uniform distribution
        _U = self.rng_.uniform(0,1, 
                              size=[X.shape[0],self.n_components],
                              ).astype(X.dtype) # (D,K)
        _Vt = self.rng_.uniform(0,1,
                                size=[self.n_components, self._n_features],
                                ).astype(X.dtype) # (K,F)
        
        # update parameters
        for i in trange(self.max_iter):
            _Vt = self.update_Vt(X,_U,_Vt)
            _U = self.update_U(X,_U,_Vt)
            _X = _U@_Vt
            self.cost_.append(self.cost_fn(X, _X))
        
        # output
        self.components_ = _Vt # Store _Vt in instance variable to be accessed from outside
        self.is_fitted = True # Raise the flag
        return _U
    
    def fit(self,X,y=None):
        self.fit_transform(X)
        return self
    
    def transform(self, X):
        if not self.is_fitted:
            raise NotFittedError(f"{self.__class__.__name__}.transformはfit後にのみ利用できる")
        if self.components_.shape[1] != X.shape[1]:
            raise ValueError("Xと訓練データの特徴数が異なっている")
        X = X.astype(np.float64)
        
        # Initialize U from a uniform distribution
        U = self.rng_.uniform(0,1, 
                              size=[X.shape[0],self.n_components],
                              ).astype(X.dtype) # (K,F)
        
        for i in trange(self.max_iter):
            U = self.update_U(X,U,self.components_)
        return U

In [49]:
nmf = MyNMF(20,max_iter=50)
U = nmf.fit_transform(X_train)

  0%|          | 0/50 [00:00<?, ?it/s]

In [50]:
px.line(nmf.cost_, 
        title="訓練中のコスト関数の値",
        height=600, 
        width=600,
        )