In [2]:
import pyspark
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from sklearn import datasets
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import Window
from pyspark.ml.feature import Normalizer
from pyspark.mllib.linalg.distributed import *
from pyspark.sql.types import *
import os
os.environ['ARROW_PRE_0_15_IPC_FORMAT']='1'

In [3]:
sc = SparkSession.builder.getOrCreate()
sc.conf.set("spark.sql.execution.arrow.enabled", "true")

In [4]:
iris = datasets.load_iris()
X_class = pd.DataFrame(iris.data, columns=iris.feature_names)
y_class = pd.DataFrame(iris.target, columns=['label'])
classs = pd.concat([X_class,y_class],axis=1)
classs['id']=classs.index
classs.columns=['A','B','C','D','label','id']
num_obs = classs.shape[0]
dfclust = sc.createDataFrame(classs)

In [8]:
schema_list1 = [StructField(str(name), FloatType(), True) for name in range(50)]
#schema_list1.append(StructField('label', DoubleType(),True))
cos_schema=StructType(schema_list1)

As a prototype, I have two versions of the cosine similarity as pandas udf functions. The first one returns the whole cosine similarity matrix for each label in the groupby (assuming they all have the same # of observations)

In [8]:
#pandas udf to row normalize data
@pandas_udf("label long, A double, B double, C double, D double, id long", PandasUDFType.GROUPED_MAP)
def normalize(pdf):
    # pdf is a pandas.DataFrame
    pdf_temp = pdf.loc[:,['A','B','C','D']].apply(lambda x: x*x)
    sum_squares = pdf_temp.sum(axis=0)
    normed = pdf_temp.div(np.sqrt(sum_squares))
    normed['label'] = pdf['label']
    normed['id'] = pdf['id']
    return normed
dfclust.groupby("label").apply(normalize)

DataFrame[label: bigint, A: double, B: double, C: double, D: double, id: bigint]

In [10]:
#pandas udf that takes the normalized data and finds the dot product (cosine) matrix
#could be used with differnt scaling preproccess to create correlation matrix or covariance matrix
@pandas_udf(cos_schema, PandasUDFType.GROUPED_MAP)
def dot_prods(pdf):
    #pdf is a pandas.DataFrame
    pdf_temp = pdf.loc[:,['A','B','C','D']]
    pdf_matrix = pdf_temp.to_numpy()
    dot_prods = pdf_matrix @ pdf_matrix.T
    return_df = pd.DataFrame(dot_prods)
    #return_df['label'] = pdf['label']
    return return_df
    
normed = dfclust.groupby("label").apply(normalize)
dots = normed.groupby("label").apply(dot_prods)

In [11]:
dots.count()

150

In [None]:
# similarity_df = sc.createDataFrame(classs)
# rdd_df=similarity_df.rdd.map(list)
# df_matrix = RowMatrix(rdd_df)

In [5]:
irm = IndexedRowMatrix(dfclust.rdd.map(lambda x: (x.id, [x.A, x.B, x.C, x.D]) ) )
irmt = irm.toCoordinateMatrix().transpose()
cosines = irmt.toRowMatrix().columnSimilarities()

In [9]:
rows = cosines.toRowMatrix().rows

In [None]:
@udf(returnType=StringType())
def cosine_sim_udf(a,b,c,d):
    dfa = a.withColumn("id", monotonically_increasing_id())
    dfb = b.withColumn("id", monotonically_increasing_id())
    dfc = c.withColumn("id", monotonically_increasing_id())
    dfd = d.withColumn("id", monotonically_increasing_id())
    
    
    dftemp = dfa.join(dfb, "id", "outer")
    dftemp = dftemp.join(dfc,"id","outer")
    dftemp = dftemp.join(dfd,"id","outer")
    
    irm = IndexedRowMatrix(dftemp.rdd.map(lambda x: (x.id, [x.A, x.B, x.C, x.D]) ) )
    irmt = irm.toCoordinateMatrix().transpose()
    cosine_matrix = irmt.toRowMatrix().columnSimilarities()
    avg_sim = cosine_matrix.toRowMatrix().rows.map(lambda x :x.toArray()).sum().sum()/(1225)
    return avg_sim


In [None]:
grouped = dfclust.groupBy("label").agg(collect_list(dfclust['A']).alias('A'),
                                            collect_list(dfclust['B']).alias('B'),
                                            collect_list(dfclust['C']).alias('C'),
                                            collect_list(dfclust['D']).alias('D'))


In [None]:
grouped.select( grouped['label'], cosine_sim_udf(grouped['A'],grouped['B'],grouped['C'],grouped['D']) ).show()