In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from xgboost.spark import SparkXGBClassifier
# from xgboost import XGBoostClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import col
from sklearn.metrics import f1_score, recall_score, classification_report

In [2]:
def init_spark():
    spark = SparkSession.builder.appName("Python Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()
    return spark
spark_object = init_spark()

In [3]:
train_spark = spark_object.read.option("delimiter", ",").csv("./data/trainbalanced.csv", header='true', inferSchema='true')

In [4]:
for i in train_spark.columns:
    oldname = i
    if "." in oldname:
        newname = oldname.replace(".","")
        train_spark = train_spark.withColumnRenamed(oldname, newname)

In [5]:
train_data, test_data = train_spark.randomSplit([0.7, 0.3], seed=123)

In [6]:
input_list =[]
for i in train_spark.columns:
    if i != 'isFraud':
        input_list.append(i)

In [7]:
assembler = VectorAssembler(inputCols=input_list,outputCol='features')

In [8]:
train_features = assembler.transform(train_data)
test_features = assembler.transform(test_data)

In [9]:
xgb_models=[]
num_rounds=[10, 50, 100]
max_depths = [3, 10, 20]
for nr in num_rounds:
    for md in max_depths:
        print("-"*25)
        print("num_rounds: " + str(nr))
        print("max_depth: " + str(md))
        dt = SparkXGBClassifier(
            features_col="features",
            label_col="isFraud",
            num_round=nr,
            max_depth=md,
            eta=0.1,
            subsample=0.8,
            colsample_bytree=0.8,
            seed=42
        )
        model = dt.fit(train_features)
        predictions = model.transform(train_features)
        y_train_pred=predictions.select("prediction").collect()
        y_train_orig=predictions.select("isFraud").collect()
        print('Training Report')
        print(classification_report(y_train_orig, y_train_pred))
        
        predictions = model.transform(test_features)
        y_test_pred=predictions.select("prediction").collect()
        y_test_orig=predictions.select("isFraud").collect()
        print('Test Report')
        print(classification_report(y_test_orig, y_test_pred))
        xgb_models.append(model)


-------------------------
num_rounds: 10
max_depth: 3


Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Could not recover from a failed barrier ResultStage. Most recent failure reason: Stage failed because barrier task ResultTask(7, 0) finished unsuccessfully.
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\spark\python\lib\pyspark.zip\pyspark\worker.py", line 666, in main
  File "C:\spark\python\lib\pyspark.zip\pyspark\serializers.py", line 593, in read_int
    length = stream.read(4)
  File "C:\Users\amrit\anaconda3\envs\soen6111\lib\socket.py", line 705, in readinto
    return self._sock.recv_into(b)
TimeoutError: timed out

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:552)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:758)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:740)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:505)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2278)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:2111)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2857)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:180)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


In [None]:
# for model in xgb_models:
#     try:
#         print("-"*25)
#         print("Depth: " + str(md))
#         print("numTrees: " + str(nt))
#         model = model.fit(train_features)
#         predictions = model.transform(test_features)
#         y_pred=predictions.select("prediction").collect()
#         y_orig=predictions.select("isFraud").collect()
#         classification_report(y_orig, y_pred)
#     except Exception as e:
#         print(e)

In [None]:
# xgb = SparkXGBClassifier(
#     features_col="features",
#     label_col="isFraud",
#     num_round=10,
#     max_depth=3,
#     eta=0.1,
#     subsample=0.8,
#     colsample_bytree=0.8,
#     seed=42
# )

In [None]:
# xgb = SparkXGBClassifier(label_col="cnt",max_depth=5, missing=0.0,
#     validation_indicator_col='isVal', weight_col='weight',
#     early_stopping_rounds=1, eval_metric='logloss')

# new_xgb = SparkXGBClassifier(max_depth=5, missing=0.0,
# validation_indicator_col='isVal', weight_col='weight',
# early_stopping_rounds=1, eval_metric='logloss')

In [None]:
try:
    model = xgb.fit(train_features)
    predictions = model.transform(test_features)
except Exception as e:
    print(e)

In [None]:
predictions = model.transform(test_features)

In [None]:
y_pred=predictions.select("prediction").collect()
y_orig=predictions.select("isFraud").collect()

In [None]:
print(f1_score(y_orig, y_pred))
print(recall_score(y_orig, y_pred))

In [None]:
# evaluator = BinaryClassificationEvaluator(labelCol="isFraud")
# accuracy = evaluator.evaluate(predictions)

# print(f"Accuracy: {accuracy:.4f}")