In [28]:
import kfp
from kfp import dsl
from kfp.dsl import *
from typing import *


In [29]:
@dsl.component(
    packages_to_install=["scikit-learn", "pandas", "numpy", "minio"],
    base_image="python:3.8"
)
def data_preparation(storage_bucket: str, data_path: str) -> str:
    from typing import NamedTuple
    import pandas as pd
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler, LabelEncoder
    import joblib
    import numpy as np
    from minio import Minio
    from minio.error import S3Error

    data_path = f"{data_path}/1"

    minio_client = Minio(
        "172.20.16.117:9000",
        access_key="pTNMJ884sHchwenM2yOE",
        secret_key="Vp97YHJRnHjgiOt492rWIKjJgzC5An3RfZK0VJ10",
        secure=False
    )

    # Create a synthetic dataset
    data = {
        'crop_name': ['wheat', 'rice', 'maize', 'wheat', 'rice', 'maize', 
                      'wheat', 'rice', 'maize', 'wheat', 'rice', 'maize'],
        'temperature': [20, 25, 22, 21, 24, 23, 19, 26, 21, 20, 25, 22],
        'humidity': [30, 50, 45, 32, 48, 47, 31, 52, 44, 30, 50, 45],
        'soil_moisture': [40, 60, 55, 42, 58, 57, 41, 62, 54, 40, 60, 55],
        'disease_risk': ['low', 'high', 'medium', 'low', 'high', 'medium', 
                         'low', 'high', 'medium', 'low', 'high', 'medium']
    }

    df = pd.DataFrame(data)

    # Encode categorical variable for crop_name
    crop_label_encoder = LabelEncoder()
    df['crop_name'] = crop_label_encoder.fit_transform(df['crop_name'])

    # Encode target variable
    risk_label_encoder = LabelEncoder()
    df['disease_risk'] = risk_label_encoder.fit_transform(df['disease_risk'])

    # Features and target variable
    X = df[['crop_name', 'temperature', 'humidity', 'soil_moisture']]
    y = df['disease_risk']

    # Split data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    # Standardize features
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)

    # Save label encoders and training data
    joblib.dump(crop_label_encoder, '/tmp/crop_label_encoder.pkl')
    joblib.dump(risk_label_encoder, '/tmp/risk_label_encoder.pkl')
    joblib.dump(scaler, '/tmp/scaler.pkl')
    np.save('/tmp/X_train.npy', X_train)
    np.save('/tmp/X_test.npy', X_test)
    np.save('/tmp/y_train.npy', y_train)
    np.save('/tmp/y_test.npy', y_test)

    # Files to upload
    files = {
        "X_train.npy": "/tmp/X_train.npy",
        "X_test.npy": "/tmp/X_test.npy",
        "y_train.npy": "/tmp/y_train.npy",
        "y_test.npy": "/tmp/y_test.npy",
        "crop_label_encoder.pkl": "/tmp/crop_label_encoder.pkl",
        "risk_label_encoder.pkl": "/tmp/risk_label_encoder.pkl",
        "scaler.pkl": "/tmp/scaler.pkl"
    }

    # Upload files to MinIO
    try:
        for artifact_name, file_path in files.items():
            minio_client.fput_object(storage_bucket, f"{data_path}/{artifact_name}", file_path)
            print(f"{artifact_name} object successfully uploaded.")
    except S3Error as err:
        print(f"Error occurred: {err}")

    return data_path

In [30]:
@dsl.component(
    packages_to_install=["scikit-learn", "pandas", "numpy", "minio"],
    base_image="python:3.8"
)
def model_building_training(storage_bucket: str, data_path: str) -> str:
    import numpy as np
    from sklearn.ensemble import RandomForestClassifier
    import joblib
    from minio import Minio

    # Initialize Minio client
    minio_client = Minio(
        "172.20.16.117:9000",
        access_key="pTNMJ884sHchwenM2yOE",
        secret_key="Vp97YHJRnHjgiOt492rWIKjJgzC5An3RfZK0VJ10",
        secure=False
    )

    # Load data from Minio
    minio_client.fget_object(storage_bucket, f"{data_path}/X_train.npy", "/tmp/X_train.npy")
    X_train = np.load("/tmp/X_train.npy")

    minio_client.fget_object(storage_bucket, f"{data_path}/y_train.npy", "/tmp/y_train.npy")
    y_train = np.load("/tmp/y_train.npy")

    # Initialize the model
    model = RandomForestClassifier(n_estimators=100, random_state=42)

    # Train the model
    model.fit(X_train, y_train)

    # Save the model
    model_path = '/tmp/model.pkl'
    joblib.dump(model, model_path)

    # Upload the model to MinIO
    minio_client.fput_object(storage_bucket, f"{data_path}/model.pkl", model_path)

    return data_path

In [31]:
@dsl.component(
    packages_to_install=["scikit-learn", "pandas", "numpy", "minio"],
    base_image="python:3.8"
)
def predict(
    crop_name: str, 
    temperature: float, 
    humidity: float, 
    soil_moisture: float,
    storage_bucket: str,
    data_path: str
) -> str:
    import joblib
    import numpy as np
    from minio import Minio

    # Initialize MinIO client
    minio_client = Minio(
        "172.20.16.117:9000",
        access_key="pTNMJ884sHchwenM2yOE",
        secret_key="Vp97YHJRnHjgiOt492rWIKjJgzC5An3RfZK0VJ10",
        secure=False
    )

    # Load data from Minio - model, scaler, and encoders
    minio_client.fget_object(storage_bucket, f"{data_path}/scaler.pkl", "/tmp/scaler.pkl")
    scaler = joblib.load("/tmp/scaler.pkl")

    minio_client.fget_object(storage_bucket, f"{data_path}/crop_label_encoder.pkl", "/tmp/crop_label_encoder.pkl")
    crop_label_encoder = joblib.load("/tmp/crop_label_encoder.pkl")

    minio_client.fget_object(storage_bucket, f"{data_path}/risk_label_encoder.pkl", "/tmp/risk_label_encoder.pkl")
    risk_label_encoder = joblib.load("/tmp/risk_label_encoder.pkl")

    minio_client.fget_object(storage_bucket, f"{data_path}/model.pkl", "/tmp/model.pkl")
    model = joblib.load("/tmp/model.pkl")

    # Encode the crop_name
    crop_name_encoded = crop_label_encoder.transform([crop_name])[0]

    # Prepare the feature vector
    features = np.array([[crop_name_encoded, temperature, humidity, soil_moisture]])
    features_scaled = scaler.transform(features)

    # Predict the disease risk
    risk_encoded = model.predict(features_scaled)[0]

    # Decode the risk
    risk = risk_label_encoder.inverse_transform([risk_encoded])[0]

    return f'The disease risk for {crop_name} is {risk}.'

In [32]:


@dsl.pipeline(
    name='disease-risk-pipeline',
    description='Crop disease risk prediction'
)
def disease_risk_pipeline(   
    crop_name: str, 
    temperature: float, 
    humidity: float, 
    soil_moisture: float,
) -> str:
    storage_bucket = f"kubeflow-pipelines"
    data_path = data_preparation(
        storage_bucket=storage_bucket, 
        data_path=f"disease-risk"
    )
    data_path = model_building_training(
        storage_bucket=storage_bucket, 
        data_path=data_path.output
    )
    prediction = predict(
        crop_name=crop_name,
        temperature=temperature,
        humidity=humidity,
        soil_moisture=soil_moisture,
        storage_bucket=storage_bucket,
        data_path=data_path.output
    )
    return prediction.output
    


from kfp import compiler
compiler.Compiler().compile(disease_risk_pipeline, 'disease_risk_pipeline.yaml')