Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spark): model quality and drift for current - multiclass classification #74

Merged
merged 6 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions spark/jobs/current_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from metrics.statistics import calculate_statistics_current
from models.current_dataset import CurrentDataset
from models.reference_dataset import ReferenceDataset
from utils.reference_regression import ReferenceMetricsRegressionService
from utils.current_binary import CurrentMetricsService
from utils.current_multiclass import CurrentMetricsMulticlassService
from utils.models import JobStatus, ModelOut, ModelType
Expand Down Expand Up @@ -57,9 +56,8 @@ def main(
case ModelType.BINARY:
metrics_service = CurrentMetricsService(
spark_session=spark_session,
current=current_dataset.current,
reference=reference_dataset.reference,
model=model,
current=current_dataset,
reference=reference_dataset,
)
statistics = calculate_statistics_current(current_dataset)
data_quality = metrics_service.calculate_data_quality()
Expand All @@ -80,25 +78,25 @@ def main(
case ModelType.MULTI_CLASS:
metrics_service = CurrentMetricsMulticlassService(
spark_session=spark_session,
current=current_dataset.current,
reference=reference_dataset.reference,
model=model,
current=current_dataset,
reference=reference_dataset,
)
statistics = calculate_statistics_current(current_dataset)
data_quality = metrics_service.calculate_data_quality()
model_quality = metrics_service.calculate_model_quality()
drift = metrics_service.calculate_drift()
complete_record["STATISTICS"] = statistics.model_dump_json(
serialize_as_any=True
)
complete_record["DATA_QUALITY"] = data_quality.model_dump_json(
serialize_as_any=True
)
case ModelType.REGRESSION:
metrics_service = ReferenceMetricsRegressionService(
reference=reference_dataset
complete_record["MODEL_QUALITY"] = orjson.dumps(model_quality).decode(
"utf-8"
)
complete_record["DRIFT"] = orjson.dumps(drift).decode("utf-8")
case ModelType.REGRESSION:
statistics = calculate_statistics_current(current_dataset)
data_quality = metrics_service.calculate_data_quality()

complete_record["STATISTICS"] = statistics.model_dump_json(
serialize_as_any=True
)
Expand Down
File renamed without changes.
79 changes: 79 additions & 0 deletions spark/jobs/metrics/drift_calculator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from pyspark.sql import SparkSession

from metrics.chi2 import Chi2Test
from metrics.ks import KolmogorovSmirnovTest
from models.current_dataset import CurrentDataset
from models.reference_dataset import ReferenceDataset


class DriftCalculator:
@staticmethod
def calculate_drift(
spark_session: SparkSession,
reference_dataset: ReferenceDataset,
current_dataset: CurrentDataset,
):
drift_result = dict()
drift_result["feature_metrics"] = []

categorical_features = [
categorical.name
for categorical in reference_dataset.model.get_categorical_features()
]
chi2 = Chi2Test(
spark_session=spark_session,
reference_data=reference_dataset.reference,
current_data=current_dataset.current,
)

for column in categorical_features:
feature_dict_to_append = {
"feature_name": column,
"drift_calc": {
"type": "CHI2",
},
}
if (
reference_dataset.reference_count > 5
and current_dataset.current_count > 5
):
result_tmp = chi2.test(column, column)
feature_dict_to_append["drift_calc"]["value"] = float(
result_tmp["pValue"]
)
feature_dict_to_append["drift_calc"]["has_drift"] = bool(
result_tmp["pValue"] <= 0.05
)
else:
feature_dict_to_append["drift_calc"]["value"] = None
feature_dict_to_append["drift_calc"]["has_drift"] = False
drift_result["feature_metrics"].append(feature_dict_to_append)

numerical_features = [
numerical.name
for numerical in reference_dataset.model.get_numerical_features()
]
ks = KolmogorovSmirnovTest(
reference_data=reference_dataset.reference,
current_data=current_dataset.current,
alpha=0.05,
phi=0.004,
)

for column in numerical_features:
feature_dict_to_append = {
"feature_name": column,
"drift_calc": {
"type": "KS",
},
}
result_tmp = ks.test(column, column)
feature_dict_to_append["drift_calc"]["value"] = float(
result_tmp["ks_statistic"]
)
feature_dict_to_append["drift_calc"]["has_drift"] = bool(
result_tmp["ks_statistic"] > result_tmp["critical_value"]
)
drift_result["feature_metrics"].append(feature_dict_to_append)

return drift_result
File renamed without changes.
46 changes: 46 additions & 0 deletions spark/jobs/models/current_dataset.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from typing import List

from pyspark.ml.feature import StringIndexer
from pyspark.sql import DataFrame
from pyspark.sql.types import DoubleType, StructField, StructType

from models.reference_dataset import ReferenceDataset
from utils.models import ModelOut, ModelType, ColumnDefinition
from utils.spark import apply_schema_to_dataframe

Expand Down Expand Up @@ -95,3 +97,47 @@ def get_all_variables(self) -> List[ColumnDefinition]:
+ [self.model.timestamp]
+ self.model.outputs.output
)

def get_string_indexed_dataframe(self, reference: ReferenceDataset):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

index is calculated both with reference and current

"""
Source: https://stackoverflow.com/questions/65911146/how-to-transform-multiple-categorical-columns-to-integers-maintaining-shared-val
Current dataset will be indexed with columns from both reference and current in order to have complete data
"""
predictions_df_current = self.current.select(
self.model.outputs.prediction.name
).withColumnRenamed(self.model.outputs.prediction.name, "classes")
target_df_current = self.current.select(
self.model.target.name
).withColumnRenamed(self.model.target.name, "classes")
predictions_df_reference = reference.reference.select(
self.model.outputs.prediction.name
).withColumnRenamed(self.model.outputs.prediction.name, "classes")
target_df_reference = reference.reference.select(
self.model.target.name
).withColumnRenamed(self.model.target.name, "classes")
prediction_target_df = (
predictions_df_current.union(target_df_current)
.union(predictions_df_reference)
.union(target_df_reference)
)
indexer = StringIndexer(
inputCol="classes",
outputCol="classes_index",
stringOrderType="alphabetAsc",
handleInvalid="skip",
)
indexer_model = indexer.fit(prediction_target_df)
indexer_prediction = indexer_model.setInputCol(
self.model.outputs.prediction.name
).setOutputCol(f"{self.model.outputs.prediction.name}-idx")
indexed_prediction_df = indexer_prediction.transform(self.current)
indexer_target = indexer_model.setInputCol(self.model.target.name).setOutputCol(
f"{self.model.target.name}-idx"
)
indexed_target_df = indexer_target.transform(indexed_prediction_df)

index_label_map = {
str(float(index)): str(label)
for index, label in enumerate(indexer_model.labelsArray[0])
}
return index_label_map, indexed_target_df
4 changes: 1 addition & 3 deletions spark/jobs/reference_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ def main(

match model.model_type:
case ModelType.BINARY:
metrics_service = ReferenceMetricsService(
reference_dataset.reference, model=model
)
metrics_service = ReferenceMetricsService(reference=reference_dataset)
model_quality = metrics_service.calculate_model_quality()
statistics = calculate_statistics_reference(reference_dataset)
data_quality = metrics_service.calculate_data_quality()
Expand Down
Loading