## Why PySpark

PySpark is an interface to Apache Spark in Python. It allows you to analyze data interactively in a distributed environment. If your data volume is large, such as transaction detail and execution data, you need to find a stable storage to store these daily data, and also you need to have a cheap and powerful computing tool to process and analyze this data. hadoop HDFS and Jupyter with pyspark is a good choice for you. PySpark provides some features such as Spark SQL, DataFrame and MLlib (machine learning), which is is really BA friendly.

## Data

The daily order execution data is stored in daily partition in hdfs as parquet format files. As a data scientist, we can extract some and save in our user dedicated folder as csv format file. The data schema is as follws: 
```
Field name        Field type
Id                String
Symbol            String
LastMkt           String
Datetime          String
Brokerid          String
Traderid          String
Clientid          String
Account           String
Country           String
effectiveTime     String
expireTime        String
timeInForce       String
exposureDuration  String
tradingSession    String
tradingSessionSub String
settlType         String
settlDate         String
Currency          String
currencyFXRate    Float
execType          String
ExDestination     String
trdType           String
matchType         String
Side              String
OrdStatus         String
OrdType           String
orderQty          Int
Price             Float
exchangeCode      String
refQuoteId        String
refOrderId        String
trdPlatform       String
```
Please refer to https://www.onixs.biz/fix-dictionary/4.2/fields_by_name.html for the meaning of detailed data fields.

### Creating Spark Session

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Trade Execution Anomaly Detection').getOrCreate()

### Loading Execution Data

Execution.csv is located in my hdfs home folder

In [3]:
df = spark.read.csv('Execution.csv', inferSchema=True, header=True)

In [4]:
df.show(5)

+------------+-------------+-------+---------+-------+--------+--------+--------------+---------+-------+----+------+--------+-----------+--------------+-------------------+------------+----------------+---------+
|          Id|ExDestination|LastMkt|  Account|Country|Currency|execType|leaf_exec_flag|OrdStatus|OrdType|Side|Symbol|TraderId|trdPlatform|drv_event_type|drv_cross_exec_flag|drv_1ast_mkt|account_mnemonic|region_cd|
+------------+-------------+-------+---------+-------+--------+--------+--------------+---------+-------+----+------+--------+-----------+--------------+-------------------+------------+----------------+---------+
|565 98267758|          QFF|   XNAs| RENTADMA|    USA|     USD|       F|             N|        1|      2|   2|    cc| be14462|     RIOQFF|   FILLED_CONF|                  N|         OTC|        00301215|      NAM|
| 56584040534|     CITISMRT|   CDED|  AQRTEST|    USA|     USD|       F|             N|        1|      2|   1|   FLS| md42277|     RIOQFF|   FIL

## Data Preprocessing

##### Feature selection
Some columns have no obvious contribution to the classification results, we can drop them directly, and some other features have multicollinearity but we can't find the closely linear relationship directly, we can use pair plot to check, or a better way is use correlation matrix to check how closely related the features are. For latter one, we can do like this:
```
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import VectorAssembler

vector_col = "corr_features"
assembler = VectorAssembler(inputCols=df.columns, outputCol=vector_col)
df_vector = assembler.transform(df).select(vector_col)
matrix = Correlation.corr(df_vector, vector_col)
cor_np = matrix.collect()[0][matrix.columns[0]].toArray()
```

In [5]:
df = df.drop('Id', 'Symbol', 'Datetime', 'effectiveTime', 'expireTime')

#### handling missing values

In [6]:
df = df.fillna('Unknown')

#### Importing all necessary ML modules

In [7]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import IndexToString

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

from pyspark.ml import Pipeline

from pyspark.ml.classification import MultilayerPerceptronClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

#### Splitting Dataset Into train, and test

In [8]:
train, test = df.randomSplit([0.8, 0.2], 4321)

In [9]:
# df.show(5)

#### Encoding categorical variables

In [10]:
label_indexer = StringIndexer(inputCol="trdPlatform", outputCol="label", handleInvalid="keep").fit(train)
train = label_indexer.transform(train)
test = label_indexer.transform(test)

In [11]:
categorical_columns = [dtype[0] for dtype in df.dtypes if dtype[1].startswith('string') and dtype[0] != 'trdPlatform']
indexers = [StringIndexer(inputCol=column, outputCol='{0}_index'.format(column), handleInvalid="keep") for column in categorical_columns]

#### Merging columns into a vector features

In [12]:
numeric_columns = [dtype[0] for dtype in train.dtypes if dtype[1].startswith('double')]
assembler = VectorAssembler(inputCols=[indexer.getOutputCol() for indexer in indexers] + numeric_columns, outputCol='features')

#### Standardizing features

In [13]:
scaler = StandardScaler(inputCol="features", outputCol="features_scaled", withStd=True, withMean=False)

#### Creating ML Model

In [14]:
layers = [len(assembler.getInputCols()), 120, 60, len(label_indexer.labels)+1]
classifier = MultilayerPerceptronClassifier(labelCol='label',
                                            featuresCol='features_scaled',
                                            maxIter=100,
                                            layers=layers,
                                            blockSize=128,
                                            seed=4321)

#### Training model

In [15]:
pipeline = Pipeline(stages=indexers + [assembler, scaler, classifier])
model = pipeline.fit(train)

#### Testing model

In [16]:
train_preds = model.transform(train)
test_preds = model.transform(test)

In [17]:
train_pred_labels = train_preds.select('prediction', 'label')
test_pred_labels = test_preds.select('prediction', 'label')

In [18]:
metrics = ['weightedPrecision', 'weightedRecall', 'accuracy']
for metric in metrics:
    evaluator = MulticlassClassificationEvaluator(metricName=metric)
    print('Train ' + metric + ' = ' + str(evaluator.evaluate(train_pred_labels)))
    print('Test ' + metric + ' = ' + str(evaluator.evaluate(test_pred_labels)))

Train weightedPrecision = 0.9947885690601138
Test weightedPrecision = 0.9934785942232548
Train weightedRecall = 0.994486634551594
Test weightedRecall = 0.9934157584971533
Train accuracy = 994486634551594
Test accuracy = 0.9934157584971532
