Analysis of image factors on annotation consensus within observer groups

Analytic code supporting "Observer variability in manual-visual interpretation of aerial imagery of wildlife, with implications for deep learning" - Converse et al. submitted Feb 2024

In [None]:
#Imports
import pandas as pd
from PIL import Image
import os
import ast
import numpy as np
import cv2
import pandas as pd
from skimage.feature import graycomatrix, graycoprops
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import statsmodels.api as sm

In [None]:
#Analysis annotations
path = "path/to/labels.csv"
with open(path) as f:
  df = pd.read_csv(f)

#Derive dependent variables for image factors analysis: fraction of guesses that agree with consensus

import pandas as pd

# Create a dictionary to store counts of class_orig for each group
class_counts_dict = {}

# Group the dataframe by image, then by cluster
grouped = df.groupby(['filename', 'cluster_id'])

# Create empty lists to store the results
cluster_id_list = []
filename_list = []
consensus_class_id_list = []
consensus_bbox_list = []
consensus_guess_list = []
num_annotations_list = []
correct_guess_list = []

# Loop through each group and calculate Pielou's evenness index
for name, group in grouped:
    # Get the cluster ID, filename, and consensus class ID for this group
    cluster_id = name[1]
    filename = name[0]
    consensus_class_id = group['cat_refined'].iloc[0]  # Assumes all consensus IDs in the group are the same
    consensus_bbox = group['bbox_refined'].iloc[0]

    # Count the number of annotations in the group
    num_annotations = len(group)

    # Create a list of class_orig values for this group
    class_orig_values = group['cat_orig'].tolist()

    # Count the number of times each class_orig appears in this group
    class_counts = pd.Series(class_orig_values).value_counts()

    # Count the number of times the consensus class appears
    consensus_count = class_counts.get(consensus_class_id, 0)

    # Calculate the proportion of correct guesses
    proportion_correct = consensus_count / num_annotations

    # Append the results to the lists
    cluster_id_list.append(cluster_id)
    filename_list.append(filename)
    consensus_class_id_list.append(consensus_class_id)
    consensus_bbox_list.append(consensus_bbox)
    num_annotations_list.append(num_annotations)
    consensus_guess_list.append(consensus_count)
    correct_guess_list.append(proportion_correct)

# Create a new dataframe with the results
df = pd.DataFrame({
    'cluster_id': cluster_id_list,
    'filename': filename_list,
    'consensus_class_ID': consensus_class_id_list,
    'consensus_bbox': consensus_bbox_list,
    'num_annotations': num_annotations_list,
    'consensus_guesses': consensus_guess_list,
    'correct_fraction': correct_guess_list
})

#Calculate dependent variable
df['n-k'] = df['num_annotations'] - df['consensus_guesses']

DERIVE IMAGE/ANNOTATION COVARIATES

In [None]:
#BBOX AREA
#Size of the bounding box in pixels

def calc_area(row):
    bbox = row['consensus_bbox']
    xmin, ymin, w, h = bbox
    return w * h

df['area'] = df.apply(calc_area, axis=1)

In [None]:
# % AREA BBOX
# Percent area of the bounding box of the total image area

# Define a function to calculate percentage area
def calculate_percentage_area(image_filename, bbox_area):
    image_path = os.path.join("E:\\imagefactors\\data\\zooniverse", image_filename)
    
    try:
        image = Image.open(image_path)
    except FileNotFoundError:
        # Handle the case where the image is not found
        print(f"Image not found: {image_path}")
        return None  # You can return a special value, such as None, to indicate the image wasn't found
    
    image_width, image_height = image.size
    image_area = image_width * image_height

    percentage_area = (bbox_area / image_area) * 100
    return percentage_area

# Calculate and add as a column
df['bbox_percent_area'] = df.apply(lambda row: calculate_percentage_area(row['filename'], row['area']), axis=1)

In [None]:
# SAME CLASS %
# % of targets of the same class as the analysis target (in the same image)

# Define a function to calculate the percentage of same-class neighbors for a given row
def calculate_same_class_percentage(row, df):
    # Get the filename and class ID of the target bounding box
    filename = row['filename']
    class_id = row['consensus_class_ID']
    
    # Filter the DataFrame to include only rows with matching filenames
    matching_rows = df[df['filename'] == filename]
    
    # Calculate the total number of neighbors in the same image
    total_neighbors = len(matching_rows) - 1  # Subtract 1 to exclude the target bounding box
    
    if total_neighbors == 0:
        return 0  # Avoid division by zero
    
    # Calculate the number of same-class neighbors
    same_class_neighbors = len(matching_rows[matching_rows['consensus_class_ID'] == class_id]) - 1  # Subtract 1 to exclude the target bounding box
    
    # Calculate the percentage of same-class neighbors
    same_class_percentage = (same_class_neighbors / total_neighbors) * 100
    
    return same_class_percentage

# Calculate the same-class percentage for each row and add the results as a new column
df['same_class_percent'] = df.apply(lambda row: calculate_same_class_percentage(row, df), axis=1)

In [None]:
#NUMBER OF NEIGHBORS 
# Number of annotations within 2x maximum of bbox width or height (to account for positional differences)

# Define a function to calculate the number of neighbors for a given row
def count_neighbors(row, df):
    # Extract 'bbox' values from the 'consensus_bbox' column as a list [xmin, ymin, width, height]
    bbox = row['consensus_bbox']  # Use ast.literal_eval() to safely evaluate the string
    
    # Define the search radius as 2 times the maximum of width and height
    search_radius = 2 * max(bbox[2], bbox[3])
    
    # Calculate the center coordinates of the bounding box
    x_center = bbox[0] + bbox[2] / 2
    y_center = bbox[1] + bbox[3] / 2
    
    # Initialize a count for neighbors
    num_neighbors = 0
    
    # Iterate through rows with matching filenames
    matching_rows = df[df['filename'] == row['filename']]
    
    for _, neighbor_row in matching_rows.iterrows():
        if neighbor_row.name != row.name:
            # Extract 'bbox' values for the neighbor as a list [xmin, ymin, width, height]
            neighbor_bbox = neighbor_row['consensus_bbox']
            
            # Calculate the center coordinates of the potential neighbor
            neighbor_x_center = neighbor_bbox[0] + neighbor_bbox[2] / 2
            neighbor_y_center = neighbor_bbox[1] + neighbor_bbox[3] / 2
            
            # Calculate the Euclidean distance between centers
            distance = np.sqrt((x_center - neighbor_x_center)**2 + (y_center - neighbor_y_center)**2)
            
            # Check if the neighbor is within the search radius
            if distance <= search_radius:
                num_neighbors += 1
    
    return num_neighbors

# Calculate the number of neighbors for each row and add the results as a new column
df['num_neighbors'] = df.apply(lambda row: count_neighbors(row, df), axis=1)

In [None]:
#TOTAL NUMBER OF BIRDS PER IMAGE
df['density'] = df.groupby('filename')['consensus_bbox'].transform('count')

In [None]:
#DISTANCE OF TARGET FROM IMAGE CENTER-- in meters

path1 = "E:\\imagefactors\\data\\crowdsourced_gsd.csv"
with open(path1) as f1:
  gsd_df = pd.read_csv(f1)

gsd_df["basefile"] = gsd_df["filename"].apply(lambda x: os.path.splitext(x)[0])

merged_df = pd.merge(df, gsd_df, on="basefile", how="left")
merged_df = merged_df.rename(columns={"filename_x": "filename"})
merged_df = merged_df.drop(columns=["filename_y", "filename_base"])

# Function to calculate distance from center
def calculate_distance_from_center(row):
    image_path = os.path.join("E:\\imagefactors\\data\\zooniverse", row["filename"])
    
    try:
        image = Image.open(image_path)
    except FileNotFoundError:
        # Handle the case where the image is not found
        print(f"Image not found: {image_path}")
        return None  # You can return a special value, such as None, to indicate the image wasn't found
    
    image_width, image_height = image.size
    center_x_px = image_width/2 
    center_y_px = image_height/2
    gsd_m = row['gsd'] / 100

    row['center_x_m'] = center_x_px * gsd_m
    row['center_y_m'] = center_y_px * gsd_m
    
    # Get the coordinates of the bounding box (x, y, width, height)
    x, y, width, height = row['consensus_bbox']

    # Calculate the center point of the bounding box in pixels
    bbox_center_x_px = x + (width / 2)
    bbox_center_y_px = y + (height / 2)

    # Calculate the center point of the bounding box in meters
    bbox_center_x_m = bbox_center_x_px * gsd_m
    bbox_center_y_m = bbox_center_y_px * gsd_m

    # Calculate the distance from the center of the image in meters
    distance_m = ((row['center_x_m'] - bbox_center_x_m)**2 + (row['center_y_m'] - bbox_center_y_m)**2)**0.5

    return distance_m

# Apply the function to the merged dataframe
merged_df['distance_from_center'] = merged_df.apply(calculate_distance_from_center, axis=1)
df = merged_df

In [None]:
#TEXTURE METRICS- GLCM
#Bounding box and "donut" (buffer region directly around bbox)

def calculate_glcm_derivatives(image, bbox):
    # Convert bounding box coordinates to integers
    x, y, width, height = map(int, bbox)
    
    # Extract the region of interest (ROI) from the image using the bounding box
    roi = image[y:y+height, x:x+width]
    
    # Check if the ROI is empty or None
    if roi is None or roi.size == 0:
        print("Warning: ROI is empty or None")
        return None, None, None, None
    
    # Convert the ROI to grayscale
    roi_gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
    
    # Calculate GLCM features for the grayscale ROI
    distances = [1, 2]  # Define the distances for GCLM
    angles = [0, np.pi/4, np.pi/2, 3*np.pi/4]  # Define the angles for GCLM
    gclm = graycomatrix(roi_gray, distances=distances, angles=angles, levels=256,
                        symmetric=True, normed=True)
    
    # Calculate GLCM derivatives (contrast, dissimilarity, homogeneity, energy)
    contrast = graycoprops(gclm, 'contrast').mean()
    dissimilarity = graycoprops(gclm, 'dissimilarity').mean()
    homogeneity = graycoprops(gclm, 'homogeneity').mean()
    energy = graycoprops(gclm, 'energy').mean()
    
    return contrast, dissimilarity, homogeneity, energy

def adjust_bbox_to_image(image, bbox):
    # Get image dimensions
    image_height, image_width, _ = image.shape
    
    # Adjust bounding box coordinates if they exceed image boundaries
    x, y, width, height = bbox
    
    # Ensure the bounding box does not go beyond the image boundaries
    x = max(x, 0)
    y = max(y, 0)
    width = min(width, image_width - x)
    height = min(height, image_height - y)
    
    return x, y, width, height

def calculate_texture_metrics_for_directory(image_dir, csv_file):
    # Initialize an empty dataframe to store the texture metrics
    texture_metrics_df = pd.DataFrame()
    
    # List all files in the specified directory
    image_files = [f for f in os.listdir(image_dir) if os.path.isfile(os.path.join(image_dir, f))]
    
    for image_file in image_files:
        try:
            # Construct the full path to the image file
            image_path = os.path.join(image_dir, image_file)
            
            # Load the image
            image = cv2.imread(image_path)
            
            if image is None:
                print(f"Warning: Image '{image_path}' not found or cannot be loaded.")
                continue
            
            # Read the CSV file
            csv_data = pd.read_csv(csv_file)
            
            # Find the corresponding image filename
            image_filename = os.path.basename(image_path)
            
            # Filter annotations based on the image filename
            annotations = csv_data[csv_data['filename'] == image_filename]
            
            # Initialize lists to store the texture metrics
            bbox_contrast_list = []
            bbox_dissimilarity_list = []
            bbox_homogeneity_list = []
            bbox_energy_list = []
            donut_contrast_list = []
            donut_dissimilarity_list = []
            donut_homogeneity_list = []
            donut_energy_list = []
            
            # Iterate through annotations and calculate texture metrics
            for _, row in annotations.iterrows():
                bbox = ast.literal_eval(row['consensus_bbox'])  # Parse bbox values from string to list
                
                # Adjust bounding box to stay within image boundaries
                bbox = adjust_bbox_to_image(image, bbox)
                
                # Calculate GCLM derivatives for bounding box
                bbox_contrast, bbox_dissimilarity, bbox_homogeneity, bbox_energy = calculate_glcm_derivatives(image, bbox)
                
                donut_left = max(0, bbox[0] - 20)  # Adjust the buffer size as needed
                donut_top = max(0, bbox[1] - 20)
                donut_right = min(image.shape[1], bbox[0] + bbox[2] + 20)
                donut_bottom = min(image.shape[0], bbox[1] + bbox[3] + 20)
                donut_bbox = [donut_left, donut_top, donut_right - donut_left, donut_bottom - donut_top]
                donut_contrast, donut_dissimilarity, donut_homogeneity, donut_energy = calculate_glcm_derivatives(image, donut_bbox)

                # Append the calculated texture metrics to the lists
                bbox_contrast_list.append(bbox_contrast)
                bbox_dissimilarity_list.append(bbox_dissimilarity)
                bbox_homogeneity_list.append(bbox_homogeneity)
                bbox_energy_list.append(bbox_energy)
                donut_contrast_list.append(donut_contrast)
                donut_dissimilarity_list.append(donut_dissimilarity)
                donut_homogeneity_list.append(donut_homogeneity)
                donut_energy_list.append(donut_energy)
            
            # Add texture metrics as columns to a temporary dataframe
            temp_df = pd.DataFrame({
                'ID': annotations["id"],
                'filename': [image_filename] * len(annotations),
                'bbox_contrast': bbox_contrast_list,
                'bbox_dissimilarity': bbox_dissimilarity_list,
                'bbox_homogeneity': bbox_homogeneity_list,
                'bbox_energy': bbox_energy_list,
                'donut_contrast': donut_contrast_list,
                'donut_dissimilarity': donut_dissimilarity_list,
                'donut_homogeneity': donut_homogeneity_list,
                'donut_energy': donut_energy_list
            })
            
            # Append the temporary dataframe to the main dataframe
            texture = pd.concat([texture, temp_df], ignore_index=True)
        except Exception as e:
            print(f"Error processing image '{image_path}': {e}")

# Implementation
image_dir = 'path/to/image/directory'
csv_file = 'path/to/labels.csv'

calculate_texture_metrics_for_directory(image_dir, csv_file)

df = pd.merge(df, texture, on=["id", "filename"], how="left")

In [None]:
# Calculate simple differences for each GCLM statistic between label and donut
df['contrast_difference'] = df['donut_contrast'] - df['bbox_contrast']
df['energy_difference'] = df['donut_energy'] - df['bbox_energy']
df['homogeneity_difference'] = df['donut_homogeneity'] - df['bbox_homogeneity']
df['dissimilarity_difference'] = df['donut_dissimilarity'] - df['bbox_dissimilarity']

MODELING IMPACT OF IMAGE FACTORS ON ANNOTATION AGREEMENT

In [None]:
#Dummy variables for class ID
data = pd.get_dummies(df, columns=["consensus_class_ID"], prefix="class")
for column in data.filter(like='class_'):
    data[column] = data[column].astype(int)
data.head()

In [None]:
import pandas as pd
from statsmodels.stats.outliers_influence import variance_inflation_factor

# Assuming 'data' is your DataFrame with predictor variables
predictors = data[[
 'bbox_percent_area',
 'same_class_percent',
 'num_neighbors',
 'agl',
 'gsd',
 'distance_from_center',
 'density',
 'bbox_contrast',
 'bbox_dissimilarity',
 'bbox_homogeneity',
 'bbox_energy',
 'donut_contrast',
 'donut_dissimilarity',
 'donut_homogeneity',
 'donut_energy',
 'contrast_difference',
 'energy_difference',
 'homogeneity_difference',
 'dissimilarity_difference',
 'class_Crane',
 'class_Duck',
 'class_Goose',
 'class_Other Bird']]

# Calculate VIF
vif_data = pd.DataFrame()
vif_data["Variable"] = predictors.columns
vif_data["VIF"] = [variance_inflation_factor(predictors.values, i) for i in range(predictors.shape[1])]

# Display the VIF DataFrame
print(vif_data)

In [None]:
#Multiple linear regression 
import pandas as pd
import numpy as np
import statsmodels.api as sm

y = data['n-k']
X = data[['gsd', 'bbox_percent_area', 'same_class_percent', 'num_neighbors', 'distance_from_center', 'density', 'contrast_difference', 'energy_difference', 'homogeneity_difference', 'dissimilarity_difference',
        'class_Crane', 'class_Duck', 'class_Goose']]

# Add a constant term to the independent variables (intercept)
X = sm.add_constant(X)

# Fit the linear regression model
model = sm.OLS(y, X).fit()

# Print the regression summary
print(model.summary())