In [1]:
from pyspark.sql.session import SparkSession
from pyspark.mllib.regression import LabeledPoint
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans, KMeansModel
from pyspark.mllib.clustering import StreamingKMeans
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.ml.evaluation import ClusteringEvaluator
from numpy import array
from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint
import scipy.sparse
from pyspark.ml.linalg import Vectors, _convert_to_vector, VectorUDT
from pyspark.context import SparkContext
from pyspark.ml.linalg import SparseVector


In [20]:
spark = SparkSession.builder.appName('parquet').getOrCreate()

In [21]:
df = spark.read.format("text").option("header", "true").option("inferSchema", "true").load("Qualitative_Bankruptcy.txt")
df.show()

+--------------+
|         value|
+--------------+
|P,P,A,A,A,P,NB|
|N,N,A,A,A,N,NB|
|A,A,A,A,A,A,NB|
|P,P,P,P,P,P,NB|
|N,N,P,P,P,N,NB|
|A,A,P,P,P,A,NB|
|P,P,A,P,P,P,NB|
|P,P,P,A,A,P,NB|
|P,P,A,P,A,P,NB|
|P,P,A,A,P,P,NB|
|P,P,P,P,A,P,NB|
|P,P,P,A,P,P,NB|
|N,N,A,P,P,N,NB|
|N,N,P,A,A,N,NB|
|N,N,A,P,A,N,NB|
|N,N,A,P,A,N,NB|
|N,N,A,A,P,N,NB|
|N,N,P,P,A,N,NB|
|N,N,P,A,P,N,NB|
|A,A,A,P,P,A,NB|
+--------------+
only showing top 20 rows



In [22]:
def dense_to_sparse(vector):
    return _convert_to_vector(scipy.sparse.csc_matrix(array(vector)).T)

def getDoubleValue(d):
    if d == 'A':
        return 2.0
    elif d == 'P':
        return 3.0
    elif d == 'N':
        return 1.0
    elif d == 'B':
        return 0.0
    elif d == 'NB':
        return 1.0
    
    
def doTheThing(parts):
    label_point = getDoubleValue(parts[6])
    parts[0:6] = map(getDoubleValue ,parts[0:6])

    parts = parts[0:6]
    parts.insert(0, label_point)
    
    parts = Row(label_point = parts[0], features = dense_to_sparse(parts[1:6]))
    return parts

df2 = df.rdd.map(lambda d: doTheThing(d[0].split(',')))

df2 = df2.toDF()

df2.take(3)
    

[Row(label_point=1.0, features=SparseVector(5, {0: 3.0, 1: 3.0, 2: 2.0, 3: 2.0, 4: 2.0})),
 Row(label_point=1.0, features=SparseVector(5, {0: 1.0, 1: 1.0, 2: 2.0, 3: 2.0, 4: 2.0})),
 Row(label_point=1.0, features=SparseVector(5, {0: 2.0, 1: 2.0, 2: 2.0, 3: 2.0, 4: 2.0}))]

In [23]:
km = KMeans().setK(2).setSeed(10)
model = km.fit(df2)

In [24]:
cts = model.clusterCenters()
cts

[array([1.75454545, 1.4       , 1.06363636, 1.21818182, 1.08181818]),
 array([2.12857143, 2.06428571, 2.29285714, 2.50714286, 2.63571429])]

In [25]:
trans = model.transform(df2)
trans.show()

+-----------+--------------------+----------+
|label_point|            features|prediction|
+-----------+--------------------+----------+
|        1.0|(5,[0,1,2,3,4],[3...|         1|
|        1.0|(5,[0,1,2,3,4],[1...|         0|
|        1.0|(5,[0,1,2,3,4],[2...|         1|
|        1.0|(5,[0,1,2,3,4],[3...|         1|
|        1.0|(5,[0,1,2,3,4],[1...|         1|
|        1.0|(5,[0,1,2,3,4],[2...|         1|
|        1.0|(5,[0,1,2,3,4],[3...|         1|
|        1.0|(5,[0,1,2,3,4],[3...|         1|
|        1.0|(5,[0,1,2,3,4],[3...|         1|
|        1.0|(5,[0,1,2,3,4],[3...|         1|
|        1.0|(5,[0,1,2,3,4],[3...|         1|
|        1.0|(5,[0,1,2,3,4],[3...|         1|
|        1.0|(5,[0,1,2,3,4],[1...|         1|
|        1.0|(5,[0,1,2,3,4],[1...|         1|
|        1.0|(5,[0,1,2,3,4],[1...|         1|
|        1.0|(5,[0,1,2,3,4],[1...|         1|
|        1.0|(5,[0,1,2,3,4],[1...|         1|
|        1.0|(5,[0,1,2,3,4],[1...|         1|
|        1.0|(5,[0,1,2,3,4],[1...|

In [30]:
def correctPredict(label_point, prediction):
    if label_point == prediction:
        return 1
    else:
        return 0
    
correctPredict = udf(correctPredict, IntegerType())

trans = trans.withColumn('correctPredict', correctPredict('label_point', 'prediction'))
#Accuracy  of the implementation
trans.agg((sum('correctPredict')/count('label_point')).alias("accuracy")).show()

+--------+
|accuracy|
+--------+
|    0.98|
+--------+



In [28]:
# results
cluster_centers = model.clusterCenters()
print("Cluster Centers: ")
for center in cluster_centers:
    print(center)

Cluster Centers: 
[1.75454545 1.4        1.06363636 1.21818182 1.08181818]
[2.12857143 2.06428571 2.29285714 2.50714286 2.63571429]
