In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("Python Spark Clustering") \
    .getOrCreate()

# Import Data

In [2]:
df = spark.read.csv("D:/Spark/tugas cluster/driver-license-permit-and-non-driver-identification-cards-issued-as-of-august-30-2017.csv", header=True, inferSchema=True)

In [4]:
df.schema

StructType(List(StructField(Year of Birth,IntegerType,true),StructField(Sex,StringType,true),StructField(City,StringType,true),StructField(State,StringType,true),StructField(Zip,StringType,true),StructField(Residence County,StringType,true),StructField(License Class,StringType,true),StructField(Status,StringType,true),StructField(Privilege,StringType,true),StructField(Year of Expiration,IntegerType,true)))

In [5]:
df.show()

+-------------+---+---------------+-----+-----+----------------+-------------+---------+---------+------------------+
|Year of Birth|Sex|           City|State|  Zip|Residence County|License Class|   Status|Privilege|Year of Expiration|
+-------------+---+---------------+-----+-----+----------------+-------------+---------+---------+------------------+
|         1950|  F|       GLENMONT|   NY|12077|          ALBANY|            D|    VALID|     FULL|              2020|
|         1977|  F| WEST HEMPSTEAD|   NY|11552|          NASSAU|            D|    VALID|     FULL|              2019|
|         1967|  M|      SMITHTOWN|   NY|11787|         SUFFOLK|            D|    VALID|     FULL|              2019|
|         1989|  M|       BROOKLYN|   NY|11213|           KINGS|            D|    VALID|     FULL|              2020|
|         1994|  F|        BXVILLE|   NY|10708|     WESTCHESTER|            D|    VALID|     FULL|              2023|
|         1960|  M|       NEW YORK|   NY|10025|        N

In [6]:
df.createOrReplaceTempView("License")

# Cluster 1

In [7]:
query = spark.sql("SELECT `City`, `Year of Birth`, `Year of Expiration`, `License Class` \
                    FROM License \
                    WHERE `Year of Birth` BETWEEN 1980 AND 2000 \
                    AND Sex='M'")

In [8]:
query.show()

+------------+-------------+------------------+-------------+
|        City|Year of Birth|Year of Expiration|License Class|
+------------+-------------+------------------+-------------+
|    BROOKLYN|         1989|              2020|            D|
|    NEW YORK|         1981|              2020|            I|
|       BRONX|         1996|              2020|     D PERMIT|
|   EAST OTTO|         2000|              2021|    DJ PERMIT|
|LONG IS CITY|         1994|              2019|            D|
|   BELLEROSE|         1986|              2023|            D|
|     MAHOPAC|         1995|              2024|            D|
| RANSOMVILLE|         1993|              2022|            D|
| STEPHENTOWN|         1984|              2021|           DM|
|    BROOKLYN|         1980|              2022|            D|
|    BROOKLYN|         1982|              2024|            D|
|   HAUPPAUGE|         1991|              2017|     D PERMIT|
|WATKINS GLEN|         1993|              2022|            D|
|       

In [9]:
query.count()

2774999

#### Vector Assembler

In [10]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=["Year of Birth", "Year of Expiration"],
    outputCol='features')

query = assembler.transform(query)

In [11]:
query.show()

+------------+-------------+------------------+-------------+---------------+
|        City|Year of Birth|Year of Expiration|License Class|       features|
+------------+-------------+------------------+-------------+---------------+
|    BROOKLYN|         1989|              2020|            D|[1989.0,2020.0]|
|    NEW YORK|         1981|              2020|            I|[1981.0,2020.0]|
|       BRONX|         1996|              2020|     D PERMIT|[1996.0,2020.0]|
|   EAST OTTO|         2000|              2021|    DJ PERMIT|[2000.0,2021.0]|
|LONG IS CITY|         1994|              2019|            D|[1994.0,2019.0]|
|   BELLEROSE|         1986|              2023|            D|[1986.0,2023.0]|
|     MAHOPAC|         1995|              2024|            D|[1995.0,2024.0]|
| RANSOMVILLE|         1993|              2022|            D|[1993.0,2022.0]|
| STEPHENTOWN|         1984|              2021|           DM|[1984.0,2021.0]|
|    BROOKLYN|         1980|              2022|            D|[19

#### kMeans

In [12]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans().setK(5).setSeed(1)
model = kmeans.fit(query)

#### Prediction

In [13]:
prediction = model.transform(query)
prediction.show()

+------------+-------------+------------------+-------------+---------------+----------+
|        City|Year of Birth|Year of Expiration|License Class|       features|prediction|
+------------+-------------+------------------+-------------+---------------+----------+
|    BROOKLYN|         1989|              2020|            D|[1989.0,2020.0]|         3|
|    NEW YORK|         1981|              2020|            I|[1981.0,2020.0]|         4|
|       BRONX|         1996|              2020|     D PERMIT|[1996.0,2020.0]|         1|
|   EAST OTTO|         2000|              2021|    DJ PERMIT|[2000.0,2021.0]|         1|
|LONG IS CITY|         1994|              2019|            D|[1994.0,2019.0]|         1|
|   BELLEROSE|         1986|              2023|            D|[1986.0,2023.0]|         0|
|     MAHOPAC|         1995|              2024|            D|[1995.0,2024.0]|         2|
| RANSOMVILLE|         1993|              2022|            D|[1993.0,2022.0]|         2|
| STEPHENTOWN|       

#### Evaluate

In [14]:
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(prediction)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.6000210625334547


#### Cluster Center

In [15]:
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[1985.94496334 2022.76302178]
[1997.47876262 2019.53306976]
[1993.9084316  2023.17896686]
[1989.92296319 2019.39982705]
[1982.03307073 2020.15344342]


#### Vizualization

In [16]:
import pixiedust

Pixiedust database opened successfully


In [17]:
display(prediction)

![cluster](img/img1.png)

![cluster](img/img2.png)

# Cluster 2

In [19]:
#Cluster jumlah License CLass yang expired diatas tahun 2018 yang berisikan seluruhnya laki-laki
query2 = spark.sql("SELECT `License Class`, COUNT(`License Class`) AS Jumlah\
                    FROM License \
                    WHERE `Year of Expiration`>2018 AND Sex='M'\
                    GROUP BY `License Class`")

In [20]:
query2.show()

+----------------+------+
|   License Class|Jumlah|
+----------------+------+
|M WITH DJ PERMIT|     2|
| E WITH A PERMIT|   654|
|AM WITH A PERMIT|    42|
|AM WITH M PERMIT|     1|
|              AM| 49148|
| A WITH M PERMIT|  1079|
| B WITH B PERMIT|   364|
|N WITH MJ PERMIT|     4|
| M WITH D PERMIT|   164|
| D WITH M PERMIT| 15111|
|        E PERMIT|   224|
|               E|439776|
| N WITH N PERMIT|     4|
|DM WITH D PERMIT|     1|
|EM WITH B PERMIT|    49|
|               B|141839|
| N WITH M PERMIT|   339|
| D WITH A PERMIT|  1743|
|DM WITH N PERMIT|     3|
|CM WITH A PERMIT|    13|
+----------------+------+
only showing top 20 rows



#### Vector Assembler

In [21]:
from pyspark.ml.feature import VectorAssembler


assembler = VectorAssembler(
    inputCols=["Jumlah"],
    outputCol='features')

query2 = assembler.transform(query2)
query2.show()

+----------------+------+----------+
|   License Class|Jumlah|  features|
+----------------+------+----------+
|M WITH DJ PERMIT|     2|     [2.0]|
| E WITH A PERMIT|   654|   [654.0]|
|AM WITH A PERMIT|    42|    [42.0]|
|AM WITH M PERMIT|     1|     [1.0]|
|              AM| 49148| [49148.0]|
| A WITH M PERMIT|  1079|  [1079.0]|
| B WITH B PERMIT|   364|   [364.0]|
|N WITH MJ PERMIT|     4|     [4.0]|
| M WITH D PERMIT|   164|   [164.0]|
| D WITH M PERMIT| 15111| [15111.0]|
|        E PERMIT|   224|   [224.0]|
|               E|439776|[439776.0]|
| N WITH N PERMIT|     4|     [4.0]|
|DM WITH D PERMIT|     1|     [1.0]|
|EM WITH B PERMIT|    49|    [49.0]|
|               B|141839|[141839.0]|
| N WITH M PERMIT|   339|   [339.0]|
| D WITH A PERMIT|  1743|  [1743.0]|
|DM WITH N PERMIT|     3|     [3.0]|
|CM WITH A PERMIT|    13|    [13.0]|
+----------------+------+----------+
only showing top 20 rows



#### kMeans

In [22]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans().setK(5).setSeed(1)
model = kmeans.fit(query2)

#### Prediction

In [23]:
predictions = model.transform(query2)
predictions.show()

+----------------+------+----------+----------+
|   License Class|Jumlah|  features|prediction|
+----------------+------+----------+----------+
|M WITH DJ PERMIT|     2|     [2.0]|         0|
| E WITH A PERMIT|   654|   [654.0]|         0|
|AM WITH A PERMIT|    42|    [42.0]|         0|
|AM WITH M PERMIT|     1|     [1.0]|         0|
|              AM| 49148| [49148.0]|         0|
| A WITH M PERMIT|  1079|  [1079.0]|         0|
| B WITH B PERMIT|   364|   [364.0]|         0|
|N WITH MJ PERMIT|     4|     [4.0]|         0|
| M WITH D PERMIT|   164|   [164.0]|         0|
| D WITH M PERMIT| 15111| [15111.0]|         0|
|        E PERMIT|   224|   [224.0]|         0|
|               E|439776|[439776.0]|         4|
| N WITH N PERMIT|     4|     [4.0]|         0|
|DM WITH D PERMIT|     1|     [1.0]|         0|
|EM WITH B PERMIT|    49|    [49.0]|         0|
|               B|141839|[141839.0]|         3|
| N WITH M PERMIT|   339|   [339.0]|         0|
| D WITH A PERMIT|  1743|  [1743.0]|    

#### Evaluate

In [24]:
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.9514091715557769


#### Cluster Center

In [25]:
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[3750.72580645]
[3732935.]
[768974.]
[134915.]
[414304.5]


#### Vizualization

In [26]:
import pixiedust

In [None]:
display(predictions)

![cluster](img/img3.png)

![cluster](img/img4.png)