<a href="https://colab.research.google.com/github/mathewsrc/machine-learning-monitoring-with-evidently/blob/main/evidently_capstone.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install -q pyspark==3.5.0
!pip install -q duckdb==0.8
!pip install -q prefect
!pip install -q mlflow==2.8.1
!pip install -q evidently==0.4.9
!pip install -q soda-core-duckdb==3.0.45
!pip install -q polars

!pip install kaleido
!pip install python-multipart



In [121]:
import certifi
import json
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import sqlite3
import urllib3
from urllib3 import request
from unicodedata import normalize
from datetime import timedelta
import asyncio

import duckdb

from prefect import task, flow

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import (when, col, to_date, unix_timestamp,
                                   to_timestamp, year, month, day, dayofweek,
                                   regexp_like)
from pyspark.sql.types import (StructType, StructField, StringType,
                               IntegerType, FloatType, TimestampType, BooleanType)

# Pyspark Machine Learning
from pyspark.ml.feature import VectorAssembler, Imputer, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, GBTRegressor
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import RegressionEvaluator, BinaryClassificationEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder

# MLflow
import mlflow
import mlflow.spark
from mlflow import MlflowClient


In [3]:
import warnings
warnings.filterwarnings('ignore')
warnings.simplefilter('ignore')

In [17]:
!git clone https://github.com/mathewsrc/machine-learning-monitoring-with-evidently.git

fatal: destination path 'machine-learning-monitoring-with-evidently' already exists and is not an empty directory.
/content


In [4]:
%cd /content/machine-learning-monitoring-with-evidently
%ls

/content/machine-learning-monitoring-with-evidently
evidently_capstone.ipynb  LICENSE  README.md  [0m[01;34msoda[0m/  supplier.db


## **Supplier Contracts: Extract, Transform, Load from API**

In [4]:
!mkdir soda
!mkdir soda/checks
!touch soda/checks/sources/raw_check.yml
!touch soda/checks/staging/transfom_check.yml

touch: cannot touch 'soda/checks/sources/raw_check.yml': No such file or directory
touch: cannot touch 'soda/checks/staging/transfom_check.yml': No such file or directory


In [5]:
#%%writefile soda/check_function.py
def check(scan_name, duckdb_conn, data_source, checks_subpath=None):
    from soda.scan import Scan

    print('Running Soda Scan ...')
    checks_path = f'soda/checks'

    if checks_subpath:
        checks_path += f'/{checks_subpath}'

    scan = Scan()
    scan.set_verbose()
    scan.add_duckdb_connection(duckdb_conn)
    scan.set_data_source_name(data_source)
    scan.add_sodacl_yaml_files(checks_path)
    scan.set_scan_definition_name(scan_name)
    result = scan.execute()
    print(scan.get_logs_text())

    if result != 0:
        raise ValueError('Soda Scan failed')

    return result

In [6]:
spark = SparkSession.builder.appName("spark").getOrCreate()
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [164]:
@task(name="Extract Supplier Contract API data",
      description="Get API data from URL and read it as Spark DataFrame" ,
      tags=['supplier','vendor','contract'],
      cache_expiration=timedelta(days=1),
      retries=3,
      retry_delay_seconds=5)
def extract():
  # get api data from url
  #url = 'https://data.cityofnewyork.us/resource/h9gi-nx95.json?$limit=500'
  url = 'https://data.sfgov.org/resource/cqi5-hm2d.json?$limit=800'

  # Check if API is available to retrive the
  http = urllib3.PoolManager(cert_reqs='CERT_REQUIRED',ca_certs=certifi.where())
  apt_status = http.request('GET', url).status
  print(apt_status)
  if apt_status == 200:
      # Sometimes we get certificate error . We shoul never silence this error as this may cause a securirty threat.
      # Create a Pool manager that can be used to read the API response
      data = json.loads(http.request('GET', url).data.decode('utf-8'))
      df_api = pd.json_normalize(data)
      with duckdb.connect("supplier.db") as conn:
        conn.sql("""CREATE OR REPLACE TABLE supplier_raw_table AS
                        SELECT * FROM df_api""")
        conn.sql("SELECT COUNT(*) FROM supplier_raw_table").show()
  else:
      raise Exception('Error to extract data from API!')


In [8]:
@task(name="Extract Supplier Contract API data",
      description="Get API data from URL and read it as Spark DataFrame" ,
      tags=['supplier','vendor','contract'],
      cache_expiration=timedelta(days=1),
      retries=3,
      retry_delay_seconds=5)
def check_raw():
  with duckdb.connect("supplier.db") as conn:
    check(scan_name="raw_check", duckdb_conn=conn, data_source="duckdb", checks_subpath="sources")

In [165]:
@task(name="Extract Supplier Contract API data",
      description="Get API data from URL and read it as Spark DataFrame" ,
      tags=['supplier','vendor','contract'],
      cache_expiration=timedelta(days=1),
      retries=3,
      retry_delay_seconds=5)
def transform():

  with duckdb.connect("supplier.db") as conn:
    df_pandas = conn.sql("SELECT * FROM supplier_raw_table").df()

    df_spark = spark.createDataFrame(df_pandas)

    df_spark = (
        df_spark
          .withColumnRenamed("contract_no", "contract_number")
          .withColumnRenamed("consumed_amt", "purchase_orders_outstanding")
          .withColumnRenamed("agreed_amt", "contract_awarded_amount")
          .withColumnRenamed("pmt_amt", "payments_made")
          .withColumnRenamed("remaining_amt", "remaining_contract_award_amount")
          .withColumnRenamed("project_team_constituent", "supplier_type")
          .withColumnRenamed("project_team_supplier", "supplier_name")
          .withColumnRenamed("prime_contractor", "supplier_name_prime_contractor")
    )

    df_spark = df_spark.dropDuplicates(subset=['contract_number'])


    df_spark = df_spark.na.drop(subset=["contract_number"])

    df_spark = (
      df_spark
        .withColumn("purchase_orders_outstanding", col("purchase_orders_outstanding").cast("float"))
        .withColumn("contract_awarded_amount", col("contract_awarded_amount").cast("float"))
        .withColumn("payments_made", col("payments_made").cast("float"))
        .withColumn("remaining_contract_award_amount", col("remaining_contract_award_amount").cast("float"))
    )

    df_spark = df_spark.withColumn("start_date",
                              to_date(
                                  unix_timestamp(
                                      "term_start_date",
                                      "yyyy-MM-dd'T'HH:mm:ss.SSS").cast("timestamp"), "yyyy/mm/dd"))\
      .withColumn("end_date", to_date(
                                  unix_timestamp(
                                      "term_end_date",
                                      "yyyy-MM-dd'T'HH:mm:ss.SSS").cast("timestamp"), "yyyy/mm/dd"))



    df_spark.createOrReplaceTempView("supplier_contracts")

    df_spark = spark.sql("""
          SELECT
              *,
              EXTRACT(DAY FROM start_date) AS start_day,
              EXTRACT(MONTH FROM start_date) AS start_month,
              EXTRACT(YEAR FROM start_date) AS start_year,
              EXTRACT(DAY FROM end_date) AS end_day,
              EXTRACT(MONTH FROM end_date) AS end_month,
              EXTRACT(YEAR FROM end_date) AS end_year
          FROM supplier_contracts
          WHERE (start_date IS NOT NULL) AND (end_date IS NOT NULL)
          """)

    df_spark.createOrReplaceTempView("supplier_contracts")

    df_spark = spark.sql("""
              SELECT *,
                  CASE WHEN sole_source_flg IS NULL THEN 1
                  ELSE 0 END AS is_sole_source
              FROM supplier_contracts
                """)

    df_spark = df_spark.drop("sole_source_flg")

    df_spark.createOrReplaceTempView("supplier_contracts")

    df_spark = spark.sql("""
                         SELECT *,
                            CASE WHEN non_profit IS NULL THEN 1
                            ELSE 0 END AS is_non_profit
                         FROM supplier_contracts
                         """)

    df_spark = df_spark.withColumn("has_outstanding_orders",
                                   when(
                                      col('purchase_orders_outstanding') > 0,
                                      1).otherwise(0))



    df_spark = df_spark.filter("payments_made >= 0")
    df_spark = df_spark.filter("payments_made <= contract_awarded_amount")
    df_spark = df_spark.filter("remaining_contract_award_amount >= 0")
    df_spark = df_spark.filter("contract_awarded_amount >= 0")
    df_spark = df_spark.filter("remaining_contract_award_amount <= contract_awarded_amount")
    #df_spark = df_spark.filter("payments_made + remaining_contract_award_amount <= contract_awarded_amount")
    #df_spark - df_spark.filter("purchase_orders_outstanding >= 0")



    df_pandas = df_spark.toPandas()

    conn.sql("""CREATE OR REPLACE TABLE supplier_transformed_table AS
                    SELECT * FROM df_pandas""")
    return df_spark

In [10]:
@task(name="Extract Supplier Contract API data",
      description="Get API data from URL and read it as Spark DataFrame" ,
      tags=['supplier','vendor','contract'],
      cache_expiration=timedelta(days=1),
      retries=3,
      retry_delay_seconds=5)
def check_staging():
  with duckdb.connect("supplier.db") as conn:
    check(scan_name="transformation_check", duckdb_conn=conn, data_source="duckdb", checks_subpath="staging")

In [11]:
@task(name="Extract Supplier Contract API data",
      description="Get API data from URL and read it as Spark DataFrame" ,
      tags=['supplier','vendor','contract'],
      cache_expiration=timedelta(days=1),
      retries=3,
      retry_delay_seconds=5)
def load(df_spark):
  with duckdb.connect("supplier.db") as conn:
    df_pandas = df_spark.toPandas()
    conn.sql("CREATE OR REPLACE TABLE supplier_transformed_table AS SELECT * FROM df_pandas")

In [12]:
@flow(name="ETL Supplier Contract API", log_prints=True)
def etl():
  extract()
  check_raw()
  df_spark = transform()
  check_staging()
  load(df_spark)

In [None]:
etl()

In [14]:
with duckdb.connect("supplier.db") as conn:
 print(conn.sql("""SELECT
                    COUNT(*),
                    EXTRACT(YEAR FROM start_date) as year
                   FROM supplier_transformed_table GROUP BY year""").df().head(20))
 print(conn.sql("DESCRIBE supplier_transformed_table"))
 print(conn.sql("SELECT * FROM supplier_transformed_table").pl().columns)

    count_star()  year
0              1  2003
1              1  2006
2              4  2007
3              5  2008
4              9  2009
5             12  2010
6             15  2011
7             25  2012
8              1  2013
9              1  2015
10             3  2016
11             4  2017
12             2  2018
┌────────────────────────────────┬─────────────┬─────────┬─────────┬─────────┬───────┐
│          column_name           │ column_type │  null   │   key   │ default │ extra │
│            varchar             │   varchar   │ varchar │ varchar │ varchar │ int32 │
├────────────────────────────────┼─────────────┼─────────┼─────────┼─────────┼───────┤
│ contract_title                 │ VARCHAR     │ YES     │ NULL    │ NULL    │  NULL │
│ term_start_date                │ VARCHAR     │ YES     │ NULL    │ NULL    │  NULL │
│ term_end_date                  │ VARCHAR     │ YES     │ NULL    │ NULL    │  NULL │
│ contract_type                  │ VARCHAR     │ YES     │ NULL    │ 

## **ML monitoring with Evidentlyai**

In [15]:
from evidently.spark.engine import SparkEngine

from evidently import ColumnMapping

from evidently.report import Report
from evidently.metrics import ColumnDriftMetric, DataDriftTable, DatasetDriftMetric
from evidently.metric_preset import DataDriftPreset

from evidently.test_suite import TestSuite
from evidently.tests import TestColumnDrift, TestShareOfDriftedColumns, TestNumberOfDriftedColumns
from evidently.test_preset import DataDriftTestPreset

In [16]:
with duckdb.connect("supplier.db") as conn:
  df_pandas = conn.sql("SELECT * FROM supplier_transformed_table").df()

  df_spark = spark.createDataFrame(df_pandas)
# Simulate current and reference data using randomSplit
current_data, reference_data = df_spark.randomSplit(weights=[0.5,0.5], seed=200)

In [17]:
column_mapping = ColumnMapping()

#column_mapping.prediction = "prediction"
#column_mapping.target = "target"
#column_mapping.target_names = ['churn', 'not_churn']
column_mapping.id = "contract_number"

column_mapping.datetime_features = ['start_date',
                                    'end_date',
                                    'term_start_date',
                                    'term_end_date',]

column_mapping.text_features = ['contract_title',
                                'supplier_name',
                                'supplier_name_prime_contractor',
                                'department',
                                'purchasing_authority',]

column_mapping.categorical_features= ['contract_type',
                                      'department_code',
                                      'project_team_lbe_status',
                                      'scope_of_work',
                                      'non_profit',
                                      'is_sole_source',
                                      'has_outstanding_orders',]

column_mapping.numerical_features = ['contract_awarded_amount',
                                     'purchase_orders_outstanding',
                                     'payments_made',
                                     'remaining_contract_award_amount',
                                     'start_day', 'start_month',
                                     'start_year', 'end_day',
                                     'end_month', 'end_year',]

In [18]:
def eval_drift(reference, production, column_mapping):

    data_drift_report = Report(metrics=[DataDriftPreset()])
    data_drift_report.run(reference_data=reference, current_data=production, column_mapping=column_mapping)
    report = data_drift_report.as_dict()

    drifts = []

    for feature in column_mapping.numerical_features + column_mapping.categorical_features:
        drifts.append((feature, report["metrics"][1]["result"]["drift_by_columns"][feature]["drift_score"]))

    return drifts

In [19]:
with duckdb.connect("supplier.db") as conn:
  start_date = conn.sql("""SELECT MIN(start_date)
              FROM supplier_transformed_table
              """).fetchone()
  start_date_0 = start_date[0].strftime("%Y-%m-%d %H:%M:%S")

  end_date = conn.sql("""SELECT DATE_ADD(MIN(start_date), INTERVAL 5 YEAR) AS end_date
              FROM supplier_transformed_table
              """).fetchone()
  end_date_0 = end_date[0].strftime("%Y-%m-%d %H:%M:%S")

print(start_date_0)
print(end_date_0)


2003-09-11 00:00:00
2008-09-11 00:00:00


In [94]:
with duckdb.connect("supplier.db") as conn:
 experiment_batches_sql = conn.sql(
    """
    WITH numbered_rows AS (
        SELECT
            start_date,
            ROW_NUMBER() OVER (ORDER BY start_date) AS row_num
        FROM
            supplier_transformed_table
        WHERE EXTRACT(YEAR FROM start_date) > 2008
    )

    SELECT
      MIN(start_date) AS start_date,
      MAX(start_date) + 30 AS end_date
    FROM
      numbered_rows
    WHERE
      row_num BETWEEN 1 AND 6
    GROUP BY
      CASE
        WHEN row_num BETWEEN 1 AND 1 THEN 1
        WHEN row_num BETWEEN 2 AND 2 THEN 2
        WHEN row_num BETWEEN 3 AND 3 THEN 3
        WHEN row_num BETWEEN 4 AND 4 THEN 4
        WHEN row_num BETWEEN 5 AND 5 THEN 5
        WHEN row_num BETWEEN 6 AND 6 THEN 6
    END
    ORDER BY
      start_date;
   """
  )
 print(experiment_batches_sql.show())
 experiment_batches=[(start.strftime("%Y-%m-%d"), end.strftime("%Y-%m-%d")) for start, end in experiment_batches_sql.fetchall() ]


┌────────────┬────────────┐
│ start_date │  end_date  │
│    date    │    date    │
├────────────┼────────────┤
│ 2009-02-17 │ 2009-03-19 │
│ 2009-03-20 │ 2009-04-19 │
│ 2009-03-25 │ 2009-04-24 │
│ 2009-04-10 │ 2009-05-10 │
│ 2009-05-04 │ 2009-06-03 │
│ 2009-05-29 │ 2009-06-28 │
└────────────┴────────────┘

None


In [21]:
with duckdb.connect("supplier.db") as conn:
  df_pandas = conn.sql("SELECT * FROM supplier_transformed_table").df()
  df_spark = spark.createDataFrame(df_pandas)

  reference_data = df_spark.filter((col("start_date") >= start_date_0) & (col("start_date") <= end_date_0))
print(reference_data.agg({"start_date": "max"}).show())
print(reference_data.agg({"start_date": "min"}).show())

+-------------------+
|    max(start_date)|
+-------------------+
|2008-01-11 00:00:00|
+-------------------+

None
+-------------------+
|    min(start_date)|
+-------------------+
|2003-09-11 00:00:00|
+-------------------+

None


In [22]:
reference_data.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+---------------+--------------------+------------------------------+--------------------+-----------------------+----------------+--------------------+-----------------------+---------------------------+-------------+-------------------------------+---------------+----------+-------------------+-------------------+---------+-----------+----------+-------+---------+--------+--------------+-------------+----------------------+
|      contract_title|     term_start_date|       term_end_date|       contract_type|purchasing_authority|department_code|          department|supplier_name_prime_contractor|       supplier_name|project_team_lbe_status|   supplier_type|       scope_of_work|contract_awarded_amount|purchase_orders_outstanding|payments_made|remaining_contract_award_amount|contract_number|non_profit|         start_date|           end_date|start_day|start_month|start_year|end_day|en

In [73]:
with duckdb.connect("supplier.db") as conn:
  conn.sql(
      """
      CREATE OR REPLACE TABLE reference_table AS
      SELECT *
      FROM supplier_transformed_table
      WHERE start_year BETWEEN 2003 AND 2008
      """)

  conn.sql(
      """
      CREATE OR REPLACE TABLE current_table AS
      SELECT *
      FROM supplier_transformed_table
      WHERE start_year > 2008;
      """)

In [79]:
with duckdb.connect("supplier.db") as conn:
  reference_data = conn.sql("SELECT * FROM reference_table")
  reference_data.show()
  reference_data = spark.createDataFrame(reference_data.df())

┌──────────────────────┬──────────────────────┬───┬────────────────┬───────────────┬──────────────────────┐
│    contract_title    │   term_start_date    │ … │ is_sole_source │ is_non_profit │ has_outstanding_or…  │
│       varchar        │       varchar        │   │     int32      │     int32     │        int32         │
├──────────────────────┼──────────────────────┼───┼────────────────┼───────────────┼──────────────────────┤
│ PUC DESIGN FOR CAL…  │ 2003-09-11T00:00:0…  │ … │              1 │             1 │                    1 │
│ DESIGN FOR NEW IRV…  │ 2006-06-27T00:00:0…  │ … │              1 │             1 │                    0 │
│ ASSOCIATION OF BAY…  │ 2007-03-23T00:00:0…  │ … │              0 │             1 │                    0 │
│ ENGINEERING SERVIC…  │ 2007-10-03T00:00:0…  │ … │              1 │             1 │                    1 │
│ ENGINEERING POOL     │ 2007-12-06T00:00:0…  │ … │              1 │             1 │                    0 │
│ PUC ENGINEERING POOL │ 200

In [104]:
with duckdb.connect("supplier.db") as conn:
  current_data = conn.sql("SELECT * FROM current_table")
  current_data.show()
  current_data = spark.createDataFrame(current_data.df())

┌──────────────────────┬──────────────────────┬───┬────────────────┬───────────────┬──────────────────────┐
│    contract_title    │   term_start_date    │ … │ is_sole_source │ is_non_profit │ has_outstanding_or…  │
│       varchar        │       varchar        │   │     int32      │     int32     │        int32         │
├──────────────────────┼──────────────────────┼───┼────────────────┼───────────────┼──────────────────────┤
│ CHF-GA-Mission YMC…  │ 2018-07-01T00:00:0…  │ … │              1 │             0 │                    1 │
│ FEES FOR ENERGY TR…  │ 2016-08-01T00:00:0…  │ … │              0 │             1 │                    0 │
│ ADMINISTRATIVE CHA…  │ 2016-08-03T00:00:0…  │ … │              1 │             1 │                    0 │
│ ELECTRIC ENERGY AN…  │ 2016-08-17T00:00:0…  │ … │              1 │             1 │                    0 │
│ CLP - RESOURCE ADE…  │ 2017-02-24T00:00:0…  │ … │              1 │             1 │                    0 │
│ CLP - RESOURCE ADE…  │ 201

In [115]:
def train_and_evaluate(model, model_name):

  for k, test_dates in enumerate(experiment_batches):

    print(f"Batch: {k}")
    print(f"Test dates: {test_dates}")

    label_col = 'has_outstanding_orders'
    numeric_cols = ['contract_awarded_amount',
                    'purchase_orders_outstanding',
                    'payments_made',
                    'remaining_contract_award_amount',]

    categorical_cols = ['contract_type',
                        'department_code',
                        'project_team_lbe_status',
                        'scope_of_work',
                        'is_non_profit',
                        'is_sole_source',
                        'has_outstanding_orders',]

    indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_indexed") for col in categorical_cols]
    encoders = [OneHotEncoder(inputCol=f"{col}_indexed", outputCol=f"{col}_encoded") for col in categorical_cols]

    # Assemble Features
    assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features") #numeric_cols + [f"{col}_encoded" for col in categorical_cols]

    # Impute Missing Values for Numeric and Categorical Columns
    imputer_numeric = Imputer(strategy="mean", inputCols=numeric_cols,
                              outputCols=[f"{col}_imputed" for col in numeric_cols])
    imputer_categorical = Imputer(strategy="most_frequent", inputCols=[f"{col}_encoded" for col in categorical_cols], outputCols=[f"{col}_imputed" for col in categorical_cols])

    # Scaling Features
    scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

    # Random Forest Classifier
    rf = RandomForestClassifier(featuresCol="scaled_features", labelCol=label_col)

    # Create the Pipeline
    pipeline = Pipeline(stages=[assembler, imputer_numeric, scaler, rf])#indexers + encoders +

    # Fit the Pipeline on the Training Data
    pipeline_model = pipeline.fit(reference_data)

    predictions_reference = pipeline_model.transform(reference_data)

    current_dataa = current_data.filter(
            (col("start_date") >= test_dates[0]) & (col("start_date") <= test_dates[1]))

    print(f"Number of observations: {current_dataa.count()}")

    predictions_current = pipeline_model.transform(current_dataa)

    evaluator = BinaryClassificationEvaluator(labelCol=label_col,
                                              rawPredictionCol="prediction",
                                              metricName="areaUnderROC")

    auc_reference = evaluator.evaluate(predictions_reference)
    print(f"{model_name} AUC (reference): {auc_reference}")

    auc_current = evaluator.evaluate(predictions_current)
    print(f"{model_name} AUC (current): {auc_current}")

    mlflow.set_experiment_tag("model", model_name)

    mlflow.log_metric(f"{model_name}_AUC_reference", auc_reference)
    mlflow.log_metric(f"{model_name}_AUC_current", auc_current)

    mlflow.spark.log_model(pipeline_model,
                          artifact_path="spark-model",
                          registered_model_name=f"{model_name}_Model")


In [116]:
mlflow.set_tracking_uri("sqlite:///mlruns.db") # for local experiment, change to "http://127.0.0.1:5000" in vscode to access MLflow UI

EXPERIMENT_NAME = "supplier_contract_experiment"
mlflow.set_experiment(EXPERIMENT_NAME)

models = {
    #"Linear Regression": LinearRegression(),
    #"Random Forest Regressor": RandomForestRegressor(),
    #"GBT Regressor": GBTRegressor(),
    "Logistic_Regression": LogisticRegression(),
    "Random_Forest_Classifier": RandomForestClassifier(),
    "GBT_Classifier": GBTClassifier()

}

for model_name, model in models.items():
  with mlflow.start_run(run_name=model_name):
      print(f"Training and evaluating {model_name}")
      train_and_evaluate(model, model_name)



Training and evaluating Logistic_Regression
Batch: 0
Test dates: ('2009-02-17', '2009-03-19')
Number of observations: 1
Logistic_Regression AUC (reference): 0.8333333333333333
Logistic_Regression AUC (current): 0.0


KeyboardInterrupt: ignored

In [34]:
mlflow.search_runs(experiment_names=EXPERIMENT_NAME)

Unnamed: 0,run_id,experiment_id,status,artifact_uri,start_time,end_time,metrics.Random Forest Classifier_AUC,tags.mlflow.source.name,tags.mlflow.log-model.history,tags.mlflow.user,tags.mlflow.runName,tags.mlflow.source.type
0,c08de055cbe4438d836358dc24300830,1,FINISHED,/content/machine-learning-monitoring-with-evid...,2023-11-28 22:12:36.953000+00:00,2023-11-28 22:13:23.967000+00:00,0.5,/usr/local/lib/python3.10/dist-packages/colab_...,"[{""run_id"": ""c08de055cbe4438d836358dc24300830""...",root,Random Forest Classifier,LOCAL
1,ae41eb9f63514dc7bc88a9604c5e8ca7,1,FINISHED,/content/machine-learning-monitoring-with-evid...,2023-11-28 22:08:32.745000+00:00,2023-11-28 22:09:11.229000+00:00,0.5,/usr/local/lib/python3.10/dist-packages/colab_...,"[{""run_id"": ""ae41eb9f63514dc7bc88a9604c5e8ca7""...",root,Random Forest Classifier,LOCAL
2,a6b7b90173e4488d87625ca5a642982d,1,FAILED,/content/machine-learning-monitoring-with-evid...,2023-11-28 22:07:15.760000+00:00,2023-11-28 22:07:23.562000+00:00,0.5,/usr/local/lib/python3.10/dist-packages/colab_...,,root,Random Forest Classifier,LOCAL


In [None]:

columns = ['contract_awarded_amount', 'purchase_orders_outstanding',
           'payments_made', 'has_outstanding_orders',
           'scope_of_work', 'contract_type']

tests = TestSuite(tests=[
    TestNumberOfDriftedColumns(columns=columns),
    TestShareOfDriftedColumns(columns=columns),
])

tests.run(current_data=current_data,
          reference_data=reference_data,
          column_mapping=column_mapping,
          engine=SparkEngine)
tests.show(mode='inline')

## **Data Integrity**

In [None]:
from evidently.metrics import DatasetSummaryMetric

data_integrity_report = Report(metrics=[
    DatasetSummaryMetric()
])

data_integrity_report.run(current_data=current_data.toPandas(),
           reference_data=reference_data.toPandas())
data_integrity_report.show(mode="inline")

## **Data Quality**

In [161]:
from evidently.metric_preset import DataQualityPreset

data_quality_report = Report(
    metrics=[DataQualityPreset()]
)

data_quality_report.run(current_data=current_data.toPandas(),
                        reference_data=reference_data.toPandas())

data_quality_report.show(mode="inline")

{"version": "0.4.9", "metrics": [{"metric": "DatasetSummaryMetric", "result": {"almost_duplicated_threshold": 0.95, "current": {"target": null, "prediction": null, "date_column": null, "id_column": null, "number_of_columns": 28, "number_of_rows": 43, "number_of_missing_values": 40, "number_of_categorical_columns": 16, "number_of_numeric_columns": 10, "number_of_text_columns": 0, "number_of_datetime_columns": 2, "number_of_constant_columns": 1, "number_of_almost_constant_columns": 2, "number_of_duplicated_columns": 0, "number_of_almost_duplicated_columns": 0, "number_of_empty_rows": 0, "number_of_empty_columns": 0, "number_of_duplicated_rows": 0, "nans_by_columns": {"contract_title": 0, "term_start_date": 0, "term_end_date": 0, "contract_type": 0, "purchasing_authority": 0, "department_code": 0, "department": 0, "supplier_name_prime_contractor": 0, "supplier_name": 0, "project_team_lbe_status": 0, "supplier_type": 0, "scope_of_work": 0, "contract_awarded_amount": 0, "purchase_orders_out

GitHub

In [None]:
# create a new ssh key
! ssh-keygen -t rsa -b 4096
# add github.com to our known hosts
! ssh-keyscan -t rsa github.com >> ~/.ssh/known_hosts
# restrict the key permissions, or else SSH will complain.
! chmod go-rwx /root/.ssh/rsa

In [None]:
# copy the output and add it to Github
!cat /root/.ssh/id_rsa.pub

In [None]:
# replace with your email and user name that you uses in your Github account
!git config --global user.email ""
!git config --global user.name ""

In [None]:
# test your authentication
!ssh -T git@github.com

In [None]:
!git status

In [None]:
!git push origin main