In [1]:
import os
import pandas as pd
from pyspark.sql import SparkSession

In [2]:
data_path = "../data/rules/rules.csv"

In [3]:
pd_df = pd.read_csv(data_path)

### Creating a Spark Session

In [4]:
os.environ["JAVA_HOME"] = "C:\\Program Files\\Java\\jre1.8.0_341"

spark = SparkSession.builder.appName("Practice").getOrCreate()

## Spark DataFrame

### Reading the dataset

In [5]:
sp_df = spark.read.csv(data_path, header=True, inferSchema=True)

In [6]:
sp_df.printSchema()

root
 |-- RuleDefinitionId: integer (nullable = true)
 |-- LastModifiedDateTime: string (nullable = true)
 |-- FieldName: string (nullable = true)
 |-- ShortName: string (nullable = true)
 |-- ClientName: string (nullable = true)
 |-- FundName: string (nullable = true)
 |-- SubVectorId: integer (nullable = true)
 |-- Formula: string (nullable = true)
 |-- Severity: string (nullable = true)
 |-- ExecutionOrder: integer (nullable = true)
 |-- Dimension: string (nullable = true)
 |-- IsSLA: integer (nullable = true)
 |-- CutOffTime: timestamp (nullable = true)
 |-- ErrorMessage: string (nullable = true)
 |-- CronExpression: string (nullable = true)
 |-- CobDate: integer (nullable = true)
 |-- EmailIds: string (nullable = true)
 |-- CalculationFormula: string (nullable = true)



In [7]:
sp_df.describe().show()

+-------+-----------------+--------------------+--------------------+------------+--------------------+---------+------------------+--------------------+--------+------------------+------------+-------------------+--------------------+-------------------+-------------------+--------------------+--------------------+
|summary| RuleDefinitionId|LastModifiedDateTime|           FieldName|   ShortName|          ClientName| FundName|       SubVectorId|             Formula|Severity|    ExecutionOrder|   Dimension|              IsSLA|        ErrorMessage|     CronExpression|            CobDate|            EmailIds|  CalculationFormula|
+-------+-----------------+--------------------+--------------------+------------+--------------------+---------+------------------+--------------------+--------+------------------+------------+-------------------+--------------------+-------------------+-------------------+--------------------+--------------------+
|  count|              335|                 33

In [8]:
sp_df.show(2)

+----------------+--------------------+--------------------+--------------------+----------+--------+-----------+--------------------+--------+--------------+------------+-----+----------+--------------------+-------------------+-------+--------------------+------------------+
|RuleDefinitionId|LastModifiedDateTime|           FieldName|           ShortName|ClientName|FundName|SubVectorId|             Formula|Severity|ExecutionOrder|   Dimension|IsSLA|CutOffTime|        ErrorMessage|     CronExpression|CobDate|            EmailIds|CalculationFormula|
+----------------+--------------------+--------------------+--------------------+----------+--------+-----------+--------------------+--------+--------------+------------+-----+----------+--------------------+-------------------+-------+--------------------+------------------+
|             707|             00:53.5|      FileCompletion|          EodPEDFile|   ACADIAN|     0X0|         11|/home/upload/eodp...|   Error|             0|Complete

In [9]:
sp_df.count()

335

In [10]:
type(sp_df)

pyspark.sql.dataframe.DataFrame

In [11]:
sp_df.dtypes

[('RuleDefinitionId', 'int'),
 ('LastModifiedDateTime', 'string'),
 ('FieldName', 'string'),
 ('ShortName', 'string'),
 ('ClientName', 'string'),
 ('FundName', 'string'),
 ('SubVectorId', 'int'),
 ('Formula', 'string'),
 ('Severity', 'string'),
 ('ExecutionOrder', 'int'),
 ('Dimension', 'string'),
 ('IsSLA', 'int'),
 ('CutOffTime', 'timestamp'),
 ('ErrorMessage', 'string'),
 ('CronExpression', 'string'),
 ('CobDate', 'int'),
 ('EmailIds', 'string'),
 ('CalculationFormula', 'string')]

In [12]:
cols = sp_df.columns
cols

['RuleDefinitionId',
 'LastModifiedDateTime',
 'FieldName',
 'ShortName',
 'ClientName',
 'FundName',
 'SubVectorId',
 'Formula',
 'Severity',
 'ExecutionOrder',
 'Dimension',
 'IsSLA',
 'CutOffTime',
 'ErrorMessage',
 'CronExpression',
 'CobDate',
 'EmailIds',
 'CalculationFormula']

### Selecting columns

In [13]:
sp_df.select(cols[:2]).show()

+----------------+--------------------+
|RuleDefinitionId|LastModifiedDateTime|
+----------------+--------------------+
|             707|             00:53.5|
|             484|             05:51.9|
|             487|             05:51.9|
|             486|             05:51.9|
|             485|             05:51.9|
|             477|             41:53.0|
|             475|             41:53.0|
|             473|             41:53.0|
|             471|             41:53.0|
|             472|             41:53.0|
|             474|             41:53.0|
|             470|             41:53.0|
|             469|             13:27.0|
|             467|             41:53.0|
|             476|             41:53.0|
|             468|             41:53.0|
|             466|             41:53.0|
|             483|             43:12.3|
|             706|             00:53.5|
|             705|             00:53.5|
+----------------+--------------------+
only showing top 20 rows



In [14]:
sp_df[cols[:2]].show()

+----------------+--------------------+
|RuleDefinitionId|LastModifiedDateTime|
+----------------+--------------------+
|             707|             00:53.5|
|             484|             05:51.9|
|             487|             05:51.9|
|             486|             05:51.9|
|             485|             05:51.9|
|             477|             41:53.0|
|             475|             41:53.0|
|             473|             41:53.0|
|             471|             41:53.0|
|             472|             41:53.0|
|             474|             41:53.0|
|             470|             41:53.0|
|             469|             13:27.0|
|             467|             41:53.0|
|             476|             41:53.0|
|             468|             41:53.0|
|             466|             41:53.0|
|             483|             43:12.3|
|             706|             00:53.5|
|             705|             00:53.5|
+----------------+--------------------+
only showing top 20 rows



### Adding Columns

In [15]:
sp_df = sp_df.withColumn("RuleDefinitionId+2", sp_df["RuleDefinitionId"] + 2)

In [16]:
sp_df.select(["RuleDefinitionId", "RuleDefinitionId+2"]).show()

+----------------+------------------+
|RuleDefinitionId|RuleDefinitionId+2|
+----------------+------------------+
|             707|               709|
|             484|               486|
|             487|               489|
|             486|               488|
|             485|               487|
|             477|               479|
|             475|               477|
|             473|               475|
|             471|               473|
|             472|               474|
|             474|               476|
|             470|               472|
|             469|               471|
|             467|               469|
|             476|               478|
|             468|               470|
|             466|               468|
|             483|               485|
|             706|               708|
|             705|               707|
+----------------+------------------+
only showing top 20 rows



### Dropping columns

In [17]:
sp_df = sp_df.drop("RuleDefinitionId+2")

In [18]:
sp_df

DataFrame[RuleDefinitionId: int, LastModifiedDateTime: string, FieldName: string, ShortName: string, ClientName: string, FundName: string, SubVectorId: int, Formula: string, Severity: string, ExecutionOrder: int, Dimension: string, IsSLA: int, CutOffTime: timestamp, ErrorMessage: string, CronExpression: string, CobDate: int, EmailIds: string, CalculationFormula: string]

### Dropping NA Values

In [19]:
sp_df.na.drop().show()

+----------------+--------------------+---------+---------+----------+--------+-----------+-------+--------+--------------+---------+-----+----------+------------+--------------+-------+--------+------------------+
|RuleDefinitionId|LastModifiedDateTime|FieldName|ShortName|ClientName|FundName|SubVectorId|Formula|Severity|ExecutionOrder|Dimension|IsSLA|CutOffTime|ErrorMessage|CronExpression|CobDate|EmailIds|CalculationFormula|
+----------------+--------------------+---------+---------+----------+--------+-----------+-------+--------+--------------+---------+-----+----------+------------+--------------+-------+--------+------------------+
+----------------+--------------------+---------+---------+----------+--------+-----------+-------+--------+--------------+---------+-----+----------+------------+--------------+-------+--------+------------------+



In [20]:
sp_df.na.drop(
    how="all"
).show(2)

+----------------+--------------------+--------------------+--------------------+----------+--------+-----------+--------------------+--------+--------------+------------+-----+----------+--------------------+-------------------+-------+--------------------+------------------+
|RuleDefinitionId|LastModifiedDateTime|           FieldName|           ShortName|ClientName|FundName|SubVectorId|             Formula|Severity|ExecutionOrder|   Dimension|IsSLA|CutOffTime|        ErrorMessage|     CronExpression|CobDate|            EmailIds|CalculationFormula|
+----------------+--------------------+--------------------+--------------------+----------+--------+-----------+--------------------+--------+--------------+------------+-----+----------+--------------------+-------------------+-------+--------------------+------------------+
|             707|             00:53.5|      FileCompletion|          EodPEDFile|   ACADIAN|     0X0|         11|/home/upload/eodp...|   Error|             0|Complete

In [21]:
sp_df.na.drop(
    how="any"
).show()

+----------------+--------------------+---------+---------+----------+--------+-----------+-------+--------+--------------+---------+-----+----------+------------+--------------+-------+--------+------------------+
|RuleDefinitionId|LastModifiedDateTime|FieldName|ShortName|ClientName|FundName|SubVectorId|Formula|Severity|ExecutionOrder|Dimension|IsSLA|CutOffTime|ErrorMessage|CronExpression|CobDate|EmailIds|CalculationFormula|
+----------------+--------------------+---------+---------+----------+--------+-----------+-------+--------+--------------+---------+-----+----------+------------+--------------+-------+--------+------------------+
+----------------+--------------------+---------+---------+----------+--------+-----------+-------+--------+--------------+---------+-----+----------+------------+--------------+-------+--------+------------------+



In [22]:
sp_df.na.drop(
    how="any",
    thresh=2  # At least two non-null values, else drop
).show(2)

+----------------+--------------------+--------------------+--------------------+----------+--------+-----------+--------------------+--------+--------------+------------+-----+----------+--------------------+-------------------+-------+--------------------+------------------+
|RuleDefinitionId|LastModifiedDateTime|           FieldName|           ShortName|ClientName|FundName|SubVectorId|             Formula|Severity|ExecutionOrder|   Dimension|IsSLA|CutOffTime|        ErrorMessage|     CronExpression|CobDate|            EmailIds|CalculationFormula|
+----------------+--------------------+--------------------+--------------------+----------+--------+-----------+--------------------+--------+--------------+------------+-----+----------+--------------------+-------------------+-------+--------------------+------------------+
|             707|             00:53.5|      FileCompletion|          EodPEDFile|   ACADIAN|     0X0|         11|/home/upload/eodp...|   Error|             0|Complete

In [23]:
sp_df.na.drop(
    how="any",
    # thresh=2  # At least two non-null values, else drop,
    subset=[
        "RuleDefinitionId",
        "LastModifiedDateTime",
        "FieldName",
        "ShortName"
    ]  # If any of these values are null, drop the row
).show(2)

+----------------+--------------------+--------------------+--------------------+----------+--------+-----------+--------------------+--------+--------------+------------+-----+----------+--------------------+-------------------+-------+--------------------+------------------+
|RuleDefinitionId|LastModifiedDateTime|           FieldName|           ShortName|ClientName|FundName|SubVectorId|             Formula|Severity|ExecutionOrder|   Dimension|IsSLA|CutOffTime|        ErrorMessage|     CronExpression|CobDate|            EmailIds|CalculationFormula|
+----------------+--------------------+--------------------+--------------------+----------+--------+-----------+--------------------+--------+--------------+------------+-----+----------+--------------------+-------------------+-------+--------------------+------------------+
|             707|             00:53.5|      FileCompletion|          EodPEDFile|   ACADIAN|     0X0|         11|/home/upload/eodp...|   Error|             0|Complete

In [24]:
sp_df.na.fill("Missing Values").show(2)

+----------------+--------------------+--------------------+--------------------+----------+--------+-----------+--------------------+--------+--------------+------------+-----+----------+--------------------+-------------------+-------+--------------------+------------------+
|RuleDefinitionId|LastModifiedDateTime|           FieldName|           ShortName|ClientName|FundName|SubVectorId|             Formula|Severity|ExecutionOrder|   Dimension|IsSLA|CutOffTime|        ErrorMessage|     CronExpression|CobDate|            EmailIds|CalculationFormula|
+----------------+--------------------+--------------------+--------------------+----------+--------+-----------+--------------------+--------+--------------+------------+-----+----------+--------------------+-------------------+-------+--------------------+------------------+
|             707|             00:53.5|      FileCompletion|          EodPEDFile|   ACADIAN|     0X0|         11|/home/upload/eodp...|   Error|             0|Complete

In [25]:
sp_df.na.fill("Missing Values", [
        "RuleDefinitionId",
        "LastModifiedDateTime",
        "FieldName",
        "ShortName"
    ]).show(2)

+----------------+--------------------+--------------------+--------------------+----------+--------+-----------+--------------------+--------+--------------+------------+-----+----------+--------------------+-------------------+-------+--------------------+------------------+
|RuleDefinitionId|LastModifiedDateTime|           FieldName|           ShortName|ClientName|FundName|SubVectorId|             Formula|Severity|ExecutionOrder|   Dimension|IsSLA|CutOffTime|        ErrorMessage|     CronExpression|CobDate|            EmailIds|CalculationFormula|
+----------------+--------------------+--------------------+--------------------+----------+--------+-----------+--------------------+--------+--------------+------------+-----+----------+--------------------+-------------------+-------+--------------------+------------------+
|             707|             00:53.5|      FileCompletion|          EodPEDFile|   ACADIAN|     0X0|         11|/home/upload/eodp...|   Error|             0|Complete

### Imputing Null Values

In [26]:
from pyspark.ml.feature import Imputer



In [27]:
input_cols = [
    "RuleDefinitionId",
    "SubVectorId",
    "ExecutionOrder",
    "CobDate"
]

output_cols = [f"{ip_col}_inputed" for ip_col in input_cols]

imputer = Imputer(
    inputCols=input_cols,
    outputCols=output_cols
).setStrategy("mean")

In [28]:
imputer.fit(sp_df).transform(sp_df)[
    "RuleDefinitionId",
    "RuleDefinitionId_inputed",
    "SubVectorId",
    "SubVectorId_inputed",
    "ExecutionOrder",
    "ExecutionOrder_inputed",
    "CobDate",
    "CobDate_inputed"
].show(5)

+----------------+------------------------+-----------+-------------------+--------------+----------------------+-------+---------------+
|RuleDefinitionId|RuleDefinitionId_inputed|SubVectorId|SubVectorId_inputed|ExecutionOrder|ExecutionOrder_inputed|CobDate|CobDate_inputed|
+----------------+------------------------+-----------+-------------------+--------------+----------------------+-------+---------------+
|             707|                     707|         11|                 11|             0|                     0|     -1|             -1|
|             484|                     484|          8|                  8|             1|                     1|   null|              0|
|             487|                     487|          8|                  8|          null|                     1|   null|              0|
|             486|                     486|          8|                  8|             2|                     2|   null|              0|
|             485|                

In [29]:
sp_df.dtypes

[('RuleDefinitionId', 'int'),
 ('LastModifiedDateTime', 'string'),
 ('FieldName', 'string'),
 ('ShortName', 'string'),
 ('ClientName', 'string'),
 ('FundName', 'string'),
 ('SubVectorId', 'int'),
 ('Formula', 'string'),
 ('Severity', 'string'),
 ('ExecutionOrder', 'int'),
 ('Dimension', 'string'),
 ('IsSLA', 'int'),
 ('CutOffTime', 'timestamp'),
 ('ErrorMessage', 'string'),
 ('CronExpression', 'string'),
 ('CobDate', 'int'),
 ('EmailIds', 'string'),
 ('CalculationFormula', 'string')]

### Filter Operations

In [30]:
sp_df.filter("RuleDefinitionId > 700").select(["RuleDefinitionId", "FieldName", "SubVectorId"]).show()

+----------------+--------------------+-----------+
|RuleDefinitionId|           FieldName|SubVectorId|
+----------------+--------------------+-----------+
|             707|      FileCompletion|         11|
|             706|      FileCompletion|         11|
|             705|      FileCompletion|         11|
|             704|      FileCompletion|         11|
|             701|      FileCompletion|         11|
|             702|      FileCompletion|         11|
|             703|      FileCompletion|         11|
|             750|CustAcctName PnlC...|         11|
|             751|Fund PnlCheck Exc...|         11|
|             752|Fund PnlCheck Exc...|         11|
|             753|Fund PnlCheck Exc...|         11|
|             754|Fund PnlCheck Exc...|         11|
|             755|Fund PnlCheck Exc...|         11|
|             756|Fund PnlCheck Exc...|         11|
|             757|Fund PnlCheck Exc...|         11|
|             758| PnlCheck Exceptions|         11|
|           

In [31]:
sp_df.filter(sp_df["RuleDefinitionId"] > 700).select(["RuleDefinitionId", "FieldName", "SubVectorId"]).show()

+----------------+--------------------+-----------+
|RuleDefinitionId|           FieldName|SubVectorId|
+----------------+--------------------+-----------+
|             707|      FileCompletion|         11|
|             706|      FileCompletion|         11|
|             705|      FileCompletion|         11|
|             704|      FileCompletion|         11|
|             701|      FileCompletion|         11|
|             702|      FileCompletion|         11|
|             703|      FileCompletion|         11|
|             750|CustAcctName PnlC...|         11|
|             751|Fund PnlCheck Exc...|         11|
|             752|Fund PnlCheck Exc...|         11|
|             753|Fund PnlCheck Exc...|         11|
|             754|Fund PnlCheck Exc...|         11|
|             755|Fund PnlCheck Exc...|         11|
|             756|Fund PnlCheck Exc...|         11|
|             757|Fund PnlCheck Exc...|         11|
|             758| PnlCheck Exceptions|         11|
|           

In [32]:
sp_df.filter(
    (sp_df["RuleDefinitionId"] > 700) &
    (sp_df["SubVectorId"] > 11)
).select(["RuleDefinitionId", "FieldName", "SubVectorId"]).show()

+----------------+--------------------+-----------+
|RuleDefinitionId|           FieldName|SubVectorId|
+----------------+--------------------+-----------+
|             766|ReturnsCheck Exce...|         35|
|             867|Fund Admin NAV Co...|         98|
|             828|ClientSignOffComp...|         72|
|             837|Fund Admin NAV Co...|         98|
|             727|ReturnsCheck Exce...|         35|
|             711|ClientSignOffComp...|         72|
|             740|ReturnsCheck Exce...|         35|
|             710|Account Validatio...|         98|
|             773|Account Validatio...|         98|
|             776|ClientSignOffComp...|         72|
+----------------+--------------------+-----------+



In [33]:
sp_df.filter(~(sp_df["RuleDefinitionId"] > 700)).select(["RuleDefinitionId", "FieldName", "SubVectorId"]).show()

+----------------+--------------------+-----------+
|RuleDefinitionId|           FieldName|SubVectorId|
+----------------+--------------------+-----------+
|             484|Valuation Exceptions|          8|
|             487|ValuationComplete...|          8|
|             486|ValuationSummary ...|          8|
|             485|ValuationSummary ...|          8|
|             477|CustAcctName PnlC...|         11|
|             475|Fund PnlCheck Exc...|         11|
|             473|Fund PnlCheck Exc...|         11|
|             471|Fund PnlCheck Exc...|         11|
|             472|Fund PnlCheck Exc...|         11|
|             474|Fund PnlCheck Exc...|         11|
|             470|Fund PnlCheck Exc...|         11|
|             469|Fund PnlCheck Exc...|         11|
|             467| PnlCheck Exceptions|         11|
|             476| PnlCheck Exceptions|         11|
|             468| PnlCheck Exceptions|         11|
|             466| PnlCheck Exceptions|         11|
|           

### Grouping

In [34]:
sp_df[["RuleDefinitionId", "ExecutionOrder", "Dimension", "IsSLA"]].groupBy("ExecutionOrder").mean().show()

+--------------+---------------------+-------------------+--------------------+
|ExecutionOrder|avg(RuleDefinitionId)|avg(ExecutionOrder)|          avg(IsSLA)|
+--------------+---------------------+-------------------+--------------------+
|          null|   495.35849056603774|               null|                 1.0|
|             1|    514.0555555555555|                1.0|0.018518518518518517|
|             3|    634.0666666666667|                3.0|                 0.0|
|             5|    619.9333333333333|                5.0|                 0.0|
|             4|    641.4444444444445|                4.0|                 0.0|
|             2|    600.4868421052631|                2.0|                 0.0|
|             0|    731.5486725663717|                0.0| 0.19469026548672566|
+--------------+---------------------+-------------------+--------------------+



In [35]:
sp_df[["RuleDefinitionId", "ExecutionOrder", "Dimension", "IsSLA"]].groupBy("ExecutionOrder").sum().show()

+--------------+---------------------+-------------------+----------+
|ExecutionOrder|sum(RuleDefinitionId)|sum(ExecutionOrder)|sum(IsSLA)|
+--------------+---------------------+-------------------+----------+
|          null|                26254|               null|        53|
|             1|                27759|                 54|         1|
|             3|                 9511|                 45|         0|
|             5|                 9299|                 75|         0|
|             4|                 5773|                 36|         0|
|             2|                45637|                152|         0|
|             0|                82665|                  0|        22|
+--------------+---------------------+-------------------+----------+



In [36]:
sp_df[["RuleDefinitionId", "ExecutionOrder", "Dimension", "IsSLA"]].groupBy("ExecutionOrder").count().show()

+--------------+-----+
|ExecutionOrder|count|
+--------------+-----+
|          null|   53|
|             1|   54|
|             3|   15|
|             5|   15|
|             4|    9|
|             2|   76|
|             0|  113|
+--------------+-----+



In [37]:
sp_df[
    ["RuleDefinitionId", "ExecutionOrder", "Dimension", "IsSLA"]
].groupBy("ExecutionOrder").agg({"RuleDefinitionId": "max", "IsSLA": "sum"}).show()

+--------------+----------+---------------------+
|ExecutionOrder|sum(IsSLA)|max(RuleDefinitionId)|
+--------------+----------+---------------------+
|          null|        53|                  765|
|             1|         1|                  934|
|             3|         0|                  887|
|             5|         0|                  898|
|             4|         0|                  888|
|             2|         0|                  897|
|             0|        22|                  928|
+--------------+----------+---------------------+



## MLib

In [38]:
from pyspark.ml.feature import VectorAssembler

In [39]:
sp_df.dtypes

[('RuleDefinitionId', 'int'),
 ('LastModifiedDateTime', 'string'),
 ('FieldName', 'string'),
 ('ShortName', 'string'),
 ('ClientName', 'string'),
 ('FundName', 'string'),
 ('SubVectorId', 'int'),
 ('Formula', 'string'),
 ('Severity', 'string'),
 ('ExecutionOrder', 'int'),
 ('Dimension', 'string'),
 ('IsSLA', 'int'),
 ('CutOffTime', 'timestamp'),
 ('ErrorMessage', 'string'),
 ('CronExpression', 'string'),
 ('CobDate', 'int'),
 ('EmailIds', 'string'),
 ('CalculationFormula', 'string')]

In [40]:
input_cols = ["RuleDefinitionId", "SubVectorId", "ExecutionOrder"]

featureAssembler = VectorAssembler(
    inputCols=input_cols,
    outputCol="Features"
)

In [41]:
data = featureAssembler.transform(
    sp_df.dropna(subset=["ExecutionOrder"])  # Because ExecutionOrder has null values
).select(["Features", "IsSLA"])

In [42]:
data.show(10)

+----------------+-----+
|        Features|IsSLA|
+----------------+-----+
|[707.0,11.0,0.0]|    1|
| [484.0,8.0,1.0]|    0|
| [486.0,8.0,2.0]|    0|
| [485.0,8.0,3.0]|    0|
|[477.0,11.0,0.0]|    0|
|[475.0,11.0,2.0]|    0|
|[473.0,11.0,2.0]|    0|
|[471.0,11.0,2.0]|    0|
|[472.0,11.0,2.0]|    0|
|[474.0,11.0,2.0]|    0|
+----------------+-----+
only showing top 10 rows



In [43]:
# from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression

In [44]:
train_data, test_data = data.randomSplit([0.75, 0.25])
classifier = LogisticRegression(featuresCol="Features", labelCol="IsSLA")
classifier = classifier.fit(train_data)

In [45]:
classifier.coefficients

DenseVector([-0.0053, -0.0005, -22.1322])

In [46]:
classifier.intercept

2.381690379244546

In [47]:
preds = classifier.evaluate(test_data)

In [48]:
preds.predictions.show()

+----------------+-----+--------------------+--------------------+----------+
|        Features|IsSLA|       rawPrediction|         probability|prediction|
+----------------+-----+--------------------+--------------------+----------+
| [102.0,3.0,1.0]|    0|[20.2909713862644...|[0.99999999845921...|       0.0|
| [108.0,3.0,1.0]|    0|[20.3226803196769...|[0.99999999850730...|       0.0|
| [110.0,0.0,1.0]|    0|[20.3318780189910...|[0.99999999852096...|       0.0|
| [111.0,0.0,1.0]|    0|[20.3371628412264...|[0.99999999852876...|       0.0|
| [204.0,2.0,1.0]|    0|[20.8295659392242...|[0.99999999910084...|       0.0|
| [206.0,2.0,1.0]|    0|[20.8401355836951...|[0.99999999911029...|       0.0|
| [207.0,2.0,1.0]|    1|[20.8454204059305...|[0.99999999911498...|       0.0|
|[301.0,11.0,1.0]|    0|[21.3463095315294...|[0.99999999946369...|       0.0|
|[306.0,11.0,2.0]|    0|[43.5049715950466...|           [1.0,0.0]|       0.0|
|[341.0,11.0,2.0]|    0|[43.6899403732861...|           [1.0,0.0

In [49]:
preds.accuracy

0.9375

### Handling Categorical Features

In [50]:
from pyspark.ml.feature import StringIndexer 

In [51]:
indexer = StringIndexer(
    inputCols=["ClientName"],
    outputCols=["ClientName_indexed"]
)

In [52]:
data = indexer.fit(sp_df).transform(sp_df)

In [53]:
input_cols = ["RuleDefinitionId", "SubVectorId", "ExecutionOrder", "ClientName_indexed"]

In [54]:
data[input_cols].show(10)

+----------------+-----------+--------------+------------------+
|RuleDefinitionId|SubVectorId|ExecutionOrder|ClientName_indexed|
+----------------+-----------+--------------+------------------+
|             707|         11|             0|               2.0|
|             484|          8|             1|               2.0|
|             487|          8|          null|               2.0|
|             486|          8|             2|               2.0|
|             485|          8|             3|               2.0|
|             477|         11|             0|               2.0|
|             475|         11|             2|               2.0|
|             473|         11|             2|               2.0|
|             471|         11|             2|               2.0|
|             472|         11|             2|               2.0|
+----------------+-----------+--------------+------------------+
only showing top 10 rows



In [55]:
featureAssembler = VectorAssembler(
    inputCols=input_cols,
    outputCol="Features"
)

In [56]:
data = featureAssembler.transform(
    data.dropna(subset=["ExecutionOrder"])  # Because ExecutionOrder has null values
).select(["Features", "IsSLA"])

In [57]:
data.show(10)

+--------------------+-----+
|            Features|IsSLA|
+--------------------+-----+
|[707.0,11.0,0.0,2.0]|    1|
| [484.0,8.0,1.0,2.0]|    0|
| [486.0,8.0,2.0,2.0]|    0|
| [485.0,8.0,3.0,2.0]|    0|
|[477.0,11.0,0.0,2.0]|    0|
|[475.0,11.0,2.0,2.0]|    0|
|[473.0,11.0,2.0,2.0]|    0|
|[471.0,11.0,2.0,2.0]|    0|
|[472.0,11.0,2.0,2.0]|    0|
|[474.0,11.0,2.0,2.0]|    0|
+--------------------+-----+
only showing top 10 rows



In [58]:
# from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression

In [59]:
train_data, test_data = data.randomSplit([0.75, 0.25])
classifier = LogisticRegression(featuresCol="Features", labelCol="IsSLA")
classifier = classifier.fit(train_data)

In [60]:
classifier.coefficients

DenseVector([-0.0034, -0.0095, -4.4352, -0.3138])

In [61]:
classifier.intercept

2.803305252625486

In [62]:
preds = classifier.evaluate(test_data)

In [63]:
preds.predictions.show()

+--------------------+-----+--------------------+--------------------+----------+
|            Features|IsSLA|       rawPrediction|         probability|prediction|
+--------------------+-----+--------------------+--------------------+----------+
| [44.0,11.0,4.0,0.0]|    0|[15.1918330697317...|[0.99999974749463...|       0.0|
|[108.0,3.0,1.0,13.0]|    0|[6.10632072022987...|[0.99777622032872...|       0.0|
| [201.0,2.0,1.0,1.0]|    0|[2.64725230045529...|[0.93384143543764...|       0.0|
| [204.0,2.0,1.0,1.0]|    0|[2.65743892919028...|[0.93446800727193...|       0.0|
|[299.0,11.0,2.0,0.0]|    0|[7.18730813867343...|[0.99924444952975...|       0.0|
|[306.0,11.0,2.0,0.0]|    0|[7.21107693905507...|[0.99926218321881...|       0.0|
|[307.0,11.0,2.0,2...|    0|[13.4899929863565...|[0.99999861525475...|       0.0|
|[342.0,11.0,2.0,0.0]|    0|[7.33331648387495...|[0.99934702366742...|       0.0|
|[343.0,11.0,2.0,2...|    0|[14.5535606068348...|[0.99999952195505...|       0.0|
|[345.0,11.0,2.0

In [64]:
preds.accuracy

0.9333333333333333