# **A study about the anomaly detection**
**This notebook is a form to practice my knowledge in data science**

The notebook walks us through a workflow for anomaly detection with histogram, boxplot and machine learning.

The main purpose of this notebook is to serve as a step-by-step workflow guide, allowing me to review this notebook myself and serve as a study for future cases.

## Workflow stages
The solution workflow goes through five stages.
1.   Load the Data.
2.   Cleaning the Data.
3.   Applying the histogram for anomaly detection.
4.   Applying the Boxplot for anomaly detection.
5.   Applying the K-means algorithm for anomaly detection.

In [0]:
#Import the library that creates the spark section
from pyspark.sql import SparkSession

In [0]:
#Starts the section for using spark
spark = SparkSession.builder.appName("Deteccaoanomalies").getOrCreate()

In [0]:
%fs ls /FileStore/tables

path,name,size,modificationTime
dbfs:/FileStore/tables/grafoECommerce.txt,grafoECommerce.txt,5387,1666721409000
dbfs:/FileStore/tables/iris_bezdekIris.csv,iris_bezdekIris.csv,4551,1663856964000
dbfs:/FileStore/tables/movies-1.csv,movies-1.csv,494431,1662647401000
dbfs:/FileStore/tables/movies-2.csv,movies-2.csv,494431,1662647447000
dbfs:/FileStore/tables/movies.csv,movies.csv,494431,1662647363000
dbfs:/FileStore/tables/ratings.csv,ratings.csv,2483723,1662647649000
dbfs:/FileStore/tables/temperature.csv,temperature.csv,13971171,1667063993000
dbfs:/FileStore/tables/transport_nodes_7c826.csv,transport_nodes_7c826.csv,465,1666295023000
dbfs:/FileStore/tables/transport_relationships_c2bfc.csv,transport_relationships_c2bfc.csv,550,1666295023000
dbfs:/FileStore/tables/u.data,u.data,1979173,1662474869000


In [0]:
#Get the directory containing the file to use
dir="/FileStore/tables/temperature.csv"

#1) Load the Data

In [0]:
#Reading stored files through generic function
df = spark.read.format("csv").options(header="true", inferschema="true").load(dir) 

In [0]:
df.show(5,False)

+-------------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-------------+-----------------+-------------+-----+---------+---------+
|datetime           |Vancouver    |Portland     |San Francisco|Seattle      |Los Angeles  |San Diego    |Las Vegas    |Phoenix      |Albuquerque  |Denver       |San Antonio  |Dallas       |Houston      |Kansas City  |Minneapolis  |Saint Louis  |Chicago      |Nashville    |Indianapolis |Atlanta      |Detroit      |Jacksonville |Charlotte    |Miami        |Pittsburgh   |Toronto      |Philadelphia |New York     |Montreal     |Boston       |Beersheba    |Tel Aviv District|Eilat        

In [0]:
#Show dataset columns
df.columns 

Out[618]: ['datetime',
 'Vancouver',
 'Portland',
 'San Francisco',
 'Seattle',
 'Los Angeles',
 'San Diego',
 'Las Vegas',
 'Phoenix',
 'Albuquerque',
 'Denver',
 'San Antonio',
 'Dallas',
 'Houston',
 'Kansas City',
 'Minneapolis',
 'Saint Louis',
 'Chicago',
 'Nashville',
 'Indianapolis',
 'Atlanta',
 'Detroit',
 'Jacksonville',
 'Charlotte',
 'Miami',
 'Pittsburgh',
 'Toronto',
 'Philadelphia',
 'New York',
 'Montreal',
 'Boston',
 'Beersheba',
 'Tel Aviv District',
 'Eilat',
 'Haifa',
 'Nahariyya',
 'Jerusalem']

In [0]:
df.printSchema()

root
 |-- datetime: timestamp (nullable = true)
 |-- Vancouver: double (nullable = true)
 |-- Portland: double (nullable = true)
 |-- San Francisco: double (nullable = true)
 |-- Seattle: double (nullable = true)
 |-- Los Angeles: double (nullable = true)
 |-- San Diego: double (nullable = true)
 |-- Las Vegas: double (nullable = true)
 |-- Phoenix: double (nullable = true)
 |-- Albuquerque: double (nullable = true)
 |-- Denver: double (nullable = true)
 |-- San Antonio: double (nullable = true)
 |-- Dallas: double (nullable = true)
 |-- Houston: double (nullable = true)
 |-- Kansas City: double (nullable = true)
 |-- Minneapolis: double (nullable = true)
 |-- Saint Louis: double (nullable = true)
 |-- Chicago: double (nullable = true)
 |-- Nashville: double (nullable = true)
 |-- Indianapolis: double (nullable = true)
 |-- Atlanta: double (nullable = true)
 |-- Detroit: double (nullable = true)
 |-- Jacksonville: double (nullable = true)
 |-- Charlotte: double (nullable = true)
 |-- M

In [0]:
df.count()#Counts the number of data

Out[620]: 45253

In [0]:
#Select only the datatime and the column of the city of Vancouver
df_analysis=df.select('datetime','Vancouver')

In [0]:
df_analysis.show(5)

+-------------------+-------------+
|           datetime|    Vancouver|
+-------------------+-------------+
|2012-10-01 12:00:00|         null|
|2012-10-01 13:00:00|       284.63|
|2012-10-01 14:00:00| 284.62904131|
|2012-10-01 15:00:00|284.626997923|
|2012-10-01 16:00:00|284.624954535|
+-------------------+-------------+
only showing top 5 rows



#2) Cleaning the data

In [0]:
#Filter data that do not have null values
from pyspark.sql.functions import col
df_notNull=df_analysis.filter(col('Vancouver').isNotNull())

In [0]:
df_notNull.show(5)

+-------------------+-------------+
|           datetime|    Vancouver|
+-------------------+-------------+
|2012-10-01 13:00:00|       284.63|
|2012-10-01 14:00:00| 284.62904131|
|2012-10-01 15:00:00|284.626997923|
|2012-10-01 16:00:00|284.624954535|
|2012-10-01 17:00:00|284.622911147|
+-------------------+-------------+
only showing top 5 rows



In [0]:
from pyspark.sql.functions import monotonically_increasing_id  #Library for building indexes
df_plots = df_notNull.withColumn("index", monotonically_increasing_id())  #Creates the indexes to perform the plot

In [0]:
df_plots.show(5)

+-------------------+-------------+-----+
|           datetime|    Vancouver|index|
+-------------------+-------------+-----+
|2012-10-01 13:00:00|       284.63|    0|
|2012-10-01 14:00:00| 284.62904131|    1|
|2012-10-01 15:00:00|284.626997923|    2|
|2012-10-01 16:00:00|284.624954535|    3|
|2012-10-01 17:00:00|284.622911147|    4|
+-------------------+-------------+-----+
only showing top 5 rows



In [0]:
#Showing the data
display(df_plots.select("index","Vancouver"))

index,Vancouver
0,284.63
1,284.62904131
2,284.626997923
3,284.624954535
4,284.622911147
5,284.620867759
6,284.618824371
7,284.616780983
8,284.614737595
9,284.612694207


Output can only be rendered in Databricks

#3) Applying the histogram for anomaly detection

In [0]:
#Finding the mean
import numpy as np  #Library used to handle vectors and matrices
from pyspark.sql.functions import mean, stddev  #Functions to find the mean and stddev

In [0]:
#Create a list of values
list_stats = df_notNull.select(mean(col('Vancouver')).alias('media'),stddev(col('Vancouver')).alias('desvioPadrao')).collect()

In [0]:
media = list_stats[0]['media']
desvio = list_stats[0]['desvioPadrao']
print("Mean: ", media)
print("Standard Deviation: ", desvio)

Mean:  283.86265374154124
Standard Deviation:  6.640130616770929


In [0]:
#Create the dataset with the mean and standard deviation
df_stats = dataNotNull.select(mean(col('Vancouver')).alias('mean'),stddev(col('Vancouver')).alias('stddev'))

In [0]:
df_stats.show()

+------------------+-----------------+
|              mean|           stddev|
+------------------+-----------------+
|283.86265374154124|6.640130616770929|
+------------------+-----------------+



In [0]:
#Using the describe function
df_notNull.describe().show()

+-------+------------------+
|summary|         Vancouver|
+-------+------------------+
|  count|             44458|
|   mean|283.86265374154124|
| stddev| 6.640130616770929|
|    min|            245.15|
|    max|             307.0|
+-------+------------------+



In [0]:
#Defining the distance function
def distance(x):
  mean=283.8626
  stddev=6.6401
  return ((x - mean)/stddev)

#Defining the function to check for anomalies more than 2 stddev (95% of the data)
def anomalies(x):
  stddev=6.6401
  if (x>2):
    return 1
  else:
    return 0

#Defining the functions to be used (registering)
from pyspark.sql.types import DoubleType, IntegerType
distance_udf_double = udf(lambda z: distance(z), DoubleType())
anomaly_udf_int = udf(lambda z: anomalies(z), IntegerType())

In [0]:
data_new=df_notNull.select('Vancouver',distance_udf_double('Vancouver').alias('distance'))
data_new.show()

+-------------+-------------------+
|    Vancouver|           distance|
+-------------+-------------------+
|       284.63|0.11557054863631709|
| 284.62904131|0.11542616978660031|
|284.626997923|0.11511843541512777|
|284.624954535| 0.1148107008930567|
|284.622911147|0.11450296637099419|
|284.620867759| 0.1141952318489231|
|284.618824371|0.11388749732685204|
|284.616780983|0.11357976280478096|
|284.614737595| 0.1132720282827099|
|284.612694207|0.11296429376063881|
|284.610650819|0.11265655923856774|
|284.608607431|0.11234882471649667|
|284.606564043| 0.1120410901944256|
|284.604520655|0.11173335567235451|
|284.602477267|0.11142562115028344|
|284.600433879|0.11111788662821237|
|284.598390491| 0.1108101521061413|
|284.596347103|0.11050241758407878|
|284.594303716|0.11019468321260625|
|284.592260328|0.10988694869053518|
+-------------+-------------------+
only showing top 20 rows



In [0]:
from  pyspark.sql.functions import abs   #Library needed to calculate the absolute value
data_new=data_new.select('Vancouver','distance', abs(col('distance')).alias("distanceABS"))
data_new.show()

+-------------+-------------------+-------------------+
|    Vancouver|           distance|        distanceABS|
+-------------+-------------------+-------------------+
|       284.63|0.11557054863631709|0.11557054863631709|
| 284.62904131|0.11542616978660031|0.11542616978660031|
|284.626997923|0.11511843541512777|0.11511843541512777|
|284.624954535| 0.1148107008930567| 0.1148107008930567|
|284.622911147|0.11450296637099419|0.11450296637099419|
|284.620867759| 0.1141952318489231| 0.1141952318489231|
|284.618824371|0.11388749732685204|0.11388749732685204|
|284.616780983|0.11357976280478096|0.11357976280478096|
|284.614737595| 0.1132720282827099| 0.1132720282827099|
|284.612694207|0.11296429376063881|0.11296429376063881|
|284.610650819|0.11265655923856774|0.11265655923856774|
|284.608607431|0.11234882471649667|0.11234882471649667|
|284.606564043| 0.1120410901944256| 0.1120410901944256|
|284.604520655|0.11173335567235451|0.11173335567235451|
|284.602477267|0.11142562115028344|0.11142562115

In [0]:
data_new=data_new.select('Vancouver','distance', "distanceABS", anomaly_udf_int("distanceABS").alias("isAnomaly"))
data_new.show()

+-------------+-------------------+-------------------+---------+
|    Vancouver|           distance|        distanceABS|isAnomaly|
+-------------+-------------------+-------------------+---------+
|       284.63|0.11557054863631709|0.11557054863631709|        0|
| 284.62904131|0.11542616978660031|0.11542616978660031|        0|
|284.626997923|0.11511843541512777|0.11511843541512777|        0|
|284.624954535| 0.1148107008930567| 0.1148107008930567|        0|
|284.622911147|0.11450296637099419|0.11450296637099419|        0|
|284.620867759| 0.1141952318489231| 0.1141952318489231|        0|
|284.618824371|0.11388749732685204|0.11388749732685204|        0|
|284.616780983|0.11357976280478096|0.11357976280478096|        0|
|284.614737595| 0.1132720282827099| 0.1132720282827099|        0|
|284.612694207|0.11296429376063881|0.11296429376063881|        0|
|284.610650819|0.11265655923856774|0.11265655923856774|        0|
|284.608607431|0.11234882471649667|0.11234882471649667|        0|
|284.60656

In [0]:
data_new.filter(col("isAnomaly")>0).show()

+---------+-------------------+------------------+---------+
|Vancouver|           distance|       distanceABS|isAnomaly|
+---------+-------------------+------------------+---------+
|   270.42|-2.0244574629900107|2.0244574629900107|        1|
|   269.81|-2.1163235493441337|2.1163235493441337|        1|
|   269.65| -2.140419571994399| 2.140419571994399|        1|
|   269.67|-2.1374075691631105|2.1374075691631105|        1|
|   269.67|-2.1374075691631105|2.1374075691631105|        1|
|   270.52|-2.0093974488336026|2.0093974488336026|        1|
|   269.15|-2.2157196427764654|2.2157196427764654|        1|
|   267.15|-2.5169199259047317|2.5169199259047317|        1|
|   266.15| -2.667520067468865| 2.667520067468865|        1|
|   266.15| -2.667520067468865| 2.667520067468865|        1|
|   266.15| -2.667520067468865| 2.667520067468865|        1|
|   266.15| -2.667520067468865| 2.667520067468865|        1|
|   266.15| -2.667520067468865| 2.667520067468865|        1|
|   265.15|-2.8181202090

In [0]:
#Viewing the histogram
display(df_notNull.select("Vancouver"))

Vancouver
284.63
284.62904131
284.626997923
284.624954535
284.622911147
284.620867759
284.618824371
284.616780983
284.614737595
284.612694207


Output can only be rendered in Databricks

#4) Applying the Boxplot for anomaly detection

In [0]:
#Viewing the Boxplot
display(df_notNull.select("Vancouver"))

Vancouver
284.63
284.62904131
284.626997923
284.624954535
284.622911147
284.620867759
284.618824371
284.616780983
284.614737595
284.612694207


In [0]:
spark.sparkContext.parallelize( [np.array([1.0, 10.0, 100.0]), np.array([2.0, 20.0, 200.0]), np.array([3.0, 30.0, 300.0])])  #RDD of Vectors

Out[641]: ParallelCollectionRDD[3812] at readRDDFromInputStream at PythonRDD.scala:413

In [0]:
#Statistics with MLlib
from pyspark.mllib.stat import Statistics
column=df_notNull.select("Vancouver")  #Select the column
column1= column.rdd.map(lambda x: [int (x[0])]) #Apply the map to transform into vector
statistic=Statistics.colStats(column1) #Apply the statistics
print("Mean: ",statistic.mean())  #Mean
print("Variance: ", statistic.variance())  #Variance
print("Values not null: ",statistic.numNonzeros())  #Non-zero number of values

Mean:  [283.36749291]
Variance:  [44.23613606]
Values not null:  [44458.]


#5) Applying the K-means algorithm for anomaly detection

In [0]:
%fs ls /FileStore/tables	

path,name,size,modificationTime
dbfs:/FileStore/tables/grafoECommerce.txt,grafoECommerce.txt,5387,1666721409000
dbfs:/FileStore/tables/iris_bezdekIris.csv,iris_bezdekIris.csv,4551,1663856964000
dbfs:/FileStore/tables/movies-1.csv,movies-1.csv,494431,1662647401000
dbfs:/FileStore/tables/movies-2.csv,movies-2.csv,494431,1662647447000
dbfs:/FileStore/tables/movies.csv,movies.csv,494431,1662647363000
dbfs:/FileStore/tables/ratings.csv,ratings.csv,2483723,1662647649000
dbfs:/FileStore/tables/temperature.csv,temperature.csv,13971171,1667063993000
dbfs:/FileStore/tables/transport_nodes_7c826.csv,transport_nodes_7c826.csv,465,1666295023000
dbfs:/FileStore/tables/transport_relationships_c2bfc.csv,transport_relationships_c2bfc.csv,550,1666295023000
dbfs:/FileStore/tables/u.data,u.data,1979173,1662474869000


In [0]:
#Get the directory containing the file to use
dir="/FileStore/tables/worldcities.csv" 

In [0]:
#Read the dataset
df_cities = spark.read.format("csv").options(header="true", inferschema="true").load(dir)

In [0]:
df_cities.printSchema()

root
 |-- city: string (nullable = true)
 |-- city_ascii: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- lng: double (nullable = true)
 |-- country: string (nullable = true)
 |-- iso2: string (nullable = true)
 |-- iso3: string (nullable = true)
 |-- admin_name: string (nullable = true)
 |-- capital: string (nullable = true)
 |-- population: double (nullable = true)
 |-- id: integer (nullable = true)



In [0]:
#Showing the dataset
df_cities.show()

+--------------+--------------+--------+---------+-------------+----+----+--------------------+-------+-----------+----------+
|          city|    city_ascii|     lat|      lng|      country|iso2|iso3|          admin_name|capital| population|        id|
+--------------+--------------+--------+---------+-------------+----+----+--------------------+-------+-----------+----------+
|         Tokyo|         Tokyo|  35.685| 139.7514|        Japan|  JP| JPN|               Tōkyō|primary|   3.5676E7|1392685764|
|      New York|      New York| 40.6943| -73.9249|United States|  US| USA|            New York|   null|1.9354922E7|1840034016|
|   Mexico City|   Mexico City| 19.4424|  -99.131|       Mexico|  MX| MEX|    Ciudad de México|primary|   1.9028E7|1484247881|
|        Mumbai|        Mumbai|  19.017|   72.857|        India|  IN| IND|         Mahārāshtra|  admin|   1.8978E7|1356226629|
|     São Paulo|     Sao Paulo|-23.5587|  -46.625|       Brazil|  BR| BRA|           São Paulo|  admin|   1.884

In [0]:
#Filtering some cities
cities_BR =df_cities.where(col("country")=="Brazil")
cities_BR.show(5)

+--------------+--------------+--------+--------+-------+----+----+-----------------+-------+----------+----------+
|          city|    city_ascii|     lat|     lng|country|iso2|iso3|       admin_name|capital|population|        id|
+--------------+--------------+--------+--------+-------+----+----+-----------------+-------+----------+----------+
|     São Paulo|     Sao Paulo|-23.5587| -46.625| Brazil|  BR| BRA|        São Paulo|  admin|  1.8845E7|1076532519|
|Rio de Janeiro|Rio de Janeiro| -22.925| -43.225| Brazil|  BR| BRA|   Rio de Janeiro|  admin|  1.1748E7|1076887657|
|Belo Horizonte|Belo Horizonte| -19.915| -43.915| Brazil|  BR| BRA|     Minas Gerais|  admin| 5575000.0|1076967355|
|  Porto Alegre|  Porto Alegre|  -30.05|   -51.2| Brazil|  BR| BRA|Rio Grande do Sul|  admin| 3917000.0|1076658221|
|      Brasília|      Brasilia|-15.7833|-47.9161| Brazil|  BR| BRA| Distrito Federal|primary| 3716996.0|1076144436|
+--------------+--------------+--------+--------+-------+----+----+-----

In [0]:
cities_MX =df_cities.where(col("country")=="Mexico")
cities_MX.show(5)

+-----------+-----------+-------+-------+-------+----+----+----------------+-------+----------+----------+
|       city| city_ascii|    lat|    lng|country|iso2|iso3|      admin_name|capital|population|        id|
+-----------+-----------+-------+-------+-------+----+----+----------------+-------+----------+----------+
|Mexico City|Mexico City|19.4424|-99.131| Mexico|  MX| MEX|Ciudad de México|primary|  1.9028E7|1484247881|
|Guadalajara|Guadalajara|  20.67|-103.33| Mexico|  MX| MEX|         Jalisco|  admin| 4198000.0|1484950208|
|  Monterrey|  Monterrey|  25.67|-100.33| Mexico|  MX| MEX|      Nuevo León|  admin| 3712000.0|1484559591|
|     Puebla|     Puebla|  19.05|  -98.2| Mexico|  MX| MEX|          Puebla|  admin| 2195000.0|1484416394|
|    Tijuana|    Tijuana|   32.5|-117.08| Mexico|  MX| MEX| Baja California|  minor| 1553000.0|1484708778|
+-----------+-----------+-------+-------+-------+----+----+----------------+-------+----------+----------+
only showing top 5 rows



In [0]:
cities_EUA =df_cities.where(col("country")=="United States")
cities_EUA.show(5)

+-----------+-----------+-------+---------+-------------+----+----+----------+-------+-----------+----------+
|       city| city_ascii|    lat|      lng|      country|iso2|iso3|admin_name|capital| population|        id|
+-----------+-----------+-------+---------+-------------+----+----+----------+-------+-----------+----------+
|   New York|   New York|40.6943| -73.9249|United States|  US| USA|  New York|   null|1.9354922E7|1840034016|
|Los Angeles|Los Angeles|34.1139|-118.4068|United States|  US| USA|California|   null|1.2815475E7|1840020491|
|    Chicago|    Chicago|41.8373| -87.6862|United States|  US| USA|  Illinois|   null|  8675982.0|1840000494|
|      Miami|      Miami|25.7839| -80.2102|United States|  US| USA|   Florida|   null|  6381966.0|1840015149|
|     Dallas|     Dallas|32.7936| -96.7662|United States|  US| USA|     Texas|   null|  5733259.0|1840019440|
+-----------+-----------+-------+---------+-------------+----+----+----------+-------+-----------+----------+
only showi

In [0]:
#Creating a new dataset through the join function
df_concat = cities_BR.union(cities_MX)
df_concat.show(5)

+--------------+--------------+--------+--------+-------+----+----+-----------------+-------+----------+----------+
|          city|    city_ascii|     lat|     lng|country|iso2|iso3|       admin_name|capital|population|        id|
+--------------+--------------+--------+--------+-------+----+----+-----------------+-------+----------+----------+
|     São Paulo|     Sao Paulo|-23.5587| -46.625| Brazil|  BR| BRA|        São Paulo|  admin|  1.8845E7|1076532519|
|Rio de Janeiro|Rio de Janeiro| -22.925| -43.225| Brazil|  BR| BRA|   Rio de Janeiro|  admin|  1.1748E7|1076887657|
|Belo Horizonte|Belo Horizonte| -19.915| -43.915| Brazil|  BR| BRA|     Minas Gerais|  admin| 5575000.0|1076967355|
|  Porto Alegre|  Porto Alegre|  -30.05|   -51.2| Brazil|  BR| BRA|Rio Grande do Sul|  admin| 3917000.0|1076658221|
|      Brasília|      Brasilia|-15.7833|-47.9161| Brazil|  BR| BRA| Distrito Federal|primary| 3716996.0|1076144436|
+--------------+--------------+--------+--------+-------+----+----+-----

In [0]:
#Counting the number of different countries
df_concat.groupby("country").count().show()

+-------+-----+
|country|count|
+-------+-----+
| Brazil|  387|
| Mexico|  188|
+-------+-----+



In [0]:
#Adding cities in Japan (our anomalies)
cities_JP =df_cities.where((col("city")=="Tokyo") | (col("city")=="Ōsaka"))
cities_JP.show()

+-----+----------+------+--------+-------+----+----+----------+-------+----------+----------+
| city|city_ascii|   lat|     lng|country|iso2|iso3|admin_name|capital|population|        id|
+-----+----------+------+--------+-------+----+----+----------+-------+----------+----------+
|Tokyo|     Tokyo|35.685|139.7514|  Japan|  JP| JPN|     Tōkyō|primary|  3.5676E7|1392685764|
|Ōsaka|     Osaka| 34.75|135.4601|  Japan|  JP| JPN|     Ōsaka|  admin|  1.1294E7|1392419823|
+-----+----------+------+--------+-------+----+----+----------+-------+----------+----------+



In [0]:
#Create the final dataset
df_final = df_concat.union(cities_JP)

In [0]:
df_final.show()

+--------------+--------------+--------+--------+-------+----+----+-------------------+-------+----------+----------+
|          city|    city_ascii|     lat|     lng|country|iso2|iso3|         admin_name|capital|population|        id|
+--------------+--------------+--------+--------+-------+----+----+-------------------+-------+----------+----------+
|     São Paulo|     Sao Paulo|-23.5587| -46.625| Brazil|  BR| BRA|          São Paulo|  admin|  1.8845E7|1076532519|
|Rio de Janeiro|Rio de Janeiro| -22.925| -43.225| Brazil|  BR| BRA|     Rio de Janeiro|  admin|  1.1748E7|1076887657|
|Belo Horizonte|Belo Horizonte| -19.915| -43.915| Brazil|  BR| BRA|       Minas Gerais|  admin| 5575000.0|1076967355|
|  Porto Alegre|  Porto Alegre|  -30.05|   -51.2| Brazil|  BR| BRA|  Rio Grande do Sul|  admin| 3917000.0|1076658221|
|      Brasília|      Brasilia|-15.7833|-47.9161| Brazil|  BR| BRA|   Distrito Federal|primary| 3716996.0|1076144436|
|        Recife|        Recife| -8.0756|-34.9156| Brazil

In [0]:
from pyspark.ml.evaluation import ClusteringEvaluator  #Library used for the evaluation in each of the clusters
from pyspark.ml.clustering import KMeans #Library used to create the clustering model using K-means

In [0]:
from pyspark.ml.feature import VectorAssembler  #Transforming data into feature vectors

vecAssembler = VectorAssembler(inputCols=["lat","lng"], outputCol="features") #Used to transform the data into a vector (defines the object)
df_new = vecAssembler.transform(df_final) #Apply the transformation
df_new.show()

+--------------+--------------+--------+--------+-------+----+----+-------------------+-------+----------+----------+-------------------+
|          city|    city_ascii|     lat|     lng|country|iso2|iso3|         admin_name|capital|population|        id|           features|
+--------------+--------------+--------+--------+-------+----+----+-------------------+-------+----------+----------+-------------------+
|     São Paulo|     Sao Paulo|-23.5587| -46.625| Brazil|  BR| BRA|          São Paulo|  admin|  1.8845E7|1076532519| [-23.5587,-46.625]|
|Rio de Janeiro|Rio de Janeiro| -22.925| -43.225| Brazil|  BR| BRA|     Rio de Janeiro|  admin|  1.1748E7|1076887657|  [-22.925,-43.225]|
|Belo Horizonte|Belo Horizonte| -19.915| -43.915| Brazil|  BR| BRA|       Minas Gerais|  admin| 5575000.0|1076967355|  [-19.915,-43.915]|
|  Porto Alegre|  Porto Alegre|  -30.05|   -51.2| Brazil|  BR| BRA|  Rio Grande do Sul|  admin| 3917000.0|1076658221|     [-30.05,-51.2]|
|      Brasília|      Brasilia|-15

In [0]:
#Apply the clustering process
kmeans = KMeans(k=3, seed=1)  #Declare object - 3 clusters
model = kmeans.fit(df_new.select('features')) #Apply the training

In [0]:
#Creates the dataset indicating which cluster each dataset was added to
df_final = model.transform(df_new)
df_final.show() 

+--------------+--------------+--------+--------+-------+----+----+-------------------+-------+----------+----------+-------------------+----------+
|          city|    city_ascii|     lat|     lng|country|iso2|iso3|         admin_name|capital|population|        id|           features|prediction|
+--------------+--------------+--------+--------+-------+----+----+-------------------+-------+----------+----------+-------------------+----------+
|     São Paulo|     Sao Paulo|-23.5587| -46.625| Brazil|  BR| BRA|          São Paulo|  admin|  1.8845E7|1076532519| [-23.5587,-46.625]|         0|
|Rio de Janeiro|Rio de Janeiro| -22.925| -43.225| Brazil|  BR| BRA|     Rio de Janeiro|  admin|  1.1748E7|1076887657|  [-22.925,-43.225]|         0|
|Belo Horizonte|Belo Horizonte| -19.915| -43.915| Brazil|  BR| BRA|       Minas Gerais|  admin| 5575000.0|1076967355|  [-19.915,-43.915]|         0|
|  Porto Alegre|  Porto Alegre|  -30.05|   -51.2| Brazil|  BR| BRA|  Rio Grande do Sul|  admin| 3917000.0|

In [0]:
df_final.groupby('prediction').count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         0|  387|
|         1|  188|
|         2|    2|
+----------+-----+



In [0]:
df_final.where(col("prediction")=='2').show()

+-----+----------+------+--------+-------+----+----+----------+-------+----------+----------+-----------------+----------+
| city|city_ascii|   lat|     lng|country|iso2|iso3|admin_name|capital|population|        id|         features|prediction|
+-----+----------+------+--------+-------+----+----+----------+-------+----------+----------+-----------------+----------+
|Tokyo|     Tokyo|35.685|139.7514|  Japan|  JP| JPN|     Tōkyō|primary|  3.5676E7|1392685764|[35.685,139.7514]|         2|
|Ōsaka|     Osaka| 34.75|135.4601|  Japan|  JP| JPN|     Ōsaka|  admin|  1.1294E7|1392419823| [34.75,135.4601]|         2|
+-----+----------+------+--------+-------+----+----+----------+-------+----------+----------+-----------------+----------+

