In [None]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

# Load the dataset
df = pd.read_csv('feature_fraud.csv')

# get object columns
object_col=df.select_dtypes(include='object').columns.to_list()

# Apply LabelEncoder
encoder = LabelEncoder()
for i in object_col:
    df[i] = encoder.fit_transform(df[i])
    
df.to_csv('encoded_fraud.csv')

['merchant', 'category', 'city', 'state', 'job', 'dob', 'trans_num']

In [67]:
from pyspark.sql import SparkSession

# Initialize the Spark session
spark = SparkSession.builder.appName("Spark_BigData").getOrCreate()

# Load data (CSV, Parquet, etc.)
data = spark.read.csv("encoded_fraud.csv", header=True, inferSchema=True)

In [52]:
data.show()

+---+--------+--------+-------+----+-----+-------+---------+--------+---+---+---------+---------+-----------+--------+---------+-----------+----------+----------+------------+--------+---+-----------+
|_c0|merchant|category|    amt|city|state|    lat|     long|city_pop|job|dob|trans_num|merch_lat| merch_long|is_fraud|trans_day|trans_month|trans_year|trans_hour|trans_minute|dob_year|age|distance_km|
+---+--------+--------+-------+----+-----+-------+---------+--------+---+---+---------+---------+-----------+--------+---------+-----------+----------+----------+------------+--------+---+-----------+
|  0|     600|       3|  14.37| 166|    0|64.7556|-165.6723|     145|  1| 47|     9134|65.654142|-164.722603|       1|        4|          1|      2019|         0|          58|    1939| 80|   109.2856|
|  1|     486|      11| 966.11| 166|    0|64.7556|-165.6723|     145|  1| 47|     9247|65.468863|-165.473127|       1|        4|          1|      2019|        15|           6|    1939| 80|    79.8

In [86]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

# Feature selection: Select features for prediction (exclude target column)
feature_columns = [col for col in data.columns if col != "is_fraud"]

# Assemble all features into a single vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")

# Set up the pipeline
pipeline = Pipeline(stages=[assembler])

# Fit the model
pipeline_model = pipeline.fit(data)

# Make Predictions on the Test Data
processed_data = pipeline_model.transform(data)

# Split the data into training and test datasets: 70% train, 30%
train_data, test_data = processed_data.randomSplit([0.7, 0.3], seed=50)

In [87]:
train_data.show()

+---+--------+--------+-------+----+-----+-------+---------+--------+---+---+---------+---------+-----------+--------+---------+-----------+----------+----------+------------+--------+---+-----------+--------------------+
|_c0|merchant|category|    amt|city|state|    lat|     long|city_pop|job|dob|trans_num|merch_lat| merch_long|is_fraud|trans_day|trans_month|trans_year|trans_hour|trans_minute|dob_year|age|distance_km|            features|
+---+--------+--------+-------+----+-----+-------+---------+--------+---+---+---------+---------+-----------+--------+---------+-----------+----------+----------+------------+--------+---+-----------+--------------------+
|  0|     600|       3|  14.37| 166|    0|64.7556|-165.6723|     145|  1| 47|     9134|65.654142|-164.722603|       1|        4|          1|      2019|         0|          58|    1939| 80|   109.2856|[0.0,600.0,3.0,14...|
|  1|     486|      11| 966.11| 166|    0|64.7556|-165.6723|     145|  1| 47|     9247|65.468863|-165.473127|   

# Random Forest

In [100]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import time

#get the timestamp before inference in seconds
start_ts = time.time()

# Train the Random Forest Model on the Training Data
rf = RandomForestClassifier(featuresCol="features", labelCol="is_fraud", numTrees=100)

# Fit the model on the training data
rf_model = rf.fit(train_data)

# Make Predictions on the Test Data
predictions = rf_model.transform(test_data)

# Show some predictions
predictions.select("features", "is_fraud", "prediction").show(10)

# Step 8: Evaluate the Model
evaluator = MulticlassClassificationEvaluator(labelCol="is_fraud", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

print(f"Accuracy: {(accuracy*100):.3f}")

#get the timestamp after the inference in second
end_ts = time.time()

# print the time difference in between start and end timestamps in seconds
print(f"Prediction Time [s]: {(end_ts-start_ts):.3f}")

+--------------------+--------+----------+
|            features|is_fraud|prediction|
+--------------------+--------+----------+
|[9.0,142.0,8.0,84...|       1|       1.0|
|[14.0,200.0,11.0,...|       1|       1.0|
|[17.0,22.0,12.0,7...|       1|       1.0|
|[23.0,311.0,11.0,...|       1|       1.0|
|[29.0,531.0,12.0,...|       1|       1.0|
|[31.0,376.0,0.0,5...|       1|       1.0|
|[36.0,474.0,2.0,1...|       1|       1.0|
|[37.0,459.0,4.0,3...|       1|       1.0|
|[38.0,88.0,3.0,16...|       1|       1.0|
|[42.0,285.0,2.0,8...|       1|       1.0|
+--------------------+--------+----------+
only showing top 10 rows

Accuracy: 99.703
Prediction Time [s]: 2.865


# Logistic regression

In [None]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import col
import time

#get the timestamp before inference in seconds
start_ts = time.time()

# Train the Logistic Regression Model
lr = LogisticRegression(featuresCol="features", labelCol="is_fraud", maxIter=10, regParam=0.1)

# Fit the model on the training data
lr_model = lr.fit(train_data)

# Step 6: Make Predictions on the Test Data
predictions = lr_model.transform(test_data)

# Show some predictions
predictions.select("features", "is_fraud", "prediction").show(5)

# Evaluate the Model
evaluator_bin = BinaryClassificationEvaluator(labelCol="is_fraud", rawPredictionCol="prediction", metricName="areaUnderROC")
evaluator_class = MulticlassClassificationEvaluator(labelCol="is_fraud", predictionCol="prediction", metricName="accuracy")

#Confusion Matrix Calculation
# Create a confusion matrix by counting the number of True Positives, False Positives, 
# True Negatives, and False Negatives
tp = predictions.filter((col("prediction") == 1) & (col("is_fraud") == 1)).count()
tn = predictions.filter((col("prediction") == 0) & (col("is_fraud") == 0)).count()
fp = predictions.filter((col("prediction") == 1) & (col("is_fraud") == 0)).count()
fn = predictions.filter((col("prediction") == 0) & (col("is_fraud") == 1)).count()

# Print the confusion matrix
print("Confusion Matrix:")
print(f"True Positive (TP): {tp}")
print(f"False Negative (FN): {fn}")
print(f"False Positive (FP): {fp}")
print(f"True Negative (TN): {tn}")

roc_auc = evaluator_bin.evaluate(predictions)
accuracy = evaluator_class.evaluate(predictions)

print(f"Area Under ROC Curve: {(roc_auc*100):.3f}")
print(f"Accuracy: {(accuracy*100):.3f}")

# get the timestamp after the inference in second
end_ts = time.time()

# print the time difference in between start and end timestamps in seconds
print(f"Prediction Time [s]: {(end_ts-start_ts):.3f}")

+--------------------+--------+----------+
|            features|is_fraud|prediction|
+--------------------+--------+----------+
|[9.0,142.0,8.0,84...|       1|       1.0|
|[14.0,200.0,11.0,...|       1|       1.0|
|[17.0,22.0,12.0,7...|       1|       1.0|
|[23.0,311.0,11.0,...|       1|       1.0|
|[29.0,531.0,12.0,...|       1|       1.0|
+--------------------+--------+----------+
only showing top 5 rows

Confusion Matrix:
True Positive (TP): 274
False Negative (FN): 285
False Positive (FP): 10
True Negative (TN): 3813
Area Under ROC Curve: 74.377
Accuracy: 93.268
Prediction Time [s]: 2.312


# SVM

In [115]:
from pyspark.ml.classification import LinearSVC
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import col
import time

#get the timestamp before inference in seconds
start_ts = time.time()

# Train the LinearSVC (Support Vector Machine) Model
svm = LinearSVC(featuresCol="features", labelCol="is_fraud")

# Fit the model on the training data
svm_model = svm.fit(train_data)

# Make Predictions on the Test Data
predictions = svm_model.transform(test_data)

# Show some predictions
predictions.select("features", "is_fraud", "prediction").show(5)

# Evaluate the Model
# Using BinaryClassificationEvaluator to evaluate the model performance
evaluator = BinaryClassificationEvaluator(labelCol="is_fraud", rawPredictionCol="prediction", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions)

print(f"Area Under ROC Curve: {(roc_auc*100):.3f}")

# Confusion Matrix Calculation
# Create a confusion matrix by counting the number of True Positives, False Positives, 
# True Negatives, and False Negatives
tp = predictions.filter((col("prediction") == 1) & (col("is_fraud") == 1)).count()
tn = predictions.filter((col("prediction") == 0) & (col("is_fraud") == 0)).count()
fp = predictions.filter((col("prediction") == 1) & (col("is_fraud") == 0)).count()
fn = predictions.filter((col("prediction") == 0) & (col("is_fraud") == 1)).count()

# Print the confusion matrix
print("Confusion Matrix:")
print(f"True Positive (TP): {tp}")
print(f"True Negative (TN): {tn}")
print(f"False Positive (FP): {fp}")
print(f"False Negative (FN): {fn}")

# Optionally, you can calculate accuracy, precision, recall, and F1 score from the confusion matrix
accuracy = (tp + tn) / (tp + tn + fp + fn)

print(f"Accuracy: {(accuracy*100):.3f}%")

# get the timestamp after the inference in second
end_ts = time.time()

# print the time difference in between start and end timestamps in seconds
print(f"Prediction Time [s]: {(end_ts-start_ts):.3f}")

+--------------------+--------+----------+
|            features|is_fraud|prediction|
+--------------------+--------+----------+
|[0.0,600.0,3.0,14...|       1|       1.0|
|[1.0,486.0,11.0,9...|       1|       1.0|
|[4.0,180.0,5.0,18...|       1|       1.0|
|[21.0,310.0,12.0,...|       1|       1.0|
|[24.0,309.0,4.0,3...|       1|       1.0|
+--------------------+--------+----------+
only showing top 5 rows

Area Under ROC Curve: 97.785
Confusion Matrix:
True Positive (TP): 509
True Negative (TN): 3765
False Positive (FP): 4
False Negative (FN): 23
Accuracy: 99.372%
Prediction Time [s]: 8.602


# Naive Bayes

In [109]:
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import col
import time

#get the timestamp before inference in seconds
start_ts = time.time()

# Train the Naive Bayes Model
nb = NaiveBayes(featuresCol="features", labelCol="is_fraud", modelType="multinomial")

# Fit the model on the training data
nb_model = nb.fit(train_data)

# Step 6: Make Predictions on the Test Data
predictions = nb_model.transform(test_data)

# Show some predictions
predictions.select("features", "is_fraud", "prediction").show(5)

# Step 7: Evaluate the Model
evaluator = BinaryClassificationEvaluator(labelCol="is_fraud", rawPredictionCol="prediction", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions)

print(f"Area Under ROC Curve: {roc_auc}")

# Optional: You can also calculate accuracy, precision, recall, and F1 score if needed
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Calculate accuracy
evaluator_acc = MulticlassClassificationEvaluator(labelCol="is_fraud", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_acc.evaluate(predictions)
print(f"Accuracy: {(accuracy*100):.3f}")

# get the timestamp after the inference in second
end_ts = time.time()

# print the time difference in between start and end timestamps in seconds
print(f"Prediction Time [s]: {(end_ts-start_ts):.3f}")

Py4JJavaError: An error occurred while calling o4279.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 460.0 failed 1 times, most recent failure: Lost task 0.0 in stage 460.0 (TID 459) (WinniePC22 executor driver): java.lang.RuntimeException: Vector values MUST NOT be Negative, NaN or Infinity, but got [0.0,600.0,3.0,14.37,166.0,0.0,64.7556,-165.6723,145.0,1.0,47.0,9134.0,65.654142,-164.722603,4.0,1.0,2019.0,0.0,58.0,1939.0,80.0,109.2856]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:92)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:880)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:880)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.RuntimeException: Vector values MUST NOT be Negative, NaN or Infinity, but got [0.0,600.0,3.0,14.37,166.0,0.0,64.7556,-165.6723,145.0,1.0,47.0,9134.0,65.654142,-164.722603,4.0,1.0,2019.0,0.0,58.0,1939.0,80.0,109.2856]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:92)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:880)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:880)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)


In [110]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col

# Step 1: Set up SparkSession
spark = SparkSession.builder.appName("NaiveBayesExample").getOrCreate()

# Step 2: Load the data (Example: CSV file)
data = spark.read.csv("encoded_fraud.csv", header=True, inferSchema=True)

# Step 3: Preprocessing
# Assuming 'is_fraud' is the target column (fraud = 1, non-fraud = 0)
# Select feature columns (excluding the target column)
feature_columns = [col for col in data.columns if col != "is_fraud"]

# Assemble features into a single vector column 'features'
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")
data = assembler.transform(data)

# Step 4: Split the data into training and test sets
train_data, test_data = data.randomSplit([0.7, 0.3], seed=1234)

# Step 5: Train the Naive Bayes Model
nb = NaiveBayes(featuresCol="features", labelCol="is_fraud", modelType="multinomial")

# Fit the model on the training data
nb_model = nb.fit(train_data)

# Step 6: Make Predictions on the Test Data
predictions = nb_model.transform(test_data)

# Show some predictions
predictions.select("features", "is_fraud", "prediction").show(5)

# Step 7: Evaluate the Model
evaluator = BinaryClassificationEvaluator(labelCol="is_fraud", rawPredictionCol="prediction", metricName="areaUnderROC")
roc_auc = evaluator.evaluate(predictions)

print(f"Area Under ROC Curve: {roc_auc}")

# Optional: You can also calculate accuracy, precision, recall, and F1 score if needed
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# Calculate accuracy
evaluator_acc = MulticlassClassificationEvaluator(labelCol="is_fraud", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator_acc.evaluate(predictions)
print(f"Accuracy: {accuracy}")


Py4JJavaError: An error occurred while calling o4338.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 463.0 failed 1 times, most recent failure: Lost task 0.0 in stage 463.0 (TID 462) (WinniePC22 executor driver): java.lang.RuntimeException: Vector values MUST NOT be Negative, NaN or Infinity, but got [2.0,674.0,9.0,49.61,166.0,0.0,64.7556,-165.6723,145.0,1.0,47.0,7511.0,65.347667,-165.914542,4.0,1.0,2019.0,22.0,37.0,1939.0,80.0,66.8079]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:92)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:880)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:880)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.RuntimeException: Vector values MUST NOT be Negative, NaN or Infinity, but got [2.0,674.0,9.0,49.61,166.0,0.0,64.7556,-165.6723,145.0,1.0,47.0,7511.0,65.347667,-165.914542,4.0,1.0,2019.0,22.0,37.0,1939.0,80.0,66.8079]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:92)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:880)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:880)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
