## Planet API search, preview, download image

Using some of Planet's Python SDK user guidance [here](https://planet-sdk-for-python-v2.readthedocs.io/en/stable/python/sdk-guide/).  
Custom functionality to convert shapefile or geopackage features into json for API request and then download clipped images from features.
This helps to preserve Planet Account's download allowance for longer if whole image tiles are not required.  

In [None]:
import asyncio
import planet
from planet import Auth, Session, data_filter, reporting
import getpass
import geopandas as gpd
import pandas as pd
from shapely.geometry import box
import json
import datetime
from PIL import Image, ImageOps
import matplotlib.pyplot as plt
from io import BytesIO

Do not need to input API key using this method, just Planet username and password as would use to sign in to [Planet Explorer](https://planet.com/explorer)   
Credentials are then stored while run the rest of this notebook using planet.Session()

In [None]:
user = input("Username: ")
pw = getpass.getpass("Password: ")
auth = Auth.from_login(user,pw)
auth.store()

In [None]:
# class for converting input shapefile or geopackage into json for request
class SearchFeatures:
    """Prepare features for Planet api search and order"""
    def __init__(self, features_fp, bounding_box=False):
        """Initialize the SearchFeatures object."""
        self.features_fp = features_fp
        self.bounding_box = bounding_box
        self.gdf = gpd.read_file(features_fp)
        self.json_data = None
    
    def convert_to_wgs(self):
        """Convert the GeoDataFrame CRS to WGS84 (EPSG:4326)."""
        self.gdf = self.gdf.to_crs(epsg=4326)

    def geom_to_bbox(self):
        """Convert geometries to bounding boxes if specified."""
        if self.bounding_box:
            self.gdf["geometry"] = self.gdf.geometry.apply(lambda geom: box(*geom.bounds))

    def features_to_json(self):
        """Convert features to GeoJSON format."""
        json_text = self.gdf.to_json()
        self.json_data = json.loads(json_text)

    def filter_json(self, id_col, id_val):
        """Filter the GeoJSON features based on a given column and value."""
        filtered = [d for d in self.json_data["features"] if d["properties"].get(id_col) == id_val]
        return filtered[0] if filtered else None

    def process(self):
        """Run all necessary processing and return GeoJSON."""
        self.convert_to_wgs()
        self.geom_to_bbox()
        self.features_to_json()
        return self.json_data
        

In [None]:
# Update this with the input shapefile or geopackage want to use to search for images - should be POLYGON geometries currently
INPUT_FEATURES = "inputs/test_shape.shp"

In [None]:
# Using bounding box option to simplify 
search_features = SearchFeatures(INPUT_FEATURES, bounding_box=True)
# Create the search json - for all features
json_data = search_features.process()

In [None]:
# This notebook will just use one feature to search for images as example. Using an attribute column "ID" and value of 2:
filtered_feature = search_features.filter_json("id", 2)

In [None]:
def create_img_filter(start_date, end_date, geometry_filter, max_cloud_percent=10):
    """Build a planet search filter using images have access permission, data range, geometry, overall image tile cloud percent""" 
    start = datetime.datetime.strptime(start_date, "%Y-%m-%d")
    end = datetime.datetime.strptime(end_date, "%Y-%m-%d")
    return data_filter.and_filter([
        data_filter.permission_filter(),
        data_filter.date_range_filter("acquired", gte=start, lte=end),
        data_filter.geometry_filter(geometry_filter),
        data_filter.range_filter("cloud_percent", lte=max_cloud_percent, gte=0)
    ])

In [None]:
def available_img_to_df(response_lst):
    """Put the search response into a Pandas data frame for easy review, export etc"""
    data = []
    for item in response_lst:
        data.append({
            "id": item["id"],
            "cloud_percent": item["properties"]["cloud_percent"],
            "satellite_id": item["properties"]["satellite_id"],
            "thumbnail_link": item["_links"]["thumbnail"]
        })
    
    return pd.DataFrame(data)

In [None]:
async def get_available(input_filter):
    """Run the asynchronous search to find images"""
    async with Session() as sess:
        cl = sess.client('data')
        return [i async for i in cl.search(['PSScene'], input_filter)]


In [None]:
# Create the filter specifying dates, geometry json, optionally cloud cover percent
search_filter = create_img_filter("2024-04-01", "2024-05-30", filtered_feature)
# Run the search
available_images = await get_available(search_filter)
# Extract relevant info from search result into dataframe (just view available_images object if want to see full response)
search_df = available_img_to_df(available_images)
# View data frame
search_df

In [None]:
# Can plot the thumbnails, but note these are not georeferenced
class PreviewPlot:
    """Plot thumbnail image overviews from image search result dataframe."""
    def __init__(self, search_df):
        """Initialise PreviewPlot"""
        self.df = search_df
        self.current_row = 0
        self.max_row = search_df.shape[0] - 1
        self.current_link = None
        self.current_title = None
    def thumbnail_plot(self, image_data):
        """Request response to plot"""
        img = Image.open(BytesIO(image_data))
        img = img.convert("RGB")
        img = ImageOps.autocontrast(img)
        plt.figure(figsize=(20, 20))
        plt.imshow(img)
        plt.axis("off")
        plt.title(f"Image ID: {self.current_title}", fontsize=24, color='red')
        plt.show()
    def update_link(self):
        """Get current row thumbnail link"""
        self.current_link = self.df.iloc[self.current_row]["thumbnail_link"]
    def update_title(self):
        """Get current row image id for plot title"""
        self.current_title = self.df.iloc[self.current_row]["id"]
    async def fetch_thumbnail(self, thumbnail_url):
        """Thumbnail URL request"""
        async with Session() as sess:
            response = await sess.request("GET", url=f"{thumbnail_url}?width=2048")
            return response._http_response.content
    async def view_thumbnail(self):
        """Overall method to view a thumbnail and increment current row"""
        print(f"plotting thumbnail {self.current_row + 1} of {self.max_row + 1}..")
        self.update_link()
        self.update_title()
        image_data = await self.fetch_thumbnail(self.current_link)
        self.thumbnail_plot(image_data)
        if self.current_row < self.max_row:
            self.current_row += 1
        else:
            self.current_row = 0

In [None]:
# Create instance of the class
preview = PreviewPlot(search_df)

In [None]:
# Run this cell repeatedly to preview each search result image in turn
await preview.view_thumbnail()

In [None]:
def create_request(item_ids, clip_geom, order_ref):
    """Build the order request"""
   new_order = planet.order_request.build_request(
       name=order_ref,
       products=[
           planet.order_request.product(item_ids=item_ids,
                                        product_bundle="analytic_udm2",
                                        item_type="PSScene")
       ],
       tools=[planet.order_request.clip_tool(aoi=clip_geom)])

   return new_order

In [None]:
async def create_wait_and_download(input_request, download_directory):
    """Make the order, then wait and download"""
    async with Session() as sess:
        cl = sess.client('orders')
        with reporting.StateBar(state='creating') as bar:
            # create order
            order = await cl.create_order(input_request)
            bar.update(state='created', order_id=order['id'])
            # poll
            await cl.wait(order['id'], callback=bar.update_state)
        # download
        await cl.download_order(order['id'], download_directory, progress_bar = True)

In [None]:
# Make the request
request = create_request(["20240518_103839_43_24bc"], test_geom, "sg_test_order")
# Download to outputs dir in repository when ready (change this to full path if want different output dir
create_wait_and_download(request, "outputs")