# Anomaly Detection in PySpark

Using SVM model and Logit model to detect anomalies in the 15 minutes candles of Eur/Usd over 10 years of data

In [1]:
#Create enviroment variables to find PySpark
%env SPARK_HOME=C:\Users\HP\Desktop\Spark\spark-2.4.5-bin-hadoop2.7 
%env HADOOP_HOME=C:\Users\HP\Desktop\Spark\spark-2.4.5-bin-hadoop2.7

env: SPARK_HOME=C:\Users\HP\Desktop\Spark\spark-2.4.5-bin-hadoop2.7
env: HADOOP_HOME=C:\Users\HP\Desktop\Spark\spark-2.4.5-bin-hadoop2.7


In [2]:
!pip install pyarrow
!pip install py4j



In [2]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [3]:
# Create a spark session
conf = pyspark.SparkConf().setMaster("local[*]")
sc = pyspark.SparkContext(conf = conf)
spark = SparkSession(sc)

spark.sparkContext

In [49]:
#Alternative way to create a Session and a Context

#spark = SparkSession.builder\
#        .master("local[*]") \
#        .getOrCreate()

In [4]:
# Different ways of loading a csv file

spark_eurusd = spark.read.format('csv').options(header='true', inferSchema='true') \
                .load('C://Users//HP//Desktop//DataScience//AlgoTrading//eurusd15min.csv')

#spark_eurusd = spark.read.csv('C://Users//HP//Desktop//DataScience//AlgoTrading//eurusd5min.csv', header=True)

In [5]:
# Different ways of renaming columns

#spark_eurusd = spark_eurusd.select(F.col("_c0").alias("id"),
#                                   F.col("Time").alias("Time"),
#                                   F.col("Open").alias("Open"),
#                                   F.col("High").alias("High"),
#                                   F.col("Low").alias("Low"),
#                                   F.col("Close").alias("Close"))

#spark_eurusd = spark_eurusd.toDF("id", "Time", "Open", "High", "Low", "Close")

# A way to reneame only a single column
spark_eurusd = spark_eurusd.withColumnRenamed("_c0", "id")

In [6]:
spark_eurusd.show(5)

+---+-------------------+-------+-------+-------+-------+
| id|               Time|   Open|   High|    Low|  Close|
+---+-------------------+-------+-------+-------+-------+
|  0|2010-01-03 17:45:00| 1.4312|1.43172| 1.4312|1.43172|
|  1|2010-01-03 18:00:00|1.43172|1.43425|1.43105| 1.4311|
|  2|2010-01-03 18:15:00| 1.4315|1.43155| 1.4313|1.43155|
|  3|2010-01-03 18:30:00|1.43175| 1.4324|1.43106|1.43106|
|  4|2010-01-03 18:45:00|1.43111|1.43157|1.43106|1.43157|
+---+-------------------+-------+-------+-------+-------+
only showing top 5 rows



In [7]:
spark_eurusd.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Time: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)



In [8]:
# To drop columns
spark_eurusd = spark_eurusd.drop("id")

In Pyspark there is no concept of axis like in pandas (e.g. pd.df.drop("column_name", axis=1)

In [9]:
# Add a column that calculate the absolute value of the body of the 5 minutes candle

body = F.udf(lambda x,y: (x-y)*10000, DoubleType())

spark_eurusd = spark_eurusd.withColumn("body", F.round(F.abs(body(spark_eurusd["Open"], spark_eurusd["Close"])), 2))
spark_eurusd = spark_eurusd.withColumn("higher_wick", F.round(F.abs(body(spark_eurusd["High"], spark_eurusd["Open"])), 2))
spark_eurusd = spark_eurusd.withColumn("lower_wick", F.round(F.abs(body(spark_eurusd["Close"], spark_eurusd["Low"])), 2))

spark_eurusd.show(5)

+-------------------+-------+-------+-------+-------+----+-----------+----------+
|               Time|   Open|   High|    Low|  Close|body|higher_wick|lower_wick|
+-------------------+-------+-------+-------+-------+----+-----------+----------+
|2010-01-03 17:45:00| 1.4312|1.43172| 1.4312|1.43172| 5.2|        5.2|       5.2|
|2010-01-03 18:00:00|1.43172|1.43425|1.43105| 1.4311| 6.2|       25.3|       0.5|
|2010-01-03 18:15:00| 1.4315|1.43155| 1.4313|1.43155| 0.5|        0.5|       2.5|
|2010-01-03 18:30:00|1.43175| 1.4324|1.43106|1.43106| 6.9|        6.5|       0.0|
|2010-01-03 18:45:00|1.43111|1.43157|1.43106|1.43157| 4.6|        4.6|       5.1|
+-------------------+-------+-------+-------+-------+----+-----------+----------+
only showing top 5 rows



In [45]:
spark_eurusd.select("body").describe().show()
spark_eurusd.selectExpr("percentile_approx(body, array(.25, .5, .75)) as body_percentile").show()

+-------+-----------------+
|summary|             body|
+-------+-----------------+
|  count|           260000|
|   mean|4.449321153846233|
| stddev|5.470685714503136|
|    min|              0.0|
|    max|            223.0|
+-------+-----------------+

+---------------+
|body_percentile|
+---------------+
|[1.2, 2.8, 5.7]|
+---------------+



In [22]:
# Statistics of higher_wick
spark_eurusd.select("higher_wick").describe().show()
spark_eurusd.selectExpr("percentile_approx(higher_wick, array(.25, .5, .75)) as higher_wick_percentile").show()

+-------+------------------+
|summary|       higher_wick|
+-------+------------------+
|  count|            260000|
|   mean|4.4696776923077355|
| stddev| 5.756607936679121|
|    min|               0.0|
|    max|             223.1|
+-------+------------------+

+----------------------+
|higher_wick_percentile|
+----------------------+
|       [1.0, 2.8, 5.8]|
+----------------------+



In [23]:
# Statistics of lower_wick
spark_eurusd.select("lower_wick").describe().show()
spark_eurusd.selectExpr("percentile_approx(lower_wick, array(.25, .5, .75)) as lower_wick_percentile").show()

+-------+-----------------+
|summary|       lower_wick|
+-------+-----------------+
|  count|           260000|
|   mean|4.438610769230832|
| stddev|5.308289298585862|
|    min|              0.0|
|    max|            243.7|
+-------+-----------------+

+---------------------+
|lower_wick_percentile|
+---------------------+
|      [1.2, 2.9, 5.8]|
+---------------------+



In [10]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols= ["higher_wick", "body", "lower_wick"],
    outputCol= "features")

transformed = assembler.transform(spark_eurusd).withColumn('label', 
                                                            F.when((F.col("body") > 120) | (F.col("body") < 5) |
                                                            (F.col("higher_wick") > 100) | (F.col("higher_wick") < 0.5) | 
                                                            (F.col("lower_wick") > 100) | (F.col("lower_wick") < 0.5), 1.0)
                                                            .otherwise(0.0))

In [11]:
transformed.printSchema()

root
 |-- Time: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- body: double (nullable = true)
 |-- higher_wick: double (nullable = true)
 |-- lower_wick: double (nullable = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)



In [12]:
transformed.show(10, truncate=False)

+-------------------+-------+-------+-------+-------+----+-----------+----------+--------------+-----+
|Time               |Open   |High   |Low    |Close  |body|higher_wick|lower_wick|features      |label|
+-------------------+-------+-------+-------+-------+----+-----------+----------+--------------+-----+
|2010-01-03 17:45:00|1.4312 |1.43172|1.4312 |1.43172|5.2 |5.2        |5.2       |[5.2,5.2,5.2] |0.0  |
|2010-01-03 18:00:00|1.43172|1.43425|1.43105|1.4311 |6.2 |25.3       |0.5       |[25.3,6.2,0.5]|0.0  |
|2010-01-03 18:15:00|1.4315 |1.43155|1.4313 |1.43155|0.5 |0.5        |2.5       |[0.5,0.5,2.5] |1.0  |
|2010-01-03 18:30:00|1.43175|1.4324 |1.43106|1.43106|6.9 |6.5        |0.0       |[6.5,6.9,0.0] |1.0  |
|2010-01-03 18:45:00|1.43111|1.43157|1.43106|1.43157|4.6 |4.6        |5.1       |[4.6,4.6,5.1] |1.0  |
|2010-01-03 19:00:00|1.43154|1.43192|1.43136|1.43192|3.8 |3.8        |5.6       |[3.8,3.8,5.6] |1.0  |
|2010-01-03 19:15:00|1.43199|1.43212|1.43081|1.43126|7.3 |1.3        |4.5

In [13]:
from pyspark.mllib.regression import LabeledPoint

labeled_point = transformed.rdd.map(lambda row: LabeledPoint(row['label'], row['features'].toArray()))
labeled_point.take(5)

[LabeledPoint(0.0, [5.2,5.2,5.2]),
 LabeledPoint(0.0, [25.3,6.2,0.5]),
 LabeledPoint(1.0, [0.5,0.5,2.5]),
 LabeledPoint(1.0, [6.5,6.9,0.0]),
 LabeledPoint(1.0, [4.6,4.6,5.1])]

In [14]:
#Split dataset in train set and test set
train_data, test_data = labeled_point.randomSplit([0.8,0.2], seed=111)

In [36]:
# Build the model
from pyspark.mllib.classification import SVMWithSGD

SVMmodel = SVMWithSGD.train(train_data, iterations=100, regType="l2")

In [37]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

LogitModel = LogisticRegressionWithLBFGS.train(train_data)

In [38]:
# Evaluating the model on test data

SVMpredsAndLabels = test_data.map(lambda p: (float(SVMmodel.predict(p.features)), p.label))
SVMtrainErr = SVMpredsAndLabels.filter(lambda lp: lp[0] != lp[1]).count() / float(test_data.count())
print("Training Error of the SVM model = " + str(SVMtrainErr))

LogitPredsAndLabels = test_data.map(lambda p: (float(LogitModel.predict(p.features)), p.label))
LogitTrainErr = LogitPredsAndLabels.filter(lambda lp: lp[0] != lp[1]).count() / float(test_data.count())
print("Training Error of the Logit model = " + str(LogitTrainErr))

Training Error of the SVM model = 0.6554819692213459
Training Error of the Logit model = 0.43930403491310005


In [44]:
from pyspark.mllib.evaluation import BinaryClassificationMetrics
# Instantiate metrics object
SVMmetrics = BinaryClassificationMetrics(SVMpredsAndLabels)
Logitmetrics = BinaryClassificationMetrics(LogitPredsAndLabels)


# Area under precision-recall curve
print("Area under PR of the SVM model = %s" % SVMmetrics.areaUnderPR)

# Area under ROC curve
print("Area under ROC of the SVM model = %s" % SVMmetrics.areaUnderROC)

# Area under precision-recall curve
print("Area under PR of the Logit model = %s" % Logitmetrics.areaUnderPR)

# Area under ROC curve
print("Area under ROC of the Logit model = %s" % Logitmetrics.areaUnderROC)

Area under PR of the SVM model = 0.8955503065377077
Area under ROC of the SVM model = 0.5707993044276799
Area under PR of the Logit model = 0.8376653060737242
Area under ROC of the Logit model = 0.6200217156960575


#### Extra fun with PySpark: Calculate Moving Average

In [41]:
# Check if something is RDD o DataFrame
from pyspark.sql import DataFrame
from pyspark.rdd import RDD

def foo(x):
    if isinstance(x, RDD):
        return "RDD"
    if isinstance(x, DataFrame):
        return "DataFrame"
    
foo(transformed)

'DataFrame'

In [47]:
# How to do an aggregation
body_aggr = spark_eurusd.groupby(["body"]).agg({"body": "count"}).orderBy("body")
body_aggr = body_aggr.withColumnRenamed("count(body)", "count")

body_aggr.describe().show()
body_aggr.selectExpr("percentile_approx(body, array(.25, .5, .75)) as body_percentile").show()

+-------+-----------------+------------------+
|summary|             body|             count|
+-------+-----------------+------------------+
|  count|              673|               673|
|   mean|36.77265973254087| 386.3298662704309|
| stddev|26.44344304300913|1068.2407536192316|
|    min|              0.0|                 1|
|    max|            223.0|              7538|
+-------+-----------------+------------------+

+------------------+
|   body_percentile|
+------------------+
|[16.8, 33.6, 50.8]|
+------------------+



In [30]:
data1 = spark_eurusd

In [31]:
#function to calculate number of 5 minutes from number of minutes:
minutes = lambda i: i * 300

In [32]:
# Create a Window and WindowSpec (in this case we need a time frame, e.g. 40 5 minutes) with rangeBetween():
windowSpec = Window.orderBy(F.col("Time").cast('long')).rangeBetween(-minutes(40), 0)

# Note the OVER clause added to AVG(), to define a windowing column.
data1 = data1.withColumn('ma40', F.avg("Close").over(windowSpec))

In [33]:
data1.show(5, truncate= True)

+-------------------+-------+-------+-------+-------+----+-----------+----------+------------------+
|               Time|   Open|   High|    Low|  Close|body|higher_wick|lower_wick|              ma40|
+-------------------+-------+-------+-------+-------+----+-----------+----------+------------------+
|2010-01-03 17:45:00| 1.4312|1.43172| 1.4312|1.43172| 5.2|        5.2|       5.2|           1.43172|
|2010-01-03 18:00:00|1.43172|1.43425|1.43105| 1.4311| 6.2|       25.3|       0.5|           1.43141|
|2010-01-03 18:15:00| 1.4315|1.43155| 1.4313|1.43155| 0.5|        0.5|       2.5|1.4314566666666668|
|2010-01-03 18:30:00|1.43175| 1.4324|1.43106|1.43106| 6.9|        6.5|       0.0|1.4313575000000003|
|2010-01-03 18:45:00|1.43111|1.43157|1.43106|1.43157| 4.6|        4.6|       5.1|1.4314000000000002|
+-------------------+-------+-------+-------+-------+----+-----------+----------+------------------+
only showing top 5 rows



In [50]:
spark.stop()