# Clustering USArrests Dataset using pyspark

📌  In this section, we will cluster the USArrests dataset using pyspark.

# Business Problem

📌 It asks you to divide the states into clusters by examining the crimes that occurred in 50 different states of America. You are expected to do this separation with the k-means algorithm and use pyspak.

# Dataset Story

📌 This data set contains statistics, in arrests per 100,000 residents for assault, murder, and rape in each of the 50 US states in 1973. Also given is the percent of the population living in urban areas.This is a systematic approach for identifying and analyzing patterns and trends in crime using USArrest dataset

# Create Session in Spark

In [1]:
!pip install findspark
import findspark
findspark.init("C:\spark")
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark import SparkContext
spark = SparkSession.builder \
    .master("local") \
    .appName("Clustering") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()
sc = spark.sparkContext



In [2]:
sc

# Import Necesaary Libraries

In [29]:
import pandas as pd
pd.set_option("display.max_columns",None)
pd.set_option("display.max_rows", None)
pd.set_option("display.width", 500)
pd.set_option("display.float_format", lambda x: '%.4f' % x)
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

# Import Dataset

In [4]:
df = (spark.read.format("csv")
      .option("header", True)
      .option("inferSchema", True)
      .load("USArrests.csv")
)
df.persist()

DataFrame[_c0: string, Murder: double, Assault: int, UrbanPop: int, Rape: double]

In [5]:
df.limit(5).toPandas()

Unnamed: 0,_c0,Murder,Assault,UrbanPop,Rape
0,Alabama,13.2,236,58,21.2
1,Alaska,10.0,263,48,44.5
2,Arizona,8.1,294,80,31.0
3,Arkansas,8.8,190,50,19.5
4,California,9.0,276,91,40.6


In [6]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- Murder: double (nullable = true)
 |-- Assault: integer (nullable = true)
 |-- UrbanPop: integer (nullable = true)
 |-- Rape: double (nullable = true)



In [7]:
df = df.withColumnRenamed("_c0", "index")
df.show(5)

+----------+------+-------+--------+----+
|     index|Murder|Assault|UrbanPop|Rape|
+----------+------+-------+--------+----+
|   Alabama|  13.2|    236|      58|21.2|
|    Alaska|  10.0|    263|      48|44.5|
|   Arizona|   8.1|    294|      80|31.0|
|  Arkansas|   8.8|    190|      50|19.5|
|California|   9.0|    276|      91|40.6|
+----------+------+-------+--------+----+
only showing top 5 rows



In [8]:
print((df.count(), len(df.columns)))

(50, 5)


# Missing Value Analysis

In [9]:
df.select([F.count(F.when(F.isnan(c), c)).alias(c) for c in df.columns]).toPandas()

Unnamed: 0,index,Murder,Assault,UrbanPop,Rape
0,0,0,0,0,0


In [10]:
def null_count(dataframe, col_name):
    nc = dataframe.select(col_name).filter(
        (F.col(col_name) == "NA")|
        (F.col(col_name) == "") |
        (F.col(col_name).isNull())
    ).count()
    return nc

In [11]:
null_count(df, "Rape")

0

In [12]:
def show_all_null(dataframe):
    for col_name in dataframe.dtypes:
        nc = null_count(dataframe, col_name[0])
        if nc > 0:
            print("{} ===> {} , Ratio: {:.2f}".format(col_name[0], nc, (nc/dataframe.count())*100))
    if nc == 0:
        print("There is no null value")  

In [13]:
show_all_null(df)

There is no null value


# Analysis of Categorical and Numerical Variables

In [14]:
categorical_cols = []
numerical_cols = []
discarted_cols = ["index"]

In [15]:
def grab_cat_num_cols(dataframe):
    for col_name in dataframe.dtypes:
        if (col_name[0] not in discarted_cols):
            if col_name[1] == "string":
                categorical_cols.append(col_name[0])
            else:
                numerical_cols.append(col_name[0])
    return categorical_cols, numerical_cols

In [16]:
categorical_cols, numerical_cols = grab_cat_num_cols(df)

#Print Categorical and Numerical Variables
print(f"Observations: {df.count()}")
print(f"Variables: {len(df.columns)}")
print(f"Cat_cols: {len(categorical_cols)}")
print(f"Num_cols: {len(numerical_cols)}")

Observations: 50
Variables: 5
Cat_cols: 0
Num_cols: 4


In [17]:
# column check
if (len(df.columns) == (len(discarted_cols) + len(categorical_cols) + len(numerical_cols))):
    print("column check is True")
else:
    print("There is a problem for column check")

column check is True


# Encoding Scaling

In [18]:
def find_binary_cols(dataframe, cat_cols):
    binary_cols = dataframe.select([col for col in cat_cols if dataframe.select(col).dtypes[0][1] == "string" and dataframe.select(col).distinct().count() == 2])
    return binary_cols

In [19]:
binary_cols = find_binary_cols(df, categorical_cols)
print(binary_cols.columns)

[]


In [20]:
my_dict = {}
string_indexer_objs = []
string_indexer_output_names = []
ohe_input_names = []
ohe_output_names = []

for col_name in categorical_cols:
    my_dict[col_name+"_index_obj"] = StringIndexer() \
    .setHandleInvalid("skip") \
    .setInputCol(col_name) \
    .setOutputCol(col_name+"_indexed")
    
    string_indexer_objs.append(my_dict.get(col_name+"_index_obj"))
    string_indexer_output_names.append(col_name+"_indexed")
    
    if col_name not in binary_cols.columns:
        ohe_input_names.append(col_name+"_indexed")
        ohe_output_names.append(col_name+"_ohe")

In [21]:
not_to_hot_coded = list(set(string_indexer_output_names).difference(set(ohe_input_names)))
print(not_to_hot_coded)

[]


In [22]:
encoder = OneHotEncoder().setInputCols(ohe_input_names).setOutputCols(ohe_output_names)
assembler = VectorAssembler().setHandleInvalid("skip").setInputCols(numerical_cols + not_to_hot_coded + ohe_output_names).setOutputCol("unscaled_features")
scaler = StandardScaler().setInputCol("unscaled_features").setOutputCol("features")

# Create Model

In [23]:
# create pipeline object
pipeline_obj = Pipeline().setStages(string_indexer_objs + [encoder, assembler, scaler])

In [24]:
# fit model
pipeline_model = pipeline_obj.fit(df)

In [26]:
# show transform pipeline dataframe
transform_df = pipeline_model.transform(df)
transform_df.limit(5).toPandas()

Unnamed: 0,index,Murder,Assault,UrbanPop,Rape,unscaled_features,features
0,Alabama,13.2,236,58,21.2,"[13.2, 236.0, 58.0, 21.2]","[3.030644107027128, 2.8318529416496117, 4.0069..."
1,Alaska,10.0,263,48,44.5,"[10.0, 263.0, 48.0, 44.5]","[2.2959425053235822, 3.155836117177321, 3.3161..."
2,Arizona,8.1,294,80,31.0,"[8.1, 294.0, 80.0, 31.0]","[1.8597134293121014, 3.527816800190618, 5.5268..."
3,Arkansas,8.8,190,50,19.5,"[8.8, 190.0, 50.0, 19.5]","[2.0204294046847524, 2.279881605565365, 3.4542..."
4,California,9.0,276,91,40.6,"[9.0, 276.0, 91.0, 40.6]","[2.0663482547912237, 3.3118280165054776, 6.286..."


In [28]:
# compute k-means model for k clusters
def compute_kmeans_model(dataframe, k):
    kmeans_object = KMeans() \
        .setSeed(123) \
        .setK(k)
    
    return kmeans_object.fit(dataframe)

In [30]:
# create evaluator object
evaluator = ClusteringEvaluator()

In [33]:
# find Silhouette score
for k in range(2, 11):
    kmeans_model = compute_kmeans_model(transform_df, k)
    transformed_df = kmeans_model.transform(transform_df)
    score_k = evaluator.evaluate(transformed_df)
    print("k: {}, score: {:.3f}".format(k, score_k))

k: 2, score: 0.609
k: 3, score: 0.473
k: 4, score: 0.533
k: 5, score: 0.391
k: 6, score: 0.393
k: 7, score: 0.429
k: 8, score: 0.323
k: 9, score: 0.377
k: 10, score: 0.421


In [60]:
# When looking at the Silhouette Score, it wants us to choose 2 clusters, but I choose 4 cluster because I know the data :)
kmeans_model = compute_kmeans_model(transform_df, 4)

In [61]:
# prediction
transformed_df = kmeans_model.transform(transform_df)
transformed_df.limit(5).toPandas()

Unnamed: 0,index,Murder,Assault,UrbanPop,Rape,unscaled_features,features,prediction
0,Alabama,13.2,236,58,21.2,"[13.2, 236.0, 58.0, 21.2]","[3.030644107027128, 2.8318529416496117, 4.0069...",3
1,Alaska,10.0,263,48,44.5,"[10.0, 263.0, 48.0, 44.5]","[2.2959425053235822, 3.155836117177321, 3.3161...",0
2,Arizona,8.1,294,80,31.0,"[8.1, 294.0, 80.0, 31.0]","[1.8597134293121014, 3.527816800190618, 5.5268...",0
3,Arkansas,8.8,190,50,19.5,"[8.8, 190.0, 50.0, 19.5]","[2.0204294046847524, 2.279881605565365, 3.4542...",3
4,California,9.0,276,91,40.6,"[9.0, 276.0, 91.0, 40.6]","[2.0663482547912237, 3.3118280165054776, 6.286...",0


In [62]:
transformed_df = transformed_df.withColumn("prediction", (transformed_df["prediction"] + 1))

In [68]:
transformed_df.limit(5).toPandas()

Unnamed: 0,index,Murder,Assault,UrbanPop,Rape,unscaled_features,features,prediction
0,Alabama,13.2,236,58,21.2,"[13.2, 236.0, 58.0, 21.2]","[3.030644107027128, 2.8318529416496117, 4.0069...",4
1,Alaska,10.0,263,48,44.5,"[10.0, 263.0, 48.0, 44.5]","[2.2959425053235822, 3.155836117177321, 3.3161...",1
2,Arizona,8.1,294,80,31.0,"[8.1, 294.0, 80.0, 31.0]","[1.8597134293121014, 3.527816800190618, 5.5268...",1
3,Arkansas,8.8,190,50,19.5,"[8.8, 190.0, 50.0, 19.5]","[2.0204294046847524, 2.279881605565365, 3.4542...",4
4,California,9.0,276,91,40.6,"[9.0, 276.0, 91.0, 40.6]","[2.0663482547912237, 3.3118280165054776, 6.286...",1


In [67]:
transformed_df.select("index", "prediction").limit(5).toPandas()

Unnamed: 0,index,prediction
0,Alabama,4
1,Alaska,1
2,Arizona,1
3,Arkansas,4
4,California,1
