In [1]:
!pip install geemap



DEPRECATION: Loading egg at c:\users\prath\anaconda3\lib\site-packages\agoro_field_boundary_detector-0.1.1-py3.11.egg is deprecated. pip 24.3 will enforce this behaviour change. A possible replacement is to use pip for package installation.. Discussion can be found at https://github.com/pypa/pip/issues/12330

[notice] A new release of pip is available: 24.0 -> 24.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
import ee
ee.Authenticate(force=True)
ee.Initialize(project='proj76')

Enter verification code: 4/1AQSTgQEcOiZUlMxWgFOoDG2Brbp7vaaS1D4p3GyfemVamz2PGlVTsJrn-Iw

Successfully saved authorization token.


# Harmonized Code for Multiple Indices

In [3]:
import ee
import pandas as pd

# Initialize Earth Engine
ee.Initialize()

# Load the points dataset (replace with your dataset)
points = ee.FeatureCollection('projects/proj76/assets/OG_Chunk_2')  # Replace with your points dataset

# Set the time range
start_date = ee.Date('2023-06-01')
end_date = ee.Date('2023-12-31')

# Define 15-day intervals
def make_intervals(start_date, end_date):
    interval_list = []
    current_date = start_date
    
    while current_date.difference(end_date, 'day').lt(0).getInfo():
        next_date = current_date.advance(16, 'day')
        interval_list.append(ee.DateRange(current_date, next_date))
        current_date = next_date
    
    return interval_list

# Create the 15-day intervals
intervals = make_intervals(start_date, end_date)

# Cloud masking functions
def mask_l8_cloud(image):
    qa = image.select('QA_PIXEL')
    cloud_mask = qa.bitwiseAnd(1 << 4).eq(0).And(qa.bitwiseAnd(1 << 5).eq(0))
    return image.updateMask(cloud_mask).select(
        ['SR_B2', 'SR_B3', 'SR_B4', 'SR_B5', 'SR_B6', 'SR_B7']
    ).rename(['BLUE', 'GREEN', 'RED', 'NIR', 'SWIR1', 'SWIR2'])

def mask_l7_cloud(image):
    qa = image.select('QA_PIXEL')
    cloud_mask = qa.bitwiseAnd(1 << 4).eq(0).And(qa.bitwiseAnd(1 << 5).eq(0))
    return image.updateMask(cloud_mask).select(
        ['SR_B1', 'SR_B2', 'SR_B3', 'SR_B4', 'SR_B5', 'SR_B7']
    ).rename(['BLUE', 'GREEN', 'RED', 'NIR', 'SWIR1', 'SWIR2'])

def mask_s2_cloud(image):
    cloud_prob = image.select('MSK_CLDPRB')
    mask = cloud_prob.lt(20)  # Cloud probability < 20%
    return image.select(
        ['B2', 'B3', 'B4', 'B8', 'B11', 'B12']
    ).rename(['BLUE', 'GREEN', 'RED', 'NIR', 'SWIR1', 'SWIR2']).updateMask(mask)

# Harmonization function
def harmonization_chastain(img, from_sensor, to_sensor):
    chastain_band_names = ['BLUE', 'GREEN', 'RED', 'NIR', 'SWIR1', 'SWIR2']
    chastain_coeff_dict = {
    'MSI_OLI': [[1.0946,1.0043,1.0524,0.8954,1.0049,1.0002], [-0.0107,0.0026,-0.0015,0.0033,0.0065,0.0046], 1],
    'MSI_ETM': [[1.10601,0.99091,1.05681,1.0045,1.03611,1.04011], [-0.0139,0.00411,-0.0024,-0.0076,0.00411,0.00861], 1],
    'OLI_ETM': [[1.03501,1.00921,1.01991,1.14061,1.04351,1.05271], [-0.0055,-0.0008,-0.0021,-0.0163,-0.0045,0.00261], 1],
    'OLI_MSI': [[1.0946,1.0043,1.0524,0.8954,1.0049,1.0002], [-0.0107,0.0026,-0.0015,0.0033,0.0065,0.0046], 0],
    'ETM_MSI': [[1.10601,0.99091,1.05681,1.0045,1.03611,1.04011], [-0.0139,0.00411,-0.0024,-0.0076,0.00411,0.00861], 0],
    'ETM_OLI': [[1.03501,1.00921,1.01991,1.14061,1.04351,1.05271], [-0.0055,-0.0008,-0.0021,-0.0163,-0.0045,0.00261], 0]
    }

    combo_key = f'{from_sensor}_{to_sensor}'.upper()
    coeff_list = chastain_coeff_dict[combo_key]
    slopes, intercepts, direction = coeff_list[0], coeff_list[1], coeff_list[2]
    
    if direction == 0:
        return img.select(chastain_band_names).multiply(slopes).add(intercepts)
    else:
        return img.select(chastain_band_names).subtract(intercepts).divide(slopes)

# Add Vegetation Indices (NDVI, EVI, SAVI, NDWI)
def add_indices(image):
    ndvi = image.normalizedDifference(['NIR', 'RED']).rename('NDVI')
    evi = image.expression(
        '2.5 * ((NIR - RED) / (NIR + 6 * RED - 7.5 * BLUE + 1))', {
            'NIR': image.select('NIR'),
            'RED': image.select('RED'),
            'BLUE': image.select('BLUE')
        }).rename('EVI')
    
    gcvi = image.expression(
        '((NIR) / (GREEN)) - 1', {
            'NIR': image.select('NIR'),
            'GREEN': image.select('GREEN')
        }).rename('GCVI')
    

    savi = image.expression(
        '((NIR - RED) / (NIR + RED + 0.5)) * (1.5)', {
            'NIR': image.select('NIR'),
            'RED': image.select('RED')
        }).rename('SAVI')

    ndwi = image.normalizedDifference(['GREEN', 'NIR']).rename('NDWI')

    arvi = image.expression(
        '(NIR - (2 * RED - BLUE)) / (NIR + (2 * RED - BLUE))', {
            'NIR': image.select('NIR'),
            'RED': image.select('RED'),
            'BLUE': image.select('BLUE')
        }).rename('ARVI')

    msavi = image.expression(
        '(2 * NIR + 1 - sqrt((2 * NIR + 1) ** 2 - 8 * (NIR - RED))) / 2', {
            'NIR': image.select('NIR'),
            'RED': image.select('RED')
        }).rename('MSAVI')

    gndvi = image.normalizedDifference(['NIR', 'GREEN']).rename('GNDVI')

    mtvi2 = image.expression(
        '1.5 * (1.2 * (NIR - GREEN) - 2.5 * (RED - GREEN)) / sqrt((2 * NIR + 1) ** 2 - (6 * NIR - 5 * sqrt(RED)) - 0.5)', {
            'NIR': image.select('NIR'),
            'RED': image.select('RED'),
            'GREEN': image.select('GREEN')
        }).rename('MTVI2')

    cvi = image.expression(
        '(NIR / RED) * (GREEN / RED)', {
            'NIR': image.select('NIR'),
            'RED': image.select('RED'),
            'GREEN': image.select('GREEN')
        }).rename('CVI')

    cari = image.expression(
        '((RED - GREEN) - 0.2 * (RED - BLUE)) * (RED - GREEN) / sqrt((RED - GREEN) ** 2 + (RED - BLUE) ** 2)', {
            'RED': image.select('RED'),
            'GREEN': image.select('GREEN'),
            'BLUE': image.select('BLUE')
        }).rename('CARI')
    
    return image.addBands([ndvi, evi, savi, ndwi, arvi, msavi, gndvi, mtvi2, cvi, cari, gcvi]).toFloat()

def generate_timeseries(start_date, end_date, points):
    # Load and preprocess satellite image collections
    L8 = ee.ImageCollection('LANDSAT/LC08/C02/T1_L2').filterDate(start_date, end_date).map(mask_l8_cloud).map(add_indices)
    L7 = ee.ImageCollection('LANDSAT/LE07/C02/T1_L2').filterDate(start_date, end_date).map(mask_l7_cloud).map(add_indices)
    S2 = ee.ImageCollection('COPERNICUS/S2_SR').filterDate(start_date, end_date).map(mask_s2_cloud).map(add_indices)

    # Harmonize the collections
    L7 = L7.map(lambda img: harmonization_chastain(img, 'ETM', 'OLI'))
    S2 = S2.map(lambda img: harmonization_chastain(img, 'MSI', 'OLI'))

    # Merge collections
    collections = L8.merge(L7).merge(S2)

    # Create 15-day intervals server-side
    def create_intervals(start_date, end_date, step_days=15):
        start = ee.Date(start_date)
        end = ee.Date(end_date)

        # Generate a list of date intervals
        def create_interval(n):
            interval_start = start.advance(n, 'day')
            interval_end = interval_start.advance(step_days, 'day')
            return ee.List([interval_start, interval_end])

        # Calculate how many intervals we need
        n_intervals = end.difference(start, 'day').divide(step_days).floor()

        # Map the intervals to the sequence
        intervals = ee.List.sequence(0, n_intervals.multiply(step_days), step_days).map(create_interval)
        return intervals

    # Generate 15-day intervals
    intervals = create_intervals(start_date, end_date)

    # Process each point
    def process_point(point):
        lat = point.geometry().coordinates().get(1)
        lon = point.geometry().coordinates().get(0)
        def process_interval(interval):
            # Ensure interval is an ee.List
            interval = ee.List(interval)
            interval_start = ee.Date(interval.get(0))
            interval_end = ee.Date(interval.get(1))

            # Filter images for this time range
            images_in_interval = collections.filterDate(interval_start, interval_end)

            # Reduce to mean for the point's geometry
            mean_dict = images_in_interval.mean().reduceRegion(
                reducer=ee.Reducer.mean(),
                geometry=point.geometry(),
                scale=30,
                maxPixels=1e9
            )
            mean_dict = mean_dict.set('interval_start', interval_start)
            mean_dict = mean_dict.set('interval_end', interval_end)
            mean_dict = mean_dict.set('latitude', lat)
            mean_dict = mean_dict.set('longitude', lon)

            return ee.Feature(point.geometry(), mean_dict)

        point_timeseries = intervals.map(process_interval)
        return ee.FeatureCollection(point_timeseries)

    return points.map(process_point).flatten()

# Generate the time series
result = generate_timeseries(start_date, end_date, points)

# Convert the result to a list of dictionaries (for export or further processing)
results_list = result.getInfo()['features']

# Convert to pandas DataFrame
data = []
for feature in results_list:
    data.append(feature['properties'])

df = pd.DataFrame(data)
print(df)
# Export to CSV
df.to_csv('vegetation_indices_timeseries.csv', index=False)
print("Data exported to vegetation_indices_timeseries.csv")

EEException: Collection query aborted after accumulating over 5000 elements.

# Convert Intervals To Date

In [4]:
import pandas as pd
import ast
# Load the CSV file into a DataFrame
df = pd.read_csv('vegetation_indices_timeseries.csv')

# Function to extract the 'value' field from the JSON-like structure
def extract_timestamp(json_str):
    try:
        # Convert the string representation of JSON to a dictionary
        data = ast.literal_eval(json_str)
        # Extract the 'value' field
        return data.get('value')
    except (ValueError, SyntaxError):
        # Handle cases where conversion fails or data is not as expected
        return None

# Apply the function to the interval_start and interval_end columns
df['interval_start_value'] = df['interval_start'].apply(extract_timestamp)
df['interval_end_value'] = df['interval_end'].apply(extract_timestamp)

# Convert the extracted timestamps to human-readable dates
df['interval_start_date'] = pd.to_datetime(df['interval_start_value'], unit='ms')
df['interval_end_date'] = pd.to_datetime(df['interval_end_value'], unit='ms')

# Display the result
print(df[['interval_start', 'interval_start_date', 'interval_end', 'interval_end_date']])

# Optionally, save the updated dataframe to a new CSV file
df.to_csv('updated_file.csv', index=False)

                                interval_start interval_start_date  \
0     {'type': 'Date', 'value': 1685577600000}          2023-06-01   
1     {'type': 'Date', 'value': 1686873600000}          2023-06-16   
2     {'type': 'Date', 'value': 1688169600000}          2023-07-01   
3     {'type': 'Date', 'value': 1689465600000}          2023-07-16   
4     {'type': 'Date', 'value': 1690761600000}          2023-07-31   
...                                        ...                 ...   
2980  {'type': 'Date', 'value': 1698537600000}          2023-10-29   
2981  {'type': 'Date', 'value': 1699833600000}          2023-11-13   
2982  {'type': 'Date', 'value': 1701129600000}          2023-11-28   
2983  {'type': 'Date', 'value': 1702425600000}          2023-12-13   
2984  {'type': 'Date', 'value': 1703721600000}          2023-12-28   

                                  interval_end interval_end_date  
0     {'type': 'Date', 'value': 1686873600000}        2023-06-16  
1     {'type': 'Date', 'v

In [17]:
import ee
import pandas as pd
import os
import time
import math
import json
from datetime import datetime
from os.path import join

# Initialize Earth Engine
ee.Initialize()

# ======================
# CONFIGURATION
# ======================
START_DATE = '2021-04-01'
END_DATE = '2022-03-31'
MAX_POINTS_PER_BATCH = 300  # Smaller batches for reliability
TIMEOUT_MINUTES = 20
WAIT_BETWEEN_FILES = 10  # seconds

# ======================
# DATA CLEANING FUNCTIONS
# ======================

def clean_properties(row):
    """Clean and convert properties to EE-compatible types"""
    cleaned = {}
    for k, v in row.items():
        if k in ['latitude', 'longitude']:
            continue
        if pd.isna(v) or v == 'NaN' or v == 'nan':
            cleaned[k] = None
        elif isinstance(v, (int, float)):
            cleaned[k] = float(v)
        else:
            cleaned[k] = str(v)
    return cleaned

def csv_to_fc_batches(csv_path, batch_size=MAX_POINTS_PER_BATCH):
    """Convert CSV to multiple smaller FeatureCollections"""
    try:
        df = pd.read_csv(csv_path)
        total_points = len(df)
        batches = []
        
        for i in range(0, total_points, batch_size):
            batch_df = df.iloc[i:i+batch_size]
            features = []
            
            for _, row in batch_df.iterrows():
                try:
                    geom = ee.Geometry.Point(row['longitude'], row['latitude'])
                    properties = clean_properties(row)
                    features.append(ee.Feature(geom, properties))
                except Exception as e:
                    print(f"Skipping row {_}: {str(e)}")
                    continue
            
            if features:
                batches.append(ee.FeatureCollection(features))
        
        return batches
    
    except Exception as e:
        print(f"Error loading {csv_path}: {str(e)}")
        return []

# ======================
# IMAGE PROCESSING
# ======================

def mask_l8_cloud(image):
    qa = image.select('QA_PIXEL')
    cloud_mask = qa.bitwiseAnd(1 << 4).eq(0).And(qa.bitwiseAnd(1 << 5).eq(0))
    return image.updateMask(cloud_mask).select(
        ['SR_B2', 'SR_B3', 'SR_B4', 'SR_B5', 'SR_B6', 'SR_B7']
    ).rename(['BLUE', 'GREEN', 'RED', 'NIR', 'SWIR1', 'SWIR2'])

def mask_l7_cloud(image):
    qa = image.select('QA_PIXEL')
    cloud_mask = qa.bitwiseAnd(1 << 4).eq(0).And(qa.bitwiseAnd(1 << 5).eq(0))
    return image.updateMask(cloud_mask).select(
        ['SR_B1', 'SR_B2', 'SR_B3', 'SR_B4', 'SR_B5', 'SR_B7']
    ).rename(['BLUE', 'GREEN', 'RED', 'NIR', 'SWIR1', 'SWIR2'])

def mask_s2_cloud(image):
    cloud_prob = image.select('MSK_CLDPRB')
    mask = cloud_prob.lt(20)
    return image.select(
        ['B2', 'B3', 'B4', 'B8', 'B11', 'B12']
    ).rename(['BLUE', 'GREEN', 'RED', 'NIR', 'SWIR1', 'SWIR2']).updateMask(mask)

def add_indices(image):
    ndvi = image.normalizedDifference(['NIR', 'RED']).rename('NDVI')
    gcvi = image.expression(
        '((NIR) / (GREEN)-1)', {
            'NIR': image.select('NIR'),
            'GREEN': image.select('GREEN')
        }).rename('GCVI')
    evi = image.expression(
        '2.5 * ((NIR - RED) / (NIR + 6 * RED - 7.5 * BLUE + 1))', {
            'NIR': image.select('NIR'),
            'RED': image.select('RED'),
            'BLUE': image.select('BLUE')
        }).rename('EVI')
    return image.addBands([ndvi, evi,gcvi]).toFloat()

# ======================
# TIME SERIES GENERATION
# ======================

def generate_timeseries_batch(points_batch, start_date, end_date):
    """Process a single batch of points"""
    try:
        # Load and process image collections
        L8 = ee.ImageCollection('LANDSAT/LC08/C02/T1_L2') \
              .filterDate(start_date, end_date) \
              .map(mask_l8_cloud) \
              .map(add_indices)
        
        L7 = ee.ImageCollection('LANDSAT/LE07/C02/T1_L2') \
              .filterDate(start_date, end_date) \
              .map(mask_l7_cloud) \
              .map(add_indices)
        
        S2 = ee.ImageCollection('COPERNICUS/S2_SR_HARMONIZED') \
              .filterDate(start_date, end_date) \
              .map(mask_s2_cloud) \
              .map(add_indices)

        # Create date intervals
        n_intervals = ee.Date(end_date).difference(start_date, 'day').divide(15).ceil()
        intervals = ee.List.sequence(0, n_intervals.subtract(1)) \
            .map(lambda n: start_date.advance(ee.Number(n).multiply(15), 'day'))

        # Process points
        def process_point(point):
            def process_interval(interval_start):
                interval_end = ee.Date(interval_start).advance(15, 'day')
                composite = L8.merge(L7).merge(S2) \
                    .filterDate(interval_start, interval_end) \
                    .mean()
                
                values = composite.reduceRegion(
                    ee.Reducer.mean(),
                    point.geometry(),
                    30,
                    maxPixels=1e9
                )
                
                return ee.Feature(point.geometry(), values \
                    .set('interval_start', interval_start) \
                    .set('interval_end', interval_end) \
                    .combine(point.toDictionary()))

            return intervals.map(process_interval)

        return points_batch.map(process_point).flatten()
    
    except Exception as e:
        print(f"Error in timeseries generation: {str(e)}")
        return None

# ======================
# EXPORT MANAGEMENT
# ======================

def run_export(task, filename, timeout_min=TIMEOUT_MINUTES):
    """Run and monitor an export task"""
    start_time = time.time()
    timeout = timeout_min * 60
    wait_interval = 30
    
    task.start()
    print(f"Started export for {filename}")
    
    while task.active() and (time.time() - start_time) < timeout:
        try:
            status = task.status()
            elapsed = time.time() - start_time
            print(f"Status: {status['state']} ({elapsed//60:.0f}m {elapsed%60:.0f}s)")
            
            if status['state'] == 'FAILED':
                print(f"Export failed: {status.get('error_message', 'No details')}")
                return False
            
            time.sleep(wait_interval)
        except KeyboardInterrupt:
            print("\nExport interrupted by user")
            task.cancel()
            return False
        except Exception as e:
            print(f"Error monitoring export: {str(e)}")
            task.cancel()
            return False
    
    if task.active():
        task.cancel()
        print(f"Timeout reached after {timeout_min} minutes")
        return False
    
    final_status = task.status()
    if final_status['state'] == 'COMPLETED':
        print(f"Export completed successfully")
        return True
    else:
        print(f"Export ended with status: {final_status['state']}")
        return False

# ======================
# MAIN PROCESSING
# ======================

def process_file(csv_path):
    """Process a single CSV file"""
    filename = os.path.basename(csv_path)
    print(f"\nProcessing {filename}")
    
    # 1. Load data in batches
    fc_batches = csv_to_fc_batches(csv_path)
    if not fc_batches:
        print("No valid batches created")
        return False
    
    # 2. Process each batch
    results = []
    for i, batch in enumerate(fc_batches, 1):
        print(f"Processing batch {i}/{len(fc_batches)}")
        result = generate_timeseries_batch(batch, ee.Date(START_DATE), ee.Date(END_DATE))
        if result:
            results.append(result)
        time.sleep(5)  # Brief pause between batches
    
    if not results:
        print("No valid results generated")
        return False
    
    # 3. Combine and export results
    combined = ee.FeatureCollection(results).flatten()
    export_name = f"ts_{os.path.splitext(filename)[0][:40]}"
    
    task = ee.batch.Export.table.toDrive(
        collection=combined,
        description=export_name[:95],
        folder='GEE_Exports',
        fileNamePrefix=export_name,
        fileFormat='CSV'
    )
    
    return run_export(task, filename)

def main():
    input_dir = "Karnataka_Datasets/Across/Chunk_Small"
    output_dir = "GEE_Exports"
    
    if not os.path.exists(input_dir):
        print(f"Input directory not found: {input_dir}")
        return
    
    csv_files = sorted([f for f in os.listdir(input_dir) if f.endswith('.csv')])
    if not csv_files:
        print("No CSV files found")
        return
    
    print(f"\nStarting processing at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"Found {len(csv_files)} files to process")
    
    success_count = 0
    for i, csv_file in enumerate(csv_files, 1):
        csv_path = join(input_dir, csv_file)
        print(f"\nProcessing file {i}/{len(csv_files)}: {csv_file}")
        
        try:
            if process_file(csv_path):
                success_count += 1
            time.sleep(WAIT_BETWEEN_FILES)
        except KeyboardInterrupt:
            print("\nProcessing interrupted by user")
            break
        except Exception as e:
            print(f"Error processing {csv_file}: {str(e)}")
    
    print(f"\nProcessing completed at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"Successfully processed {success_count}/{len(csv_files)} files")

if __name__ == "__main__":
    main()


Starting processing at 2025-05-08 00:16:26
Found 31 files to process

Processing file 1/31: Karnataka_1.csv

Processing Karnataka_1.csv
Processing batch 1/10
Processing batch 2/10
Processing batch 3/10
Processing batch 4/10
Processing batch 5/10
Processing batch 6/10
Processing batch 7/10
Processing batch 8/10
Processing batch 9/10
Processing batch 10/10
Started export for Karnataka_1.csv
Status: READY (0m 18s)
Export ended with status: FAILED

Processing file 2/31: Karnataka_10.csv

Processing Karnataka_10.csv
Processing batch 1/10
Processing batch 2/10
Processing batch 3/10
Processing batch 4/10
Processing batch 5/10
Processing batch 6/10
Processing batch 7/10
Processing batch 8/10
Processing batch 9/10
Processing batch 10/10

Processing interrupted by user

Processing completed at 2025-05-08 00:19:26
Successfully processed 0/31 files
