<h1>Segmentation script</h1>

<h2>Importing packages</h2>

In [1]:
# Import packages
!pip install roboflow
!pip install folium
!pip install PyQt5
from IPython.display import display, Image
from IPython import display
import csv
import os
import json
from shapely.geometry import Polygon
import pandas as pd
import folium
display.clear_output()
print("Successfully imported all packages")

Successfully imported all packages


<h2>Importing a trained model from Roboflow</h2>

<h3>Create a Roboflow Project:</h3>
Sign in to your Roboflow account and create a new project, selecting "Instance Segmentation" as the task type.

<h3>Annotate your Data:</h3>
Upload your images and use the Roboflow annotation tool to label each object instance.

<h3>Splitting and Downloading the Dataset:</h3>
Once annotations are complete, split your dataset into training, validation, and test sets using Roboflow's splitting tool. Then, navigate to the "Versions" tab, select the desired version, click "Export Dataset," choose "COCO Segmentation" format, and click "Show Download Code" to generate a code snippet containing the download link.

<h3>Paste the Download Code:</h3>
Copy the entire code snippet provided by Roboflow and paste it into the designated code block below in your JupyterLab notebook.

In [2]:
# Paste your snippet here
from roboflow import Roboflow
rf = Roboflow(api_key="YgqaFYQYMXIDYOCYoY2O") 
project = rf.workspace("footage").project("seafloor-segmentation") 
model = project.version("1").model
display.clear_output()
print("Successfully extracted data from roboflow")

Successfully extracted data from roboflow


<h2>Prediction</h2>

In [3]:
def create_output_dir(base_dir):
    """
    Create an output directory if it doesn't exist.

    Args:
        base_dir (str): The base directory path.

    Returns:
        str: The created output directory path.
    """
    if not os.path.exists(base_dir):  # Check if the base directory doesn't exist
        os.makedirs(base_dir)  # Create the base directory
        return base_dir  # Return the base directory path
    else:
        counter = 2
        while True:
            new_dir = f"{base_dir}{counter}"  # Create a new directory path with a counter
            if not os.path.exists(new_dir):  # Check if the new directory doesn't exist
                os.makedirs(new_dir)  # Create the new directory
                return new_dir  # Return the new directory path
            counter += 1


base_output_dir = "/cache/album/cache/kso-benjamin/bucket/kso/notebooks/test/predictions/seg-pred"  # Define the base output directory
output_dir = create_output_dir(base_output_dir)  # Create the output directory

input_directory = "/cache/album/cache/kso-benjamin/bucket/kso/notebooks/test/Seafloor_footage-1/Raw/"  # Define the input directory

for filename in os.listdir(input_directory):  # Iterate over the files in the input directory
    filepath = os.path.join(input_directory, filename)  # Get the full path of the file

    if filepath.lower().endswith(('.png', '.jpg', '.jpeg')):  # Check if the file format is PNG, JPG, or JPEG

        prediction = model.predict(filepath, confidence=30)  # Perform prediction on the image

        prediction_data = prediction.json()  # Get the JSON results

        csv_filename = os.path.splitext(filename)[0] + "_prediction.csv"  # Generate the CSV filename
        csv_filepath = os.path.join(output_dir, csv_filename)  # Generate the CSV file path

        with open(csv_filepath, mode='w', newline='') as csv_file:  # Open the CSV file for writing
            csv_writer = csv.writer(csv_file)  # Create a CSV writer

            header = ["x", "y", "width", "height", "confidence", "class", "class_id", "detection_id", "image_path", "prediction_type", "points", "area"]  # Define the header row
            csv_writer.writerow(header)  # Write the header row to the CSV file

            image_area = 0
            quadrant_area = 0
            rows = []
            for idx, pred in enumerate(prediction_data['predictions']):  # Iterate over each prediction
                mask_points = pred.get('points', [])  # Get the mask points

                if mask_points:
                    try:
                        mask_points = [(float(point['x']), float(point['y'])) for point in mask_points]  # Convert mask points to float
                        polygon = Polygon(mask_points)  # Create a polygon from the mask points
                        area = polygon.area  # Calculate the area of the polygon
                    except ValueError as e:
                        print(f"Error converting points to float for {filename}: {e}")  # Print an error message
                        area = 0
                else:
                    area = 0

                if pred.get('class', '') == 'quadrant':  # Check if the prediction class is 'quadrant'
                    x = float(pred.get('x', 0))
                    y = float(pred.get('y', 0))
                    width = float(pred.get('width', 0))
                    height = float(pred.get('height', 0))
                    quadrant_area = width * height  # Calculate the area of the quadrant

                row = {
                    "x": pred.get('x', ''),
                    "y": pred.get('y', ''),
                    "width": pred.get('width', ''),
                    "height": pred.get('height', ''),
                    "confidence": pred.get('confidence', ''),
                    "class": pred.get('class', ''),
                    "class_id": pred.get('class_id', ''),
                    "detection_id": pred.get('detection_id', ''),
                    "image_path": pred.get('image_path', ''),
                    "prediction_type": pred.get('prediction_type', ''),
                    "points": json.dumps(mask_points),  # Convert mask points to JSON string
                    "area": area,
                }
                rows.append(row)  # Add the row to the list of rows

            for row in rows:
                csv_writer.writerow([
                    row["x"],
                    row["y"],
                    row["width"],
                    row["height"],
                    row["confidence"],
                    row["class"],
                    row["class_id"],
                    row["detection_id"],
                    row["image_path"],
                    row["prediction_type"],
                    row["points"],
                    row["area"]
                ])  # Write each row to the CSV file

        labeled_prediction = model.predict(filepath, confidence=30)  # Perform prediction with labels and save the result, set confidence threshold here.

        output_image_path = os.path.join(output_dir, os.path.splitext(filename)[0] + "_prediction.jpg")  # Generate the output image path for labeled prediction

        labeled_prediction.save(output_image_path)  # Save the labeled prediction result to an image file

        display.clear_output()  # Clear the output

def calculate_area_percentages(output_dir):
    """
    Calculate the area percentages for each class in the output directory.

    Args:
        output_dir (str): The output directory path.

    Returns:
        tuple: A tuple containing the results dictionary and the set of all classes.
    """
    results = {}
    all_classes = set()

    for filename in os.listdir(output_dir):  # Iterate over the files in the output directory
        if filename.endswith('_prediction.csv'):  # Check if the file is a prediction CSV file
            csv_filepath = os.path.join(output_dir, filename)  # Get the CSV file path
            class_area_percentages = {}
            total_quadrant_area = 0
            total_image_area = 0
            non_quadrant_detections = False

            with open(csv_filepath, mode='r') as csv_file:  # Open the CSV file for reading
                csv_reader = csv.DictReader(csv_file)  # Create a CSV reader
                for row in csv_reader:  # Iterate over each row in the CSV file
                    class_name = row['class']  # Get the class name

                    if class_name == "quadrant":  # Check if the class is 'quadrant'
                        width = float(row['width'])
                        height = float(row['height'])
                        total_quadrant_area = width * height  # Calculate the total quadrant area
                        class_area_percentages[class_name] = total_quadrant_area
                        all_classes.add(class_name)
                        continue

                    area = float(row.get('area', 0))  # Get the area

                    if area > 0:
                        non_quadrant_detections = True

                    if class_name in class_area_percentages:
                        class_area_percentages[class_name] += area
                    else:
                        class_area_percentages[class_name] = area
                    all_classes.add(class_name)

            image_name = filename.replace('_prediction.csv', '')  # Get the image name
            results[image_name] = {}

            if total_quadrant_area == 0:
                image_path = os.path.join(input_directory, image_name + '.jpg')  # Get the image path
                with Image.open(image_path) as img:  # Open the image
                    total_image_area = img.width * img.height  # Calculate the total image area

            if non_quadrant_detections:
                for class_name, total_area in class_area_percentages.items():
                    if class_name == "quadrant":
                        results[image_name][class_name] = 100.0
                    elif total_quadrant_area > 0:
                        area_percentage = (total_area / total_quadrant_area) * 100  # Calculate the area percentage
                        results[image_name][class_name] = area_percentage
                    else:
                        area_percentage = (total_area / total_image_area) * 100  # Calculate the area percentage
                        results[image_name][class_name] = area_percentage
            else:
                results[image_name] = {'No detections besides the quadrant': 'Na'}

    return results, all_classes

def write_results_to_csv(results, all_classes, output_filepath):
    """
    Write the results to a CSV file.

    Args:
        results (dict): The results dictionary.
        all_classes (set): The set of all classes.
        output_filepath (str): The output CSV file path.
    """
    formatted_results = {}
    for image_name, class_areas in results.items():
        formatted_results[image_name] = {class_name: class_areas.get(class_name, 'Na') for class_name in all_classes}

    df = pd.DataFrame.from_dict(formatted_results, orient='index', columns=sorted(all_classes))
    df.index.name = 'filename'
    df.to_csv(output_filepath, na_rep='Na')

results, all_classes = calculate_area_percentages(output_dir)  # Calculate the area percentages

results_csv_filepath = os.path.join(output_dir, 'results.csv')  # Generate the results CSV file path
write_results_to_csv(results, all_classes, results_csv_filepath)  # Write the results to a CSV file

print(f'Results have been written to {results_csv_filepath}')  # Print a success message

def getting_geolocation(output_dir):
    """
    Get the geolocation information and generate a map.

    Args:
        output_dir (str): The output directory path.
    """
    photos_koster_path = '/cache/album/cache/kso-benjamin/bucket/csv_Koster_Seafloor_Obs/photos_koster.csv'

    photos_koster_df = pd.read_csv(photos_koster_path)  # Read the photos_koster.csv file
    print("Columns in photos_koster_df:", photos_koster_df.columns.tolist())
    print("Sample filenames in photos_koster_df:", photos_koster_df['filename'].head())

    results_path = os.path.join(output_dir, 'results.csv')  # Generate the results CSV file path
    results_df = pd.read_csv(results_path)  # Read the results.csv file
    print("Columns in results_df:", results_df.columns.tolist())
    print("Sample filenames in results_df:", results_df['filename'].head())

    photos_koster_df = photos_koster_df.rename(columns={'PhotoPosition': 'PhotoPosition_koster'})  # Rename the 'PhotoPosition' column

    if 'filename' not in photos_koster_df.columns or 'filename' not in results_df.columns:
        raise KeyError("'filename' column not found in one of the CSV files")  # Raise an error if 'filename' column is not found

    photos_koster_df['filename'] = photos_koster_df['filename'].str.strip().str.lower()  # Strip and lowercase the filenames
    results_df['filename'] = results_df['filename'].str.strip().str.lower()  # Strip and lowercase the filenames

    results_df['filename'] = results_df['filename'].apply(lambda x: x if x.lower().endswith('.jpg') else x + '.jpg')  # Add '.jpg' extension to filenames if missing

    merged_df = pd.merge(results_df, photos_koster_df[['filename', 'PhotoPosition_koster']], on='filename', how='left')  # Merge the results and photos_koster dataframes

    missing_matches = merged_df[merged_df['PhotoPosition_koster'].isna()]  # Get the rows with missing matches
    if not missing_matches.empty:
        print("Filenames in results.csv with no matching PhotoPosition_koster:")
        print(missing_matches['filename'])

    merged_df['PhotoPosition'] = merged_df['PhotoPosition_koster']  # Rename the 'PhotoPosition_koster' column to 'PhotoPosition'
    merged_df = merged_df.drop(columns=['PhotoPosition_koster'])  # Drop the 'PhotoPosition_koster' column

    merged_df.to_csv(results_path, index=False)  # Save the merged dataframe to the results.csv file

    map_center = [58.0, 11.0]  # Define the map center
    m = folium.Map(location=map_center, zoom_start=8)  # Create a folium map

    for idx, row in merged_df.iterrows():  # Iterate over each row in the merged dataframe
        if not pd.isna(row['PhotoPosition']):  # Check if the PhotoPosition is not NaN
            lat, lon = map(float, row['PhotoPosition'].split(','))  # Split the PhotoPosition into latitude and longitude
            color = 'green' if row['Seagrass'] else 'transparent'  # Set the color based on the presence of seagrass
            popup_text = 'Seagrass present' if row['Seagrass'] else 'No seagrass'  # Set the popup text

            folium.Rectangle(
                bounds=[[lat-0.00007, lon-0.00007], [lat+0.00007, lon+0.00007]],  # Define the bounds of the rectangle
                color='black',  # Set the outline color
                fill=False,
                fill_color=color,
                fill_opacity=0.5 if row['Seagrass'] else 0,
                tooltip=popup_text
            ).add_to(m)  # Add the rectangle to the map

    map_path = os.path.join(output_dir, 'seagrass_map.html')  # Generate the map file path
    m.save(map_path)  # Save the map to an HTML file
    print(f'Map saved to {map_path}')  # Print a success message

getting_geolocation(output_dir)  # Call the getting_geolocation function

KeyboardInterrupt: 