In [None]:
from oneatlas import OneAtlasClient  # custom class in this repo
import json
import geopandas as gpd
from shapely.geometry import box
from pathlib import Path
import shutil
import zipfile
import time

# Process overview
- Convert input aoi layer into geojson polygons
- For each aoi polygon feature:
  - Search API for available images
  - Plot the quicklook for each image found for that feature
  - Place an order for the chosen image clipped to the feature
  - Download the order once completed

In [None]:
# Read local config.json to get api key and directory for outputs
with open("config.json", "r") as file:
    config = json.load(file)

api_key = config["api_key"]

client = OneAtlasClient(api_key=api_key)

output_folder = Path(config["output_dir"])
input_file_gdb = Path(config["input_gdb"])

In [None]:
# function to convert polygon gdf to bounding box list
sites_gdf = gpd.read_file(input_file_gdb, layer="Registered_Sites_Merged_v2")

LAYER_NAME = "Registered_Sites"  # prefix name for the clipped image downloads

In [None]:
# Drop rows with 'POINT EMPTY' geometry
sites_gdf = sites_gdf[~sites_gdf.geometry.is_empty]

In [None]:
# Define function to convert points -> buffer -> bounding box
def points_to_buffer_box(gdf, buffer_distance=500):
    """Buffer points by distance and convert to bounding boxes"""

    # Check if all geometries are Points

    if not all(gdf.geometry.geom_type == "Point"):

        print("All geometries in the GeoDataFrame must be Points. Exiting.")

        return None

    # Reproject to EPSG 27700

    gdf = gdf.to_crs(epsg=27700)

    # Buffer the geometries

    gdf["geometry"] = gdf.geometry.buffer(buffer_distance)

    # Convert buffers to bounding boxes

    gdf["geometry"] = gdf.geometry.apply(lambda geom: box(*geom.bounds))

    return gdf

In [None]:
# convert points geometry to bounding box around 500m buffer
sites_box_gdf = points_to_buffer_box(sites_gdf, buffer_distance=500)

In [None]:
def aoi_gdf_to_search_geojson(gdf, uid_column=None):
    """Convert geodataframe to geojson ensuring unique ids per feature"""

    if uid_column is None:

        uid_column = "id"

        gdf[uid_column] = gdf.index

    else:

        if not gdf[uid_column].is_unique:

            raise ValueError(
                f"Values in '{uid_column}' are not unique. Consider using default uid_column=None to add uids."
            )

    print(
        f"uid_column '{uid_column}' values {', '.join(str(i) for i in gdf[uid_column].tolist())}"
    )

    json_str = gdf[[uid_column, "geometry"]].to_crs(epsg=4326).to_json(drop_id=True)

    return json.loads(json_str), gdf[uid_column].tolist()

In [None]:
# convert gdf to geojson and get the id value list to know ids to search for
search_geojson, id_vals = aoi_gdf_to_search_geojson(
    sites_box_gdf, uid_column="image_id"
)

In [None]:
def get_feature_by_id(geojson, id_vals, search_id, uid_column="id"):
    """Filter geojson features to just one specified id"""

    if search_id not in id_vals:

        raise ValueError(f"id {search_id} not in list of id values")

    for feature in geojson["features"]:

        if feature["properties"][uid_column] == search_id:
            return feature

    return None

In [None]:
# Set the search id. Must be in the id_vals list created above.
SEARCH_ID = 4
# extract the geojson feature for that id
search_feature = get_feature_by_id(
    search_geojson, id_vals, SEARCH_ID, uid_column="image_id"
)

Search options are described [here](https://www.geoapi-airbusds.com/guides/oneatlas-data/g-search/)

In [None]:
# create search json - geometry is the search feature geometry
img_search_json = {
    "cloudCover": "[0,30]",
    "incidenceAngle": "[0,40]",
    "processingLevel": "SENSOR",
    "relation": "contains",
    "geometry": search_feature["geometry"],
    "constellation": "PHR",
}

# make the request
results = client.search(img_search_json)

# extract relevant values from the results and store in client instance


client.extract_results(results)

In [None]:
# Run this cell repeatedly to show each search result image quicklook in turn
client.show_result()

See the order options [here](https://www.geoapi-airbusds.com/guides/oneatlas-data/g-order-product/)

In [None]:
# create the order specification and initially just quote the price
order_body = {
    "kind": "order.data.product",
    "products": [
        {
            "productType": "pansharpened",  # pansharpened # multiSpectral
            "radiometricProcessing": "DISPLAY",  # REFLECTANCE # DISPLAY #
            "imageFormat": "image/geotiff",
            "crsCode": "urn:ogc:def:crs:EPSG::4326",
            "id": client.current_image,  # current client.show_result() image
            "aoi": search_feature["geometry"],
        }
    ],
}


client.get_price(order_body)["price"]

In [None]:
# Add a ref to identify the order
order_ref = f"sg_{LAYER_NAME}_{SEARCH_ID}"

order_body["customerRef"] = order_ref

In [None]:
# Uncomment line below to place the order above (WARNING: account will be charged)
client.create_order(order_body)

In [None]:
# See order status - need to keep checking by running this until shows "delivered"
status = ""
while status != "delivered":

    orders = client.list_orders(customerRef=order_ref)


    order = orders["items"][0]


    status = order["status"]
    time.sleep(10)

In [None]:
# The config.json in the repo specifies the output_folder (I'm using _PS if pan-sharpened)
output_file = output_folder / f"{LAYER_NAME}_PS_{SEARCH_ID}.zip"

# Download the order to specified zip file
client.download_order_to_file(order, output_file)

In [None]:
def process_zip_file(zip_file_path, target_directory, delete_zip=True):
    zip_file_path = Path(zip_file_path)
    target_directory = Path(target_directory)

    # Extract the ZIP file
    with zipfile.ZipFile(zip_file_path, "r") as zip_ref:
        temp_extract_dir = zip_file_path.parent / "temp_extracted"
        zip_ref.extractall(temp_extract_dir)

    # Find the largest .tif file
    largest_tif_file = None
    largest_size = 0
    for file in temp_extract_dir.rglob("*.TIF"):
        if file.stat().st_size > largest_size:
            largest_tif_file = file
            largest_size = file.stat().st_size

    if largest_tif_file is not None:
        # Copy it to the target directory with _<number> appended to the file name
        number_suffix = zip_file_path.stem.split("_")[-1]
        new_file_name = (
            largest_tif_file.stem + f"_{number_suffix}" + largest_tif_file.suffix
        )
        target_file_path = target_directory / new_file_name
        shutil.copy(largest_tif_file, target_file_path)

        # Delete the extracted ZIP file and temporary directory
        shutil.rmtree(temp_extract_dir)
        if delete_zip:
            zip_file_path.unlink()
    else:
        print("No .tif files found in the ZIP archive.")

In [None]:
process_zip_file(output_file, output_folder / "extracted_images")

In [None]:
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import A4
import matplotlib.pyplot as plt
import rasterio as rio
from rasterio.plot import show
from matplotlib_scalebar.scalebar import ScaleBar
import geopandas as gpd
from pathlib import Path
from datetime import datetime
import re
import numpy as np


def add_images_to_pdf(images_folder, pdf_path, gdf):
    c = canvas.Canvas(str(pdf_path), pagesize=A4)
    page_width, page_height = A4

    images = list(Path(images_folder).glob("*.tif"))
    images.sort(key=lambda img: int(img.stem.split("_")[-1]))

    for image_path in images:
        match = re.search(r"\d{14}", image_path.stem)
        if match:
            # Extract the first 8 digits (YYYYMMDD) from the match for the date
            datetime_str = match.group(0)[:8]
            datetime_obj = datetime.strptime(datetime_str, "%Y%m%d")
            formatted_date = datetime_obj.strftime(
                "%d %b %Y"
            )  # Format the date string as needed
        else:
            formatted_date = "Unknown Date"
        # Extract image_id from the file name (assuming format "_<id>.tif")
        image_id = image_path.stem.split("_")[-1]

        # Filter the GeoDataFrame for the current image_id
        gdf_filtered = gdf[gdf["image_id"] == int(image_id)]
        if not gdf_filtered.empty:
            business_name = gdf_filtered["USER_Name_of_business"].iloc[0]
            title = f"{image_id} - {business_name} ({formatted_date})"
        else:
            title = f"{image_id} ({formatted_date})"
        # Existing code for extracting datetime_str, formatted_date, image_id, and title...

        with rio.open(image_path) as src:
            # Dynamically adjust figure size based on the image dimensions

            fig, ax = plt.subplots(figsize=(8, 8))

            ax.set_title(title, pad=20)
            show(
                src, ax=ax, with_bounds=False
            )  # with_bounds=False to not alter aspect ratio

            # Calculate an approximate scale in meters per degree at the raster's central latitude
            central_lat = np.mean([src.bounds.top, src.bounds.bottom])
            meters_per_degree = (
                111132.92
                - 559.82 * np.cos(2 * central_lat * np.pi / 180)
                + 1.175 * np.cos(4 * central_lat * np.pi / 180)
                - 0.0023 * np.cos(6 * central_lat * np.pi / 180)
            )
            dx = (
                src.res[0] * meters_per_degree
            )  # Convert pixel size from degrees to meters

            # Add a scale bar
            scalebar = ScaleBar(dx, units="m", location="lower right")
            ax.add_artist(scalebar)

            ax.set_axis_off()

            temp_png_path = image_path.with_suffix(".png")
            plt.savefig(temp_png_path, dpi=300)  # Specify DPI for image quality
            plt.close(fig)

            image_width_in_points = 8 * 72
            image_height_in_points = 8 * 72
            x_position = (page_width - image_width_in_points) / 2
            y_position = (page_height - image_height_in_points) / 2

            # Draw the image centered on the page
            c.drawImage(
                str(temp_png_path),
                x_position,
                y_position,
                width=image_width_in_points,
                height=image_height_in_points,
                preserveAspectRatio=True,
            )
            c.showPage()

            temp_png_path.unlink()

    c.save()


# List of your image paths
image_folder = output_folder / "extracted_images"
images = list(image_folder.glob("*.tif"))

# Ensure images list is not longer than 50 items
images = images[:50]

# Output PDF path
pdf_path = output_folder / "output_images.pdf"

# Create PDF
add_images_to_pdf(image_folder, pdf_path, sites_gdf)