In [21]:
import findspark
import os
os.environ['PATH'].split(';')


findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import pyspark.sql.functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StandardScaler
from pyspark.ml.clustering import KMeans


spark = SparkSession.builder.appName("midterm").getOrCreate()

df = spark.read .format("csv").option("header", "true").load("drive_stats_2019_Q1/*.csv")

drive_days = df.select('model').groupBy('model').count().withColumnRenamed('count', 'drive_days')

failures = df.select('model').filter(df.failure == 1).groupBy('model').count().withColumnRenamed('count', 'failures')

model_count = df.select('model').filter(df.date=='2019-03-31').groupBy('model').count().alias("count")

result = drive_days.join(failures, ["model"]).join(model_count, ["model"])
result = result.withColumn("annual_failure_rate", 100.0 * (1.0 * F.col("failures")) / ( (F.col("drive_days") / 365.0)))
df_temp = result.select("annual_failure_rate")
print(df_temp.columns)

feat_cols = [ 'annual_failure_rate']

for column in feat_cols:
    df_temp = df_temp.withColumn(column,df_temp[column].cast(IntegerType()))

# df_temp.show(100)
df_temp.dropna()
vec_assembler = VectorAssembler(inputCols = feat_cols, outputCol='features')

final_data = vec_assembler.setHandleInvalid("skip").transform(df_temp)
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)
scalerModel = scaler.fit(final_data)
cluster_final_data = scalerModel.transform(final_data)
kmeans2 = KMeans(featuresCol='scaledFeatures',k=3)
model_k2 = kmeans2.fit(cluster_final_data)

centers = model_k2.clusterCenters()
centers = [center.tolist() for center in centers]
print(centers)

['annual_failure_rate']
[[0.45804560998893407], [4.512597491002092], [0.03619125807319973]]


In [28]:
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from scipy.spatial import distance

df_preds2 = model_k2.transform(cluster_final_data).orderBy("prediction")

distance_udf = F.udf(lambda x,y: float(distance.euclidean(x, centers[y])), FloatType())
df_preds2 = df_preds2.withColumn('distance', distance_udf(F.col('scaledFeatures'),F.col('prediction')))
df_preds2 = df_preds2.orderBy("prediction").orderBy(F.desc("distance"))
# df_preds2.show()
# q = df_preds2.approxQuantile("distance", [0.5], 1)
# q

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col

window = Window.partitionBy(df_preds2['prediction']).orderBy(df_preds2['distance'].desc())
threshold = 10
df_preds2.select('annual_failure_rate','prediction','distance', rank().over(window).alias('rank')) .filter(F.col('rank') <= threshold) .show(50) 

+-------------------+----------+------------+----+
|annual_failure_rate|prediction|    distance|rank|
+-------------------+----------+------------+----+
|                133|         1|         0.0|   1|
|                  3|         2| 0.065596655|   1|
|                  3|         2| 0.065596655|   1|
|                  0|         2|  0.03619126|   3|
|                  0|         2|  0.03619126|   3|
|                  0|         2|  0.03619126|   3|
|                  0|         2|  0.03619126|   3|
|                  0|         2|  0.03619126|   3|
|                  2|         2|  0.03166735|   8|
|                  2|         2|  0.03166735|   8|
|                  1|         2|0.0022619537|  10|
|                  1|         2|0.0022619537|  10|
|                  1|         2|0.0022619537|  10|
|                  1|         2|0.0022619537|  10|
|                  1|         2|0.0022619537|  10|
|                  1|         2|0.0022619537|  10|
|                 22|         0