# Data Acquisition

### Import Libraries


In [None]:
!pip install geemap google-cloud-storage rasterio matplotlib > /dev/null 2>&1

In [None]:
import ee
import geemap
import os
import pandas as pd
from google.colab import drive
import rasterio
import matplotlib.pyplot as plt


drive.mount('/content/drive')
work_dir = '/content/drive/MyDrive/UHI-Detection-Analysis/data/raw/'
os.makedirs(work_dir, exist_ok=True)


In [None]:
ee.Authenticate()
ee.Initialize(project='manifest-pride-258211')

### Determining Hotest Summer Days

*Using MODIS hottest cloud-free summer days through 10 year*

In [None]:
def get_modis_hottest_days(start_year=2014, end_year=2024):

    """
    Finds the hottest cloud-free summer days (May 15 – Sep 15) for Hamburg using MODIS LST data.
    Returns a DataFrame with date, LST in Kelvin, and Celsius.
    """

    # Create a 5 km buffer around Hamburg coordinates
    hamburg = ee.Geometry.Point(9.9937, 53.5511).buffer(5000)
    results = {}

    for year in range(start_year, end_year + 1):
        print(f"\nProcessing year {year}...")

        try:
            # Load MODIS LST Day 1km collection
            modis = ee.ImageCollection('MODIS/061/MOD11A1') \
                .filterBounds(hamburg) \
                .filterDate(f'{year}-05-15', f'{year}-09-15')
                # .filter(ee.Filter.lt('QC_Day', 2))  # Optional: strict quality filter

            # Function to extract mean LST and acquisition date
            def compute_lst(img):
                mean_lst = img.reduceRegion(
                    reducer=ee.Reducer.mean(),
                    geometry=hamburg,
                    scale=1000
                ).get('LST_Day_1km')
                return ee.Feature(None, {
                    'lst': mean_lst,
                    'date': img.date().format('YYYY-MM-dd')
                })

            # Map function over collection, and filter out null results
            lst_features = modis.map(compute_lst).filter(
                ee.Filter.notNull(['lst'])
            )

            # Check if any valid images remain
            if lst_features.size().getInfo() == 0:
                print("No valid LST data available, skipping.")
                continue

            # Sort by LST descending and pick the hottest
            hottest = ee.Feature(lst_features.sort('lst', False).first())

            date = hottest.get('date').getInfo()
            lst_kelvin = hottest.get('lst').getInfo()

            # Some years might return null
            if lst_kelvin is None:
                print("No temperature value found, skipping.")
                continue

            lst_celsius = lst_kelvin * 0.02 - 273.15

            results[year] = {
                'date': date,
                'lst_kelvin': lst_kelvin,
                'lst_celsius': round(lst_celsius, 2)
            }

            print(f"Hottest day: {date} | LST: {lst_celsius:.2f} °C")

        except Exception as e:
            print(f"Error: {str(e)}")
            continue

    return pd.DataFrame.from_dict(results, orient='index')

print("Extracting MODIS data...")
df_hottest = get_modis_hottest_days()

In [None]:
print("Results:")
print(df_hottest)

### Extracting Landsat-8 Images Based on Hottest Days

*Landsat 8 images based on hottest days and calculating LTS*

In [None]:
# Hamburg coordinates
hamburg = ee.Geometry.Point(9.99, 53.55)

def get_landsat_data(year, max_cloud=20):
    """
    Finds the most suitable Landsat 8 image for the given year.
    Logic improved and sorting bug fixed.
    """
    target_date = ee.Date(df_hottest.loc[year, 'date'])
    image_collection = ee.ImageCollection("LANDSAT/LC08/C02/T1_L2").filterBounds(hamburg)

    # 1. Search on the exact date (+/- 1 day for flexibility)
    landsat = image_collection \
        .filterDate(target_date.advance(-1, 'day'), target_date.advance(1, 'day')) \
        .filter(ee.Filter.lt('CLOUD_COVER', max_cloud))

    # 2. If not found, search within +/- 10 days and select the least cloudy image
    if landsat.size().getInfo() == 0:
        print(f"No image found for {year} within +/- 1 day and <{max_cloud}% cloud cover. Expanding search window...")
        landsat = image_collection \
            .filterDate(target_date.advance(-10, 'day'), target_date.advance(10, 'day')) \
            .filter(ee.Filter.lt('CLOUD_COVER', max_cloud)) \
            .sort('CLOUD_COVER')  # Bring the least cloudy image to the top

    # 3. If still no image, remove cloud filter, select the least cloudy one, and apply mask
    if landsat.size().getInfo() == 0:
        print(f"CRITICAL WARNING for {year}: No low-cloud image found. Searching best candidate with QA mask...")
        landsat = image_collection \
            .filterDate(target_date.advance(-15, 'day'), target_date.advance(15, 'day')) \
            .sort('CLOUD_COVER')  # Pick the least cloudy image (could be 50%, 80%, but best available)

    # If any image found, select the first (best) one
    image = ee.Image(landsat.first())

    # Check if the image exists
    # Wrapping with ee.Image() avoids errors on empty collections
    # We can verify existence by checking its properties
    if image.getInfo() is None:
        print(f"WARNING: No image found for {year} within any search window.")
        return None

    # If we fell to step 3 (cloud filter not applied), apply QA mask
    if image.get('CLOUD_COVER').getInfo() >= max_cloud:
        print(f"The best available image for {year} has high cloud cover ({image.get('CLOUD_COVER').getInfo():.2f}%). Applying QA mask.")
        # Cloud masking function
        def mask_clouds(img):
            # Check 3rd bit for clouds and 4th bit for cloud shadow
            qa = img.select('QA_PIXEL')
            cloud_bit_mask = 1 << 3
            cloud_shadow_bit_mask = 1 << 4
            # Keep only pixels without clouds and cloud shadows
            mask = qa.bitwiseAnd(cloud_bit_mask).eq(0).And(qa.bitwiseAnd(cloud_shadow_bit_mask).eq(0))
            return img.updateMask(mask)

        image = mask_clouds(image)

    return image


# LST calculation function
def calculate_lst(image):
    # Check if input is a valid ee.Image
    if image is None:
        return None

    lst = image.expression(
        '(TIRS1 * 0.00341802 + 149.0) - 273.15',  # Convert Kelvin to Celsius
        {'TIRS1': image.select('ST_B10')}
    ).rename('LST')
    return image.addBands(lst)

In [None]:
# --- Retrieve images and calculate LST ---
years = df_hottest.index.tolist()
lst_images = {}

for year in years:
    print(f"\n--- Processing year {year} ---")
    landsat_image = get_landsat_data(year)

    if landsat_image:
        lst_image = calculate_lst(landsat_image)
        lst_images[year] = lst_image
        print(f"LST successfully calculated for {year}.")
    else:
        print(f"LST could not be calculated for {year} because no suitable image was found.")

#### Visualize 10 years LTS data for control

In [None]:
# Interactive harita oluştur
Map = geemap.Map(center=[53.55, 9.99], zoom=12)

# Görselleştirme parametreleri (tek bant için)
vis_params = {
    'min': 20,  # Min LST (°C)
    'max': 40,  # Max LST (°C)
    'palette': ['blue', 'green', 'yellow', 'red']  # Veya 'inferno'
}

# lst_images sözlüğündeki her yıl için katman ekle
for year, lst_image in lst_images.items():
    try:
        # Görüntünün yalnızca LST bandını seç
        lst_single_band = lst_image.select('LST')
        # Haritaya katman ekle
        Map.addLayer(lst_single_band, vis_params, f'LST {year}')
    except Exception as e:
        print(f"{year} için hata: {str(e)}")

# Layer kontrol paneli ekle
Map.addLayerControl()

map_dir = '/content/drive/MyDrive/UHI-Detection-Analysis/outputs/'
output_path = map_dir + 'LST_map.html'
Map.to_html(output_path)

*Adding Time Slider to 10 years LTS*


In [32]:
# Hamburg coordinates
hamburg = ee.Geometry.Point(9.99, 53.55)

def get_sentinel2_data(year, max_cloud=50):
    """
    Finds the most suitable Sentinel-2 image for the given year based on the hottest day from df_hottest.
    - Searches ±1 day first, then ±10 days.
    - Picks the least cloudy image if multiple are available.
    - If no image under max_cloud is found, picks the best available candidate (highest cloud mask applied later).
    """
    target_date = ee.Date(df_hottest.loc[year, 'date'])
    image_collection = ee.ImageCollection("COPERNICUS/S2_SR").filterBounds(hamburg)

    # 1. Search on the exact date (+/- 1 day)
    s2 = image_collection \
        .filterDate(target_date.advance(-1, 'day'), target_date.advance(1, 'day')) \
        .filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', max_cloud))

    # 2. If not found, expand to ±10 days
    if s2.size().getInfo() == 0:
        print(f"No Sentinel-2 image found for {year} within ±1 day and <{max_cloud}% clouds. Expanding window...")
        s2 = image_collection \
            .filterDate(target_date.advance(-10, 'day'), target_date.advance(10, 'day')) \
            .filter(ee.Filter.lt('CLOUDY_PIXEL_PERCENTAGE', max_cloud)) \
            .sort('CLOUDY_PIXEL_PERCENTAGE')

    # 3. If still not found, pick best available image (even high cloud)
    if s2.size().getInfo() == 0:
        print(f"CRITICAL WARNING for {year}: No low-cloud Sentinel-2 image found. Picking best candidate.")
        s2 = image_collection \
            .filterDate(target_date.advance(-20, 'day'), target_date.advance(20, 'day')) \
            .sort('CLOUDY_PIXEL_PERCENTAGE')

    image = ee.Image(s2.first())

    # Verify existence
    if image.getInfo() is None:
        print(f"WARNING: No Sentinel-2 image found for {year} within any search window.")
        return None

    # Apply cloud mask if cloud percentage > max_cloud
    if image.get('CLOUDY_PIXEL_PERCENTAGE').getInfo() >= max_cloud:
        print(f"The best available Sentinel-2 image for {year} has high cloud cover ({image.get('CLOUDY_PIXEL_PERCENTAGE').getInfo():.2f}%). Applying SCL cloud mask.")

        def mask_clouds(img):
            scl = img.select('SCL')
            # Keep classes: 4=Vegetation, 5=Bare soil, 6=Water, 7=Low vegetation, 8=High vegetation, 11=Snow/Ice
            mask = scl.eq(4).Or(scl.eq(5)).Or(scl.eq(6)).Or(scl.eq(7)).Or(scl.eq(8)).Or(scl.eq(11))
            return img.updateMask(mask)

        image = mask_clouds(image)

    return image

### Extracting Sentinel-2 Images Based on Hottest Days

In [33]:
# --- Retrieve Sentinel-2 images ---
s2_images = {}

for year in df_hottest.index:
    print(f"\n--- Processing Sentinel-2 image for year {year} ---")
    sentinel_image = get_sentinel2_data(year)

    if sentinel_image:
        s2_images[year] = sentinel_image
        print(f"Sentinel-2 image successfully retrieved for {year}.")
    else:
        print(f"No Sentinel-2 image available for {year}.")



--- Processing Sentinel-2 image for year 2014 ---
No Sentinel-2 image found for 2014 within ±1 day and <50% clouds. Expanding window...
No Sentinel-2 image available for 2014.

--- Processing Sentinel-2 image for year 2015 ---
No Sentinel-2 image found for 2015 within ±1 day and <50% clouds. Expanding window...
No Sentinel-2 image available for 2015.

--- Processing Sentinel-2 image for year 2016 ---
No Sentinel-2 image found for 2016 within ±1 day and <50% clouds. Expanding window...
No Sentinel-2 image available for 2016.

--- Processing Sentinel-2 image for year 2017 ---
No Sentinel-2 image found for 2017 within ±1 day and <50% clouds. Expanding window...
Sentinel-2 image successfully retrieved for 2017.

--- Processing Sentinel-2 image for year 2018 ---
Sentinel-2 image successfully retrieved for 2018.

--- Processing Sentinel-2 image for year 2019 ---
Sentinel-2 image successfully retrieved for 2019.

--- Processing Sentinel-2 image for year 2020 ---
Sentinel-2 image successfully

### Export Images as GeoTIF

In [None]:
# GeoTIFF olarak export etme fonksiyonu (GÜNCEL)
def export_to_drive(image, name, folder):
    task = ee.batch.Export.image.toDrive(
        image=image,
        description=name,
        folder=folder.replace('/content/drive/MyDrive/', ''),  # GEE için göreli yol
        fileNamePrefix=name,
        scale=30,
        region=hamburg.buffer(5000).bounds(),
        fileFormat='GeoTIFF'
    )
    task.start()
    return task

work_dir = 'raw'

# Drive'a kaydet (work_dir kullanarak)
export_to_drive(lst_2015, 'LST_2015_Hamburg', work_dir)
export_to_drive(lst_2024, 'LST_2024_Hamburg', work_dir)

# Task'lerin tamamlanmasını bekle
import time
while True:
    tasks = ee.batch.Task.list()
    if all(task.status()['state'] in ('COMPLETED', 'FAILED') for task in tasks):
        break
    time.sleep(10)
print(f"Export işlemleri tamamlandı!")

#### Clean the metadata.widgets Data in the Notebook

In [None]:
!pip install nbstripout > /dev/null 2>&1

In [None]:
!nbstripout /content/drive/MyDrive/GitHub_Repos/urban-heat-island/notebooks/01_data_acquisition.ipynb
