In [1]:
import findspark
findspark.init("/usr/local/Cellar/apache-spark/2.4.4/libexec")

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import CountVectorizer, IDF, Tokenizer, RegexTokenizer, StopWordsRemover, IDF, MinHashLSH
from pyspark.sql import SQLContext, Row
from pyspark.mllib.linalg import Vector, Vectors
from pyspark.sql.functions import explode, col
from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
import numpy as np

from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession\
        .builder\
        .appName("test_new")\
        .getOrCreate()

spark

In [3]:
#load all the drive stats csv files into the spark dataframe
df = spark.read.option("header", "true").csv("/Users/bharathsurianarayanan/Downloads/drive_stats_2019_Q1/*.csv")


In [4]:
#display the schema of the loaded dataframe
df.printSchema()

root
 |-- date: string (nullable = true)
 |-- serial_number: string (nullable = true)
 |-- model: string (nullable = true)
 |-- capacity_bytes: string (nullable = true)
 |-- failure: string (nullable = true)
 |-- smart_1_normalized: string (nullable = true)
 |-- smart_1_raw: string (nullable = true)
 |-- smart_2_normalized: string (nullable = true)
 |-- smart_2_raw: string (nullable = true)
 |-- smart_3_normalized: string (nullable = true)
 |-- smart_3_raw: string (nullable = true)
 |-- smart_4_normalized: string (nullable = true)
 |-- smart_4_raw: string (nullable = true)
 |-- smart_5_normalized: string (nullable = true)
 |-- smart_5_raw: string (nullable = true)
 |-- smart_7_normalized: string (nullable = true)
 |-- smart_7_raw: string (nullable = true)
 |-- smart_8_normalized: string (nullable = true)
 |-- smart_8_raw: string (nullable = true)
 |-- smart_9_normalized: string (nullable = true)
 |-- smart_9_raw: string (nullable = true)
 |-- smart_10_normalized: string (nullable = tru

In [5]:
from pyspark.ml.feature import VectorAssembler
#convert smart_1_normalized from string to Integer type 
df = df.withColumn("smart_1_normalized", df["smart_1_normalized"].cast(IntegerType()))
# vectorize the column smart_1_normalized and store the vector in a column named features
vecAssembler=VectorAssembler(inputCols=["smart_1_normalized"],outputCol="features")
# skip invalid values
new_df=vecAssembler.setHandleInvalid('skip').transform(df)
# lets take only the columns necessary for part a) i.e kmeans with respect to smart_1_normalized column
temp_df=new_df.select('date','serial_number','model','smart_1_normalized','features')
temp_df.show()
temp_df.printSchema()

temp_df=temp_df.where(F.col('features').isNotNull())
# temp_df=temp_df.select('features')
# temp_df.printSchema()

+----------+--------------+--------------------+------------------+--------+
|      date| serial_number|               model|smart_1_normalized|features|
+----------+--------------+--------------------+------------------+--------+
|2019-03-05|      Z305B2QN|         ST4000DM000|               117| [117.0]|
|2019-03-05|      ZJV0XJQ4|       ST12000NM0007|                80|  [80.0]|
|2019-03-05|      ZJV0XJQ3|       ST12000NM0007|                83|  [83.0]|
|2019-03-05|      ZJV0XJQ0|       ST12000NM0007|                81|  [81.0]|
|2019-03-05|PL1331LAHG1S4H|HGST HMS5C4040ALE640|               100| [100.0]|
|2019-03-05|      ZA16NQJR|        ST8000NM0055|                75|  [75.0]|
|2019-03-05|      ZJV02XWG|       ST12000NM0007|                83|  [83.0]|
|2019-03-05|      ZJV1CSVX|       ST12000NM0007|                83|  [83.0]|
|2019-03-05|      ZJV02XWA|       ST12000NM0007|                78|  [78.0]|
|2019-03-05|      ZA18CEBS|        ST8000NM0055|                77|  [77.0]|

In [6]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=2, seed=1)  # 2 clusters here
model = kmeans.fit(temp_df.select('features'))

In [7]:
transformed=model.transform(temp_df)
transformed.show()

+----------+--------------+--------------------+------------------+--------+----------+
|      date| serial_number|               model|smart_1_normalized|features|prediction|
+----------+--------------+--------------------+------------------+--------+----------+
|2019-03-05|      Z305B2QN|         ST4000DM000|               117| [117.0]|         1|
|2019-03-05|      ZJV0XJQ4|       ST12000NM0007|                80|  [80.0]|         0|
|2019-03-05|      ZJV0XJQ3|       ST12000NM0007|                83|  [83.0]|         0|
|2019-03-05|      ZJV0XJQ0|       ST12000NM0007|                81|  [81.0]|         0|
|2019-03-05|PL1331LAHG1S4H|HGST HMS5C4040ALE640|               100| [100.0]|         1|
|2019-03-05|      ZA16NQJR|        ST8000NM0055|                75|  [75.0]|         0|
|2019-03-05|      ZJV02XWG|       ST12000NM0007|                83|  [83.0]|         0|
|2019-03-05|      ZJV1CSVX|       ST12000NM0007|                83|  [83.0]|         0|
|2019-03-05|      ZJV02XWA|     

In [8]:
from pyspark.ml.evaluation import ClusteringEvaluator
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(transformed)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.801856063520587


In [9]:
from pyspark.sql import functions as F

#generating labels based on the predicted values, if predicted value is 1 then it is an Anomaly else it is not an Anomaly
# this 
df_temp = transformed.withColumn(
    'label',
    F.when((F.col("prediction") == 1), 'Anomaly')\
    .otherwise("Not Anomaly")
)

df_temp.show()

+----------+--------------+--------------------+------------------+--------+----------+-----------+
|      date| serial_number|               model|smart_1_normalized|features|prediction|      label|
+----------+--------------+--------------------+------------------+--------+----------+-----------+
|2019-03-05|      Z305B2QN|         ST4000DM000|               117| [117.0]|         1|    Anomaly|
|2019-03-05|      ZJV0XJQ4|       ST12000NM0007|                80|  [80.0]|         0|Not Anomaly|
|2019-03-05|      ZJV0XJQ3|       ST12000NM0007|                83|  [83.0]|         0|Not Anomaly|
|2019-03-05|      ZJV0XJQ0|       ST12000NM0007|                81|  [81.0]|         0|Not Anomaly|
|2019-03-05|PL1331LAHG1S4H|HGST HMS5C4040ALE640|               100| [100.0]|         1|    Anomaly|
|2019-03-05|      ZA16NQJR|        ST8000NM0055|                75|  [75.0]|         0|Not Anomaly|
|2019-03-05|      ZJV02XWG|       ST12000NM0007|                83|  [83.0]|         0|Not Anomaly|


In [28]:
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# part a) k means using smart_1_normalized column
#scaling the features, to the range [0,1] so as to improve the accuracy of k means

scaler = MinMaxScaler(inputCol="features",\
         outputCol="scaledFeatures")
scalerModel =  scaler.fit(temp_df.select("features"))
scaledData = scalerModel.transform(temp_df)

In [29]:
scaledData.printSchema()

root
 |-- date: string (nullable = true)
 |-- serial_number: string (nullable = true)
 |-- model: string (nullable = true)
 |-- smart_1_normalized: integer (nullable = true)
 |-- features: vector (nullable = true)
 |-- scaledFeatures: vector (nullable = true)



In [30]:
# show the scaled data
scaledData.show()

+----------+--------------+--------------------+------------------+--------+--------------------+
|      date| serial_number|               model|smart_1_normalized|features|      scaledFeatures|
+----------+--------------+--------------------+------------------+--------+--------------------+
|2019-03-05|      Z305B2QN|         ST4000DM000|               117| [117.0]|[0.4503311258278146]|
|2019-03-05|      ZJV0XJQ4|       ST12000NM0007|                80|  [80.0]|[0.2052980132450331]|
|2019-03-05|      ZJV0XJQ3|       ST12000NM0007|                83|  [83.0]|[0.2251655629139073]|
|2019-03-05|      ZJV0XJQ0|       ST12000NM0007|                81|  [81.0]|[0.2119205298013245]|
|2019-03-05|PL1331LAHG1S4H|HGST HMS5C4040ALE640|               100| [100.0]|[0.33774834437086...|
|2019-03-05|      ZA16NQJR|        ST8000NM0055|                75|  [75.0]|[0.17218543046357...|
|2019-03-05|      ZJV02XWG|       ST12000NM0007|                83|  [83.0]|[0.2251655629139073]|
|2019-03-05|      ZJ

In [65]:
from pyspark.ml.clustering import KMeans
#build the k means model, k specifies the number of clusters while seed is a pseudorandom number
#for different values of "seed" differnt initial centroids are chosen
#the value of k was chosen because for different values of k the clustering silhouettte was calculated
#the silhouette measure was highest for k=49 , which is also the number of unique models, thus this k was settled upoon
kmeans = KMeans(k=49, seed=1)  # 2 clusters here
model = kmeans.fit(scaledData.select('features'))

In [66]:
#transform the data based on the generated model
scaled_transformed=model.transform(scaledData)
scaled_transformed.show()

+----------+--------------+--------------------+------------------+--------+--------------------+----------+
|      date| serial_number|               model|smart_1_normalized|features|      scaledFeatures|prediction|
+----------+--------------+--------------------+------------------+--------+--------------------+----------+
|2019-03-05|      Z305B2QN|         ST4000DM000|               117| [117.0]|[0.4503311258278146]|        21|
|2019-03-05|      ZJV0XJQ4|       ST12000NM0007|                80|  [80.0]|[0.2052980132450331]|         7|
|2019-03-05|      ZJV0XJQ3|       ST12000NM0007|                83|  [83.0]|[0.2251655629139073]|        11|
|2019-03-05|      ZJV0XJQ0|       ST12000NM0007|                81|  [81.0]|[0.2119205298013245]|         4|
|2019-03-05|PL1331LAHG1S4H|HGST HMS5C4040ALE640|               100| [100.0]|[0.33774834437086...|         1|
|2019-03-05|      ZA16NQJR|        ST8000NM0055|                75|  [75.0]|[0.17218543046357...|         8|
|2019-03-05|      Z

In [67]:
from pyspark.ml.evaluation import ClusteringEvaluator
#evaluate the generated model using the CLustering evaluator which finds the silhouette with the squared 
# euclidian distance

evaluator = ClusteringEvaluator()
#the silouette measure ranges from -1 to 1 , the closer it is to 1, the more similar are points within the cluster
# the closer it is to -1 the more dissimilar are the points within the cluster
silhouette = evaluator.evaluate(scaled_transformed)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.9683050263733727


In [111]:
# get the number of points in each cluster
# sparse number of points in a cluster means they're probably outliers
# from the results we see that the first cluster has > 99% of the data, hence the data in  the cluster with size =779
# is the outlier data cluster
summary=model.summary
summary.clusterSizes

[363763,
 2194364,
 442079,
 58903,
 616376,
 387173,
 97029,
 486657,
 153994,
 244531,
 777905,
 1055399,
 138611,
 43465,
 36550,
 123168,
 48910,
 307408,
 38903,
 194125,
 779,
 438859,
 97006,
 291628,
 22885,
 294298,
 36401,
 146362,
 109119,
 145712,
 55447,
 36522,
 54930,
 36251,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0]

In [93]:
# part b) k means with the annualized failure rate column

from pyspark.ml.feature import VectorAssembler
#convert failure from string to Integer type 
df_failures = df.withColumn("failure", df["failure"].cast(IntegerType()))
# vectorize the column smart_1_normalized and store the vector in a column named features
vecAssembler=VectorAssembler(inputCols=["failure"],outputCol="features")
# skip invalid values
new_df_failures=vecAssembler.setHandleInvalid('skip').transform(df_failures)
# lets take only the columns necessary for part a) i.e kmeans with respect to smart_1_normalized column
temp_df_failures=new_df_failures.select('date','serial_number','model','failure','features')
temp_df_failures.show()
temp_df_failures.printSchema()

temp_df_failures=temp_df_failures.where(F.col('features').isNotNull())
# temp_df=temp_df.select('features')
# temp_df.printSchema()

+----------+--------------+--------------------+-------+--------+
|      date| serial_number|               model|failure|features|
+----------+--------------+--------------------+-------+--------+
|2019-03-05|      Z305B2QN|         ST4000DM000|      0|   [0.0]|
|2019-03-05|      ZJV0XJQ4|       ST12000NM0007|      0|   [0.0]|
|2019-03-05|      ZJV0XJQ3|       ST12000NM0007|      0|   [0.0]|
|2019-03-05|      ZJV0XJQ0|       ST12000NM0007|      0|   [0.0]|
|2019-03-05|PL1331LAHG1S4H|HGST HMS5C4040ALE640|      0|   [0.0]|
|2019-03-05|      ZA16NQJR|        ST8000NM0055|      0|   [0.0]|
|2019-03-05|      ZJV02XWG|       ST12000NM0007|      0|   [0.0]|
|2019-03-05|      ZJV1CSVX|       ST12000NM0007|      0|   [0.0]|
|2019-03-05|      ZJV02XWA|       ST12000NM0007|      0|   [0.0]|
|2019-03-05|      ZA18CEBS|        ST8000NM0055|      0|   [0.0]|
|2019-03-05|      Z305DEMG|         ST4000DM000|      0|   [0.0]|
|2019-03-05|      ZA130TTW|         ST8000DM002|      0|   [0.0]|
|2019-03-0

In [94]:
# from pyspark.ml.clustering import KMeans

kmeans_failures = KMeans(k=49, seed=1)  # 2 clusters here
model_failures = kmeans_failures.fit(temp_df_failures.select('features'))
temp=100


In [97]:
transformed_failures=model_failures.transform(temp_df_failures)
transformed_failures.show()

+----------+--------------+--------------------+-------+--------+----------+
|      date| serial_number|               model|failure|features|prediction|
+----------+--------------+--------------------+-------+--------+----------+
|2019-03-05|      Z305B2QN|         ST4000DM000|      0|   [0.0]|         0|
|2019-03-05|      ZJV0XJQ4|       ST12000NM0007|      0|   [0.0]|         0|
|2019-03-05|      ZJV0XJQ3|       ST12000NM0007|      0|   [0.0]|         0|
|2019-03-05|      ZJV0XJQ0|       ST12000NM0007|      0|   [0.0]|         0|
|2019-03-05|PL1331LAHG1S4H|HGST HMS5C4040ALE640|      0|   [0.0]|         0|
|2019-03-05|      ZA16NQJR|        ST8000NM0055|      0|   [0.0]|         0|
|2019-03-05|      ZJV02XWG|       ST12000NM0007|      0|   [0.0]|         0|
|2019-03-05|      ZJV1CSVX|       ST12000NM0007|      0|   [0.0]|         0|
|2019-03-05|      ZJV02XWA|       ST12000NM0007|      0|   [0.0]|         0|
|2019-03-05|      ZA18CEBS|        ST8000NM0055|      0|   [0.0]|         0|

In [98]:
evaluator = ClusteringEvaluator()
#the silouette measure ranges from -1 to 1 , the closer it is to 1, the more similar are points within the cluster
# the closer it is to -1 the more dissimilar are the points within the cluster
silhouette = evaluator.evaluate(transformed_failures)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 1.0


In [99]:
# get the summary for a model
summary=model_failures.summary

In [102]:
# get the number of points in each cluster
# sparse number of points in a cluster means they're probably outliers
# from the results we see that the first cluster has > 99% of the data, hence the data in all the clusters other than
# the first cluster are outliers, infact no other cluster has any other element which shows how tight each of the points
# are
summary.clusterSizes


[9576602,
 444,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0,
 0]