In [0]:
# TODO:

# - M-step sin loops
# - Adjust batch size in the GD for the weights
# - sample VS takeSample (rdd, probabilistico VS lista, tamaño fijo)

# - Investigar umbral para las responsibilities
# - Eficiencia de withColumn vs select
# - Eficiencia RDD vs DF para recalcular parámetros
# - Producto matricial en M-step para D > 1
# - Lista de componentes VS arrays (comp[i].w VS x[i,:,:])
# - Pesos con intercepto o intercepto a parte (w * x_ext VS w0 + w * x)

In [0]:
import numpy as np
from scipy.stats import norm, multivariate_normal
from scipy.special import logsumexp # previously from scipy.misc import logsumexp
import matplotlib.pyplot as plt
import seaborn as sb
sb.set()

from pyspark.ml.clustering import GaussianMixture
from pyspark.ml.clustering import KMeans
from pyspark.ml.regression import LinearRegression
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import DenseVector, SparseVector, Vectors, VectorUDT
from pyspark.sql.functions import udf, mean as _mean
from pyspark.sql.types import *

#Clase componente para facilitar la manipulación del modelo
class Component:

    def __init__(self, pi, mu, sigma, w, betaInv):
        self.pi = pi
        self.mu = mu
        self.sigma = sigma
        self.w = w
        self.betaInv = betaInv

In [0]:
# Block with the util functions (utils.py)

def _computeResp(features, target, components):
      
        logPdfCompList = list()

        for component in components:
          
            pi = component.pi
            mu = component.mu
            sigma = component.sigma
            w = component.w
            betaInv = component.betaInv

            logPdfComp = (np.log(pi) + 
                          multivariate_normal.logpdf(features[1:], mean=mu, cov=sigma) + 
                          norm.logpdf(target, loc=np.dot(w,features), scale=np.sqrt(betaInv)))

            logPdfCompList.append(logPdfComp)

        logPdfComp = np.array(logPdfCompList)

        logPdf = logsumexp(logPdfComp)
        resp = np.exp(logPdfComp - logPdf)

        # TODO: probar si este bloque afecta. Normalizar L1??
        resp = resp * (resp > 1e-10)
        resp = resp / sum(resp)
        
        results = np.insert(resp, 0, logPdf)

        return Vectors.dense(results)
      
# UDFs block
pdfUdf = udf(lambda x: float(x[0]),  FloatType())
respUdf = udf(lambda x: Vectors.dense(x[1:]),  VectorUDT())

In [0]:
class ClusterwiseLinearModel:
  
    def __init__(self, K):
      
        self.path2data = None # With a normal dataset, change this
        self.components = None
        self.dataTr = None
        self.dataVal = None
        self.dataTst = None
        self.dim = None
        self.K = K
        self.logLikelihoodEvol = list()
        
        return
      
    def loadData(self):
      
        # In this case, no file with the data. We would have something like:
        #data = loadmat(path2data)['data_mat']

        # For this case with an artificial dataset
        nPoints = 100
        
        betaInvTrue = 1
        X = np.random.uniform(-10,10,nPoints)[:,np.newaxis]
        Y = (10 / (1 + np.exp(X)) + np.random.normal(0,np.sqrt(betaInvTrue),(nPoints,1)))
        # A partir de aquí, compartido ##########################################################
        
        # Añadimos los IDs
        ids = np.arange(nPoints)[:,np.newaxis]

        # Paralelización de los datos (sólo train)
        data = np.hstack([ids,X,Y]).astype(float)
        dataRDD = sc.parallelize(data)
        
        # Get the dimensionality of the dataset
        self.dim = X.shape[1]

        dataDF = (dataRDD.map(lambda x: x.tolist()).toDF()
                         .withColumnRenamed('_1', 'ID')
                         .withColumnRenamed('_'+str(self.dim+2), 'target'))
        
        # Get all the columns to generate a unique feature column
        col_names = list(range(2, self.dim+2))
        col_names = ['_' + str(col) for col in col_names]

        # Generate the features column
        vectorAssembler = VectorAssembler(inputCols = col_names, outputCol = 'features')
        dataDF = vectorAssembler.transform(dataDF)
        dataDF = dataDF.select(['ID', 'features', 'target'])
        
        # Train-Val-Test split
        #[dataTrDF, dataValDF, dataTstDF] = dataDF.randomSplit([0.98,0.01,0.01])

        self.dataTr = dataDF
        self.dataVal = None
        self.dataTst = None
        
        return
      
    def initModel(self):
      
        # Get the data
        dataDF = self.dataTr
        
        # Cluster the data
        gmm = (GaussianMixture().setK(self.K)
                                .setFeaturesCol('features'))
        
        modelGMM = gmm.fit(dataDF)
        gmmDF = modelGMM.transform(dataDF)
        
        # Get the parameters for the linear regression
        lr = (LinearRegression().setMaxIter(10)
                                .setRegParam(0.3)
                                .setElasticNetParam(0.8)
                                .setFeaturesCol('features')
                                .setLabelCol('target'))
        
        
        # List to save the components
        compList = list() 
        
        # Get the pi from the GMM
        pi = modelGMM.weights
        
        for i in range(self.K):

            # Filter the DF by cluster
            dataFiltDF = gmmDF.filter(gmmDF['prediction'] == i).select('ID','features','target')

            # Fit the model
            modelLr = lr.fit(dataFiltDF)
            
            # Component parameters
            mu = modelGMM.gaussians[i].mean
            sigma = modelGMM.gaussians[i].cov.toArray()
            #w0 = modelLr.intercept
            w = np.hstack([np.array([modelLr.intercept]), modelLr.coefficients])
            betaInv = modelLr.summary.objectiveHistory[-1]
            
            # Append component
            compList.append(Component(pi[i], mu, sigma, w, betaInv))

        self.components = compList
        
        # Add the ones columns for the bias
        self.addBiasColumn()
        
        return
      
    def addBiasColumn(self):
        
        """Add a ones column for simplicity of the code"""

        addOnesColumnUdf = udf(lambda features: Vectors.dense(np.insert(features, 0, 1.0)), VectorUDT())
        
        # For train data
        df = self.dataTr
        df = df.withColumn('features', addOnesColumnUdf(df['features']))
        self.dataTr = df

        # For validation data
        #df = self.dataVal
        #df = df.withColumn('features', addOnesColumnUdf(df['features']))
        #self.dataVal = df

        # For test data
        #df = self.dataTst
        #df = df.withColumn('features', addOnesColumnUdf(df['features']))
        #self.dataTst = df
        
        return
        
    def E_step(self):
      
        df = self.dataTr
        components = self.components
        
        # We have to redefine the UDF in each iteration so the components are updated
        computeRespUdf = udf(lambda features, target: _computeResp(features, target, components),  VectorUDT())
        
        df = df.withColumn('responsibilities', computeRespUdf(df['features'], df['target']))
        
        df = df.withColumn('logPDF', pdfUdf(df['responsibilities']))
        df = df.withColumn('responsibilities', respUdf(df['responsibilities']))
        
        self.dataTr = df
        
        df_stats = df.select(
            _mean(df['logPDF']).alias('loglikelihood'),
        ).collect()

        loglikelihood = df_stats[0]['loglikelihood']
        
        self.logLikelihoodEvol.append(loglikelihood)
        
        return
      
    def M_step(self):
      
        df = self.dataTr
      
        Nk = (df.select('responsibilities')
                .rdd
                .map(lambda x: np.array(x))
                .reduce(lambda x,y: x+y)
             )[0]


        piNew = Nk / sum(Nk)
        muNew = (df.select('features', 'responsibilities')
                   .rdd
                   .map(lambda x: x[1] * x[0][1:])
                   .reduce(lambda x,y: x+y)
                ) / Nk

        sigmaNew = (df.select('features', 'responsibilities')
                      .rdd
                      .map(lambda x: np.array(x[1]) * np.square(x[0][1:] - muNew))
                      .reduce(lambda x,y: x+y)
                   )[0] / Nk
        
        wNew = list()

        for i in range(mrgr.K):

            counter = 1
            gradient = np.inf
            w = components[i].w
            batchProp = 0.1  
            threshold = 0.001
            nMaxIter = 30
            alpha = 0.0001


            while np.linalg.norm(gradient) > threshold and counter <= nMaxIter:

                batchRDD = (df.select('features', 'target', 'responsibilities')
                              .rdd
                              .sample(False, batchProp))

                batchSize = batchRDD.count()

                gradient = (batchRDD.map(lambda data: (data[2][i] * np.dot(data[0], w) - data[1]) * data[0])
                                    .reduce(lambda x,y : x + y) 
                            / (batchSize * np.sqrt(counter)))

                w = w - alpha * gradient

                counter += 1

            wNew.append(w)
        
        
        # Update components
        compsNew = list()
        
        for i in range(self.K):
        
            compsNew.append(Component(piNew[i], muNew[i], sigmaNew[i], wNew[i], components[i].betaInv))

        self.components = compsNew
        
        return
      
    def plotComponents(self):
      
        color = ['r-','g-','b-','k-']
    
        x_axis = np.linspace(-10,10,5)

        fig = plt.figure()
        
        x = self.dataTr.rdd.map(lambda r: r[1][1]).collect()
        y = self.dataTr.rdd.map(lambda r: r[2]).collect()
        
        plt.plot(x, y, 'k.')

        for i in range(self.K):

            regr = self.components[i].w[0] + self.components[i].w[1] * x_axis      
            plt.plot(x_axis,regr, color[i])

        plt.xlabel('features')
        plt.ylabel('target')    

        axes = plt.gca()
        axes.set_ylim([-3,13])

        display(fig)
        
        return
        
    def plotLogLikelihood(self):
      
        llevol = self.logLikelihoodEvol
        nIter = list(range(1,len(llevol)+1))
        
        fig = plt.figure()
        plt.plot(nIter,llevol)
        
        plt.xlabel('# of iterations')
        plt.ylabel('Log-likelihood')
        
        display(fig)
        
        return
      
    def printComponents(self):
        
        print('-------------------------------------')
        print()
        
        i = 0
        for component in self.components:
  
            print(f'* Component #{i}:')
            print('  |--> pi: ', component.pi)
            print('  |--> mu: ', component.mu)
            print('  |--> sigma: ', component.sigma)
            print('  |--> w: ', component.w)
            print('  |--> betaInv: ', component.betaInv)
            
            i += 1
            
        print()
        
        return

In [0]:
for i in range(10):

    mrgr = ClusterwiseLinearModel(3)
    mrgr.loadData()
    mrgr.initModel()


    #mrgr.plotComponents()

    for i in range(4):

        components = mrgr.components

        mrgr.E_step()
        mrgr.M_step()

        #mrgr.plotComponents()

    mrgr.plotLogLikelihood()

In [0]:
cacahuete

mrgr = ClusterwiseLinearModel(3)
mrgr.loadData()
mrgr.initModel()


mrgr.E_step()
#mrgr.M_step()
mrgr.dataTr.show(5,False)

mrgr.E_step()
#mrgr.M_step()
mrgr.dataTr.show(5,False)

In [0]:
mrgr = ClusterwiseLinearModel(3)
mrgr.loadData()
mrgr.initModel()

#for component in mrgr.components:
  
#    print('pi: ', component.pi)
#    print('mu: ', component.mu)
#    print('sigma: ', component.sigma)
#    print('w: ', component.w)
#    print('betaInv: ', component.betaInv)

#mrgr.plotComps()


#mrgr.dataTr.show(10,False)
mrgr.printComponents()

components = mrgr.components # Have to make it a global variable really
mrgr.E_step()
mrgr.M_step()

#mrgr.dataTr.show(10,False)
mrgr.printComponents()

#mrgr.plotComps()
components = mrgr.components
mrgr.E_step()

#mrgr.dataTr.show(10,False)
mrgr.printComponents()
#mrgr.plotLogLikelihood()