# Plant Disease Detection Application

This notebook implements a plant disease detection system using Google's Gemini model for image analysis. The application provides both a FastAPI backend and a Gradio web interface for easy interaction.

## Features
- Analyzes plant images to detect diseases and pests
- Provides confidence scores and treatment recommendations
- Draws bounding boxes around detected issues
- Offers both API endpoints and a user-friendly web interface

## Requirements
- Google Gemini API key
- Python packages: fastapi, uvicorn, gradio, google-generativeai, PIL, OpenCV, NumPy

## Setup and Imports

First, let's import all the necessary libraries and configure our environment.

In [1]:
import os
import io
import uuid
import json
import base64
import numpy as np
import cv2
from PIL import Image
import uvicorn
import gradio as gr
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse
import google.generativeai as genai
import httpx
import asyncio

ModuleNotFoundError: No module named 'uvicorn'

## Configure Google Gemini API

To use the Gemini model, we need to set up our API key. For security in a notebook environment, we'll use environment variables or allow for manual input.

In [None]:
# Option 1: Get API key from environment variable
API_KEY = os.getenv("GEMINI_API_KEY")

# Option 2: Set API key manually if not found in environment (for notebook use)
if not API_KEY:
    # Uncomment and set your API key for notebook use
    # API_KEY = "your_api_key_here"  # Replace with your actual API key
    pass

# Ensure we have an API key
if not API_KEY:
    raise ValueError("Missing GEMINI_API_KEY. Please set it as an environment variable or directly in the notebook.")

# Configure Gemini with our API key
genai.configure(api_key=API_KEY)

# Initialize the Gemini model
LLM_MODEL = "gemini-2.0-flash-lite"  # Using the flash-lite model for faster response
model = genai.GenerativeModel(LLM_MODEL)

print(f"Gemini model '{LLM_MODEL}' initialized successfully!")

## Define HTML Content for Homepage

In the original code, the HTML content is imported from another file. For our notebook, we'll define it directly.

In [None]:
# Simple HTML for the homepage
HOME_HTML = """
<!DOCTYPE html>
<html>
    <head>
        <title>Plant Disease Detection API</title>
        <style>
            body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }
            h1 { color: #2e7d32; }
            .endpoint { background: #f5f5f5; padding: 15px; border-radius: 5px; margin-bottom: 15px; }
            code { background: #e0e0e0; padding: 2px 5px; border-radius: 3px; }
        </style>
    </head>
    <body>
        <h1>Plant Disease Detection API</h1>
        <p>This API analyzes plant images to detect diseases and pests using the Google Gemini AI model.</p>
        
        <h2>Available Endpoints:</h2>
        
        <div class="endpoint">
            <h3>POST /detect</h3>
            <p>Upload an image for analysis.</p>
            <p>Example curl request:</p>
            <code>curl -X POST -F "file=@plant.jpg" https://your-api-url.com/detect</code>
        </div>
        
        <div class="endpoint">
            <h3>GET /gradio</h3>
            <p>Access the web interface for easier interaction.</p>
        </div>
        
        <div class="endpoint">
            <h3>GET /health</h3>
            <p>Check if the API is running properly.</p>
        </div>
        
        <p>For more information, visit the <a href="/gradio">Gradio interface</a>.</p>
    </body>
</html>
"""

## Core Functions for Image Analysis

Now let's define the key functions that power our plant disease detection system.

In [None]:
def analyze_image(image_data):
    """
    Use Google's Gemini model to analyze the image for plant diseases or pests.
    
    Args:
        image_data: Image as bytes or PIL Image
        
    Returns:
        dict: Analysis results in JSON format
    """
    try:
        # Convert to PIL Image if it's not already
        if not isinstance(image_data, Image.Image):
            img = Image.open(io.BytesIO(image_data))
        else:
            img = image_data
        
        # Prepare prompt for Gemini - this instructs the model on what to look for and how to format the response
        prompt = """
        Analyze this plant image and identify if there are any diseases or pests present.
        If you find any issues, provide the following information in JSON format:
        1. Disease/pest name
        2. Confidence level (0-100)
        3. Brief description
        4. Recommendation for treatment
        5. Bounding box coordinates [x_min, y_min, x_width, y_height] as percentages of image dimensions
        6. If you detect the main object is not plants or pest issues, set valid as false
        7. If the image is blurred, set blurred as true else false
        
        Return a valid JSON with these fields:
        {
            "detected": true/false,
            "valid": true/false,
            "blurred": true/false,
            "obj": "plant"/"pest"/"what it is actually",
            "issues": [
                {
                    "type": "disease" or "pest",
                    "name": "name of disease/pest",
                    "confidence": 0-100,
                    "description": "brief description",
                    "treatment": "treatment recommendation",
                    "bbox": [x_min, y_min, x_width, y_height]
                }
            ]
        }

        Note: Here "obj" attribute means what it is actually like soil, plastic, human or anything it is. 
        If find multiple objects set the value as an array. Don't return "others" directly return the data or info.
        """
        
        # Generate content with Gemini by sending both the prompt and the image
        response = model.generate_content([prompt, img])
        
        # Extract text from the response
        response_text = response.text
        
        # Find JSON in the response (sometimes Gemini includes additional text before/after the JSON)
        json_start = response_text.find('{')
        json_end = response_text.rfind('}') + 1
        
        if json_start != -1 and json_end != -1:
            # Extract just the JSON part
            json_str = response_text[json_start:json_end]
            result = json.loads(json_str)  # Parse the JSON string into a Python dictionary
        else:
            # Fallback if no proper JSON is found
            result = {"detected": False, "error": "Could not parse response"}
        
        return result
    
    except Exception as e:
        print(f"Error analyzing image: {str(e)}")
        return {"detected": False, "error": str(e)}

In [None]:
def draw_bounding_boxes(image_data, issues):
    """
    Draw bounding boxes on the image based on the detected issues.
    
    Args:
        image_data: Image as bytes, PIL Image, or numpy array
        issues: List of detected issues with bounding boxes
        
    Returns:
        PIL.Image: Image with bounding boxes drawn
    """
    # Convert to numpy array if it's not already - OpenCV works with numpy arrays
    if isinstance(image_data, Image.Image):
        image_data = np.array(image_data)
    elif isinstance(image_data, bytes):
        # Convert bytes to numpy array
        nparr = np.frombuffer(image_data, np.uint8)
        image_data = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
        # Convert BGR to RGB (OpenCV loads as BGR, but PIL uses RGB)
        image_data = cv2.cvtColor(image_data, cv2.COLOR_BGR2RGB)
    
    # Get image dimensions
    h, w = image_data.shape[:2]
    
    # Process each detected issue
    for issue in issues:
        if "bbox" in issue:
            # Extract bounding box coordinates (given as percentages)
            bbox = issue["bbox"]
            # Convert percentages to pixel coordinates
            x_min = int(bbox[0] * w / 100)
            y_min = int(bbox[1] * h / 100)
            x_width = int(bbox[2] * w / 100)
            y_height = int(bbox[3] * h / 100)
            
            # Define color based on issue type (red for disease, blue for pest)
            # Using RGB format: (255, 0, 0) for red, (0, 0, 255) for blue
            color = (255, 0, 0) if issue["type"] == "disease" else (0, 0, 255)
            
            # Draw rectangle around the detected issue
            cv2.rectangle(image_data, (x_min, y_min), (x_min + x_width, y_min + y_height), color, 2)
            
            # Add label with name and confidence score
            label = f"{issue['name']} ({issue['confidence']}%)"
            cv2.putText(image_data, label, (x_min, y_min - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
    
    # Convert back to PIL Image for easier handling in Python
    return Image.fromarray(image_data)

In [None]:
def image_to_base64(image_data):
    """
    Convert image data to base64 string for web display.
    
    Args:
        image_data: Image as bytes or PIL Image
        
    Returns:
        str: Base64 encoded image string with data URL prefix
    """
    # Handle different input types
    if isinstance(image_data, Image.Image):
        # Convert PIL Image to bytes
        buffered = io.BytesIO()
        image_data.save(buffered, format="JPEG")
        img_bytes = buffered.getvalue()
    else:
        # Already in bytes format
        img_bytes = image_data
    
    # Encode bytes as base64 string
    base64_encoded = base64.b64encode(img_bytes).decode("utf-8")
    
    # Return with data URL prefix for embedding in HTML/CSS
    return f"data:image/jpeg;base64,{base64_encoded}"

In [None]:
def process_image_for_gradio(image):
    """
    Process an image for the Gradio interface.
    
    Args:
        image: PIL Image uploaded through Gradio
        
    Returns:
        tuple: (processed_image, result_text)
    """
    # Check if an image was provided
    if image is None:
        return None, "No image provided"
    
    # Analyze the image using our Gemini-powered function
    analysis_result = analyze_image(image)
    
    # Process image with bounding boxes if issues were detected
    if analysis_result.get("detected", False) and "issues" in analysis_result and analysis_result["issues"]:
        # Draw bounding boxes around detected issues
        processed_image = draw_bounding_boxes(image, analysis_result["issues"])
    else:
        # If no issues detected, return the original image
        processed_image = image
    
    # Format the results as pretty-printed JSON for display
    result_text = json.dumps(analysis_result, indent=2)
    
    return processed_image, result_text

## Setup FastAPI Application

Now we'll create our FastAPI application with all the necessary endpoints and middleware.

In [None]:
# Create FastAPI app with title
app = FastAPI(title="Plant Disease Detection")

# Configure CORS (Cross-Origin Resource Sharing) to allow requests from any origin
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # Allows all origins
    allow_credentials=True,
    allow_methods=["*"],  # Allows all methods
    allow_headers=["*"],  # Allows all headers
)

## Gradio Interface Setup

Next, let's create a user-friendly Gradio interface for our application.

In [None]:
# Create Gradio interface for user-friendly interaction
iface = gr.Interface(
    # The function to call when an image is uploaded
    fn=process_image_for_gradio,
    # Input component - an image uploader
    inputs=gr.Image(type="pil", label="Upload Plant Image"),
    # Output components - the processed image and analysis results
    outputs=[
        gr.Image(label="Processed Image"),
        gr.JSON(label="Analysis Results")
    ],
    title="Plant Disease and Pest Detection",
    description="Upload an image of a plant to detect potential diseases or pests.",
    theme="default"
)

## Server Auto-Ping Function

This function keeps the server alive by periodically sending requests to itself. Useful for deployed environments that may sleep after periods of inactivity.

In [None]:
async def ping_self():
    """Periodically ping the server to prevent it from sleeping (useful for free hosting services)"""
    async with httpx.AsyncClient() as client:
        while True:
            try:
                # Try to get the base URL from environment variable
                base_url = os.getenv("BASE_URL")
                if base_url:
                    await client.get(base_url)
            except:
                # Silently ignore errors
                pass
            # Wait 5 minutes before pinging again
            await asyncio.sleep(300)

In [None]:
@app.on_event("startup")
async def startup_event():
    """Function that runs when the server starts up"""
    # Start the ping task in the background
    asyncio.create_task(ping_self())

## FastAPI Endpoints

Now let's define all our API endpoints.

In [None]:
# FastAPI endpoint for home page
@app.get("/", response_class=HTMLResponse)
def home():
    """
    Root endpoint for the API returning the HTML homepage.
    """
    return HOME_HTML

In [None]:
# FastAPI endpoint for Gradio web UI
@app.get("/gradio", response_class=HTMLResponse)
def gradio_redirect():
    """
    Redirect page to the Gradio interface.
    """
    return """
    <html>
        <head>
            <meta http-equiv="refresh" content="0;url=/gradio" />
        </head>
        <body>
            <p>Redirecting to Gradio interface...</p>
        </body>
    </html>
    """

In [None]:
# FastAPI endpoint for direct image upload and analysis
@app.post("/detect")
async def detect_plant_issues(file: UploadFile = File(...)):
    """
    Endpoint to detect plant diseases or pests in an uploaded image.
    
    Args:
        file: Uploaded image file
        
    Returns:
        dict: Analysis results with processed image as base64
    """
    # Validate that the uploaded file is an image
    if not file.content_type.startswith("image/"):
        raise HTTPException(status_code=400, detail="File must be an image")
    
    # Read the image file
    image_data = await file.read()
    
    # Analyze image with Gemini
    analysis_result = analyze_image(image_data)
    
    # Generate response with image if issues detected
    if analysis_result.get("detected", False) and "issues" in analysis_result:
        # Draw bounding boxes on the image
        img = Image.open(io.BytesIO(image_data))
        processed_image = draw_bounding_boxes(img, analysis_result["issues"])
        
        # Convert to base64 for sending in response
        base64_image = image_to_base64(processed_image)
    else:
        # If no issues detected, return the original image
        base64_image = image_to_base64(image_data)
    
    # Add the processed image to the response
    analysis_result["processed_image"] = base64_image
    
    # Generate a unique ID for this analysis
    analysis_result["id"] = str(uuid.uuid4())
    
    return analysis_result

In [None]:
@app.get("/health")
async def health_check():
    """
    Health check endpoint to verify the API is running.
    """
    return {
        "status": "Healthy", 
        "model": LLM_MODEL,
        "api_key": bool(API_KEY)  # Just returns True/False to verify API key exists without exposing it
    }

## Mount Gradio App to FastAPI

Now we'll mount the Gradio interface to our FastAPI application.

In [None]:
# Mount Gradio app to FastAPI
app.mount("/", gr.routes.App(iface))

## Run the Server

Finally, we'll define a function to run the server and execute it if this notebook is run directly.

In [None]:
def main():
    """Run the server"""
    # Get port from environment variable, default to 7860 (common for Gradio apps)
    port = int(os.environ.get('PORT', 7860))
    
    # Start the uvicorn server
    uvicorn.run(
        app,
        host="0.0.0.0",  # Make server accessible from any IP
        port=port
    )

In [None]:
# Run the server if this notebook is executed directly
if __name__ == "__main__":
    main()