# evaluation

In [None]:
import os
import json
import numpy as np
import cv2
from PIL import Image
import io
import logging
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

# Import the required packages directly
import treedisksegmentation
import treediskpith
import treediskrings

In [None]:
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Configuration
ground_truh_images_path = "../ground-truth/images"
ground_truh_path = "../ground-truth/ground_truth.json"

# Configuration settings (normally imported from config)
OUTPUT_DIR = "../output/"
INPUT_DIR = "../input/"
YOLO_PITH_MODEL_PATH = "../models/yolo11s-det-pith.pt"
YOLO_SEG_MODEL_PATH = "../models/yolo11s-seg-tree.pt"
DEBUG = False
SAVE_RESULTS = False

In [None]:
# load the ground truth
ground_truth_json = json.load(open(ground_truh_path))

# Create required directories if they don't exist
os.makedirs(INPUT_DIR, exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)

# check if the model files exist
if not os.path.exists(YOLO_PITH_MODEL_PATH):
    logger.error(f"Model file {YOLO_PITH_MODEL_PATH} does not exist")
    exit(1)

if not os.path.exists(YOLO_SEG_MODEL_PATH):
    logger.error(f"Model file {YOLO_SEG_MODEL_PATH} does not exist")
    exit(1)

In [None]:
def process_image(image_data):
    """
    Process a single image through the entire pipeline using direct package imports:
    1. Segmentation
    2. Pith detection
    3. Rings detection
    
    Returns the predicted age and the visualization image
    """
    image_path = os.path.join(ground_truh_images_path, image_data["path"])
    
    # # Step 1: Segmentation
    # logger.info(f"Running segmentation on {image_path}")
    
    # # Configure and run segmentation
    # treedisksegmentation.configure(
    #     input_image=image_path,
    #     model_path=YOLO_SEG_MODEL_PATH,
    #     output_dir=OUTPUT_DIR,
    #     save_results=SAVE_RESULTS,
    #     debug=DEBUG,
    # )
    # segmented_image, masks = treedisksegmentation.run()
    
    # if segmented_image is None:
    #     logger.error(f"Segmentation failed for {image_path}")
    #     return None, None
    
    # # Convert segmented image to PIL for saving
    # segmented_pil = Image.fromarray(segmented_image)
    # segmented_path = f"{OUTPUT_DIR}/segmented_{os.path.basename(image_path)}"
    # segmented_pil.save(segmented_path)
    
    # Step 2: Pith detection
    logger.info(f"Running pith detection on segmented image")
    
    # Configure and run pith detection
    treediskpith.configure(
        input_image=image_path, # segmented_path
        model_path=YOLO_PITH_MODEL_PATH,
        output_dir=OUTPUT_DIR,
        save_results=SAVE_RESULTS,
        debug=DEBUG,
    )
    img_in, img_processed, pith = treediskpith.run()
    
    if pith is None:
        logger.error(f"Pith detection failed for {image_path}")
        return None, None
    
    # Step 3: Rings detection
    logger.info(f"Running rings detection with pith at {pith}")
    
    # Configure and run rings detection
    treediskrings.configure(
        input_image=image_path, # segmented_path
        output_dir=OUTPUT_DIR,
        cx=int(pith[0]),
        cy=int(pith[1]),
        sigma=1.0,
        th_low=5.0,
        th_high=15.0,
        save_results=SAVE_RESULTS,
        debug=DEBUG,
    )
    
    result = treediskrings.run_age_detect()
    
    if result is None:
        logger.error(f"Rings detection failed for {image_path}")
        return None, None
    
    average_ring_count, img_out = result
    
    return average_ring_count, img_out

In [None]:
def create_evaluation_plots(results, true_ages, predicted_ages, accuracy_at_tolerance):
    """
    Create visualizations for better evaluation understanding
    """
    # Create a figure with multiple subplots
    fig = plt.figure(figsize=(18, 10))
    
    # 1. Scatter plot of true age vs predicted age
    ax1 = fig.add_subplot(221)
    ax1.scatter(true_ages, predicted_ages, alpha=0.7, s=80)
    
    # Add perfect prediction line
    min_age = min(min(true_ages), min(predicted_ages))
    max_age = max(max(true_ages), max(predicted_ages))
    ax1.plot([min_age, max_age], [min_age, max_age], 'r--', label='Perfect Prediction')
    
    # Add +/- 3 years tolerance lines
    ax1.plot([min_age, max_age], [min_age + 3, max_age + 3], 'g--', alpha=0.5, label='+3 years')
    ax1.plot([min_age, max_age], [min_age - 3, max_age - 3], 'g--', alpha=0.5, label='-3 years')
    
    ax1.set_xlabel('True Age (years)')
    ax1.set_ylabel('Predicted Age (years)')
    ax1.set_title('True Age vs. Predicted Age')
    ax1.legend()
    ax1.grid(True, alpha=0.3)
    
    # 2. Bar chart of errors for each image
    errors = true_ages - predicted_ages
    ax2 = fig.add_subplot(222)
    bar_positions = np.arange(len(errors))
    bars = ax2.bar(bar_positions, errors, alpha=0.7)
    
    # Color bars based on error magnitude
    for i, bar in enumerate(bars):
        if abs(errors[i]) <= 1:
            bar.set_color('green')
        elif abs(errors[i]) <= 3:
            bar.set_color('orange')
        else:
            bar.set_color('red')
    
    ax2.set_xlabel('Image Index')
    ax2.set_ylabel('Error (True - Predicted)')
    ax2.set_title('Prediction Error by Image')
    ax2.grid(True, alpha=0.3, axis='y')
    
    # Add image names as x-tick labels
    image_names = [os.path.basename(r["path"]) for r in results if r.get("predicted_age") is not None]
    ax2.set_xticks(bar_positions)
    ax2.set_xticklabels(image_names, rotation=90, ha='right', fontsize=8)
    
    # 3. Accuracy at different tolerance levels
    ax3 = fig.add_subplot(223)
    
    tolerance_values = [int(k.split('_')[-1]) for k in accuracy_at_tolerance.keys()]
    accuracy_values = list(accuracy_at_tolerance.values())
    
    ax3.plot(tolerance_values, accuracy_values, 'o-', linewidth=2, markersize=10)
    ax3.set_xlabel('Tolerance (years)')
    ax3.set_ylabel('Accuracy (%)')
    ax3.set_title('Accuracy at Different Tolerance Levels')
    ax3.grid(True, alpha=0.3)
    ax3.set_xticks(tolerance_values)
    
    # 4. Histogram of age differences
    ax4 = fig.add_subplot(224)
    ax4.hist(np.abs(errors), bins=range(0, int(max(abs(errors))) + 2), alpha=0.7, edgecolor='black')
    ax4.set_xlabel('Absolute Error (years)')
    ax4.set_ylabel('Number of Images')
    ax4.set_title('Distribution of Absolute Errors')
    ax4.grid(True, alpha=0.3, axis='y')
    
    plt.tight_layout()
    
    # Save the combined figure with Titles (set transparent background)
    plt.savefig('evaluation_plots.png', dpi=300, transparent=True)
    plt.savefig('evaluation_plots.pdf', transparent=True)
    
    # Save each subplot individually WITHOUT Title
    fig.canvas.draw()  # Update the renderer
    for i, ax in enumerate([ax1, ax2, ax3, ax4], start=1):
        original_title = ax.get_title()
        ax.set_title('')  # Remove title
        extent = ax.get_tightbbox(fig.canvas.get_renderer()).transformed(fig.dpi_scale_trans.inverted())
        plt.savefig(f'evaluation_plot_{i}.png', dpi=300, bbox_inches=extent, transparent=True)
        plt.savefig(f'evaluation_plot_{i}.pdf', bbox_inches=extent, transparent=True)
        ax.set_title(original_title)  # Optionally restore the title
    
    logger.info(f"Evaluation plots saved to {OUTPUT_DIR}")


In [None]:
def calculate_metrics(results):
    """
    Calculate improved evaluation metrics based on the results
    """
    # Filter out results with failed predictions
    valid_results = [r for r in results if r.get("predicted_age") is not None]
    
    if not valid_results:
        logger.error("No valid predictions to calculate metrics")
        return
    
    # Extract true and predicted ages
    true_ages = np.array([r["true_age"] for r in valid_results])
    predicted_ages = np.array([r["predicted_age"] for r in valid_results])
    
    # Calculate standard regression metrics
    mae = mean_absolute_error(true_ages, predicted_ages)
    rmse = np.sqrt(mean_squared_error(true_ages, predicted_ages))
    r2 = r2_score(true_ages, predicted_ages)
    
    # Calculate mean percentage error
    mpe = np.mean(np.abs(true_ages - predicted_ages) / true_ages * 100)
    
    # Calculate accuracy metrics with different tolerance levels
    tolerance_levels = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    accuracy_at_tolerance = {}
    
    for tolerance in tolerance_levels:
        within_tolerance = np.abs(true_ages - predicted_ages) <= tolerance
        accuracy = np.mean(within_tolerance) * 100
        accuracy_at_tolerance[f"accuracy_within_{tolerance}"] = accuracy
    
    # Create visualization of the results
    create_evaluation_plots(results, true_ages, predicted_ages, accuracy_at_tolerance)
    
    # Log metrics
    logger.info("\n===== Improved Evaluation Metrics =====")
    logger.info(f"Mean Absolute Error (MAE): {mae:.2f} years")
    logger.info(f"Root Mean Squared Error (RMSE): {rmse:.2f} years")
    logger.info(f"R² Score: {r2:.4f}")
    logger.info(f"Mean Percentage Error: {mpe:.2f}%")
    
    for tolerance, accuracy in accuracy_at_tolerance.items():
        logger.info(f"{tolerance.replace('_', ' ').title()}: {accuracy:.2f}%")
    
    # Create metrics dictionary
    metrics = {
        "regression_metrics": {
            "mean_absolute_error": mae,
            "root_mean_squared_error": rmse,
            "r2_score": r2,
            "mean_percentage_error": mpe
        },
        "accuracy_at_tolerance": accuracy_at_tolerance,
        "processed_images": len(valid_results),
        "total_images": len(results),
        "success_rate": len(valid_results) / len(results) if results else 0
    }
    
    return metrics

In [None]:
def evaluate_pipeline():
    """
    Evaluate the entire pipeline on the ground truth dataset
    First save the basic results, then calculate metrics
    """
    # First, just collect the basic results
    basic_results = []
    
    # Process each image
    for img_data in ground_truth_json:
        logger.info(f"Processing {img_data['path']}...")
        predicted_age, visualization = process_image(img_data)
        
        # Store the basic result
        result = {
            "path": img_data["path"],
            "true_age": img_data["age"]
        }
        
        if predicted_age is not None:
            result["predicted_age"] = predicted_age
            
            # Save visualization
            # output_path = os.path.join(OUTPUT_DIR, f"eval_{os.path.basename(img_data['path'])}")
            # cv2.imwrite(output_path, visualization)
            # logger.info(f"  Result: True age = {img_data['age']}, Predicted age = {predicted_age}")
        else:
            result["predicted_age"] = None
            logger.error(f"  Failed to process {img_data['path']}")
        
        basic_results.append(result)
    
    return basic_results

In [None]:
# Run the evaluation
logger.info("Starting evaluation of the tree disk analysis pipeline...")
basic_results = evaluate_pipeline()

# Save basic results first
basic_results_file = "basic_results.json"
with open(basic_results_file, "w") as f:
    json.dump(basic_results, f, indent=2)
    
logger.info(f"Basic results saved to {basic_results_file}")

In [None]:
basic_results_file = "basic_results.json"
basic_results = json.load(open(basic_results_file))

# calculate metrics on top of the basic results
metrics = calculate_metrics(basic_results)

# Save metrics to a separate file
metrics_file = "metrics.json"
with open(metrics_file, "w") as f:
    json.dump(metrics, f, indent=2)
    
logger.info(f"Metrics saved to {metrics_file}")

In [None]:
images_data = "/Volumes/Tony SSD/Projekte/Studienarbeit/Datasets/uruDendro/images"
annotations_data = "/Volumes/Tony SSD/Projekte/Studienarbeit/Datasets/uruDendro/annotations"

# retrieve all .png files in the folder
files = [f for f in os.listdir(images_data) if f.endswith('.png')]

# delete macosx files
files = [f for f in files if not f.startswith('._')]
files = [f for f in files if not f.startswith('.')]

print(files)

# go trough annotations_data and retrieve the json files mathcing the image files, the image name must be exactly the same
annotations = []
for f in files:
    for a in os.listdir(annotations_data):
        if f.split('.')[0] in a:
            annotations.append(a)

annotations = [f for f in annotations if not f.startswith('._')]
annotations = [f for f in annotations if not f.startswith('.')]
annotations = [f for f in annotations if not '-M' in f]
annotations = [f for f in annotations if not '-V' in f]
annotations = [f for f in annotations if not '-S' in f]
annotations = [f for f in annotations if not '-C' in f]

print(annotations)

print(len(files))
print(len(annotations))

# read the json files and save the lenght (number of times) of the shapes key
shapes = []
for a in annotations:
    with open(os.path.join(annotations_data, a)) as f:
        data = json.load(f)
        shapes.append(len(data['shapes']))

print(shapes)

# create a json array with the image name and the number of shapes
data = []
for i in range(len(files)):
    data.append({
        "path": files[i],
        "age": shapes[i]
    })

print(data)