In [111]:
import json
import os
import pickle

import pandas
from prefect import flow, task
from pymongo import MongoClient
import pyarrow.parquet as pq

from evidently import ColumnMapping

from evidently.dashboard import Dashboard
from evidently.dashboard.tabs import DataDriftTab,RegressionPerformanceTab,ClassificationPerformanceTab

from evidently.model_profile import Profile
from evidently.model_profile.sections import DataDriftProfileSection, RegressionPerformanceProfileSection,ClassificationPerformanceProfileSection


from this import d
import pandas as pd
import pickle
import os

from sklearn.metrics import accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import mean_squared_error

from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

import xgboost as xgb

from hyperopt import fmin, tpe, hp, STATUS_OK, Trials,space_eval
from hyperopt.pyll import scope

import mlflow

from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner



RUN_ID="f1ba7bfd01624384bb4f2ac6f2741d98"
logged_model = f's3://jai-mlops-data/1/{RUN_ID}/artifacts/models_mlflow'
logged_artifact=f'1/{RUN_ID}/artifacts/encoder/'
model = mlflow.pyfunc.load_model(logged_model)



In [112]:
@task
def fetch_data():
    client = MongoClient("mongodb://localhost:27018/")
    data = client.get_database("prediction_service").get_collection("credit_card").find()
    df = pandas.DataFrame(list(data))
    return df

In [113]:
@task
def upload_target(filename):
    df=pd.read_csv(filename)
    client = MongoClient("mongodb://localhost:27018/")
    collection = client.get_database("prediction_service").get_collection("credit_card")
    collection.insert_many(df.to_dict('records'))
    client.close()

In [114]:
@task
def load_reference_data(filename):
    df=pd.read_csv(filename)
    reference_data= df.drop(['Unnamed: 0.1', 'Unnamed: 0','target'], axis=1)
    #print(df.columns)
    df['prediction'] = model.predict(pd.DataFrame(reference_data))
    print(df.prediction)
    return df

In [115]:
@task
def save_report(result):
    client = MongoClient("mongodb://localhost:27018/")
    client.get_database("prediction_service").get_collection("report_creditcard").insert_one(result[0])


@task
def save_html_report(result):
    result[1].save("evidently_report_credit_card_card.html")

In [116]:
@flow
def batch_analyze(data_path):
    upload_target("batch_monitoring_data/target.csv")
    ref_data = load_reference_data("batch_monitoring_data/refrence_data.csv")
    data = fetch_data()
    result = run_evidently(ref_data, data)
    save_report(result)
    save_html_report(result)
 
batch_analyze(data_path="../output/")

09:17:58.248 | INFO    | prefect.engine - Created flow run 'optimal-butterfly' for flow 'batch-analyze'
09:17:58.249 | INFO    | Flow run 'optimal-butterfly' - Using task runner 'ConcurrentTaskRunner'
09:17:58.322 | INFO    | Flow run 'optimal-butterfly' - Created task run 'upload_target-87ba957f-0' for task 'upload_target'
09:17:58.369 | INFO    | Flow run 'optimal-butterfly' - Created task run 'load_reference_data-21f5ee13-0' for task 'load_reference_data'
09:17:58.496 | INFO    | Flow run 'optimal-butterfly' - Created task run 'fetch_data-bfb2ad7b-0' for task 'fetch_data'
09:17:58.502 | ERROR   | Flow run 'optimal-butterfly' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/envs/mlops5/lib/python3.8/site-packages/prefect/engine.py", line 468, in orchestrate_flow_run
    result = await run_sync_in_worker_thread(flow_call)
  File "/home/ubuntu/anaconda3/envs/mlops5/lib/python3.8/site-packages/prefect/utilities/asyncio.py", line

0       Existing Customer
1       Attrited Customer
2       Existing Customer
3       Existing Customer
4       Existing Customer
              ...        
1008    Existing Customer
1009    Existing Customer
1010    Existing Customer
1011    Existing Customer
1012    Existing Customer
Name: prediction, Length: 1013, dtype: object


09:17:58.914 | INFO    | Task run 'upload_target-87ba957f-0' - Finished in state Completed()
09:18:00.185 | INFO    | Task run 'fetch_data-bfb2ad7b-0' - Finished in state Completed()
09:18:00.186 | ERROR   | Flow run 'optimal-butterfly' - Finished in state Failed('Flow run encountered an exception.')


Failed(message='Flow run encountered an exception.', type=FAILED, result=AttributeError("'PrefectFuture' object has no attribute 'copy'"), flow_run_id=937b5892-bfff-4409-9fa6-5ab053288ff2)

In [93]:
@flow
def run_evidently(ref_data, data):

    numerical_features=["Customer_Age","Dependent_count","Months_on_book","Total_Relationship_Count",\
        "Months_Inactive_12_mon","Contacts_Count_12_mon","Credit_Limit","Total_Revolving_Bal",\
            "Avg_Open_To_Buy","Total_Amt_Chng_Q4_Q1","Total_Trans_Amt","Total_Trans_Ct","Total_Ct_Chng_Q4_Q1"]
    categorical_features=["Avg_Utilization_Ratio","Gender_n","Education_Level_n",\
        "Marital_Status_n","Income_Category_n","Card_Category_n"]

    profile = Profile(sections=[DataDriftProfileSection(), ClassificationPerformanceProfileSection()])
    mapping = ColumnMapping(prediction="prediction", numerical_features=numerical_features,
                            categorical_features=categorical_features,
                            datetime_features=[])
    profile.calculate(ref_data, data, mapping)

    dashboard = Dashboard(tabs=[DataDriftTab(), ClassificationPerformanceTab(verbose_level=0)])
    dashboard.calculate(ref_data, data, mapping)
    return json.loads(profile.json()), dashboard


In [120]:
upload_target("batch_monitoring_data/target1.csv")
ref_data = load_reference_data("batch_monitoring_data/refrence_data.csv")
data = fetch_data()

ref_data.drop(['Unnamed: 0.1', 'Unnamed: 0'], axis=1, inplace=True)
data.drop(['Unnamed: 0.1', 'Unnamed: 0'], axis=1, inplace=True)

print("ref_data.columns",ref_data.columns)
print("data.columns",ref_data.prediction)

result = run_evidently(ref_data, data)
#run_evidently(ref_data, data)

save_report(result)
save_html_report(result)

ref_data.columns Index(['Customer_Age', 'Dependent_count', 'Months_on_book',
       'Total_Relationship_Count', 'Months_Inactive_12_mon',
       'Contacts_Count_12_mon', 'Credit_Limit', 'Total_Revolving_Bal',
       'Avg_Open_To_Buy', 'Total_Amt_Chng_Q4_Q1', 'Total_Trans_Amt',
       'Total_Trans_Ct', 'Total_Ct_Chng_Q4_Q1', 'Avg_Utilization_Ratio',
       'Gender_n', 'Education_Level_n', 'Marital_Status_n',
       'Income_Category_n', 'Card_Category_n', 'target', 'prediction'],
      dtype='object')
data.columns 0       Existing Customer
1       Attrited Customer
2       Existing Customer
3       Existing Customer
4       Existing Customer
              ...        
1008    Existing Customer
1009    Existing Customer
1010    Existing Customer
1011    Existing Customer
1012    Existing Customer
Name: prediction, Length: 1013, dtype: object


RuntimeError: Tasks cannot be called outside of a flow.

In [None]:
Avg_Utilization_Ratio,Gender_n,Education_Level_n,Marital_Status_n,Income_Category_n,Card_Category_n,target

NameError: name 'Avg_Utilization_Ratio' is not defined

In [117]:

def load_reference_data(filename):
    df=pd.read_csv(filename)
    reference_data= df.drop(['Unnamed: 0.1', 'Unnamed: 0','target'], axis=1)
    #print(df.columns)
    df['prediction'] = model.predict(pd.DataFrame(reference_data))
    return df

In [109]:
data = load_reference_data("batch_monitoring_data/target.csv")
data.to_csv("batch_monitoring_data/target1.csv")


In [None]:
def load_pickle(filename):
    with open(filename, "rb") as f_in:
        return pickle.load(f_in)

X_valid, y_valid = load_pickle(os.path.join("../output/", "valid.pkl"))
X_test, y_test = load_pickle(os.path.join("../output/", "test.pkl"))

In [None]:
X_valid["target"]=y_valid
X_valid.to_csv("batch_monitoring_data/target.csv")


In [None]:
X_test["target"]=y_test
X_test.to_csv("batch_monitoring_data/refrence_data.csv")

In [118]:
def upload_target(filename):
    df=pd.read_csv(filename)
    client = MongoClient("mongodb://localhost:27018/")
    collection = client.get_database("prediction_service").get_collection("credit_card")
    collection.insert_many(df.to_dict('records'))
    client.close()

In [None]:
upload_target("batch_monitoring_data/refrence_data.csv")


In [119]:
def fetch_data():
    client = MongoClient("mongodb://localhost:27018/")
    data = client.get_database("prediction_service").get_collection("credit_card").find()
    df = pandas.DataFrame(list(data))
    return df

In [None]:
fetch_data()

Unnamed: 0.2,_id,Unnamed: 0.1,Unnamed: 0,Customer_Age,Dependent_count,Months_on_book,Total_Relationship_Count,Months_Inactive_12_mon,Contacts_Count_12_mon,Credit_Limit,...,Total_Trans_Amt,Total_Trans_Ct,Total_Ct_Chng_Q4_Q1,Avg_Utilization_Ratio,Gender_n,Education_Level_n,Marital_Status_n,Income_Category_n,Card_Category_n,target
0,62fdde8d659abc8f990b0b5d,7215,7215,57,3,36,5,1,1,1678.0,...,4402,87,0.851,0.692,0,1,2,5,0,Existing Customer
1,62fdde8d659abc8f990b0b5e,2284,2284,26,0,13,1,2,4,6152.0,...,968,23,0.533,0.000,0,3,2,5,0,Attrited Customer
2,62fdde8d659abc8f990b0b5f,4968,4968,47,3,43,4,2,4,5062.0,...,4535,68,0.417,0.281,0,2,2,4,0,Existing Customer
3,62fdde8d659abc8f990b0b60,5308,5308,47,3,27,3,3,0,1451.0,...,3866,72,0.714,0.000,0,2,1,4,0,Existing Customer
4,62fdde8d659abc8f990b0b61,9713,9713,53,1,43,2,2,2,3629.0,...,14205,102,0.759,0.694,0,6,3,4,0,Existing Customer
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2021,62fddee9659abc8f990b1344,8134,8134,51,1,36,2,1,1,1438.3,...,4425,81,1.077,0.325,0,6,1,1,0,Existing Customer
2022,62fddee9659abc8f990b1345,6377,6377,42,4,19,5,3,2,8374.0,...,4493,59,1.034,0.000,1,2,1,1,0,Existing Customer
2023,62fddee9659abc8f990b1346,3619,3619,48,1,43,5,3,3,7199.0,...,4041,73,0.698,0.143,1,2,3,4,0,Existing Customer
2024,62fddee9659abc8f990b1347,8262,8262,44,3,32,2,2,1,1438.3,...,4687,85,0.932,0.622,0,5,2,4,0,Existing Customer
