### Get Environment Variables

In [1]:
import os
import ast

In [2]:
str_location = os.getenv('AREA')
location = ast.literal_eval(str_location)
start_date = os.getenv('START_DATE')
end_date = os.getenv('END_DATE')

### Connect to Google Earth Engine

In [3]:
import ee

In [4]:
# if you need to re-authenticate:
# !rm -rf ~/.config/earthengine

In [5]:
def initialize_earth_engine():
    try:
        ee.Initialize(project='alert-sol-419717')
    except ee.EEException as e:
        ee.Authenticate()
        ee.Initialize(project='alert-sol-419717')

In [6]:
initialize_earth_engine()

### Start Spark Session

In [7]:
import pyspark

In [8]:
os.environ['JAVA_HOME'] =  os.getenv('JAVA_HOME')
os.environ['PATH'] = os.getenv('PATH')

os.environ['PATH'] = os.environ['JAVA_HOME'] + '/bin:' + os.environ['PATH']

os.environ['PYSPARK_PYTHON'] = os.getenv('PYTHON_PATH')
os.environ['PYSPARK_DRIVER_PYTHON'] = os.getenv('PYTHON_PATH')

conf = pyspark.SparkConf()
conf.set('spark.driver.memory','4g')

sc = pyspark.SparkContext(conf=conf)
spark = pyspark.SQLContext.getOrCreate(sc)

24/09/08 20:39:55 WARN Utils: Your hostname, nishas-macbook.local resolves to a loopback address: 127.0.0.1; using 192.168.0.7 instead (on interface en0)
24/09/08 20:39:55 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/08 20:39:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Connect to MongoDB

In [9]:
from pymongo import MongoClient
from gridfs import GridFS

In [10]:
def connect_to_mongo():
    client = MongoClient(os.getenv('MONGO_URI'))
    db = client['deforestation_db']
    fs = GridFS(db)
    return fs

### Gather Satellite Images

In [11]:
def add_3_months(date):
    year, month = map(int, date.split('-'))
    month += 3
    if month > 12:
        month -= 12
        year += 1
    if month == 2:
        return f"{year:04d}-{month:02d}-28"
    elif month in [1, 3, 5, 7, 8, 10, 12]:
        return f"{year:04d}-{month:02d}-31"
    else:
        return f"{year:04d}-{month:02d}-30"
    return f"{year:04d}-{month:02d}"

In [12]:
def apply_scale_factors(image):
  optical_bands = image.select('SR_B.').multiply(0.0000275).add(-0.2)
  thermal_bands = image.select('ST_B.*').multiply(0.00341802).add(149.0)
  return image.addBands(optical_bands, None, True).addBands(
      thermal_bands, None, True
  )

In [13]:
def ndvi(image):
    ndvi = image.normalizedDifference(['SR_B5', 'SR_B4']).rename('NDVI')
    return image.addBands(ndvi)

In [14]:
def create_composite_image(date):
    # EE satellites revisit a place on earth every 6 days
        # gather at least 3 months of relevant images to make a good composite
    start_date = date + '-01'
    end_date = add_3_months(date)
    return ee.ImageCollection('LANDSAT/LC08/C02/T1_L2') \
        .filterDate(start_date, end_date) \
        .filterBounds(area_of_interest) \
        .filter(ee.Filter.lt('CLOUD_COVER', 10))  \
        .map(apply_scale_factors) \
        .map(ndvi) \
        .median() \
        .clip(area_of_interest)

In [15]:
area_of_interest = ee.Geometry.Rectangle(location)
bands = ['B4', 'B3', 'B2']

start_compositeimg = create_composite_image(start_date)
end_compositeimg = create_composite_image(end_date)

### Visualize Satellite Images

In [16]:
import geemap

In [17]:
def calculate_center(min_lon, min_lat, max_lon, max_lat):
    center_lat = (min_lat + max_lat) / 2
    center_lon = (min_lon + max_lon) / 2
    return center_lat, center_lon

In [18]:
center_lat, center_lon = calculate_center(location[0], location[1], location[2], location[3])

In [19]:
composite_viz_params = {
    'bands': ['SR_B4', 'SR_B3', 'SR_B2'],
    'min': 0.03,
    'max': 0.4,
    'gamma': 2.5
}

In [20]:
Map = geemap.Map(center=(center_lat, center_lon), zoom=8)
Map.addLayer(start_compositeimg, composite_viz_params, 'Start Composite')
Map.addLayerControl()
Map

Map(center=[-3.75, -53.25], controls=(WidgetControl(options=['position', 'transparent_bg'], widget=SearchDataG…

In [21]:
Map = geemap.Map(center=(center_lat, center_lon), zoom=8)
Map.addLayer(end_compositeimg, composite_viz_params, 'End Composite')
Map.addLayerControl()
Map

Map(center=[-3.75, -53.25], controls=(WidgetControl(options=['position', 'transparent_bg'], widget=SearchDataG…

### Calculate Deforestation

In [22]:
# sample data from start and end composite images for training

training_start = start_compositeimg.sample(**{
    'region': area_of_interest,
    'scale': 30,
    'numPixels': 500,
    'tileScale': 10
})

training_end = end_compositeimg.sample(**{
    'region': area_of_interest,
    'scale': 30,
    'numPixels': 500,
    'tileScale': 10
})

In [23]:
# apply k-means clustering
numberOfClusters = 3

clusterer_start = ee.Clusterer.wekaKMeans(numberOfClusters).train(training_start)
clusterer_end = ee.Clusterer.wekaKMeans(numberOfClusters).train(training_end)

classified_start = start_compositeimg.cluster(clusterer_start)
classified_end = end_compositeimg.cluster(clusterer_end)

In [24]:
# calculate difference between clustered images to detect deforestation
deforestation = classified_end.subtract(classified_start).abs()

### Visualize Results

In [25]:
cluster_viz_params = {
    'min': 0,
    'max': numberOfClusters - 1,
    'palette': ['red', 'green', 'blue']
}

In [26]:
Map = geemap.Map(center=(center_lat, center_lon), zoom=8)
Map.addLayer(deforestation, cluster_viz_params, 'Deforestation')
Map.addLayerControl()
Map

Map(center=[-3.75, -53.25], controls=(WidgetControl(options=['position', 'transparent_bg'], widget=SearchDataG…

### Save Maps to MongoDB

In [27]:
import requests
from PIL import Image
import io

In [28]:
# function to get byte value of maps
def export_image_to_bytes(image, region, scale=30, crs='EPSG:4326'):
    download_url = image.getDownloadURL({
        'scale': scale,
        'region': region,
        'crs': crs,
        'format': 'GEO_TIFF'
    })
    response = requests.get(download_url)
    if response.status_code == 200:
        # convert TIFF image to PNG
        tiff_image = Image.open(io.BytesIO(response.content))
        img_io = io.BytesIO()
        tiff_image.save(img_io, format='PNG')
        img_io.seek(0)
        return img_io.getvalue()
    else:
        return None

In [29]:
# function to process and store images in MongoDB
def process_and_store_image(task):
    image, region, scale = task

    # initialize google earth engine
    initialize_earth_engine()

    # connect to MongoDB
    fs = connect_to_mongo()
    
    image_bytes = export_image_to_bytes(image, region, scale)
    if image_bytes:
        image_id = fs.put(image_bytes, filename=f'{image}', metadata={'description': f'{image} Visualization'})
        if not image_id:
            print("Failed to store image in MongoDB")        
        return image_id
    return None

In [30]:
# preprocess maps for optimal display

start_composite_viz = start_compositeimg.visualize(**{
    'bands': ['SR_B4', 'SR_B3', 'SR_B2'],
    'min': 0.03,
    'max': 0.4,
    'gamma': 2.5
})

end_composite_viz = end_compositeimg.visualize(**{
    'bands': ['SR_B4', 'SR_B3', 'SR_B2'],
    'min': 0.03,
    'max': 0.4,
    'gamma': 2.5
})

clustered_composite_viz = deforestation.visualize(**{
    'min': 0,
    'max': numberOfClusters - 1,
    'palette': ['red', 'green', 'blue']
})

In [31]:
tasks = [
    (start_composite_viz, area_of_interest, 100),
    (end_composite_viz, area_of_interest, 100),
    (clustered_composite_viz, area_of_interest, 500)
]

# parallelize tasks using Spark
rdd = sc.parallelize(tasks)
image_ids = rdd.map(process_and_store_image).collect()

                                                                                

In [32]:
image_ids_dict = {
    "start_image_id": str(image_ids[0]),
    "end_image_id": str(image_ids[1]),
    "clustered_image_id": str(image_ids[2])
}

### Store Image IDs in JSON File

In [33]:
import json

In [34]:
with open('image_ids.json', 'w') as f:
    json.dump(image_ids_dict, f)