In [1]:
!pip3 install findspark



In [2]:
import findspark

In [3]:
findspark.init()

In [4]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName('anamoly_detection').getOrCreate()

In [6]:
data = spark.read.format('csv').options(header='true',inferschema='true').load("drive_stats_2019_Q1/*.csv")

In [7]:
data.createOrReplaceTempView("query_data")

# Preparing the data : Calculating Annualized failure rate and normalized read error rate , smart attribute 1

In [8]:
failure_bymodel = spark.sql("SELECT model, count(*) AS failures "+
                                "FROM query_data "+
                                "WHERE failure = 1 "+
                                "GROUP BY model")
failure_bymodel.show(5)

+-------------------+--------+
|              model|failures|
+-------------------+--------+
|        ST4000DM000|     107|
|      ST12000NM0007|     180|
|TOSHIBA MQ01ABF050M|       3|
|       ST8000NM0055|      58|
|       WDC WD60EFRX|       1|
+-------------------+--------+
only showing top 5 rows



In [9]:
total_datacount_bymodel = spark.sql("SELECT model, count(*) AS count "+
                                "FROM query_data "+
                                "WHERE date = '2019-03-31' "+
                                "GROUP BY model")
total_datacount_bymodel.show(5)

+-------------------+-----+
|              model|count|
+-------------------+-----+
|        ST4000DM000|19785|
|      ST12000NM0007|34708|
|        ST8000DM005|   25|
|TOSHIBA MQ01ABF050M|  377|
|       ST8000NM0055|14381|
+-------------------+-----+
only showing top 5 rows



In [10]:
drivedays = spark.sql("SELECT model, count(*) AS drive_days "+
                                "FROM query_data "+
                                "GROUP BY model")
drivedays.show(5)


+-------------------+----------+
|              model|drive_days|
+-------------------+----------+
|        ST4000DM000|   1989429|
|      ST12000NM0007|   2955025|
|        ST8000DM005|      2250|
|         ST320LT007|        85|
|TOSHIBA MQ01ABF050M|     32624|
+-------------------+----------+
only showing top 5 rows



In [11]:
total_datacount_bymodel.registerTempTable("model_count")
failure_bymodel.registerTempTable("model_failures")
drivedays.registerTempTable("drivedays")


failure_rates = spark.sql("SELECT drivedays.model AS model, "+ 
                                "drivedays.drive_days AS drivedays, "+
                                "model_failures.failures AS failures, "+
                                "100.0 * (1.0 * failures) / (drive_days / 365.0) AS annual_failure_rate "+
                                "FROM drivedays, model_failures, model_count "+
                                "WHERE drivedays.model = model_failures.model "+
                                "AND model_count.model = model_failures.model "+
                                "ORDER BY model")

failure_rates.show(5)

+--------------------+---------+--------+-------------------+
|               model|drivedays|failures|annual_failure_rate|
+--------------------+---------+--------+-------------------+
|HGST HMS5C4040ALE640|   313383|       2|           0.232942|
|HGST HMS5C4040BLE640|  1172824|      11|           0.342336|
|HGST HUH721212ALE600|    14040|       1|           2.599715|
|HGST HUH721212ALN604|   259749|       4|           0.562081|
|HGST HUH728080ALE600|    93598|       3|           1.169897|
+--------------------+---------+--------+-------------------+
only showing top 5 rows



# Creating Annualized_failure_rate model to detect anamoly

In [12]:
def add_label(value):
    if value > 2:
        return 1
    else:
        return 0
    return 0

In [13]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
udfadd_label = udf(add_label, IntegerType())
df_failure_with_label = failure_rates.withColumn("label", udfadd_label("annual_failure_rate"))
df_failure_with_label.show()

+--------------------+---------+--------+-------------------+-----+
|               model|drivedays|failures|annual_failure_rate|label|
+--------------------+---------+--------+-------------------+-----+
|HGST HMS5C4040ALE640|   313383|       2|           0.232942|    0|
|HGST HMS5C4040BLE640|  1172824|      11|           0.342336|    0|
|HGST HUH721212ALE600|    14040|       1|           2.599715|    1|
|HGST HUH721212ALN604|   259749|       4|           0.562081|    0|
|HGST HUH728080ALE600|    93598|       3|           1.169897|    0|
|       ST10000NM0086|   108555|       3|           1.008705|    0|
|       ST12000NM0007|  2955025|     180|           2.223331|    1|
|         ST4000DM000|  1989429|     107|           1.963126|    0|
|       ST500LM012 HN|    50619|      12|           8.652877|    1|
|          ST500LM030|    14479|       9|          22.688031|    1|
|         ST6000DX000|   135832|       1|           0.268714|    0|
|         ST8000DM002|   888741|      29|       

In [14]:
#converting into features and adding labels :  label = 0 if annual_failure_rate < 2 else 1
from pyspark.ml.feature import VectorAssembler
feature_conv = VectorAssembler(inputCols=["annual_failure_rate"], outputCol="features")
df_failure = feature_conv.transform(df_failure_with_label)
df_failure.show()

+--------------------+---------+--------+-------------------+-----+-----------+
|               model|drivedays|failures|annual_failure_rate|label|   features|
+--------------------+---------+--------+-------------------+-----+-----------+
|HGST HMS5C4040ALE640|   313383|       2|           0.232942|    0| [0.232942]|
|HGST HMS5C4040BLE640|  1172824|      11|           0.342336|    0| [0.342336]|
|HGST HUH721212ALE600|    14040|       1|           2.599715|    1| [2.599715]|
|HGST HUH721212ALN604|   259749|       4|           0.562081|    0| [0.562081]|
|HGST HUH728080ALE600|    93598|       3|           1.169897|    0| [1.169897]|
|       ST10000NM0086|   108555|       3|           1.008705|    0| [1.008705]|
|       ST12000NM0007|  2955025|     180|           2.223331|    1| [2.223331]|
|         ST4000DM000|  1989429|     107|           1.963126|    0| [1.963126]|
|       ST500LM012 HN|    50619|      12|           8.652877|    1| [8.652877]|
|          ST500LM030|    14479|       9

In [15]:
df_failure.printSchema()

root
 |-- model: string (nullable = true)
 |-- drivedays: long (nullable = false)
 |-- failures: long (nullable = false)
 |-- annual_failure_rate: decimal(38,6) (nullable = true)
 |-- label: integer (nullable = true)
 |-- features: vector (nullable = true)



In [16]:
#splitting the data into train , test and validation
train, test, validation = df_failure.randomSplit([0.75, 0.15, 0.10], seed=12345)
train.count()
#test.count()
#validation.count()

18

In [17]:
#Creating the model using Linear SVM

from pyspark.ml.classification import LinearSVC

lsvc = LinearSVC(maxIter=10, regParam=0.1)
# Fit the model
lsvcModel = lsvc.fit(train)

In [18]:
lsvcModel.coefficients


DenseVector([0.0261])

In [19]:
lsvcModel.intercept

-1.0089077960181476

In [20]:
predict = lsvcModel.transform(test)
#predict.select("prediction").show()
predict.select("label","prediction").show()

+-----+----------+
|label|prediction|
+-----+----------+
|    0|       0.0|
|    1|       0.0|
+-----+----------+



# Normalized Read error rate model

In [121]:
normalized_features = data.select("smart_1_normalized","model").filter(data["smart_1_normalized"].isNotNull())
normalized_features.show()



+------------------+--------------------+
|smart_1_normalized|               model|
+------------------+--------------------+
|               117|         ST4000DM000|
|                80|       ST12000NM0007|
|                83|       ST12000NM0007|
|                81|       ST12000NM0007|
|               100|HGST HMS5C4040ALE640|
|                75|        ST8000NM0055|
|                83|       ST12000NM0007|
|                83|       ST12000NM0007|
|                78|       ST12000NM0007|
|                77|        ST8000NM0055|
|               117|         ST4000DM000|
|                81|         ST8000DM002|
|                74|       ST12000NM0007|
|                80|        ST8000NM0055|
|                78|       ST12000NM0007|
|               100|HGST HMS5C4040ALE640|
|               100|HGST HMS5C4040BLE640|
|               100|HGST HMS5C4040BLE640|
|               100| TOSHIBA MG07ACA14TA|
|               100|HGST HMS5C4040BLE640|
+------------------+--------------

In [127]:
def add_label_b(value):
    if value > 100:
        return 1
    else:
        return 0
    return 0

In [132]:
udfadd_label = udf(add_label_b, IntegerType())
df_normalized_rate_with_label = normalized_features.withColumn("label", udfadd_label("smart_1_normalized"))
df_normalized_rate_with_label.show()

+------------------+--------------------+-----+
|smart_1_normalized|               model|label|
+------------------+--------------------+-----+
|               117|         ST4000DM000|    1|
|                80|       ST12000NM0007|    0|
|                83|       ST12000NM0007|    0|
|                81|       ST12000NM0007|    0|
|               100|HGST HMS5C4040ALE640|    0|
|                75|        ST8000NM0055|    0|
|                83|       ST12000NM0007|    0|
|                83|       ST12000NM0007|    0|
|                78|       ST12000NM0007|    0|
|                77|        ST8000NM0055|    0|
|               117|         ST4000DM000|    1|
|                81|         ST8000DM002|    0|
|                74|       ST12000NM0007|    0|
|                80|        ST8000NM0055|    0|
|                78|       ST12000NM0007|    0|
|               100|HGST HMS5C4040ALE640|    0|
|               100|HGST HMS5C4040BLE640|    0|
|               100|HGST HMS5C4040BLE640

In [133]:
vecAssembler = VectorAssembler(inputCols=["smart_1_normalized"], outputCol="features")
normalized_feat_DF = vecAssembler.transform(df_normalized_rate_with_label)
#normalized_feat_DF.printSchema()

In [134]:
train_s, test_s, validation_s = normalized_feat_DF.randomSplit([0.75, 0.15, 0.10], seed=12345)
train_s.count()
#test_s.count()
#validation_s.count()

7181826

In [135]:
lsvc = LinearSVC(maxIter=10, regParam=0.1)
# Fit the model
lsvcModel = lsvc.fit(train_s)

In [136]:
lsvcModel.coefficients

DenseVector([0.004])

In [137]:
lsvcModel.intercept

-1.3827678185296213

In [138]:
predict = lsvcModel.transform(test_s)
#predict.select("prediction").show()
predict.select("label","prediction").show()

+-----+----------+
|label|prediction|
+-----+----------+
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
|    0|       0.0|
+-----+----------+
only showing top 20 rows

