<h1>Question 3</h1>
<br>In Spark ML given the hard drive logs for 2019 Q1, implement a point anomaly detector for: 
<br>a) Annualized Failure Rate (by model) 
<br>b) Normalized Read Error Rate, SMART attribute 1. 
<br>- For generating training labels, use        a) 2%           b) 100 

<h3>Question 3(a):</h3>

In [1]:
#Starting the spark session
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import numpy as np
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from scipy.spatial import distance

#starting the spark session
spark = SparkSession.builder.getOrCreate()

In [2]:
#Load the data from different csv files to a single csv and making a datarame
pre_final= spark.read.format("csv").option("header","true").load(r"C:\Users\myste\Documents\MSCS\Spring 2020\Big Data\data_Q2_SP20\*.csv")
pre_final.createOrReplaceTempView("drive_stats")

In [3]:
#creating a table for drive days
drive_days_df=spark.sql("SELECT model, count(*) AS drive_days FROM drive_stats GROUP BY model")
drive_days_df.createOrReplaceTempView("drive_days")
drive_days_df.show()

+--------------------+----------+
|               model|drive_days|
+--------------------+----------+
|         ST4000DM000|   1989429|
|       ST12000NM0007|   2955025|
|         ST8000DM005|      2250|
|          ST320LT007|        85|
| TOSHIBA MQ01ABF050M|     32624|
|        ST8000NM0055|   1294451|
|Seagate BarraCuda...|       265|
| TOSHIBA MG07ACA14TA|    109404|
|        WDC WD60EFRX|     30523|
|         ST8000DM002|    888741|
|         ST4000DM005|      4848|
|         DELLBOSS VD|       540|
|HGST HUS726040ALE610|      2598|
|     TOSHIBA HDWF180|      1798|
|HGST HMS5C4040ALE640|    313383|
|HGST HUH721010ALE600|      1245|
| TOSHIBA MD04ABA500V|      4050|
| TOSHIBA MD04ABA400V|     12662|
|       ST10000NM0086|    108555|
|      WDC WD2500AAJS|        88|
+--------------------+----------+
only showing top 20 rows



In [4]:
#creating a table for hard drive failures
failures_df=spark.sql("SELECT model, count(*) AS failures FROM drive_stats WHERE failure = 1 GROUP BY model")
failures_df.createOrReplaceTempView("failures")
failures_df.show()

+--------------------+--------+
|               model|failures|
+--------------------+--------+
|         ST4000DM000|     107|
|       ST12000NM0007|     180|
| TOSHIBA MQ01ABF050M|       3|
|        ST8000NM0055|      58|
|        WDC WD60EFRX|       1|
| TOSHIBA MG07ACA14TA|       1|
|         ST8000DM002|      29|
|HGST HMS5C4040ALE640|       2|
|       ST10000NM0086|       3|
|  TOSHIBA MQ01ABF050|      14|
|HGST HMS5C4040BLE640|      11|
|         ST6000DX000|       1|
|      WDC WD5000LPVX|       2|
|          ST500LM030|       9|
|       ST500LM012 HN|      12|
|HGST HUH728080ALE600|       3|
|HGST HUH721212ALE600|       1|
|      WDC WD5000LPCX|       2|
|         ST8000DM004|       1|
|HGST HUH721212ALN604|       4|
+--------------------+--------+



In [5]:
#creating a table for model count
model_count_df=spark.sql("SELECT model, count(*) AS count FROM drive_stats WHERE date = '2019-03-31' GROUP BY model")
model_count_df.createOrReplaceTempView("model_count")
model_count_df.show()

+--------------------+-----+
|               model|count|
+--------------------+-----+
|         ST4000DM000|19785|
|       ST12000NM0007|34708|
|         ST8000DM005|   25|
| TOSHIBA MQ01ABF050M|  377|
|        ST8000NM0055|14381|
|Seagate BarraCuda...|    3|
| TOSHIBA MG07ACA14TA| 1220|
|        WDC WD60EFRX|   89|
|         ST8000DM002| 9874|
|         ST4000DM005|   43|
|         DELLBOSS VD|   20|
|HGST HUS726040ALE610|   28|
|     TOSHIBA HDWF180|   20|
|HGST HMS5C4040ALE640| 2557|
|HGST HUH721010ALE600|   17|
| TOSHIBA MD04ABA500V|   45|
| TOSHIBA MD04ABA400V|   99|
|       ST10000NM0086| 1203|
|  TOSHIBA MQ01ABF050|  515|
|          ST500LM021|   14|
+--------------------+-----+
only showing top 20 rows



In [6]:
#creating a table for failure rates that shows the annual failures rates for the drives
failure_rates_df=spark.sql("SELECT drive_days.model AS model, drive_days.drive_days AS drive_days, failures.failures AS failures, 100.0 * (1.0 * failures) / (drive_days / 365.0) AS annual_failure_rate FROM drive_days, failures, model_count WHERE drive_days.model = failures.model AND model_count.model = failures.model ORDER BY model").na.drop()
failure_rates_df.createOrReplaceTempView("failure_rates")
failure_rates_df.show()

+--------------------+----------+--------+-------------------+
|               model|drive_days|failures|annual_failure_rate|
+--------------------+----------+--------+-------------------+
|HGST HMS5C4040ALE640|    313383|       2|           0.232942|
|HGST HMS5C4040BLE640|   1172824|      11|           0.342336|
|HGST HUH721212ALE600|     14040|       1|           2.599715|
|HGST HUH721212ALN604|    259749|       4|           0.562081|
|HGST HUH728080ALE600|     93598|       3|           1.169897|
|       ST10000NM0086|    108555|       3|           1.008705|
|       ST12000NM0007|   2955025|     180|           2.223331|
|         ST4000DM000|   1989429|     107|           1.963126|
|       ST500LM012 HN|     50619|      12|           8.652877|
|          ST500LM030|     14479|       9|          22.688031|
|         ST6000DX000|    135832|       1|           0.268714|
|         ST8000DM002|    888741|      29|           1.191011|
|         ST8000DM004|       273|       1|         133.

In [7]:
#making a vector from the dataframe given
failure_vector= VectorAssembler(inputCols=["annual_failure_rate"], outputCol="AFR")
failure_predict = failure_vector.transform(failure_rates_df)
failure_predict.show()

+--------------------+----------+--------+-------------------+-----------+
|               model|drive_days|failures|annual_failure_rate|        AFR|
+--------------------+----------+--------+-------------------+-----------+
|HGST HMS5C4040ALE640|    313383|       2|           0.232942| [0.232942]|
|HGST HMS5C4040BLE640|   1172824|      11|           0.342336| [0.342336]|
|HGST HUH721212ALE600|     14040|       1|           2.599715| [2.599715]|
|HGST HUH721212ALN604|    259749|       4|           0.562081| [0.562081]|
|HGST HUH728080ALE600|     93598|       3|           1.169897| [1.169897]|
|       ST10000NM0086|    108555|       3|           1.008705| [1.008705]|
|       ST12000NM0007|   2955025|     180|           2.223331| [2.223331]|
|         ST4000DM000|   1989429|     107|           1.963126| [1.963126]|
|       ST500LM012 HN|     50619|      12|           8.652877| [8.652877]|
|          ST500LM030|     14479|       9|          22.688031|[22.688031]|
|         ST6000DX000|   

In [8]:
#making the prediction using k means
kmeans = KMeans().setK(4).setSeed(1).setFeaturesCol("AFR")
model = kmeans.fit(failure_predict)

In [9]:
#finding and printing the cluster centres
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[1.42660613]
[133.69967]
[22.688031]
[11.456597]


In [10]:
#displaying the results of the predicted clusters
transformed_df = model.transform(failure_predict).orderBy("annual_failure_rate")
transformed_df.show()

+--------------------+----------+--------+-------------------+-----------+----------+
|               model|drive_days|failures|annual_failure_rate|        AFR|prediction|
+--------------------+----------+--------+-------------------+-----------+----------+
|HGST HMS5C4040ALE640|    313383|       2|           0.232942| [0.232942]|         0|
|         ST6000DX000|    135832|       1|           0.268714| [0.268714]|         0|
| TOSHIBA MG07ACA14TA|    109404|       1|           0.333626| [0.333626]|         0|
|HGST HMS5C4040BLE640|   1172824|      11|           0.342336| [0.342336]|         0|
|HGST HUH721212ALN604|    259749|       4|           0.562081| [0.562081]|         0|
|       ST10000NM0086|    108555|       3|           1.008705| [1.008705]|         0|
|HGST HUH728080ALE600|     93598|       3|           1.169897| [1.169897]|         0|
|         ST8000DM002|    888741|      29|           1.191011| [1.191011]|         0|
|        WDC WD60EFRX|     30523|       1|           1

In [11]:
#Calculating the distance from the cluster centers
distance_udf = udf(lambda x,y: float(distance.euclidean(x, centers[y])), FloatType())
transformed_df = transformed_df.withColumn("distance", distance_udf(col("AFR"),col("prediction")))
transformed_df.show()

+--------------------+----------+--------+-------------------+-----------+----------+----------+
|               model|drive_days|failures|annual_failure_rate|        AFR|prediction|  distance|
+--------------------+----------+--------+-------------------+-----------+----------+----------+
|HGST HMS5C4040ALE640|    313383|       2|           0.232942| [0.232942]|         0| 1.1936641|
|         ST6000DX000|    135832|       1|           0.268714| [0.268714]|         0| 1.1578921|
| TOSHIBA MG07ACA14TA|    109404|       1|           0.333626| [0.333626]|         0| 1.0929801|
|HGST HMS5C4040BLE640|   1172824|      11|           0.342336| [0.342336]|         0| 1.0842701|
|HGST HUH721212ALN604|    259749|       4|           0.562081| [0.562081]|         0|0.86452514|
|       ST10000NM0086|    108555|       3|           1.008705| [1.008705]|         0|0.41790113|
|HGST HUH728080ALE600|     93598|       3|           1.169897| [1.169897]|         0|0.25670913|
|         ST8000DM002|    8887

In [12]:
#shows that all the values that are over the threshold. (outliars)
filter_udf = udf(lambda x: float(centers[x]+0.02*centers[x]), FloatType())
transformed_df.filter(col("distance")>=filter_udf(col("prediction"))).show()

+-------------------+----------+--------+-------------------+----------+----------+---------+
|              model|drive_days|failures|annual_failure_rate|       AFR|prediction| distance|
+-------------------+----------+--------+-------------------+----------+----------+---------+
|     WDC WD5000LPVX|     22015|       2|           3.315921|[3.315921]|         0|1.8893149|
|TOSHIBA MQ01ABF050M|     32624|       3|           3.356425|[3.356425]|         0|1.9298189|
+-------------------+----------+--------+-------------------+----------+----------+---------+



<h3>Question 3(b):</h3>

In [13]:
#making the table with the values 
smart_df=pre_final.select("smart_1_normalized").na.drop()
smart_df=smart_df.withColumn("smart_1_normalized",smart_df["smart_1_normalized"].cast("float"))
smart_df.createOrReplaceTempView("smart")
smart_df.show()

+------------------+
|smart_1_normalized|
+------------------+
|             117.0|
|              80.0|
|              83.0|
|              81.0|
|             100.0|
|              75.0|
|              83.0|
|              83.0|
|              78.0|
|              77.0|
|             117.0|
|              81.0|
|              74.0|
|              80.0|
|              78.0|
|             100.0|
|             100.0|
|             100.0|
|             100.0|
|             100.0|
+------------------+
only showing top 20 rows



In [14]:
#Vectorized values of the column that has to be evaluated 
smart_vector= VectorAssembler(inputCols=["smart_1_normalized"], outputCol="smart_value")
smart_predict = smart_vector.transform(smart_df)
smart_predict.show()

+------------------+-----------+
|smart_1_normalized|smart_value|
+------------------+-----------+
|             117.0|    [117.0]|
|              80.0|     [80.0]|
|              83.0|     [83.0]|
|              81.0|     [81.0]|
|             100.0|    [100.0]|
|              75.0|     [75.0]|
|              83.0|     [83.0]|
|              83.0|     [83.0]|
|              78.0|     [78.0]|
|              77.0|     [77.0]|
|             117.0|    [117.0]|
|              81.0|     [81.0]|
|              74.0|     [74.0]|
|              80.0|     [80.0]|
|              78.0|     [78.0]|
|             100.0|    [100.0]|
|             100.0|    [100.0]|
|             100.0|    [100.0]|
|             100.0|    [100.0]|
|             100.0|    [100.0]|
+------------------+-----------+
only showing top 20 rows



In [15]:
#prediction by clustering algorithm k-means
kmeans_3b = KMeans().setK(4).setSeed(1).setFeaturesCol("smart_value")
model_3b = kmeans_3b.fit(smart_predict)

In [16]:
#finding and printing the cluster centres
centers_3b = model_3b.clusterCenters()
centers_3b.sort()
print("Cluster Centers: ")
for center in centers_3b:
    print(center)

Cluster Centers: 
[79.80602865]
[100.38512109]
[116.09234413]
[199.99904928]


In [17]:
#showing the predicted cluster values
smart_transformed = model_3b.transform(smart_predict)
smart_transformed.show()

+------------------+-----------+----------+
|smart_1_normalized|smart_value|prediction|
+------------------+-----------+----------+
|             117.0|    [117.0]|         2|
|              80.0|     [80.0]|         1|
|              83.0|     [83.0]|         1|
|              81.0|     [81.0]|         1|
|             100.0|    [100.0]|         0|
|              75.0|     [75.0]|         1|
|              83.0|     [83.0]|         1|
|              83.0|     [83.0]|         1|
|              78.0|     [78.0]|         1|
|              77.0|     [77.0]|         1|
|             117.0|    [117.0]|         2|
|              81.0|     [81.0]|         1|
|              74.0|     [74.0]|         1|
|              80.0|     [80.0]|         1|
|              78.0|     [78.0]|         1|
|             100.0|    [100.0]|         0|
|             100.0|    [100.0]|         0|
|             100.0|    [100.0]|         0|
|             100.0|    [100.0]|         0|
|             100.0|    [100.0]|

In [18]:
#finding the distances from the cluster centers
distance_udf = udf(lambda x,y: float(distance.euclidean(x, centers_3b[y])), FloatType())
smart_transformed = smart_transformed.withColumn("distance", distance_udf(col("smart_value"),col("prediction")))
smart_transformed.show()

+------------------+-----------+----------+---------+
|smart_1_normalized|smart_value|prediction| distance|
+------------------+-----------+----------+---------+
|             117.0|    [117.0]|         2|0.9076559|
|              80.0|     [80.0]|         1| 20.38512|
|              83.0|     [83.0]|         1| 17.38512|
|              81.0|     [81.0]|         1| 19.38512|
|             100.0|    [100.0]|         0|20.193972|
|              75.0|     [75.0]|         1| 25.38512|
|              83.0|     [83.0]|         1| 17.38512|
|              83.0|     [83.0]|         1| 17.38512|
|              78.0|     [78.0]|         1| 22.38512|
|              77.0|     [77.0]|         1| 23.38512|
|             117.0|    [117.0]|         2|0.9076559|
|              81.0|     [81.0]|         1| 19.38512|
|              74.0|     [74.0]|         1| 26.38512|
|              80.0|     [80.0]|         1| 20.38512|
|              78.0|     [78.0]|         1| 22.38512|
|             100.0|    [100

In [19]:
#shows that all the values that are over the threshold. (outliars)
filter_udf = udf(lambda x: float(centers_3b[x]+100.0), FloatType())
smart_transformed = smart_transformed.filter(col("distance")>=filter_udf(col("prediction")))

In [20]:
'''The filter function is running but the show function is showing an error due to memory 
problems in jupyter and databricks.com. But the logic is same as 3a which seems to be correct.'''
smart_transformed.show(5)

Py4JJavaError: An error occurred while calling o282.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 194.0 failed 1 times, most recent failure: Lost task 1.0 in stage 194.0 (TID 5529, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark\spark-2.4.5-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 362, in main
  File "C:\spark\spark-2.4.5-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 722, in read_int
    length = stream.read(4)
  File "C:\Users\myste\AppData\Local\Programs\Python\Python37\lib\socket.py", line 589, in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark\spark-2.4.5-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\worker.py", line 362, in main
  File "C:\spark\spark-2.4.5-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\serializers.py", line 722, in read_int
    length = stream.read(4)
  File "C:\Users\myste\AppData\Local\Programs\Python\Python37\lib\socket.py", line 589, in readinto
    return self._sock.recv_into(b)
socket.timeout: timed out

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
