In [1]:
import os 
import requests
from pystac_client import Client
import re 
from dotenv import load_dotenv
import random
from PIL import Image
from io import BytesIO

import rasterio
# Load environment variables from .env file
load_dotenv()

True

In [2]:
# Load variables from .env file
USERNAME = os.environ.get("DESTINE_USERNAME")
PASSWORD = os.environ.get("DESTINE_PASSWORD")

In [3]:
def get_direct_access_token(username, password):
    """
    Get DESTINE access token directly using provided username and password.
    """
    import requests
    from urllib.parse import parse_qs, urlparse
    from lxml import html

    SERVICE_URL = "http://localhost:5000"
    IAM_URL = "https://auth.destine.eu"
    IAM_REALM = "desp"
    IAM_CLIENT = "dcms_client"

    with requests.Session() as s:
        # Get the auth url
        response = s.get(
            url=f"{IAM_URL}/realms/{IAM_REALM}/protocol/openid-connect/auth",
            params={
                "client_id": IAM_CLIENT,
                "redirect_uri": SERVICE_URL,
                "scope": "openid",
                "response_type": "code",
            },
        )
        response.raise_for_status()
        auth_url = html.fromstring(response.content.decode()).forms[0].action

        # Login and get auth code
        login = s.post(
            auth_url,
            data={
                "username": username,
                "password": password,
            },
            allow_redirects=False,
        )

        if login.status_code == 200:
            tree = html.fromstring(login.content)
            error_message_element = tree.xpath('//span[@id="input-error"]/text()')
            error_message = (
                error_message_element[0].strip()
                if error_message_element
                else "Authentication failed"
            )
            print(f"Error: {error_message}")
            return None

        if login.status_code != 302:
            print(f"Login failed with status code: {login.status_code}")
            return None

        auth_code = parse_qs(urlparse(login.headers["Location"]).query)["code"][0]

        # Use the auth code to get the token
        response = requests.post(
            f"{IAM_URL}/realms/{IAM_REALM}/protocol/openid-connect/token",
            data={
                "client_id": IAM_CLIENT,
                "redirect_uri": SERVICE_URL,
                "code": auth_code,
                "grant_type": "authorization_code",
                "scope": "",
            },
        )

        if response.status_code != 200:
            print(f"Failed to get token. Status code: {response.status_code}")
            return None

        token_data = response.json()
        return {
            "access_token": token_data.get("access_token"),
            "refresh_token": token_data.get("refresh_token")
        }

In [4]:
access_token = get_direct_access_token(username=USERNAME,
                                       password=PASSWORD)

In [5]:
def extract_url_after_filename(url):
    """
    Extract the filename from the URL.
    
    """
    match = re.search(r'\?filename=(.*)', url)
    return match.group(1) if match else None

In [6]:
def download_sentinel_image():
    
    # Get access token
    token_result = get_direct_access_token(username=USERNAME,
                                password=PASSWORD)

    if not token_result:
        return "Failed to authenticate", None


    access_token = token_result["access_token"]

    # Set up STAC API client
    stac_base_url = "https://cachea.destine.eu"
    stac_url = f"{stac_base_url}/stac/api"
    catalog = Client.open(stac_url)
    
    # Search for Sentinel-2 images from last week
    search = catalog.search(
        method="GET",
        collections=["SENTINEL-2"],
        bbox=[-180, -90, 180, 90],
        datetime="2024-09-24/2024-09-25",
        limit=10
    )
    
    # Get a list of items
    items = list(search.items())
    if not items:
        return "No Sentinel-2 images found", None
    

    # Select a random item
    random_item = random.choice(items)
    # Get the assets of the random item
    assets = random_item.assets
    asset_keys = list(assets.keys())
    
    # Filter the assets to get the one that ends with *_TCI_60m.jp2
    tci_assets = [assets[key].href for key in asset_keys if assets[key].href.endswith('_TCI_60m.jp2')]
    
    if not tci_assets:
        return "No TCI assets found in the selected image", None
    
    
    filepath = extract_url_after_filename(tci_assets[0])
    _, filename = os.path.split(filepath)
    
    # Download the file
    url = f"{stac_base_url}/stac/download?filename={filepath}"
    
    headers = {
        'Authorization': f'Bearer {access_token}'
    }
    
    response = requests.request("POST", url, headers=headers, data={})
    
    if response.status_code == 200:

        # # Save the downloaded file
        # with open(filename, 'wb') as file:
        #     file.write(response.content)
        return response
            
    else:
        return f"Failed to download the file. Status code: {response.status_code}", None

In [7]:
response = download_sentinel_image()
img = Image.open(BytesIO(response.content))