### Name
T(h)eroPoDa - Time Series Extraction for Polygonal Data

### Description
Toolkit created to extract time series information from Sentinel 2 data stored in Earth Engine

### Author
  Vinícius Vieira Mesquita - vieiramesquita@gmail.com

### Version
  1.0.6

### Import main libraries

Run the following cell to import the main API's into your session.

In [None]:
!pip install earthengine-api
!pip install pandas
!pip instal joblib

In [2]:
import os
import sys
import time
import ee
import pandas as pd
from joblib import Parallel, delayed

### Authenticate and initialize

Run the `ee.Authenticate` function to authenticate your access to Earth Engine servers and `ee.Initialize` to initialize it. Upon running the following cell you'll be asked to grant Earth Engine access to your Google account. Follow the instructions printed to the cell.

In [None]:
# Trigger the authentication flow.
ee.Authenticate()

# Initialize the library.
ee.Initialize()


### Get the NDVI Time Series from Earth Engine

Function responsible to get the time series of Sentinel 2 data throught Earth Engine.

This function needs a `geometry` object in the `ee.Feature()` formart and the choosed vetor propertie ID as the `id_field`.



In [43]:
#Returns a NDVI time series (and other informations) by a target polygon
def getTimeSeries(geometry,id_field,bestEffort=False):

  #Mask possible edges which can occur on Sentinel 2 data
  def maskEdges(img):
    return img.updateMask(img.select('B8A').mask().updateMask(img.select('B9').mask()));

  #Creates a Cloud and Shadow mask for the input Sentinel 2 image
  def mask_and_ndvi(img):
    #Get spacecraft plataform name
    satName = ee.String(img.get('SPACECRAFT_NAME'))

    #Mask expression for Sentinel 2 Surface Reflectance data
    #maskExp = "(b('MSK_CLDPRB') < 5 && b('SCL') != 3 && b('SCL') != 10)"

    #Mask expression for Sentinel 2 Top of Atmosphere Data
    #maskExp = "b('Q60') == 0"

    #Remove cloud and shadow from images
    mask = img.select('cs').gte(0.5)

    #Calculate NDVI (Normalized Difference Vegetation Index) based on Bands 4 (Red) and 8 (Near Infrared)
    ndvi = img.updateMask(mask).normalizedDifference(['B8','B4']).select([0],['NDVI'])

    return (img.addBands([ndvi,ee.Image.constant(1).rename(['full'])], None, True)
      .set({'system:time_start':img.get('system:time_start'),'satelite':satName}))

  #Extracts and standardizes the output NDVI values and etc. by each image
  def reduceData(img):

    img = ee.Image(img)

    #Get the date which the image was taken
    imgDate = ee.Date(ee.Number(img.get('system:time_start')))

    #Organize the time for the outuput NDVI information
    orgDate = (ee.String(ee.Number(imgDate.get('year')).toInt().format())
      .cat('-')
      .cat(ee.String(ee.Number(imgDate.get('month')).toInt().format()))
      .cat('-')
      .cat(ee.String(ee.Number(imgDate.get('day')).toInt().format()))
      )

    #Defines the zonal reducers to use
    reducers = (ee.Reducer.mean()
        .combine(**{'reducer2': ee.Reducer.stdDev(),'sharedInputs':True,})
        .combine(**{'reducer2': ee.Reducer.median(),'sharedInputs':True,})
        .combine(**{'reducer2': ee.Reducer.min(),'sharedInputs':True,})
        .combine(**{'reducer2': ee.Reducer.max(),'sharedInputs':True,})
        .combine(**{'reducer2': ee.Reducer.count(),'sharedInputs':True}))

    pixel_size = 10

    #If polygon area is to big and causes memory limit error, bestEffort is used
    #bestEffort - If the polygon would contain too many pixels at the given scale, compute and use a larger scale which would allow the operation to succeed.

    if bestEffort == False:
      series = img.reduceRegion(reducers,ee.Feature(geometry).geometry(), pixel_size,None,None,False,1e13,16)

    else:
      pixel_size = 30
      series = img.reduceRegion(reducers,ee.Feature(geometry).geometry(), pixel_size,None,None,False,1e13,16)

    #Return defined information for the choosed polygon
    return (ee.Feature(geometry)
      .set('id',ee.String(img.id())) #Image ID
      .set('date',orgDate) #Date
      .set('satelite',img.get('satelite')) #Sapacraft plataform name (i.e. Sentinel 2A or 2B)
      .set('MGRS_TILE',img.get('MGRS_TILE')) #Reference tile grid
      .set('AREA_HA',ee.Feature(geometry).area(1).divide(10000)) #Choosed polygon ID Field
      .set('NDVI_mean',ee.Number(ee.Dictionary(series).get('NDVI_mean'))) #NDVI pixel average for the polygon
      .set('NDVI_median',ee.Number(ee.Dictionary(series).get('NDVI_median')))
      .set('NDVI_min',ee.Number(ee.Dictionary(series).get('NDVI_min')))
      .set('NDVI_max',ee.Number(ee.Dictionary(series).get('NDVI_max')))
      .set('NDVI_stdDev',ee.Number(ee.Dictionary(series).get('NDVI_stdDev'))) #NDVI pixel Standard Deviation for the polygon
      .set('Pixel_Count',ee.Number(ee.Dictionary(series).get('NDVI_count'))) #Number of pixels cloudless and shadowless used for estimatives
      .set('Total_Pixels',ee.Number(ee.Dictionary(series).get('full_count'))) #Total number of pixels inside the polygon
      .set('Pixel_Size',pixel_size) #Size of the pixel used
    )

  #Turns Feature into Dictionary to get properties
  def toDict(feat):
    return ee.Feature(feat).toDictionary()

  #imgCol =  (ee.ImageCollection("COPERNICUS/S2_HARMONIZED")
  #  .filterBounds(geometry.geometry())
  #  .map(mask_img))

  #Calls the Sentinel 2 data collection, filter the images based in the polygon location, masks cloud/shadow and calculates NDVI
  s2 = (ee.ImageCollection('COPERNICUS/S2_SR_HARMONIZED')
    .filterBounds(geometry.geometry()))

  csPlus = (ee.ImageCollection('GOOGLE/CLOUD_SCORE_PLUS/V1/S2_HARMONIZED')
				.filterBounds(geometry.geometry()))

  csPlusBands = csPlus.first().bandNames();

  imgCol = (s2.linkCollection(csPlus, csPlusBands)
        .map(maskEdges)
        .map(mask_and_ndvi))

  #Extracts NDVI time series by polygon, remove the nulls and build a dictionary struture to the data
  Coll_fill = (imgCol.toList(imgCol.size()).map(reduceData)
    .filter(ee.Filter.notNull(['NDVI_mean']))
    .map(toDict)
    )

  return Coll_fill

### Build and Structure the Time Series library

Function responsible to build and structure the time series library.

In [7]:
#Builds and writes a NDVI time series with Sentinel 2 data by a target vector asset
def build_time_series(index,obj,id_field,outfile,asset,bestEffort=False):

  #Main polygon asset
  samples = ee.FeatureCollection(asset).select(id_field)

  #Creates an empty data.frame for the time series
  df = pd.DataFrame()

  #Processing time start variable
  start_time_obj = time.time()

  #Selects the target polygon
  selected_sample = samples.filter(ee.Filter.eq(id_field,obj)).first()

  #Extracts the formated NDVI time series from the target polygon
  point_series = getTimeSeries(ee.Feature(selected_sample),id_field,bestEffort).getInfo()

  #Writes the time series by data frame row
  for item in point_series:
    df  = pd.concat([df,pd.DataFrame(item,index=[0])])

  #If the output file exists, ignores the header
  hdr = False if os.path.isfile(outfile) else True

  #Rounds the NDVI values by four decimals (avoid huge and slow tables)
  df['AREA_HA'] = df['AREA_HA'].round(decimals=4)
  df['NDVI_mean'] = df['NDVI_mean'].round(decimals=4)
  df['NDVI_stdDev'] = df['NDVI_stdDev'].round(decimals=4)

  #Writes the time series in a table in appending mode ('a')
  df.to_csv(outfile,mode='a',header = hdr,index = False,sep =';')

  #Estimates the total time spent in the generation of the time series for the target polygon
  time_spent = round(time.time() - start_time_obj, 3)

  print(f'Index {index} - Object [{obj}] procesed in {round(time.time() - start_time_obj, 3)} seconds')

  #Returns checkers
  if df.shape[0] > 0:
    return True,time_spent #if everthings works fine, returns the True and the time spend
  elif float(df['AREA_HA']) < 0.01:
    return None,None #If the polygon area is too small, ignores the polygon!
  else:
    return False,None #if something goes wrong, returns False

### Check the Time Series library

Function responsible to check the consistency of the time series library.

In [8]:
#Checks if time series processing works
def build_time_series_check(index,obj,id_field,outfile,asset,checker=False):

  obj = int(obj)

  #Checks if the polygon was been processed before
  if checker is True:

    df_check = pd.read_csv(outfile,sep =';')

    if obj in list(df_check[id_field].values):

      print(f' Object [{obj}] was found in the file. Skipping..')
      return {'errors':None, 'time': 0}

  errors = None
  time = None


  try:
    check = build_time_series(index,obj,id_field,outfile,asset)
    time = check[1]

    if check[0] == False:
      print('raised')
      raise
    if check[0] == None:
      return {'errors':'ignore' ,'time': 'ignore'}

  except:

    try:

      print(f'Index {index} - Request [{obj}] fails. Trying the best effort!')

      check = build_time_series(index,obj,id_field,outfile,asset,True)

      if check[0] == False:
        print('raised')
        raise

      if check[0] == None:
        return {'errors':'ignore' ,'time': 'ignore'}

    except:

      print(f'Index {index} - Request [{obj}] expired. Sending it to the error list!')

      errors = obj

  return {'errors':errors ,'time': time}

### Build the Polygon List file

Function responsible to write a text file contaning each Polygon ID used to extract the time series.

In [9]:
#Builds and writes the Polygon ID list
def build_id_list(asset,id_field,colab_folder):

  #Loads EE Polygon asset
  samples = ee.FeatureCollection(asset).select(id_field)

  #Estimates the number of polygons in the Asset
  sample_size = int(samples.size().getInfo())

  #Conditionals to avoid Earth Engine memory erros
  #Earth Engine is limited to request 50k vectors, make manual lists if you need more!
  if sample_size < 50000:
    samples_list = samples.toList(50000)
  else:
    samples_list = samples.toList(samples.size())

  fileName = os.path.join(colab_folder,'polygonList.txt')

  with open(fileName, "w") as polygon_file:

    def get_ids(feat):
      return ee.Feature(feat).get(id_field)

    samples_list_slice = samples_list.map(get_ids).sort().getInfo()

    for polygon in samples_list_slice:
      polygon_file.write(str(polygon)+ '\n')


### Run

Function responsible to catch argument information and start run the process.

In [13]:
def run(asset,id_field,output_name,colab_folder):

  output_name = os.path.join(colab_folder,output_name)

  start_time = time.time()

  fileName_polyList = os.path.join(colab_folder,'polygonList.txt')

  #Reading the file which contains the polygons IDs
  listPolygons_text = open(fileName_polyList,"r")
  listPolygons_text = listPolygons_text.readlines()

  #Format the data
  listPolygons_text = [int(name) for name in listPolygons_text]

  start_obj = 1

  #Estimates the total of polygons
  total = len(listPolygons_text)

  print(f'Number of objects to process: {total}')

  #Yes, it will take a long time to finish!
  if total > 1000:
    print('Go take a coffee and watch a series... it will take a while!')

  list_num = listPolygons_text[start_obj:total]

  #Checkers
  first_dict = [{'errors':'ignore' ,'time': 'ignore'}]
  check_file = True

  #Checks if the output file exists
  if os.path.exists(output_name) is False:

    #If false, creates one using the first polygon
    check_file = False
    print('Creating the main file..')
    first_dict = build_time_series_check(0,int(listPolygons_text[0]),id_field,output_name,asset)

  #Structures the arguments for jobLib::Parallel
  worker_args = [
    (listPolygons_text.index(obj),obj,id_field,output_name,asset,check_file) \
    for obj in list_num
  ]

  #Number of to use (more than 20 generate many sleeping queries)
  n_cores = 16 #Recommended

  #Starts the parallel processing
  infos = Parallel(n_jobs=n_cores, backend='multiprocessing')(delayed(build_time_series_check)(*args) for args in worker_args)

  if check_file is True:
    first_dict = {'time': 0}

  #List with all times computed during the processing
  time_list = [first_dict['time']] + [item['time'] for item in infos if item['time'] != None]

  #List of polygons probably with errors
  errors_list = [item['errors'] for item in infos if item['errors'] != None]

  fileName_errors = os.path.join(colab_folder,'errors_polygon.txt')

  #Write a file with the erros list
  with open(fileName_errors, "w") as errors_file:
    for polygon in errors_list:
      errors_file.write(str(polygon)+ '\n')

  print(f'The average processing time was {round(pd.DataFrame(time_list).mean()[0],2)} seconds')
  print(f'Processing finished. All the work took {round(time.time() - start_time,3)} seconds to complete')

In [None]:
if __name__ == '__main__':

  asset = 'users/vieiramesquita/LAPIG_FieldSamples/lapig_goias_fieldwork_2022_50m' #Earth Engine Vector Asset
  id_field = 'ID_POINTS' #Vector collumn used as ID (use unique identifiers!)
  colab_folder = '/content'
  output_name = 'LAPIG_Pasture_S2_NDVI_Monitoring_FieldWork.csv' #Output filename

  #Check if polygon list file exists
  if os.path.exists(os.path.join(colab_folder,'polygonList.txt')) is False:
    build_id_list(asset,id_field,colab_folder)

  #Let the party begin!
  run(asset,id_field,output_name,colab_folder)