In [45]:
# Setup
!pip install --quiet --upgrade pip

# Install the dependencies.
!pip install --quiet -r requirements.txt

# Restart the runtime by ending the process.
exit()

In [2]:
from __future__ import annotations
import os

import ee
import google.auth

import folium
from serving import data
import json
from branca.element import Figure
from folium import plugins
import sys
from importlib import reload
from serving.data import get_input_image_ee, get_varied_labels
from serving.constants import  BUCKET, PROJECT, CROP, SCALE
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
from google.cloud import storage
import logging

In [3]:
current_dir = os.getcwd()
sys.path.append(current_dir+'/serving')
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

In [4]:
# Google cloud parameters
project = PROJECT
bucket = BUCKET
location = "us-west2-a"  # @param {type:"string"}

# Quick input validations.
assert project, "⚠️ Please provide a Google Cloud project ID"
assert bucket, "⚠️ Please provide a Cloud Storage bucket name"
assert not bucket.startswith(
    "gs://"
), f"⚠️ Please remove the gs:// prefix from the bucket name: {bucket}"
assert location, "⚠️ Please provide a Google Cloud location"

# Set GOOGLE_CLOUD_PROJECT for google.auth.default().
os.environ["GOOGLE_CLOUD_PROJECT"] = project

# Set the gcloud project for other gcloud commands.
!gcloud config set project {project}

# Initialise goofle earth engine
credentials, _ = google.auth.default()
ee.Initialize(
    credentials.with_quota_project(None),
    project=project,
    opt_url="https://earthengine-highvolume.googleapis.com",
)

Updated property [core/project].


In [5]:
#CONSTANTS
YEAR_START=2017
YEAR_END=2023

SEASON_START=5
SEASON_END=10

CROP=1 #Corn

# True vision params
vis_params = {
    "min": 0,
    "max": 3000,
    "bands": ["B4", "B3", "B2"],}

vis_agro = {
     "min": 225,
    "max": 4000,
    "bands": ['B11', 'B8', 'B3']}

In [28]:
reload(data)
from serving.data import get_input_image_ee, get_varied_labels

In [56]:
# Show sample filter

start_year = 2016
start_month = 5
end_month = 7
crop_type = 1 # Corn
county = "Jeff Davis".capitalize()
state_fips = "13"

county_geom = (
    ee.FeatureCollection("TIGER/2018/Counties")
    .filter(ee.Filter.eq("COUNTYFP", "161"))
    .filter(ee.Filter.eq("STATEFP", state_fips))
).geometry()

coords = county_geom.centroid().coordinates().getInfo()

s2_img_start = get_input_image_ee(county, state_fips, crop_type, start_year, start_month)["image"]
s2_img_end = get_input_image_ee(county, state_fips, crop_type, start_year, end_month)["image"]
image_name = get_input_image_ee(county, state_fips, crop_type, start_year, start_month)["image_name"]

No data found for the specified parameters.


EEException: Element.geometry: Parameter 'feature' is required.

In [46]:
# Create the map
m1 = folium.Map(coords[::-1])

# Add the CDL layer
layer_left = folium.TileLayer(
    tiles=s2_img_start.getMapId(vis_agro)["tile_fetcher"].url_format,
    attr='Map Data &copy; <a href="https://earthengine.google.com/">Google Earth Engine</a>',
    name='Cropland Data Layer',
    overlay=True,
    control=True
)

layer_right = folium.TileLayer(
    tiles=s2_img_end.getMapId(vis_agro)["tile_fetcher"].url_format,
    attr='Map Data &copy; <a href="https://earthengine.google.com/">Google Earth Engine</a>',
    name='Cropland Data Layer',
    overlay=True,
    control=True,
)

sbs = plugins.SideBySideLayers(layer_left=layer_left, layer_right=layer_right)

layer_left.add_to(m1)
layer_right.add_to(m1)
sbs.add_to(m1)    

# Display the map
m1

In [7]:
"""
Changes in crops can be observable in the satellite images. Once can hope that the colour saturation and temporal change would hold predictive power
"""

'\nChanges in crops can be observable in the satellite images. Once can hope that the colour saturation and temporal change would hold predictive power\n'

In [None]:
# DATA RETRIVAL

In [35]:
def check_blob_prefix_exists(bucket_name, prefix):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    
    blobs = bucket.list_blobs(prefix=prefix, max_results=1)
    return any(blobs)

def batch_check_blobs(bucket_name, prefixes):
    with ThreadPoolExecutor(max_workers=10) as executor:
        future_to_prefix = {executor.submit(check_blob_prefix_exists, bucket_name, prefix): prefix for prefix in prefixes}
        results = {}
        for future in as_completed(future_to_prefix):
            prefix = future_to_prefix[future]
            results[prefix] = future.result()
    return results

# Generate all prefixes
bucket_name = bucket
labels_df = get_varied_labels(150)
labels = list(zip(labels_df["county_name"],
             labels_df["state_ansi"],
             labels_df["year"]))
months = [5,7,9]

prefixes = [f'images/{SCALE}/{county.capitalize()}_{fips}/{year}/{month}-{month+1}' 
            for county, fips, year in labels
            for month in months]

# Batch check all prefixes
results = batch_check_blobs(bucket_name, prefixes)

# Generate get_input_img_params based on results
get_input_img_params = [
    {"county": county.capitalize(), "state_fips": fips, "crop": CROP, "year": year, "month": month}
            for county, fips, year in labels
            for month in months
            if not results[f'images/{SCALE}/{county.capitalize()}_{fips}/{year}/{month}-{month+1}']
]

print(f"Number of items to process: {len(get_input_img_params)}")

Number of items to process: 2618


In [36]:
get_input_img_params_small = get_input_img_params[0:100]

In [37]:
def export_img(image:ee.Image, image_name: str, county_geom: ee.geometry):
    
    image_name = f"{SCALE}/" + image_name
    img_task = ee.batch.Export.image.toCloudStorage(
    image=image,
    description=image_name.replace("/","-"),
    bucket=bucket,
    fileNamePrefix=f"images/{image_name}",
    scale=SCALE,
    region=county_geom,
    fileDimensions = 4*2048,
    skipEmptyTiles = True,
    fileFormat = "GeoTIFF",
    maxPixels = 1e9
    )

    img_task.start()
    logging.info(f"Export task started for: {image_name}")

def apply_get_input_image(params):
    
    return get_input_image_ee(**params)

def unpack_for_export(list_input):
    county, state_fips = list_input["image_name"].split("/")[0].split("_")
    county_geom = (
    ee.FeatureCollection("TIGER/2018/Counties")
        .filter(ee.Filter.eq("NAME", county.capitalize()))
        .filter(ee.Filter.eq("STATEFP", state_fips))
        ).geometry()
    return export_img(list_input["image"], list_input["image_name"], county_geom)

In [22]:
from apache_beam.options.pipeline_options import PipelineOptions

# Define your pipeline options
options = PipelineOptions(
    runner='DataflowRunner',  # Specify the runner as 'DataflowRunner'
    project=project,  # Replace with your Google Cloud project ID
    region='us-west2-a',  # Specify the GCP region, e.g., 'us-central1'
    temp_location=f'gs://{BUCKET}/temp',  # GCS bucket for temporary files
    staging_location=f'gs://{BUCKET}/staging',  # GCS bucket for staging files
    job_name='s2-image-top150',  # Give your job a unique name    
    save_main_session=True  # Ensures that all dependencies are available in the workers
)

In [38]:
with beam.Pipeline() as pipeline:
    (
        pipeline
        |"Create parameter sets" >>  beam.Create(get_input_img_params_small)
        | "Sample counties" >> beam.Map(apply_get_input_image)
        | "Export image to GCS bucket" >> beam.Map(unpack_for_export)
    )

2024-10-19 20:06:13,288 - INFO - Missing pipeline option (runner). Executing pipeline using the default runner: DirectRunner.
2024-10-19 20:06:13,436 - INFO - Creating state cache with size 104857600
2024-10-19 20:06:15,319 - INFO - Export task started for: 60/Floyd_13/2020/7-8
2024-10-19 20:06:16,973 - INFO - Export task started for: 60/Floyd_13/2020/9-10
2024-10-19 20:06:18,567 - INFO - Export task started for: 60/Meigs_47/2016/5-6
2024-10-19 20:06:20,101 - INFO - Export task started for: 60/Meigs_47/2016/7-8
2024-10-19 20:06:21,467 - INFO - Export task started for: 60/Meigs_47/2016/9-10
2024-10-19 20:06:23,361 - INFO - Export task started for: 60/Meigs_47/2017/5-6
2024-10-19 20:06:24,796 - INFO - Export task started for: 60/Meigs_47/2017/7-8
2024-10-19 20:06:26,522 - INFO - Export task started for: 60/Meigs_47/2017/9-10
2024-10-19 20:06:27,898 - INFO - Export task started for: 60/Meigs_47/2019/5-6
2024-10-19 20:06:29,289 - INFO - Export task started for: 60/Meigs_47/2019/7-8
2024-10