In [0]:
import pandas as pd
import numpy as np
from sklearn.datasets import load_iris

# Loads data.
iris = load_iris()
print(iris.feature_names)

# Make dataframe
pdf = pd.DataFrame(data=iris.data, columns=iris.feature_names)
#pdf['target'] = iris.target
#pdf['target'] = pdf['target'].map({0:"setosa", 1:"versicolor", 2:"virginica"})

pdf['target'] = iris.target

['sepal length (cm)', 'sepal width (cm)', 'petal length (cm)', 'petal width (cm)']


In [0]:
from pyspark.sql import SparkSession

# Make spark dataframe
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(pdf)

In [0]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorIndexer

def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1])]).toDF(['features'])
data = transData(sdf)

# 1. 각 군집의 Centroid 출력

In [0]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=3) # k의 값 3으로 설정
kmeans.setSeed(1) # 초기값 random을 위한 설정
kmeans.setFeaturesCol("features")
model = kmeans.fit(data)

print("각 군집의 Centroid : ")
print(model.clusterCenters())

df_pred = model.transform(data)

각 군집의 Centroid : 
[array([5.006, 3.428, 1.462, 0.246]), array([6.85384615, 3.07692308, 5.71538462, 2.05384615]), array([5.88360656, 2.74098361, 4.38852459, 1.43442623])]


In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType

l_clusters = model.clusterCenters()
# Let's convert the list of centers to a dict, each center is a list of float
d_clusters = {int(i):[float(l_clusters[i][j]) for j in range(len(l_clusters[i]))] 
              for i in range(len(l_clusters))}

# Let's create a dataframe containing the centers and their coordinates
df_centers = spark.sparkContext.parallelize([(k,)+(v,) for k,v in 
d_clusters.items()]).toDF(['prediction','center'])

df_pred = df_pred.withColumn('prediction',F.col('prediction').cast(IntegerType()))
df_pred = df_pred.join(df_centers,on='prediction',how='left')

In [0]:
from pyspark.sql.types import FloatType

get_dist = F.udf(lambda features, center : 
                 float(features.squared_distance(center)),FloatType())
df_pred = df_pred.withColumn('dist',get_dist(F.col('features'),F.col('center')))
display(df_pred)

prediction,features,center,dist
0,"Map(vectorType -> dense, length -> 4, values -> List(5.1, 3.5, 1.4, 0.2))","List(5.006, 3.4280000000000004, 1.462, 0.24600000000000005)",0.01998
0,"Map(vectorType -> dense, length -> 4, values -> List(4.9, 3.0, 1.4, 0.2))","List(5.006, 3.4280000000000004, 1.462, 0.24600000000000005)",0.20038
0,"Map(vectorType -> dense, length -> 4, values -> List(4.7, 3.2, 1.3, 0.2))","List(5.006, 3.4280000000000004, 1.462, 0.24600000000000005)",0.17398
0,"Map(vectorType -> dense, length -> 4, values -> List(4.6, 3.1, 1.5, 0.2))","List(5.006, 3.4280000000000004, 1.462, 0.24600000000000005)",0.27598
0,"Map(vectorType -> dense, length -> 4, values -> List(5.0, 3.6, 1.4, 0.2))","List(5.006, 3.4280000000000004, 1.462, 0.24600000000000005)",0.03558
0,"Map(vectorType -> dense, length -> 4, values -> List(5.4, 3.9, 1.7, 0.4))","List(5.006, 3.4280000000000004, 1.462, 0.24600000000000005)",0.45838
0,"Map(vectorType -> dense, length -> 4, values -> List(4.6, 3.4, 1.4, 0.3))","List(5.006, 3.4280000000000004, 1.462, 0.24600000000000005)",0.17238
0,"Map(vectorType -> dense, length -> 4, values -> List(5.0, 3.4, 1.5, 0.2))","List(5.006, 3.4280000000000004, 1.462, 0.24600000000000005)",0.00438
0,"Map(vectorType -> dense, length -> 4, values -> List(4.4, 2.9, 1.4, 0.2))","List(5.006, 3.4280000000000004, 1.462, 0.24600000000000005)",0.65198
0,"Map(vectorType -> dense, length -> 4, values -> List(4.9, 3.1, 1.5, 0.1))","List(5.006, 3.4280000000000004, 1.462, 0.24600000000000005)",0.14158


# 2. 각 군집의 구성요소 중 centroid와 가장 큰 거리를 갖는 구성요소 데이터

In [0]:
max_dist=df_pred.groupBy("prediction").max("dist")

In [0]:
l=list(max_dist.select('max(dist)').toPandas()['max(dist)'])
df_pred.filter(df_pred['dist'].isin(l)).show()

+----------+-----------------+--------------------+---------+
|prediction|         features|              center|     dist|
+----------+-----------------+--------------------+---------+
|         0|[4.5,2.3,1.3,0.3]|[5.006, 3.4280000...|  1.55758|
|         1|[7.7,2.6,6.9,2.3]|[6.85384615384615...|2.4073372|
|         2|[5.1,2.5,3.0,1.1]|[5.88360655737704...|2.7119539|
+----------+-----------------+--------------------+---------+



In [0]:
pdf = df_pred.select("*").toPandas()
pdf.loc[pdf.groupby(['prediction'])['dist'].idxmax()]

  Unable to convert the field features. If this column is not necessary, you may consider dropping it or converting to primitive type before the conversion.
Direct cause: Unsupported type in conversion to Arrow: VectorUDT
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.


Unnamed: 0,prediction,features,center,dist
41,0,"[4.5, 2.3, 1.3, 0.3]","[5.006, 3.4280000000000004, 1.462, 0.246000000...",1.55758
67,1,"[7.7, 2.6, 6.9, 2.3]","[6.8538461538461535, 3.0769230769230766, 5.715...",2.407337
134,2,"[5.1, 2.5, 3.0, 1.1]","[5.883606557377049, 2.7409836065573776, 4.3885...",2.711954
