# Set-Up

In [None]:
# Install Requirements

!pip install -r requirements.txt



In [None]:
# Import packages

import pandas as pd
import rasterio
from rasterio.warp import transform_bounds
import requests
import os
from shapely.geometry import box
from shapely.ops import unary_union
from rasterio.shutil import copy as rio_copy
from copernicus_login import username, password

# Collocation

In [None]:
# Helper functions

def get_tif_bounds_in_wgs84(tif_path):
    with rasterio.open(tif_path) as src:
        bounds = src.bounds
        src_crs = src.crs
        # Transform bounds to WGS84 for querying Copernicus
        wgs84_bounds = transform_bounds(src_crs, "EPSG:4326", *bounds)
        return wgs84_bounds  # (min_lon, min_lat, max_lon, max_lat)
    
def bounds_to_wkt_polygon(bounds):
    min_lon, min_lat, max_lon, max_lat = bounds
    return (
        f"POLYGON(({min_lon} {min_lat}, {min_lon} {max_lat}, "
        f"{max_lon} {max_lat}, {max_lon} {min_lat}, {min_lon} {min_lat}))"
    )

def make_api_request(url, method="GET", data=None, headers=None):
    global access_token
    if not headers:
        headers = {"Authorization": f"Bearer {access_token}"}

    response = requests.request(method, url, json=data, headers=headers)
    if response.status_code in [401, 403]:
        global refresh_token
        access_token = refresh_access_token(refresh_token)
        headers["Authorization"] = f"Bearer {access_token}"
        response = requests.request(method, url, json=data, headers=headers)
    return response


def get_access_and_refresh_token(username, password):
    """Retrieve both access and refresh tokens."""
    url = "https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token"
    data = {
        "grant_type": "password",
        "username": username,
        "password": password,
        "client_id": "cdse-public",
    }
    response = requests.post(url, data=data)
    response.raise_for_status()
    tokens = response.json()
    return tokens["access_token"], tokens["refresh_token"]


def refresh_access_token(refresh_token):
    """Attempt to refresh the access token using the refresh token."""
    url = "https://identity.dataspace.copernicus.eu/auth/realms/CDSE/protocol/openid-connect/token"
    data = {
        "grant_type": "refresh_token",
        "refresh_token": refresh_token,
        "client_id": "cdse-public",
    }
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    try:
        response = requests.post(url, headers=headers, data=data)
        response.raise_for_status()  # This will throw an error for non-2xx responses
        return response.json()["access_token"]
    except requests.exceptions.HTTPError as e:
        print(f"Failed to refresh token: {e.response.status_code} - {e.response.text}")
        if e.response.status_code == 400:
            print("Refresh token invalid, attempting re-authentication...")
            # Attempt to re-authenticate
            username = username
            password = password
            # This requires securely managing the credentials, which might not be feasible in all contexts
            access_token, new_refresh_token = get_access_and_refresh_token(
                username, password
            )  # This is a placeholder
            refresh_token = (
                new_refresh_token  # Update the global refresh token with the new one
            )
            return access_token
        else:
            raise

def download_single_product(
    product_id, file_name, access_token, download_dir="downloaded_products"
):
    """
    Download a single product from the Copernicus Data Space.

    :param product_id: The unique identifier for the product.
    :param file_name: The name of the file to be downloaded.
    :param access_token: The access token for authorization.
    :param download_dir: The directory where the product will be saved.
    """
    # Ensure the download directory exists
    os.makedirs(download_dir, exist_ok=True)

    # Construct the download URL
    url = (
        f"https://zipper.dataspace.copernicus.eu/odata/v1/Products({product_id})/$value"
    )

    # Set up the session and headers
    headers = {"Authorization": f"Bearer {access_token}"}
    session = requests.Session()
    session.headers.update(headers)

    # Perform the request
    response = session.get(url, headers=headers, stream=True)

    # Check if the request was successful
    if response.status_code == 200:
        # Define the path for the output file
        output_file_path = os.path.join(download_dir, file_name + ".zip")

        # Stream the content to a file
        with open(output_file_path, "wb") as file:
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    file.write(chunk)
        print(f"Downloaded: {output_file_path}")
    else:
        print(
            f"Failed to download product {product_id}. Status Code: {response.status_code}"
        )

def query_sentinel2_for_tif_area(
    start_date,
    end_date,
    token,
    wkt_polygon,
    min_cloud=0,
    max_cloud=20,
):
    all_data = []

    filter_string = (
        f"Collection/Name eq 'SENTINEL-2' and "
        f"Attributes/OData.CSC.StringAttribute/"
        f"any(att:att/Name eq 'productType' and att/Value eq 'S2MSI2A') and "
        f"Attributes/OData.CSC.DoubleAttribute/any(att:att/Name eq 'cloudCover' and att/Value ge {min_cloud}) and "
        f"Attributes/OData.CSC.DoubleAttribute/any(att:att/Name eq 'cloudCover' and att/Value le {max_cloud}) and "
        f"ContentDate/Start gt {start_date}T00:00:00.000Z and ContentDate/Start lt {end_date}T23:59:59.999Z"
    )

    next_url = (
        f"https://catalogue.dataspace.copernicus.eu/odata/v1/Products?"
        f"$filter={filter_string} and "
        f"OData.CSC.Intersects(area=geography'SRID=4326;{wkt_polygon}')&"
        f"$top=100"
    )

    headers = {"Authorization": f"Bearer {token}"}

    while next_url:
        response = make_api_request(next_url, headers=headers)
        if response.status_code == 200:
            data = response.json()["value"]
            all_data.extend(data)
            next_url = response.json().get("@odata.nextLink")
        else:
            print(f"Error fetching data: {response.status_code} - {response.text}")
            break

    return pd.DataFrame(all_data)

def format_sentinel2_results(df_raw):
    """Reformats raw Copernicus API response to a cleaner DataFrame."""
    formatted_data = []

    for _, row in df_raw.iterrows():
        # Extract cloud cover from nested attribute list
        attributes = row.get("Attributes", [])
        cloud_cover = None
        for att in attributes:
            if att.get("Name") == "cloudCover":
                cloud_cover = att.get("Value")
                break

        content_date = row.get("ContentDate", {})
        formatted_data.append({
            "ProductID": row["Id"],
            "ProductName": row["Name"],
            "AcquisitionDate": content_date.get("Start", "")[:10],
            "Tile": row["Name"].split("_")[5] if "_" in row["Name"] else "Unknown",
            "Size (MB)": round(int(row["ContentLength"]) / 1e6, 2),
            "DownloadPath": row["S3Path"],
            "FootprintWKT": row["Footprint"]
        })

    return pd.DataFrame(formatted_data)

def get_combined_bounds(tif_dir):
    geometries = []
    for fname in os.listdir(tif_dir):
        if fname.endswith(".tif"):
            with rasterio.open(os.path.join(tif_dir, fname)) as src:
                bounds = transform_bounds(src.crs, "EPSG:4326", *src.bounds)
                geometries.append(box(*bounds))
    return unary_union(geometries)

def bounds_to_wkt(bounds_geom):
    return bounds_geom.wkt


In [None]:
# Query Execution 

# Specify path to tif file
tif_path = "/Users/tessacannon/Documents/UCL/Dissertation/tifs/"

# 1. Extract bounds in WGS84
bounds = get_combined_bounds(tif_path)

# 2. Convert bounds to WKT polygon
wkt_poly = bounds_to_wkt(bounds)

# 3. Define date range ± 1 day around LiDAR acquisition date
start_date = "2024-04-20"
end_date = "2024-05-02"

# 4. Get access and refresh tokens
access_token, refresh_token = get_access_and_refresh_token(username, password)

# 5. Query Sentinel-2 data
df_results = query_sentinel2_for_tif_area(
    start_date=start_date,
    end_date=end_date,
    token=access_token,
    wkt_polygon=wkt_poly,
    min_cloud=0,
    max_cloud=20 # Max cloud coverage at 20%
)

# 6. Format results
df_results = format_sentinel2_results(df_results)

# Display the results
df_results.head()


Unnamed: 0,ProductID,ProductName,AcquisitionDate,Tile,Size (MB),DownloadPath,FootprintWKT
0,b0b5386b-83a9-4434-b3e6-5de5b680a269,S2A_MSIL2A_20240422T173911_N0510_R098_T17XNA_2...,2024-04-22,T17XNA,970.82,/eodata/Sentinel-2/MSI/L2A/2024/04/22/S2A_MSIL...,geography'SRID=4326;POLYGON ((-81.000612713072...


# Download Data

In [None]:
# Download the desired product

download_single_product(
    product_id=df_results.iloc[0]["ProductID"],
    file_name=df_results.iloc[0]["ProductName"],
    access_token=access_token,
    download_dir="./sentinel_downloads"
)

Downloaded: ./sentinel_downloads/S2A_MSIL2A_20240422T173911_N0510_R098_T17XNA_20240423T004122.SAFE.zip


In [None]:
# Convert the Sentinel-2 product to a tif file

jp2_path = "/Users/tessacannon/Documents/UCL/Dissertation/sentinel_downloads/S2A_MSIL2A_20240422T173911_N0510_R098_T17XNA_20240423T004122.SAFE/GRANULE/L2A_T17XNA_A046141_20240422T173906/IMG_DATA/R10m/T17XNA_20240422T173911_B04_10m.jp2"
tif_path = "/Users/tessacannon/Documents/UCL/Dissertation/s2_tifs/T17XNA_B04_10m.tif"

with rasterio.open(jp2_path) as src:
    rio_copy(src, tif_path, driver="GTiff")

In [14]:
# Get the total area in kilometers covered by the combined tif files

def calculate_total_area(tif_dir):
    total_area = 0.0
    for fname in os.listdir(tif_dir):
        if fname.endswith(".tif"):
            with rasterio.open(os.path.join(tif_dir, fname)) as src:
                bounds = src.bounds
                width = bounds[2] - bounds[0]
                height = bounds[3] - bounds[1]
                area_km2 = (width * height) / 1e6  # Convert to km²
                total_area += area_km2
    return total_area
total_area = calculate_total_area(tif_path)
print(f"Total area covered by the tif files: {total_area:.2f} km²")


Total area covered by the tif files: 4085.23 km²
