# Accessing Sentinel-2 Data from the Copernicus Data Space Ecosystem

This notebook demonstrates how to access and process Sentinel-2 satellite imagery from the Copernicus Data Space Ecosystem using both S3 and STAC interfaces.

### What you'll learn:
- How to connect to Copernicus Data Space Ecosystem using S3 credentials
- How to search for satellite imagery using STAC API
- Download CDSE .SAFE product
- Convert SAFE file into a Zarr product
- Download and transform on the fly .SAFE product into Zarr product
- How to download and visualize Sentinel-2 imagery

### Prerequisites:
- Copernicus Data Space Ecosystem account (https://dataspace.copernicus.eu/)
- Access and secret keys configured in environment variables

## Import dependencies

In [None]:
import os
from urllib.parse import urlparse
import random

import boto3
import pystac_client
from dotenv import load_dotenv

## Setup and Environment Configuration

First, we'll import the required libraries and set up our environment. Make sure your Copernicus credentials are stored in your environment variables or a `.env` file.


In [None]:
# Get credentials from environment variables
load_dotenv()
ACCESS_KEY_ID = os.environ.get("ACCESS_KEY_ID")
SECRET_ACCESS_KEY = os.environ.get("SECRET_ACCESS_KEY")

## The S3Connector Class

The `S3Connector` class provides an interface to connect to the S3-compatible storage service of the Copernicus Data Space Ecosystem. This class handles authentication and connection management.


In [None]:
class S3Connector:
    """A clean connector for S3-compatible storage services"""

    def __init__(self, endpoint_url, access_key_id,
                 secret_access_key, region_name='default'):
        """Initialize the S3Connector with connection parameters"""
        self.endpoint_url = endpoint_url
        self.access_key_id = access_key_id
        self.secret_access_key = secret_access_key
        self.region_name = region_name

        # Create session
        self.session = boto3.session.Session()

        # Initialize S3 resource
        self.s3 = self.session.resource(
            's3',
            endpoint_url=self.endpoint_url,
            aws_access_key_id=self.access_key_id,
            aws_secret_access_key=self.secret_access_key,
            region_name=self.region_name
        )

        # Initialize S3 client
        self.s3_client = self.session.client(
            's3',
            endpoint_url=self.endpoint_url,
            aws_access_key_id=self.access_key_id,
            aws_secret_access_key=self.secret_access_key,
            region_name=self.region_name
        )

    def get_s3_client(self):
        """Get the boto3 S3 client"""
        return self.s3_client

    def get_s3_resource(self):
        """Get the boto3 S3 resource"""
        return self.s3

    def get_bucket(self, bucket_name):
        """Get a specific bucket by name"""
        return self.s3.Bucket(bucket_name)

    def list_buckets(self):
        """List all available buckets"""
        response = self.s3_client.list_buckets()
        if 'Buckets' in response:
            return [bucket['Name'] for bucket in response['Buckets']]
        return []

## Utility Functions

The following function helps convert S3 URIs from the STAC catalog into S3 keys that can be used for direct access.


In [None]:
def extract_s3_path_from_url(url):
    """
    Extracts the S3 object path from an S3 URL or URI.

    This function parses S3 URLs/URIs and returns just the object path portion,
    removing the protocol (s3://), bucket name, and any leading slashes.

    Args:
        url (str): The full S3 URI (e.g., 's3://eodata/path/to/file.jp2')

    Returns:
        str: The S3 object path (without protocol, bucket name and leading slashes)
    """
    # If it's not an S3 URI, return it unchanged
    if not url.startswith('s3://'):
        return url

    # Parse the S3 URI
    parsed_url = urlparse(url)

    # Ensure this is an S3 URL
    if parsed_url.scheme != 's3':
        raise ValueError(f"URL {url} is not an S3 URL")

    # Extract the path without leading slashes
    object_path = parsed_url.path.lstrip('/')

    return object_path

In [None]:
def get_product(s3_resource, bucket_name, object_url, output_path):
    """
    Download a product from S3 bucket and create output directory if it doesn't exist.

    Args:
        s3_resource: boto3 S3 resource object
        bucket_name (str): Name of the S3 bucket
        object_url (str): Path to the object within the bucket
        output_path (str): Local directory to save the file

    Returns:
        str: Path to the downloaded file
    """
    # Create output directory if it doesn't exist
    os.makedirs(output_path, exist_ok=True)

    # Extract filename from the object URL
    _, filename = os.path.split(object_url)

    # Full path where the file will be saved
    local_file_path = os.path.join(output_path, filename)

    print(f"Downloading {object_url} to {local_file_path}...")

    try:
        # Download the file from S3
        s3_resource.Bucket(bucket_name).download_file(object_url, local_file_path)
        print(f"Successfully downloaded to {local_file_path}")
    except Exception as e:
        print(f"Error downloading file: {str(e)}")
        raise

    return local_file_path

## Connecting to Copernicus Data Space Ecosystem

Now let's establish connections to both the S3 storage and STAC catalog services using our credentials.


In [None]:
ACCESS_KEY_ID = os.environ.get("ACCESS_KEY_ID")
SECRET_ACCESS_KEY = os.environ.get("SECRET_ACCESS_KEY")
ENDPOINT_URL = 'https://eodata.dataspace.copernicus.eu'
ENDPOINT_STAC = "https://stac.dataspace.copernicus.eu/v1/"
BUCKET_NAME = "eodata"
catalog = pystac_client.Client.open(ENDPOINT_STAC)
connector = S3Connector(
    endpoint_url=ENDPOINT_URL,
    access_key_id=ACCESS_KEY_ID,
    secret_access_key=SECRET_ACCESS_KEY,
    region_name='default'
)
# Get S3 client and resource from the connector instance
s3 = connector.get_s3_resource()
s3_client = connector.get_s3_client()
buckets = connector.list_buckets()
print("Available buckets:", buckets)

## Searching for Sentinel-2 Product

We'll use the STAC API to search for Sentinel-2 Level 2A products based on:
- Geographic location (longitude/latitude point)
- Date range
- Cloud cover threshold

The search results provide metadata and access links to the actual imagery.


### Specific Area

In [None]:
LON, LAT = 150.97, -20.92
# Search for Sentinel-2 products
items_txt = catalog.search(
    collections=['sentinel-2-l1c'],
    intersects=dict(type="Point", coordinates=[LON, LAT]),
    datetime="2024-05-01/2024-06-01",
    query=["eo:cloud_cover<50"]
).item_collection()
selected_item = random.choice(items_txt)
selected_item

### Boundinx Box

In [None]:
# Define bounding box coordinates [min_lon, min_lat, max_lon, max_lat]
min_lon, min_lat, max_lon, max_lat = 150.95,-20.92, 150.97, -20.95  # Example: 1° box around LON=15, LAT=50

# Create a bounding box polygon (must be closed, so repeat the first point at the end)
bbox_polygon = {
    "type": "Polygon",
    "coordinates": [[
        [min_lon, min_lat],  # Southwest corner
        [max_lon, min_lat],  # Southeast corner
        [max_lon, max_lat],  # Northeast corner
        [min_lon, max_lat],  # Northwest corner
        [min_lon, min_lat]   # Close the polygon by repeating the first point
    ]]
}

# Search for Sentinel-2 products within the bounding box
items_txt = catalog.search(
    collections=['sentinel-2-l2a'],
    intersects=bbox_polygon,
    datetime="2024-05-01/2024-06-01",
    query=["eo:cloud_cover<10"]
).item_collection()
selected_item = random.choice(items_txt)
selected_item

## Downloading Sentinel-2 Product

Once we've identified the product we want, we can download it using our S3 connection.


In [None]:
def download(bucket, product: str, target: str = "") -> str:
    """
    Downloads every file in the S3 bucket with the provided product prefix.
    Creates a local folder named after the .SAFE directory (without the .SAFE extension).

    Args:
        bucket: boto3 Resource bucket object
        product: Path to the product (e.g., 'Sentinel-2/MSI/L2A/.../S2B_MSIL2A_..._T56KKB_20240516T015827.SAFE/')
        target: Local directory to save the files. Defaults to current directory.

    Returns:
        str: Path to the downloaded .SAFE directory (without the .SAFE extension)

    Raises:
        FileNotFoundError: If the product was not found in the bucket
    """
    # Ensure the product path ends with '/'
    if not product.endswith('/'):
        product += '/'

    # List files in the S3 prefix
    files = list(bucket.objects.filter(Prefix=product))
    if not files:
        raise FileNotFoundError(f"Could not find any files for {product}")

    # Extract the .SAFE directory name (e.g., "S2B_MSIL2A_20240516T001109_N0510_R073_T56KKB_20240516T015827.SAFE")
    safe_dir = os.path.basename(product.rstrip('/'))
    if not safe_dir.endswith('.SAFE'):
        raise ValueError(f"Expected a .SAFE directory, got: {safe_dir}")

    # Create the local target directory (without the .SAFE extension)
    # local_dir = safe_dir[:-5]  # Remove '.SAFE' from the name
    local_path = os.path.join(target, safe_dir)

    # Create the local directory structure
    os.makedirs(local_path, exist_ok=True)

    # Download each file while preserving the relative structure
    for file in files:
        # Skip directory markers (S3 pseudo-folders)
        if file.key.endswith('/'):
            continue

        # Compute the relative path inside the .SAFE directory
        relative_path = os.path.relpath(file.key, product)
        local_file_path = os.path.join(local_path, relative_path)

        # Create parent directories if they don't exist
        os.makedirs(os.path.dirname(local_file_path), exist_ok=True)

        # Download the file
        bucket.download_file(file.key, local_file_path)

    return local_path

In [None]:
selected_item = random.choice(items_txt)

In [None]:
bucket = s3.Bucket(BUCKET_NAME)
product_url , _ = os.path.split(selected_item.assets['safe_manifest'].href)
product_url = extract_s3_path_from_url(product_url)
safe_filename = download(bucket, product_url, target=".")

In [None]:
# Get the current working directory
current_dir = os.getcwd()
# Replace .SAFE with .zarr
zarr_filename =  os.path.split(safe_filename)[1].replace('.SAFE', '.zarr')
# Join the current directory path with the new filename
zarr_path = os.path.join(current_dir, zarr_filename)

## EOPF converter

Converting a local legacy product (safe format) into new Zarr format

In [None]:
from eopf.store.convert import convert

convert(safe_filename, zarr_path, mask_and_scale=True)

In [None]:
safe_filename = download(bucket, product_url, target=".")
convert(safe_filename, zarr_path, mask_and_scale=True)

In [None]:
import xarray as xr
dt = xr.open_datatree(zarr_path, engine="zarr", mask_and_scale=False, chunks={})

In [None]:

ds = dt.measurements.reflectance.r10m
ds

In [None]:
import matplotlib.pyplot as plt
import numpy as np

rgb = np.dstack([
    ds['b04'].values,
    ds['b03'].values,
    ds['b02'].values
])
# Plot
plt.figure(figsize=(12, 8))
plt.imshow(rgb)
plt.axis('off')
plt.title('Sentinel-2 RGB Composite')
plt.show()

## Download and transform on the flight from S3 


Download and convert a legacy product (safe format) into new Zarr format locally

In [None]:
product_url , _ = os.path.split(selected_item.assets['safe_manifest'].href)
# Replace .SAFE with .zarr
zarr_filename =  os.path.split(product_url)[1].replace('.SAFE', '.zarr')
# Join the current directory path with the new filename
zarr_path = os.path.join(current_dir, zarr_filename)

In [None]:
from eopf.common.constants import OpeningMode
from eopf.common.file_utils import AnyPath

S3_CONFIG = {
    "key": ACCESS_KEY_ID, # EDIT WITH YOUR S3 KEY
    "secret": SECRET_ACCESS_KEY, # EDIT WITH YOUR S3 SECRET KEY
    "client_kwargs": { "endpoint_url": ENDPOINT_URL, "region_name": "default"} # EDIT WITH YOUR CLIENT_KWARGS
}
# Add this parameter if you want to overwrite the output of the conversion if it already exists
target_store_config = dict(mode=OpeningMode.CREATE_OVERWRITE)

In [None]:
# For the zarr store we don't need to add the storage_options argument because we want to write the target product locally
convert(AnyPath(product_url, **S3_CONFIG), zarr_path, target_store_kwargs=target_store_config)