In [1]:
import planetary_computer
import pystac_client
import rasterio
from rasterio.windows import from_bounds
from rasterio.warp import transform_bounds
from rasterio.coords import BoundingBox
from shapely.geometry import box, mapping, shape
from shapely.plotting import plot_polygon
import geopandas as gpd
import os

def bounds_to_geom_wgs84(minx,miny,maxx,maxy,input_crs=24879, format='shapely'):
    bounds=[minx,miny,maxx,maxy]
    geom_utm = box(*bounds)  # expects (xmin, ymin, xmax, ymax)
    crs_str = f"EPSG:{bounds_crs}" if isinstance(input_crs, int) else bounds_crs
    aoi_gdf = gpd.GeoDataFrame({"id": [1]}, geometry=[geom_utm], crs=crs_str)
    aoi_gdf_wgs = aoi_gdf.to_crs(4326)
    aoi_geom = aoi_gdf.geometry.iloc[0]
    if format == 'shapely':
        return aoi_gdf.to_crs(4326).geometry.iloc[0]
    else:
        return mapping(aoi_gdf.to_crs(4326).geometry.iloc[0])

def clipped_asset(item,xmin, ymin, xmax, ymax, 
                 bounds_crs = 'EPSG:32719', asset_name='visual',
                 prefix='clipped',
                 return_data_dic=True,
                 save_tiff=False,
                 out_path=None):
    # Sign the item to get access to the assets
    signed_item = planetary_computer.sign(item)
    
    # Get the signed HREF for the 'visual' asset
    visual_asset = signed_item.assets[asset_name]
    href = visual_asset.href
    #pix_sz = visual_asset.gsd
    # Open the COG using rasterio
    try:
        with rasterio.open(href) as src:
            # Get the source CRS (likely EPSG:4326 for Sentinel-2)
            src_crs = src.crs
            src_bounds = src.bounds
            # Transform UTM bounds to the source CRS
            transformed_bounds = transform_bounds(
                bounds_crs,
                src_crs,
                xmin,
                ymin,
                xmax,
                ymax
            )
            bounds = BoundingBox(
                left=transformed_bounds[0],
                bottom=transformed_bounds[1],
                right=transformed_bounds[2],
                top=transformed_bounds[3]
            )

            # Check if bounds intersect with the image
            if not (bounds.left < src_bounds.right and bounds.right > src_bounds.left and
                    bounds.bottom < src_bounds.top and bounds.top > src_bounds.bottom):
                raise ValueError(f"Provided bounds {bounds} do not intersect with image extent {src_bounds}.")
    
            # Calculate the window from the transformed bounds
            window = from_bounds(
                left=bounds.left,
                bottom=bounds.bottom,
                right=bounds.right,
                top=bounds.top,
                transform=src.transform
            )
            

            px_size_x = src.transform.a
            px_size_y = -src.transform.e  # make positive

            # Validate window size
            if window.width <= 0 or window.height <= 0:
                raise ValueError(f"Invalid window size: width={window.width}, height={window.height}. Check bounds or image resolution.")
    
            # Read the data within the window (server-side clipping via COG)
            data = src.read(window=window)

            #print(type(data))
    
            # Update the profile for the output GeoTIFF
            profile = src.profile.copy()
            profile.update({
                #'width': window.width,
                #'height': window.height,
                'width': data.shape[2],#*px_size_x,
                'height': data.shape[1],#*px_size_y,
                'transform': src.window_transform(window),
                'crs': src_crs,  # Use source CRS (likely EPSG:4326)
                'driver': 'GTiff',
                'tiled': True,
                'compress': 'deflate',
                'interleave': 'band'
            })
            if return_data_dic:
                return {'data':data, 'profile':profile, 'href':href}
            # Write the clipped data to a new GeoTIFF
            if not out_path:
                out_path = prefix
            if not os.path.exists(out_path):
                os.mkdir(out_path)
            out_file = os.path.join(out_path,f'{prefix}_{asset_name}_{item.id.split('_')[2]}.tif')
            if save_tiff:
                with rasterio.open(out_file, 'w', **profile) as dst:
                    dst.write(data)
                    dst.update_tags(
                        description='Clipped Sentinel-2 visual asset from Planetary Computer',
                        creation_date=item.properties['datetime'],
                        source='Sentinel-2',
                        href = item.assets[asset_name].href 
                    ) 
    except rasterio.errors.RasterioIOError as e:
        print(f"Rasterio error: {e}")
    except ValueError as e:
        print(f"Error: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")

#clipped_asset(item,xmin, ymin, xmax, ymax)

In [3]:
# Provided bounds in EPSG:32719 (UTM zone 19S)
bounds_utm = {
    'xmin': 407500.0, 
    'ymin': 7494500.0, 
    'xmax': 415200.0, 
    'ymax': 7505700.0
}
bounds_crs = 'EPSG:32719'
bounds_crs = 'EPSG:24879'
#asset_name='visual'
asset_name='visual'
xmin, ymin, xmax, ymax = bounds_utm.values()
max_cloud_pct = 5

# AOI: Antucoya, ~2 km bbox
bbox = [-69.898902, -22.653300, -69.859971, -22.617368]
bbox=bounds_to_geom_wgs84(xmin, ymin, xmax, ymax, input_crs = 'EPSG:32719', format='json')
# 1) Search STAC
catalog = pystac_client.Client.open("https://planetarycomputer.microsoft.com/api/stac/v1")
search = catalog.search(
    collections=["sentinel-2-l2a"],
    intersects=bbox,
    datetime="2014-08-01/2025-08-30"
)
items= list(search.items())
items_filt = [item for item in items if shape(item.geometry).contains(shape(bbox))]
#item = next(items)
items_filt = sorted(items_filt, key=lambda x: x.datetime, reverse=True)

In [4]:
from tqdm import tqdm
count=1
#items=[item for item in items if ]
datal=[]
ok_cloud=0
for item in tqdm(items_filt[-10:]):

    clipped_scl = clipped_asset(item,xmin, ymin, xmax, ymax, prefix=str(count),asset_name='SCL')
    clipped_cloud_cover_pct = 100*(clipped_scl['data'][0]>=8).sum()/(clipped_scl['data'][0]>=0).sum()
    if clipped_cloud_cover_pct <=5:
        clipped_asset(item,xmin, ymin, xmax, ymax, asset_name='visual',prefix='ant_footprint_test', return_data_dic=False, save_tiff=True)
    #datal.append()
    #print(count)
    count+=1

100%|██████████████████████████████████████████████████████████████████████████████████| 10/10 [00:15<00:00,  1.59s/it]


In [7]:
# rename_dic=dict(zip([item.id.split('_')[-1] for item in items_filt],[item.id.split('_')[2] for item in items_filt]))

# for name_in in [os.path.join('clipped_assets',f) for f in os.listdir('clipped_assets') if f[-3:]=='tif']:
#     print(name_in)
#     new_date=rename_dic[name_in.split('_')[-1].split('.')[0]]
#     name_out=os.path.join('clipped_assets',f'ant_footprint_visual_{new_date}.tif')
#     os.rename(name_in,name_out)
os.getcwd()

'C:\\Users\\amsraguirre\\OneDrive - Grupo Minero Antofagasta Minerals\\Documentos\\02_PERSONAL\\02_PROYECTOS_2025\\00_CODE_REPOS\\sentinel_timelapse\\notebooks'