In [18]:
import pystac_client
import json
from pathlib import Path
from stac_geoparquet.arrow import parse_stac_ndjson_to_arrow, to_parquet
from pyarrow.parquet import read_table
from loguru import logger
import boto3
import requests
import os
from urllib.parse import urljoin, urlparse
from time import time

In [19]:
os.environ["IAM_URL"]="https://iam-dev"
os.environ["IAM_PASSWORD"]=""

In [23]:
from pystac.stac_io import DefaultStacIO, StacIO

def get_token(url, **kwargs):

    data = {**kwargs}

    response = requests.post(url, data=data)

    if response.status_code == 200:
        json_data = response.json()
        access_token = json_data.get("access_token")
        return access_token
    else:
        logger.error(
            f"Request for a token failed with status code {response.status_code}"
        )
def get_headers():
    payload = {
        "client_id": "ai-extensions",
        "username": "ai-extensions-user",
        "password": os.environ.get("IAM_PASSWORD"),
        "grant_type": "password",
    }
    token = get_token(url=os.environ.get("IAM_URL"), **payload)
    headers = {"Authorization": f"Bearer {token}"}

    return headers
    
class CustomStacIO(DefaultStacIO):
    def __init__(self):
        session = boto3.session.Session()

        self.s3 = session.client(
            service_name="s3",
            region_name=settings.region_name,
            use_ssl=True,
            endpoint_url=f"https://{settings.endpoint_url}",
            aws_access_key_id=settings.aws_access_key_id,
            aws_secret_access_key=settings.aws_secret_access_key,
        )

    def read_text(self, source, *args, **kwargs):
        parsed = urlparse(source)
        if parsed.scheme == "s3":
            bucket = parsed.netloc
            key = parsed.path[1:]

            return (
                self.s3.get_object(Bucket=bucket, Key=key)["Body"]
                .read()
                .decode("utf-8")
            )
        else:
            return super().read_text(source, *args, **kwargs)

    def write_text(self, dest, txt, *args, **kwargs):
        parsed = urlparse(dest)
        if parsed.scheme == "s3":
            bucket = parsed.netloc
            key = parsed.path[1:]
            self.s3.put_object(
                Body=txt.encode("UTF-8"),
                Bucket=bucket,
                Key=key,
                ContentType="application/geo+json",
            )
        else:
            super().write_text(dest, txt, *args, **kwargs)

In [24]:
StacIO.set_default(CustomStacIO)
StacIO.read_text_method = CustomStacIO.read_text

In [32]:
# STAC API Endpoint
stac_url = "https://ai-extensions-stac.terradue.com/"

# Define Time Range
start_date = "2015-06-27"
end_date = "2025-07-31"
date_range = f"{start_date}/{end_date}"

# AOI for our collection
geo = [
    (5.511286, 44.433707),  # Bottom-left
    (8.740761, 44.433707),  # Bottom-right
    (8.740761, 46.875144),  # Top-right
    (5.511286, 46.875144),  # Top-left
    (5.511286, 44.433707)   # Closing the polygon (same as first point)
]
geometry = {
  "type": "Polygon",
  "coordinates": [ # Europe
    
      geo
    
  ]
}


# Connect to STAC API
catalog = pystac_client.Client.open(stac_url, get_headers(),
                    ignore_conformance=True,)

# Search for Items
search = catalog.search(
    collections=["EUROSAT-Training-Dataset"],
    intersects=geometry,
    datetime=date_range,
    max_items=1000,
)

In [33]:
items_iter = search.items()
for item in items_iter:
    display(item)
    break

In [34]:
items_iter = search.items()

max_items = 1000
s2_json_path = Path("euro.jsonl")
if not s2_json_path.exists():
    with open(s2_json_path, "w") as f:
        count = 0

        for item in items_iter:
            json.dump(item.to_dict(), f, separators=(",", ":"))
            f.write("\n")

            count += 1
            if count >= max_items:
                break

In [35]:
record_batch_reader = parse_stac_ndjson_to_arrow(s2_json_path)

In [36]:
table = record_batch_reader.read_all()

table.schema

assets: struct<image: struct<eo:bands: list<item: struct<center_wavelength: double, common_name: string, name: string>>, href: string, raster:bands: list<item: struct<histogram: struct<buckets: list<item: int64>, count: int64, max: double, min: double>, spatial_resolution: double, statistics: struct<maximum: double, mean: double, minimum: double, stddev: double, valid_percent: double>>>, roles: list<item: string>, title: string, type: string>, labels: struct<file:size: int64, href: string, ml-aoi:role: string, roles: list<item: string>, title: string, type: string>, thumbnail: struct<eo:bands: list<item: struct<center_wavelength: double, common_name: string, name: string>>, file:size: int64, href: string, roles: list<item: string>, title: string, type: string>>
  child 0, image: struct<eo:bands: list<item: struct<center_wavelength: double, common_name: string, name: string>>, href: string, raster:bands: list<item: struct<histogram: struct<buckets: list<item: int64>, count: int64, max: 

In [37]:
s2_parquet_path = "euro.parquet"
to_parquet(table, s2_parquet_path)

In [38]:
read_table(s2_parquet_path) == table

True