# Crop Yield Prediction Pipeline in Vertex AI

This notebook demonstrates the deployment of a tabular model for crop yield prediction using Vertex AI.

In [None]:
# Clear pip cache and uninstall existing packages
%pip cache purge
%pip uninstall -y numpy pandas google-cloud-aiplatform kfp kfp-server-api kfp-pipeline-spec
%pip install numpy==1.24.3
%pip install pandas==2.1.4
%pip install "kfp==2.6.0" 
%pip install "google-cloud-aiplatform>=1.35.0"
%pip install "google-cloud-pipeline-components>=2.0.0"

In [14]:
import os
os.environ['PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION'] = 'python'

# Import required packages
from kfp import compiler
from kfp.dsl import component, Input, Output, Dataset, Artifact,Model,pipeline
from google.cloud import aiplatform
from google.cloud import storage
from google.auth import default
from datetime import datetime
from google.cloud.aiplatform import pipeline_jobs
import pandas as pd
import numpy as np
# Get default credentials and project
credentials, project_id = default()

# Configuration
REGION = "us-central1"
bucket_name = "agrifingcpflow-465809-bucket"
PIPELINE_ROOT = f"gs://{bucket_name}/pipeline_root"

In [16]:
def create_sample_data():
    """Create sample tabular dataset for crop yield prediction."""
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    
    # Create sample tabular data
    df = pd.DataFrame({
        'field_size': np.random.uniform(1, 100, 100),
        'temperature': np.random.normal(25, 5, 100),
        'rainfall': np.random.normal(50, 10, 100),
        'soil_quality': np.random.choice(['good', 'medium', 'poor'], 100),
        'yield': np.random.normal(75, 15, 100)
    })
    
    # Upload to GCS
    blob = bucket.blob('sample_tabular_data/farming_data.csv')
    blob.upload_from_string(df.to_csv(index=False))
    
    return f"gs://{bucket_name}/sample_tabular_data/farming_data.csv"

# Create the sample data and get the URI
tabular_uri = create_sample_data()
print(f"Created tabular dataset at: {tabular_uri}")

Created tabular dataset at: gs://agrifingcpflow-465809-bucket/sample_tabular_data/farming_data.csv


In [None]:
@component(
    packages_to_install=[
        'google-cloud-storage',
        'google-cloud-aiplatform',
        'pandas',
        'scikit-learn',
        'numpy'
    ]
)
def preprocess_data(
    tabular_data: str,
    bucket_name: str,
    project_id: str,
    region: str,
    tabular_dataset: Output[Dataset]
):

    """Preprocess tabular data for crop yield prediction."""
    from google.cloud import storage
    import pandas as pd
    from sklearn.preprocessing import LabelEncoder
    import logging
    
    # Configure logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    logger.info("Reading data from: %s", tabular_data)
    df = pd.read_csv(tabular_data)
    
    # Encode categorical variables
    le = LabelEncoder()
    df['soil_quality'] = le.fit_transform(df['soil_quality'])
    
    # Save processed data
    output_uri = f"gs://{bucket_name}/processed_data/farming_data_processed.csv"
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob('processed_data/farming_data_processed.csv')
    blob.upload_from_string(df.to_csv(index=False))
    
    logger.info("Saved processed data to: %s", output_uri)
    
    # Save to the KFP output location
    with open(tabular_dataset.path, 'w') as f:
        f.write(output_uri)


In [None]:
@component(
    packages_to_install=[
        'google-cloud-aiplatform',
        'google-cloud-storage'
    ]
)
def train_tabular_model(
    project_id: str,
    region: str,
    dataset: Input[Artifact],
    min_accuracy: float,
    model_info: Output[Model]
):

    """Train AutoML Tabular model for crop yield prediction."""
    from google.cloud import aiplatform
    import logging
    
    # Configure logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    # Initialize Vertex AI
    aiplatform.init(project=project_id, location=region)
    
    logger.info("Creating dataset from: %s", dataset)
    
    # Create dataset
    ai_dataset = aiplatform.TabularDataset.create(
        display_name="crop_tabular_dataset",
        gcs_source=dataset
    )
    
    logger.info("Training AutoML model")
    
    # Train model
    job = aiplatform.AutoMLTabularTrainingJob(
        display_name="crop_tabular_model",
        optimization_prediction_type="regression",
        optimization_objective="minimize-rmse"
    )
    
    model = job.run(
        dataset=ai_dataset,
        target_column="yield",
        budget_milli_node_hours=1000,  # ~1 hour
        model_display_name="crop_tabular_model",
        training_fraction_split=0.8,
        validation_fraction_split=0.1,
        test_fraction_split=0.1
    )
    
    # Evaluate model
    eval_metrics = model.get_model_evaluation()
    rmse = eval_metrics.metrics['rmse']
    logger.info("Model RMSE: %f", rmse)
    
    if rmse > min_accuracy:
        raise ValueError(f"Model RMSE {rmse} above threshold {min_accuracy}")
    
    # Save model info
    model_info_dict = {
        'resource_name': model.resource_name,
        'rmse': float(rmse)
    }
    
    with open(model_info.path, 'w') as f:
        f.write(model.resource_name)
    
    return model_info_dict

In [6]:
@component(
    packages_to_install=[
        'google-cloud-aiplatform==1.104.0'
    ]
)
def deploy_model(
    project_id: str,
    region: str,
    model: Input[Model],
    endpoint_info: Output[Artifact]
):


    """Deploy the trained model to an endpoint."""
    from google.cloud import aiplatform
    import logging
    import json
    
    # Configure logging
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)
    
    # Initialize Vertex AI
    aiplatform.init(project=project_id, location=region)
    
    # Get the model resource name from the artifact
    with open(model.path, 'r') as f:
        model_resource_name = f.read().strip()
    
    # Get the model
    model = aiplatform.Model(model_resource_name)
    
    # Deploy the model
    endpoint = model.deploy(
        machine_type="n1-standard-4",
        min_replica_count=1,
        max_replica_count=1
    )
    
    endpoint_info_dict = {
        'endpoint_name': endpoint.resource_name,
        'display_name': endpoint.display_name
    }
    
    # Save endpoint info
    with open(endpoint_info.path, 'w') as f:
        json.dump(endpoint_info_dict, f)
    
    return endpoint_info_dict


  return component_factory.create_component_from_func(


In [10]:
@pipeline(
    name='Crop Yield Prediction Pipeline',
    description='Pipeline for agricultural yield prediction using tabular data'
)
def crop_prediction_pipeline(
    project_id: str,
    region: str,
    bucket_name: str,
    tabular_dataset_uri: str,
    min_accuracy: float = 0.8
):
    # Preprocess data
    preprocess_task = preprocess_data(
        tabular_data=tabular_dataset_uri,
        bucket_name=bucket_name,
        project_id=project_id,
        region=region
    )

    # Train tabular model
    train_tabular_task = train_tabular_model(
        project_id=project_id,
        region=region,
        dataset=preprocess_task.outputs['tabular_dataset'],
        min_accuracy=min_accuracy
    )
    train_tabular_task.after(preprocess_task)

    # Deploy model
    deploy_task = deploy_model(
        project_id=project_id,
        region=region,
        model=train_tabular_task.outputs['model_info']
    )
    deploy_task.after(train_tabular_task)

In [17]:
# Check GCP setup
print(f"Current Project ID: {project_id}")
print(f"Current Region: {REGION}")
print("Authenticated as:", credentials.service_account_email if hasattr(credentials, 'service_account_email') else "User Account")

# Test GCP API access
storage_client = storage.Client()
try:
    buckets = list(storage_client.list_buckets(max_results=1))
    print("✅ Storage API access successful")
except Exception as e:
    print("❌ Storage API access failed:", str(e))

# Initialize Vertex AI
aiplatform.init(
    project=project_id,
    location=REGION,
    credentials=credentials
)

# Compile pipeline
compiler.Compiler().compile(
    pipeline_func=crop_prediction_pipeline,
    package_path='pipeline.yaml'
)

# Create and run pipeline job
job = pipeline_jobs.PipelineJob(
    display_name='crop-yield-prediction-pipeline',
    template_path='pipeline.yaml',
    pipeline_root=PIPELINE_ROOT,
    parameter_values={
        'project_id': project_id,
        'region': REGION,
        'bucket_name': bucket_name,
        'tabular_dataset_uri': tabular_uri,
        'min_accuracy': 0.8
    }
)

job.submit()

Current Project ID: agrifingcpflow-465809
Current Region: us-central1
Authenticated as: agrifin-service-account@agrifingcpflow-465809.iam.gserviceaccount.com
✅ Storage API access successful
Creating PipelineJob


InvalidArgument: 400 You do not have permission to act as service_account: 681123709451-compute@developer.gserviceaccount.com. (or it may not exist).