In [1]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

In [2]:
df=spark.read.csv("/FileStore/tables/bank.csv",header=True,inferSchema=True)
df.printSchema()

In [3]:
df.head(5)
df.select('age','balance','deposit').show(5)
df.groupBy("age").count().sort("count",ascending=True).show()
df.describe().show()
df.describe('balance').show()
df.filter(df.age>40).count()
df.groupBy("marital").agg({'balance':'mean'}).show()

In [4]:
print(df.filter(df.age>40).count())

In [5]:
df.select('age','balance','deposit').groupBy("age").count().sort("count",ascending=True).describe().show()

In [6]:
pd.DataFrame(df.take(5),columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
age,59,56,41,55,54
job,admin.,admin.,technician,services,admin.
marital,married,married,married,married,married
education,secondary,secondary,secondary,secondary,tertiary
default,no,no,no,no,no
balance,2343,45,1270,2476,184
housing,yes,no,yes,yes,no
loan,no,no,no,no,no
contact,unknown,unknown,unknown,unknown,unknown
day,5,5,5,5,5


In [7]:
numeric_features=[t[0] for t in df.dtypes if t[1]=='int']
df.select(numeric_features).describe().toPandas().transpose()

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
age,11162,41.231947679627304,11.913369192215518,18,95
balance,11162,1528.5385235620856,3225.413325946149,-6847,81204
day,11162,15.658036194230425,8.420739541006462,1,31
duration,11162,371.99381831213043,347.12838571630687,2,3881
campaign,11162,2.508421429851281,2.7220771816614824,1,63
pdays,11162,51.33040673714388,108.75828197197717,-1,854
previous,11162,0.8325568894463358,2.292007218670508,0,58


In [8]:
categoricalColumns=['job','marital','education','default','housing','loan','contact','poutcome']
stages=[]
for categoricalCol in categoricalColumns:
  stringIndexer= StringIndexer(inputCol=categoricalCol,outputCol=categoricalCol + 'Index' )
  encoder=OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()],outputCols=[categoricalCol + "classVec"])
  stages+=[stringIndexer,encoder]
label_stringIdx=StringIndexer(inputCol='deposit',outputCol='label' )
stages+=[label_stringIdx]
numericCols=['age','balance','duration','campaign','pdays','previous']
assemblerInputs=[c+"classVec" for c in categoricalColumns] +numericCols
assembler=VectorAssembler(inputCols=assemblerInputs, outputCol='features')
stages+=[assembler]

In [9]:
print(stages)

In [10]:
pipeline=Pipeline(stages=stages)
pipelineModel=pipeline.fit(df)
df=pipelineModel.transform(df)
df.printSchema()

In [11]:
pd.DataFrame(df.take(5),columns=df.columns).transpose()

Unnamed: 0,0,1,2,3,4
age,59,56,41,55,54
job,admin.,admin.,technician,services,admin.
marital,married,married,married,married,married
education,secondary,secondary,secondary,secondary,tertiary
default,no,no,no,no,no
balance,2343,45,1270,2476,184
housing,yes,no,yes,yes,no
loan,no,no,no,no,no
contact,unknown,unknown,unknown,unknown,unknown
day,5,5,5,5,5


In [12]:
train,test=df.randomSplit([0.8,0.2],seed=99999)
from pyspark.ml.clustering import KMeans
import numpy as np
cost=np.zeros(10)
for k in range(2,10):
  kmeans=KMeans().setK(k).setSeed(1)
  model=kmeans.fit(train)
  cost[k]=model.computeCost(train)

In [13]:
print(cost)

In [14]:
import matplotlib.mlab as mlab
import matplotlib.pyplot as plt

import seaborn as sns
from matplotlib.ticker import MaxNLocator


fig,ax=plt.subplots(1,1,figsize=(8,6))
ax.plot(range(1,9),cost[2:10])
ax.set_xlabel('k')
ax.set_ylabel('cost')
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
display(fig)

In [15]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
kmeans=KMeans().setK(8).setSeed(999)
model=kmeans.fit(train)

In [16]:
print(model)


In [17]:
evaluator=ClusteringEvaluator()
predictions=model.transform(train)
silhouette=evaluator.evaluate(predictions)
print("Training Dataset Performance =" + str(silhouette))

In [18]:
centers=model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
  print(center)

In [19]:

kmeans=KMeans().setK(8).setSeed(999)
model=kmeans.fit(test)

In [20]:
print(model)

In [21]:
evaluator=ClusteringEvaluator()
predictions=model.transform(test)
silhouette=evaluator.evaluate(predictions)
print("TESTING Dataset Performance =" + str(silhouette))

In [22]:
#FOR 3 CLUSTERS

from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
kmeans=KMeans().setK(3).setSeed(999)
model=kmeans.fit(train)

In [23]:
print(model)

In [24]:
evaluator=ClusteringEvaluator()
predictions=model.transform(train)
silhouette=evaluator.evaluate(predictions)
print("Training Dataset Performance =" + str(silhouette))

In [25]:
kmeans=KMeans().setK(3).setSeed(999)
model=kmeans.fit(test)

In [26]:
print(model)

In [27]:
evaluator=ClusteringEvaluator()
predictions=model.transform(test)
silhouette=evaluator.evaluate(predictions)
print("TESTING Dataset Performance =" + str(silhouette))

In [28]:
#for 4 clusters

kmeans=KMeans().setK(4).setSeed(999)
model=kmeans.fit(train)
print(model)


In [29]:
evaluator=ClusteringEvaluator()
predictions=model.transform(train)
silhouette=evaluator.evaluate(predictions)
print("Training Dataset Performance =" + str(silhouette))


In [30]:
kmeans=KMeans().setK(4).setSeed(999)
model=kmeans.fit(test)
print(model)


In [31]:
evaluator=ClusteringEvaluator()
predictions=model.transform(test)
silhouette=evaluator.evaluate(predictions)
print("TESTING Dataset Performance =" + str(silhouette))


In [32]:
#for 5 clusters

kmeans=KMeans().setK(5).setSeed(999)
model=kmeans.fit(train)


print(model)


In [33]:
evaluator=ClusteringEvaluator()
predictions=model.transform(train)
silhouette=evaluator.evaluate(predictions)
print("Training Dataset Performance =" + str(silhouette))


In [34]:
kmeans=KMeans().setK(5).setSeed(999)
model=kmeans.fit(test)
print(model)


In [35]:
evaluator=ClusteringEvaluator()
predictions=model.transform(test)
silhouette=evaluator.evaluate(predictions)
print("TESTING Dataset Performance =" + str(silhouette))


In [36]:
kmeans=KMeans().setK(6).setSeed(999)
model=kmeans.fit(train)


print(model)

In [37]:
evaluator=ClusteringEvaluator()
predictions=model.transform(train)
silhouette=evaluator.evaluate(predictions)
print("Training Dataset Performance =" + str(silhouette))

In [38]:
kmeans=KMeans().setK(6).setSeed(999)
model=kmeans.fit(test)
print(model)

In [39]:
evaluator=ClusteringEvaluator()
predictions=model.transform(test)
silhouette=evaluator.evaluate(predictions)
print("TESTING Dataset Performance =" + str(silhouette))

In [40]:
#for 7 clusters

kmeans=KMeans().setK(7).setSeed(999)
model=kmeans.fit(train)


print(model)

In [41]:
evaluator=ClusteringEvaluator()
predictions=model.transform(train)
silhouette=evaluator.evaluate(predictions)
print("Training Dataset Performance =" + str(silhouette))

In [42]:
kmeans=KMeans().setK(7).setSeed(999)
model=kmeans.fit(test)
print(model)

In [43]:
evaluator=ClusteringEvaluator()
predictions=model.transform(test)
silhouette=evaluator.evaluate(predictions)
print("TESTING Dataset Performance =" + str(silhouette))

In [44]:
#for 2 clusters


kmeans=KMeans().setK(2).setSeed(999)
model=kmeans.fit(train)


print(model)

In [45]:
evaluator=ClusteringEvaluator()
predictions=model.transform(train)
silhouette=evaluator.evaluate(predictions)
print("Training Dataset Performance =" + str(silhouette))

In [46]:
kmeans=KMeans().setK(2).setSeed(999)
model=kmeans.fit(test)
print(model)

In [47]:
evaluator=ClusteringEvaluator()
predictions=model.transform(test)
silhouette=evaluator.evaluate(predictions)

print("TESTING Dataset Performance =" + str(silhouette))