In [1]:
from pyspark.sql import SparkSession

In [2]:
import pyspark

In [3]:
spark=SparkSession.builder.appName('Big Data').getOrCreate()

In [4]:
spark

In [5]:
df=spark.read.csv('credit_card.csv',header=True,inferSchema=True)

In [6]:
df.show(5)

+-------+-----------+---------+----------------------+------------------+------------+-----------------+------------------+------+
|CUST_ID|    BALANCE|PURCHASES|INSTALLMENTS_PURCHASES|      CASH_ADVANCE|CREDIT_LIMIT|         PAYMENTS|  MINIMUM_PAYMENTS|TENURE|
+-------+-----------+---------+----------------------+------------------+------------+-----------------+------------------+------+
| C10001|  40.900749|     95.4|                  95.4|               0.0|      1000.0|       201.802084|139.50978700000002|    12|
| C10002|3202.467416|      0.0|                   0.0|6442.9454829999995|      7000.0|4103.032596999999|       1072.340217|    12|
| C10003|2495.148862|   773.17|                   0.0|               0.0|      7500.0|       622.066742|        627.284787|    12|
| C10004|1666.670542|   1499.0|                   0.0|        205.788017|      7500.0|              0.0|        312.343947|    12|
| C10005| 817.714335|     16.0|                   0.0|               0.0|      1200

In [7]:
df.printSchema()

root
 |-- CUST_ID: string (nullable = true)
 |-- BALANCE: double (nullable = true)
 |-- PURCHASES: double (nullable = true)
 |-- INSTALLMENTS_PURCHASES: double (nullable = true)
 |-- CASH_ADVANCE: double (nullable = true)
 |-- CREDIT_LIMIT: double (nullable = true)
 |-- PAYMENTS: double (nullable = true)
 |-- MINIMUM_PAYMENTS: double (nullable = true)
 |-- TENURE: integer (nullable = true)



In [8]:
type(df)

pyspark.sql.dataframe.DataFrame

In [9]:
df.toPandas()['TENURE'].unique()

array([12,  8, 11,  9, 10,  7,  6])

In [10]:
df.describe().show()

+-------+-------+------------------+------------------+----------------------+------------------+-----------------+------------------+------------------+-----------------+
|summary|CUST_ID|           BALANCE|         PURCHASES|INSTALLMENTS_PURCHASES|      CASH_ADVANCE|     CREDIT_LIMIT|          PAYMENTS|  MINIMUM_PAYMENTS|           TENURE|
+-------+-------+------------------+------------------+----------------------+------------------+-----------------+------------------+------------------+-----------------+
|  count|   8950|              8950|              8950|                  8950|              8950|             8950|              8950|              8950|             8950|
|   mean|   null|1564.4748276781038|1003.2048335195564|    411.06764469273713| 978.8711124654749|4494.282472772402|1733.1438520248041| 844.9067666256948|11.51731843575419|
| stddev|   null| 2081.531879456551|2136.6347818728905|     904.3381151753807|2097.1638766432347|3638.646701631524| 2895.063756904571|2332.7

In [11]:
### Get count of both null and missing values in pyspark
from pyspark.sql.functions import isnan, when, count, col
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+-------+-------+---------+----------------------+------------+------------+--------+----------------+------+
|CUST_ID|BALANCE|PURCHASES|INSTALLMENTS_PURCHASES|CASH_ADVANCE|CREDIT_LIMIT|PAYMENTS|MINIMUM_PAYMENTS|TENURE|
+-------+-------+---------+----------------------+------------+------------+--------+----------------+------+
|      0|      0|        0|                     0|           0|           0|       0|               0|     0|
+-------+-------+---------+----------------------+------------+------------+--------+----------------+------+



In [12]:
df= df.drop('CUST_ID')

In [13]:
df.columns

['BALANCE',
 'PURCHASES',
 'INSTALLMENTS_PURCHASES',
 'CASH_ADVANCE',
 'CREDIT_LIMIT',
 'PAYMENTS',
 'MINIMUM_PAYMENTS',
 'TENURE']

In [14]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

In [15]:
vec_assembler = VectorAssembler(inputCols = df.columns, outputCol='features')

In [16]:
final_data = vec_assembler.transform(df)
final_data.head(2)

[Row(BALANCE=40.900749, PURCHASES=95.4, INSTALLMENTS_PURCHASES=95.4, CASH_ADVANCE=0.0, CREDIT_LIMIT=1000.0, PAYMENTS=201.802084, MINIMUM_PAYMENTS=139.50978700000002, TENURE=12, features=DenseVector([40.9007, 95.4, 95.4, 0.0, 1000.0, 201.8021, 139.5098, 12.0])),
 Row(BALANCE=3202.467416, PURCHASES=0.0, INSTALLMENTS_PURCHASES=0.0, CASH_ADVANCE=6442.9454829999995, CREDIT_LIMIT=7000.0, PAYMENTS=4103.032596999999, MINIMUM_PAYMENTS=1072.340217, TENURE=12, features=DenseVector([3202.4674, 0.0, 0.0, 6442.9455, 7000.0, 4103.0326, 1072.3402, 12.0]))]

In [17]:
from pyspark.ml.feature import MinMaxScaler

In [18]:
scaler = MinMaxScaler(inputCol="features", outputCol="scaledFeatures")

In [19]:
scalerModel = scaler.fit(final_data)
scalerModel

MinMaxScalerModel: uid=MinMaxScaler_a773e2b21994, numFeatures=8, min=0.0, max=1.0

In [20]:
scaledData = scalerModel.transform(final_data)
scaledData.head(2)

[Row(BALANCE=40.900749, PURCHASES=95.4, INSTALLMENTS_PURCHASES=95.4, CASH_ADVANCE=0.0, CREDIT_LIMIT=1000.0, PAYMENTS=201.802084, MINIMUM_PAYMENTS=139.50978700000002, TENURE=12, features=DenseVector([40.9007, 95.4, 95.4, 0.0, 1000.0, 201.8021, 139.5098, 12.0]), scaledFeatures=DenseVector([0.0021, 0.0019, 0.0042, 0.0, 0.0317, 0.004, 0.0018, 1.0])),
 Row(BALANCE=3202.467416, PURCHASES=0.0, INSTALLMENTS_PURCHASES=0.0, CASH_ADVANCE=6442.9454829999995, CREDIT_LIMIT=7000.0, PAYMENTS=4103.032596999999, MINIMUM_PAYMENTS=1072.340217, TENURE=12, features=DenseVector([3202.4674, 0.0, 0.0, 6442.9455, 7000.0, 4103.0326, 1072.3402, 12.0]), scaledFeatures=DenseVector([0.1682, 0.0, 0.0, 0.1367, 0.2321, 0.0809, 0.014, 1.0]))]

In [21]:
print("Features scaled to range: [%f, %f]" % (scaler.getMin(), scaler.getMax()))
scaledData.select("features", "scaledFeatures").show(5)

Features scaled to range: [0.000000, 1.000000]
+--------------------+--------------------+
|            features|      scaledFeatures|
+--------------------+--------------------+
|[40.900749,95.4,9...|[0.00214779453875...|
|[3202.467416,0.0,...|[0.16816909701674...|
|[2495.148862,773....|[0.13102613595644...|
|[1666.670542,1499...|[0.08752079058547...|
|[817.714335,16.0,...|[0.04294010319903...|
+--------------------+--------------------+
only showing top 5 rows



In [22]:
from pyspark.ml.feature import MinMaxScaler

In [23]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

In [24]:
# Training a k-means model.
kmeans = KMeans().setK(5).setSeed(1)
model = kmeans.fit(scaledData)

In [25]:
predictions = model.transform(scaledData)

In [26]:
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.5375354557729843


In [27]:
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[ 806.62838121  525.29393723  262.29082917  493.25881477 2270.41498256
  920.12987355  506.15622914   11.38445078]
[1867.61638811 1508.32737725  560.51289022  905.6168306  7361.99056433
 2021.17857714  696.09494464   11.75568862]
[4.05814769e+03 1.02737875e+03 9.08989286e+02 9.22757849e+02
 4.26785714e+03 1.62493914e+03 2.27600316e+04 1.19107143e+01]
[ 5902.45319597  1801.67384932   650.87416438  4665.85826833
 11054.59526775  5395.0440797   2110.7271893     11.65890411]
[5.17748216e+03 1.72886208e+04 5.45102196e+03 5.26235073e+03
 1.45235294e+04 2.46885890e+04 3.22119562e+03 1.19607843e+01]


In [28]:
preds=model.transform(scaledData).select('prediction')
preds.show()

+----------+
|prediction|
+----------+
|         0|
|         3|
|         1|
|         1|
|         0|
|         0|
|         3|
|         0|
|         1|
|         1|
|         0|
|         0|
|         0|
|         1|
|         0|
|         3|
|         0|
|         0|
|         1|
|         2|
+----------+
only showing top 20 rows



In [37]:
import pandas as pd

In [38]:
prediction = model.transform(scaledData).select('prediction').collect()
labels = [p.prediction for p in prediction ]
labels

[0,
 3,
 1,
 1,
 0,
 0,
 3,
 0,
 1,
 1,
 0,
 0,
 0,
 1,
 0,
 3,
 0,
 0,
 1,
 2,
 2,
 3,
 0,
 3,
 1,
 0,
 0,
 0,
 3,
 1,
 3,
 1,
 3,
 0,
 2,
 0,
 3,
 1,
 3,
 3,
 0,
 0,
 0,
 0,
 1,
 0,
 0,
 0,
 0,
 0,
 3,
 0,
 0,
 0,
 0,
 1,
 0,
 1,
 0,
 1,
 1,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 1,
 0,
 1,
 4,
 1,
 0,
 1,
 1,
 1,
 0,
 0,
 3,
 0,
 1,
 1,
 0,
 1,
 1,
 3,
 3,
 0,
 1,
 3,
 0,
 0,
 0,
 0,
 0,
 1,
 0,
 1,
 0,
 0,
 0,
 1,
 0,
 0,
 0,
 0,
 3,
 0,
 3,
 1,
 0,
 1,
 1,
 1,
 0,
 1,
 0,
 1,
 0,
 3,
 1,
 1,
 0,
 3,
 1,
 0,
 1,
 3,
 1,
 0,
 1,
 0,
 3,
 0,
 0,
 3,
 0,
 4,
 0,
 0,
 1,
 1,
 3,
 1,
 1,
 1,
 1,
 0,
 1,
 2,
 1,
 1,
 3,
 0,
 3,
 1,
 0,
 3,
 0,
 0,
 1,
 0,
 0,
 0,
 1,
 1,
 3,
 0,
 0,
 0,
 3,
 1,
 0,
 3,
 0,
 0,
 0,
 3,
 0,
 0,
 0,
 3,
 1,
 1,
 0,
 1,
 0,
 0,
 0,
 1,
 1,
 0,
 3,
 0,
 1,
 1,
 1,
 1,
 1,
 3,
 0,
 0,
 3,
 3,
 0,
 0,
 3,
 3,
 0,
 3,
 1,
 1,
 1,
 0,
 1,
 0,
 1,
 0,
 1,
 3,
 1,
 0,
 3,
 3,
 0,
 1,
 1,
 0,
 3,
 0,
 0,
 1,
 0,
 0,
 1,
 3,
 1,
 0,
 0,
 1,
 0,
 1,
 1,
 0,
 0,
 1,
 0,
 3,
 3,


In [52]:
pred = df

In [53]:
pred_df = pred.toPandas()

In [55]:
pred_df['kmeans']= labels
pred_df.head(5)

Unnamed: 0,BALANCE,PURCHASES,INSTALLMENTS_PURCHASES,CASH_ADVANCE,CREDIT_LIMIT,PAYMENTS,MINIMUM_PAYMENTS,TENURE,kmeans
0,40.900749,95.4,95.4,0.0,1000.0,201.802084,139.509787,12,0
1,3202.467416,0.0,0.0,6442.945483,7000.0,4103.032597,1072.340217,12,3
2,2495.148862,773.17,0.0,0.0,7500.0,622.066742,627.284787,12,1
3,1666.670542,1499.0,0.0,205.788017,7500.0,0.0,312.343947,12,1
4,817.714335,16.0,0.0,0.0,1200.0,678.334763,244.791237,12,0


In [58]:
pivoted = pred_df.groupby('kmeans')["BALANCE", "PURCHASES","INSTALLMENTS_PURCHASES","CASH_ADVANCE","CREDIT_LIMIT", "PAYMENTS", "MINIMUM_PAYMENTS","TENURE"].median().reset_index()
pivoted

  pivoted = pred_df.groupby('kmeans')["BALANCE", "PURCHASES","INSTALLMENTS_PURCHASES","CASH_ADVANCE","CREDIT_LIMIT", "PAYMENTS", "MINIMUM_PAYMENTS","TENURE"].median().reset_index()


Unnamed: 0,kmeans,BALANCE,PURCHASES,INSTALLMENTS_PURCHASES,CASH_ADVANCE,CREDIT_LIMIT,PAYMENTS,MINIMUM_PAYMENTS,TENURE
0,0,543.517587,256.5,66.655,0.0,2000.0,559.656772,257.414743,12.0
1,1,1305.883623,835.31,175.0,0.0,7000.0,1355.483086,367.503207,12.0
2,2,3401.198831,297.085,143.025,67.988935,3250.0,518.528972,17777.81839,12.0
3,3,5873.435088,486.545,43.75,4074.374739,10500.0,3728.585387,1737.87555,12.0
4,4,4010.621974,15108.47,4000.7,0.0,14500.0,24199.11108,1193.03103,12.0
