# Imports Libraries

In [1]:
import numpy as np
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns

from PIL import Image
import requests
from io import BytesIO

import math

from opencage.geocoder import OpenCageGeocode
import folium
from folium import plugins

import json
import urllib.request
import urllib.error

from datetime import datetime
from datetime import date, timedelta
from typing import List, Any, Dict

import configparser
import os
import re

# Generic Functions

In [2]:
# Gets user input variable values from a ini file
def get_property(ini_file_name:str, section:str, key:str):
    config = configparser.ConfigParser()
    config.read(ini_file_name)
        
    if config.has_section(section):
      print(f'Config file has section [{section}]')
    else:
      print(f'Config file does not have section {section}, please check the name of section or api key file existence')
    
    key_value: str = config[section][key]
    print(f'The property value for <{key}> is:\n{key_value}\n')
    return key_value

# Gets all the section names
def get_property_sections_with_regex(ini_file_name:str, search_words:str) -> list:
    config = configparser.ConfigParser()
    config.read(ini_file_name)
    find_sections: list = []
    
    for section_name in config.sections():
        if bool(re.search(search_words, section_name)):
            find_sections.append(section_name)
    
    return find_sections

# Gets all the best domains based on country code
def get_all_keys_properties(ini_file_name:str, section:str) -> dict:
    config = configparser.ConfigParser()
    config.optionxform = str # to preserve the case
    config.read(ini_file_name)
    keys = list(config[section].keys())
    
    contry_domain: dict = {}
    for key in keys:
        print(f'key is <{key}> value is <{config[section][key]}>')
        values: list = config[section][key].split(',')
        for value in values:
            contry_domain.update({value: key})
    print(f'dict is <{contry_domain}>')
    return contry_domain

def convert_to_datetime(data:pd, col_names: list):
    for col_name in col_names:
        data[col_name] = pd.to_datetime(data[col_name])
        
def convert_to_date(data:pd, col_names: list):
    for col_name in col_names:
        data[col_name] = pd.to_datetime(data[col_name]).dt.date

# Constants

In [3]:
# Constants that are used in the subsequent functions
INI_FILE = 'cehub.ini'

# Section names
FILE_PATHS_SECTION = 'File_Paths'
CEHUB_SECTION = 'CE_Hub'
WEATHER_MERGE_ADD_COL = 'Weather_Merge_'
# TODO: make this section scanable from the ini file rather than hardcode it
CEHUB_PRECIPITATION_DOMAINS = 'CE_Hub_Precipitation_Domains'
CEHUB_TEMPRETURE_DOMAINS = 'CE_Hub_Tempreture_Domains'
CEHUB_WIND_DOMAINS = 'CE_Hub_Wind_Domains'

#Weather merge column suffix
DEFAULT = 'default'

# Property names
INPUT_FILE_DIR = 'input_file_dir'
OUTPUT_FILE_DIR = 'output_file_dir'
SOURCE_DATA_FILENAME = 'source_data_filename'
SHEET_NAME = 'excel_sheet_name'
API_KEY = 'api_key'

START_DATE_PROP ='start_date'
END_DATE_PROP ='end_date'
START_DATE_OFFSET = 'start_date_offset'
END_DATE_OFFSET = 'end_date_offset'

FROM_DATE = 'from_date'
FROM_DATE_OFFSET = 'from_date_offset'
TO_DATE = 'to_date'
TO_DATE_OFFSET = 'to_date_offset'

#CE Hub URL
CEHUB_URL = 'http://my.meteoblue.com/dataset/query?apikey='

#CE Hub data domains
DOMAIN_NEMSGLOBAL = 'NEMSGLOBAL'
DOMAIN_ERA5 = 'ERA5'
DOMAIN_ERA5T = 'ERA5T'
DOMAIN_CPCGBAUS = 'CPCGBAUS'
DOMAIN_CHIRPS2 = 'CHIRPS2'
DOMAIN_SOILGRIDS2 = 'SOILGRIDS2'

#Weather codes
TEMP = 11 #Is this short for tempreture?
PRECIPITATION = 61
HUMIDITY = 52
WIND_SPEED = 32
WIND_DIRECTION = 735
CLOUDS_TOTAL = 71
CLOUDS_HIGH = 75
CLOUDS_MEDIUM = 74
CLOUDS_LOW = 73
SUNSHINE_DURATION = 191
SHORTWAVE_RADIATION_TOTAL = 204
SHORTWAVE_RADIATION_DIRECT = 258
SHORTWAVE_RADIATION_DIFFUSE = 256
EVAPOTRANSPIRATION = 261
SOIL_TEMP = 85
SOIL_MOISTURE = 144
VAPPRESS_DEFICIT = 56
UV_MEAN = 721
#Weather level
LVL_2M_ELV_CORRECTED = '2 m elevation corrected'
LVL_2M_ABV_GND = '2 m above gnd'
LVL_SFC = 'sfc'
LVL_HIGH_CLD_LAY = 'high cld lay'
LVL_MID_CLD_LAY = 'mid cld lay'
LVL_LOW_CLD_LAY = 'low cld lay'
LVL_10CM_DOWN = '0-10 cm down'
LVL_10M_ABV_GND = '10 m above gnd'

#Soil codes
BULK_DENSITY = 808
CATION_EXCHANGE_CAPACITY = 809
CLAY_CONTENT_MASS_FRACTION = 803
COARSE_FRAGMENTS_VOLUMETRIC_FRACTION = 807
ORGANIC_CARBON_CONTENT = 811
ORGANIC_CARBON_DENSITY = 838
ORGANIC_CARBON_STOCKS = 837
SAND_CONTENT_MASS_FRACTION = 805
SILT_CONTENT_MASS_FRACTION = 804
TOTAL_NITROGEN_CONTENT = 817
PH_IN_H20 = 812
#Soil level
LVL_AGGREGATE = 'aggregated'
LVL_30 = '0-30 cm'
START_DEPTH_0 = 0
END_DEPTH_30 = 30
END_DEPTH_60 = 60

#time resolution
TIME_RESOLUTION_DAILY = 'daily'
TIME_RESOLUTION_HOURLY = 'hourly'

# Weather REST Request JSON Keys
MAX = 'max'
MIN = 'min'
MEAN = 'mean'
SUM = 'sum'

# CE Hub REST Response JSON Keys
DOMAIN = 'domain'
TIME_INTERVALS = 'timeIntervals'
GEOMETRY = 'geometry'
COORDINATES = 'coordinates'
LOCATION_NAMES = 'locationNames'
CODES = 'codes'
VARIABLE = 'variable'
DATA_PER_TIME_INTERVAL = 'dataPerTimeInterval'
DATA = 'data'
AGGREGATION = 'aggregation'
START_DEPTH = 'startDepth'
END_DEPTH = 'endDepth'
UNIT = 'unit'
LEVEL = 'level'

#Corn column names and shared with Weather data
TRIAL = 'Trial'
LATITUDE = 'Latitude'
LONGITUDE = 'Longitude'
ALTITUDE = 'Altitude'
DATES = 'Dates'
COUNTRY_CODE = 'Country_Code'

#For BITS data preprocessing and Merging with CEHub data
START_DATE_COLUMN = 'Start_Date'
END_DATE_COLUMN = 'End_Date'

#Dates Columns
PLANT_DATE = 'Plant_Date'
EMERGENCE_DATE = 'Emergence_Date'
ASSMT_DATE = 'Assmt_Date'
FIRST_APPL_DATE_TRT = 'First_Appl_Date_Trt'
FIRST_APPL_DATE_TRIAL = 'First_Appl_Date_Trial'
MOST_RECENT_APPL_DATE_TRT = 'Most_Recent_Appl_Date_Trt'
MOST_RECENT_APPL_DATE_TRIAL = 'Most_Recent_Appl_Date_Trial'

# User Input Properties Loading From INI File

In [20]:
# Loads the directory where the input data file is
input_file_dir = get_property(INI_FILE, FILE_PATHS_SECTION, INPUT_FILE_DIR)

# Loads the directory that stores output weather/soil/merged data files
output_file_dir = get_property(INI_FILE, FILE_PATHS_SECTION, OUTPUT_FILE_DIR)

# Loads trial data file name
source_data_filename = get_property(INI_FILE, FILE_PATHS_SECTION, SOURCE_DATA_FILENAME)

# Load sheet name in a excel file
excel_sheet_name = get_property(INI_FILE, FILE_PATHS_SECTION, SHEET_NAME)

# Loads CE Hub API key and constructs the endpoint url with the key
api_key = get_property(INI_FILE, CEHUB_SECTION, API_KEY)
cehub_endpoint = CEHUB_URL + api_key

# Loading start and end date column names and their offset values set by the user for CE Hub data retrieval
# start_date_column = 'Start_Date'
start_date_offset = int(get_property(INI_FILE, CEHUB_SECTION, START_DATE_OFFSET))
end_date_offset = int(get_property(INI_FILE, CEHUB_SECTION, END_DATE_OFFSET))

# Check if start_date_offset > 0 set to = 0, if end_date_offset < 0 set to 0, less than its original min/max date does not make sense!
if start_date_offset > 0:
    print(f'start_date_offset should be set to less than 0, use 0 now instead of {start_date_offset}')
    start_date_offset = 0
if end_date_offset < 0:
    print(f'end_date_offset should be set to more than 0, use 0 now instead of {end_date_offset}')
    end_date_offset = 0

# Loading best domain for coutries
precipitation_domains: dict = get_all_keys_properties(INI_FILE, CEHUB_PRECIPITATION_DOMAINS)
tempreture_domains: dict = get_all_keys_properties(INI_FILE, CEHUB_TEMPRETURE_DOMAINS)
wind_domains: dict = get_all_keys_properties(INI_FILE, CEHUB_WIND_DOMAINS)

Config file has section [File_Paths]
The property value for <input_file_dir> is:
C:\example\fields\cehub\input

Config file has section [File_Paths]
The property value for <output_file_dir> is:
C:\example\fields\cehub\output

Config file has section [File_Paths]
The property value for <source_data_filename> is:
failed_trials.xlsx

Config file has section [File_Paths]
The property value for <excel_sheet_name> is:
Sheet 1

Config file has section [CE_Hub]
The property value for <api_key> is:


Config file has section [CE_Hub]
The property value for <start_date_offset> is:
-5

Config file has section [CE_Hub]
The property value for <end_date_offset> is:
5

key is <ERA5T> value is <US,BR,DEFAULT>
dict is <{'US': 'ERA5T', 'BR': 'ERA5T', 'DEFAULT': 'ERA5T'}>
key is <ERA5T> value is <US,BR,DEFAULT>
dict is <{'US': 'ERA5T', 'BR': 'ERA5T', 'DEFAULT': 'ERA5T'}>
key is <ERA5T> value is <BR,US,DEFAULT>
dict is <{'BR': 'ERA5T', 'US': 'ERA5T', 'DEFAULT': 'ERA5T'}>


# Trial Data Loading and Preprocessing

In [5]:
# Loads trial data into a dataframe, the crop type can be corn, grape etc.
# trial_df = pd.read_csv(f'{input_file_dir}{os.path.sep}{source_data_filename}')

# Preprocessing reports produced by BioAnalytics
trial_df = pd.read_excel(f'{input_file_dir}{os.path.sep}{source_data_filename}', sheet_name= excel_sheet_name)

interested_dates_cols: list = [ASSMT_DATE, FIRST_APPL_DATE_TRT, FIRST_APPL_DATE_TRIAL, MOST_RECENT_APPL_DATE_TRT, MOST_RECENT_APPL_DATE_TRIAL, PLANT_DATE, EMERGENCE_DATE]
joined_on_cols: list = [TRIAL, LATITUDE, LONGITUDE, COUNTRY_CODE]
start_end_cols: list = [START_DATE_COLUMN, END_DATE_COLUMN]

# Converts the date columns to datetime for calculation
convert_to_datetime(trial_df, interested_dates_cols)

# Calculate min and max dates from the dates_of_interest columns, and add them back to the dataframe
# This date will be used to extract the CE Hub data.
trial_df[START_DATE_COLUMN] = trial_df.apply(lambda x: min(x[interested_dates_cols]) + timedelta(days = start_date_offset), axis = 1)
trial_df[END_DATE_COLUMN] = trial_df.apply(lambda x: max(x[interested_dates_cols]) + timedelta(days = end_date_offset), axis = 1)

trial_df[START_DATE_COLUMN] = pd.to_datetime(trial_df[START_DATE_COLUMN]).dt.date
trial_df[END_DATE_COLUMN] = pd.to_datetime(trial_df[END_DATE_COLUMN]).dt.date

# Convert the rest of the date column to date as well, after the start and end dates are calculated
convert_to_date(trial_df, interested_dates_cols)

# Remove 'Unnamed' columns from the dataframe
trial_df.drop(trial_df.columns[trial_df.columns.str.contains('Unnamed')], axis=1, inplace=True)

pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', None)

trial_df.info()
trial_df.loc[:, joined_on_cols + interested_dates_cols + start_end_cols]

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 94652 entries, 0 to 94651
Data columns (total 97 columns):
 #   Column                           Non-Null Count  Dtype  
---  ------                           --------------  -----  
 0   Master_Prt                       94652 non-null  object 
 1   Derived_Prt                      94652 non-null  object 
 2   Trial                            94652 non-null  object 
 3   Trial_Year                       94652 non-null  int64  
 4   Country_Code                     94652 non-null  object 
 5   State_province_code              94548 non-null  object 
 6   State_province_name              94548 non-null  object 
 7   City                             38759 non-null  object 
 8   Site_Type                        71929 non-null  object 
 9   Syngenta_FieldScientist          92875 non-null  object 
 10  Trial_Placement                  93494 non-null  object 
 11  Latitude                         41979 non-null  float64
 12  Longitude         

Unnamed: 0,Trial,Latitude,Longitude,Country_Code,Assmt_Date,First_Appl_Date_Trt,First_Appl_Date_Trial,Most_Recent_Appl_Date_Trt,Most_Recent_Appl_Date_Trial,Plant_Date,Emergence_Date,Start_Date,End_Date
0,US01AH12345,,,US,2000-05-09,2000-04-06,2000-04-06,2000-04-06,2000-04-06,2000-04-27,NaT,2000-04-01,2000-05-14
1,US01AH12345,,,US,2000-05-09,2000-04-06,2000-04-06,2000-04-06,2000-04-06,2000-04-27,NaT,2000-04-01,2000-05-14
2,US01AH12345,,,US,2000-05-09,2000-04-06,2000-04-06,2000-04-06,2000-04-06,2000-04-27,NaT,2000-04-01,2000-05-14
3,US01AH12345,,,US,2000-05-09,2000-04-06,2000-04-06,2000-04-06,2000-04-06,2000-04-27,NaT,2000-04-01,2000-05-14
4,US01AH12345,,,US,2000-05-09,2000-04-06,2000-04-06,2000-04-06,2000-04-06,2000-04-27,NaT,2000-04-01,2000-05-14
...,...,...,...,...,...,...,...,...,...,...,...,...,...
94647,USNFSZ12345,12.345,12.345,US,2017-07-11,2017-06-06,2017-05-08,2017-06-06,2017-06-06,2017-05-04,2017-05-12,2017-04-29,2017-07-16
94648,USNFSZ12345,12.345,12.345,US,2017-07-11,2017-06-06,2017-05-08,2017-06-06,2017-06-06,2017-05-04,2017-05-12,2017-04-29,2017-07-16
94649,USNFSZ12345,12.345,12.345,US,2017-07-11,2017-06-06,2017-05-08,2017-06-06,2017-06-06,2017-05-04,2017-05-12,2017-04-29,2017-07-16
94650,USNFSZ12345,12.345,12.345,US,2017-07-11,2017-06-06,2017-05-08,2017-06-06,2017-06-06,2017-05-04,2017-05-12,2017-04-29,2017-07-16


In [11]:
# If we have many rows, for each Trial we have one lat, lon
trial_time = trial_df[joined_on_cols + start_end_cols].groupby(joined_on_cols).agg({START_DATE_COLUMN: MIN, END_DATE_COLUMN: MAX}).reset_index()
trial_time.info()
display(trial_time)

NameError: name 'trial_df' is not defined

In [12]:
#Hack for trials with 400 502 responses.
data = {TRIAL: ['037SRBR12345-00', '037SRBR12345-01', 'TK1234567-12'], LATITUDE: [12.345, 12.345, 12.345], LONGITUDE: [12.345, 12.345, 12.345], COUNTRY_CODE: ['BR', 'BR', 'US'], START_DATE_COLUMN: ['2020-01-01', '2020-01-01', '2018-01-01'], END_DATE_COLUMN: ['2020-12-31', '2020-12-31', '2018-12-31']}
trial_time = pd.DataFrame.from_dict(data)
display(trial_time)

Unnamed: 0,Trial,Latitude,Longitude,Country_Code,Start_Date,End_Date
0,037SRBR12345-00,12.345,12.345,BR,2020-01-01,2020-12-31
1,037SRBR12345-01,12.345,12.345,BR,2020-01-01,2020-12-31
2,TK1234567-12,12.345,12.345,US,2018-01-01,2018-12-31


# Functions For CE Hub REST APIs

## Data Queries

In [13]:
# Builds weather data query for CE Hub, the return value is a list, since the data we are interested in is in two domains
def build_weather_data_query(country_code: str) -> list:
    domain_precipitation = precipitation_domains.get(country_code, precipitation_domains.get(DEFAULT.upper()))
    domain_temp = tempreture_domains.get(country_code, tempreture_domains.get(DEFAULT.upper()))
    domain_wind = wind_domains.get(country_code, wind_domains.get(DEFAULT.upper()))
    print(f'country <{country_code}> use precipitation domain <{domain_precipitation}>, tempreture domain <domain_temp>, wind <{domain_wind}>')
    
    weather_query = [{
        "domain":DOMAIN_NEMSGLOBAL,
        "timeResolution":TIME_RESOLUTION_DAILY,
        "codes":[
            {"code":HUMIDITY,"level":LVL_2M_ABV_GND,"aggregation":MAX},               #Humidity_Max
            {"code":HUMIDITY,"level":LVL_2M_ABV_GND,"aggregation":MIN},               #Humidity_Min
            {"code":HUMIDITY,"level":LVL_2M_ABV_GND,"aggregation":MEAN},              #Humidity_Mean
            {"code":CLOUDS_TOTAL,"level":LVL_SFC,"aggregation":MEAN},                 #Clouds_Total
            {"code":CLOUDS_HIGH,"level":LVL_HIGH_CLD_LAY,"aggregation":MEAN},         #Clouds_High
            {"code":CLOUDS_MEDIUM,"level":LVL_MID_CLD_LAY,"aggregation":MEAN},        #Clouds_Medium
            {"code":CLOUDS_LOW,"level":LVL_LOW_CLD_LAY,"aggregation":MEAN},           #Clouds_Low
            {"code":SUNSHINE_DURATION,"level":LVL_SFC,"aggregation":SUM},             #Sunshine_Duration
            {"code":SHORTWAVE_RADIATION_TOTAL,"level":LVL_SFC,"aggregation":MEAN},    #Shortwave_Radiation_Total
            {"code":SHORTWAVE_RADIATION_DIRECT,"level":LVL_SFC,"aggregation":MEAN},   #Shortwave_Radiation_Direct
            {"code":SHORTWAVE_RADIATION_DIFFUSE,"level":LVL_SFC,"aggregation":MEAN},  #Shortwave_Radiation_Diffuse
            {"code":EVAPOTRANSPIRATION,"level":LVL_SFC,"aggregation":SUM},            #Evapotranspiration
            {"code":SOIL_TEMP,"level":LVL_10CM_DOWN,"aggregation":MAX},               #Soil_Temp_Max
            {"code":SOIL_TEMP,"level":LVL_10CM_DOWN,"aggregation":MIN},               #Soil_Temp_Min
            {"code":SOIL_TEMP,"level":LVL_10CM_DOWN,"aggregation":MEAN},              #Soil_Temp_Mean
            {"code":SOIL_MOISTURE,"level":LVL_10CM_DOWN,"aggregation":MAX},           #Soil_Moisture_Max
            {"code":SOIL_MOISTURE,"level":LVL_10CM_DOWN,"aggregation":MIN},           #Soil_Moisture_Min
            {"code":SOIL_MOISTURE,"level":LVL_10CM_DOWN,"aggregation":MEAN},          #Soil_Moisture_Mean
            {"code":VAPPRESS_DEFICIT,"level":LVL_2M_ABV_GND,"aggregation":MAX},       #VapPress_Deficit_Max
            {"code":VAPPRESS_DEFICIT,"level":LVL_2M_ABV_GND,"aggregation":MIN},       #VapPress_Deficit_Min
            {"code":VAPPRESS_DEFICIT,"level":LVL_2M_ABV_GND,"aggregation":MEAN}       #VapPress_Deficit_Mean
        ]
    },
    {
        "domain":domain_temp,
        "timeResolution":TIME_RESOLUTION_DAILY,
        "codes":[
            {"code":TEMP,"level":LVL_2M_ELV_CORRECTED,"aggregation":MAX},            #Temp_Max
            {"code":TEMP,"level":LVL_2M_ELV_CORRECTED,"aggregation":MIN},            #Temp_Min
            {"code":TEMP,"level":LVL_2M_ELV_CORRECTED,"aggregation":MEAN}            #Temp_Mean
        ]
    },
    {
        "domain":domain_precipitation,
        "timeResolution":TIME_RESOLUTION_DAILY,
        "codes":[
            {"code":PRECIPITATION,"level":LVL_SFC,"aggregation":SUM}                 #Precipitation        
        ]
    },
    {
        "domain":domain_wind,
        "timeResolution":TIME_RESOLUTION_DAILY,
        "codes":[
            {"code":WIND_SPEED,"level":LVL_10M_ABV_GND,"aggregation":MAX},            #Wind_Max
            {"code":WIND_SPEED,"level":LVL_10M_ABV_GND,"aggregation":MIN},            #Wind_Min
            {"code":WIND_SPEED,"level":LVL_10M_ABV_GND,"aggregation":MEAN},           #Wind_Mean
            {"code":WIND_DIRECTION,"level":LVL_10M_ABV_GND}                           #Wind_Direction        
        ]
    },
    {
        "domain": DOMAIN_ERA5,
        "gapFillDomain": None,
        "timeResolution": TIME_RESOLUTION_HOURLY,
        "codes": [
            {
                "code": UV_MEAN,                                                      #UV_Mean
                "level": LVL_SFC
            }
        ],
        "transformations": [
            {
                "type": "aggregateDaily",
                "aggregation": MEAN
            }
        ]
    }]
    
    return weather_query

In [14]:
# Builds soil data query for CE Hub, here the information is only in one domain
def build_soil_query (start_depth: int, end_depth: int) -> dict :
    soil_query = {
                    "domain": DOMAIN_SOILGRIDS2,
                    "codes": [
                    {
                        "code": BULK_DENSITY,
                        "level": LVL_AGGREGATE,
                        "startDepth": start_depth,
                        "endDepth": end_depth
                
                    },
                    {
                        "code": CATION_EXCHANGE_CAPACITY,
                        "level": LVL_AGGREGATE,
                        "startDepth": start_depth,
                        "endDepth": end_depth
                    },
                    {
                        "code": CLAY_CONTENT_MASS_FRACTION,
                        "level": LVL_AGGREGATE,
                        "startDepth": start_depth,
                        "endDepth": end_depth
                    },
                    {
                        "code": COARSE_FRAGMENTS_VOLUMETRIC_FRACTION,
                        "level": LVL_AGGREGATE,
                        "startDepth": start_depth,
                        "endDepth": end_depth
                    },
                    {
                        "code": ORGANIC_CARBON_CONTENT,
                        "level": LVL_AGGREGATE,
                        "startDepth": start_depth,
                        "endDepth": end_depth
                    },
                    {
                        "code": ORGANIC_CARBON_DENSITY,
                        "level": LVL_AGGREGATE,
                        "startDepth": start_depth,
                        "endDepth": end_depth
                    },
                    {
                        "code": ORGANIC_CARBON_STOCKS,
                         "level": LVL_30 # only organic carbon stock only have one depth
                    },
                    {
                        "code": SAND_CONTENT_MASS_FRACTION,
                        "level": LVL_AGGREGATE,
                        "startDepth": start_depth,
                        "endDepth": end_depth
                    },
                    {
                        "code": SILT_CONTENT_MASS_FRACTION,
                        "level": LVL_AGGREGATE,
                        "startDepth": start_depth,
                        "endDepth": end_depth
                    },
                    {
                        "code": TOTAL_NITROGEN_CONTENT,
                        "level": LVL_AGGREGATE,
                        "startDepth": start_depth,
                        "endDepth": end_depth
                    },
                    {
                        "code": PH_IN_H20,
                        "level": LVL_AGGREGATE,
                        "startDepth": start_depth,
                        "endDepth": end_depth
                    }
                ]
            }
    return soil_query

## JSON Request Payload

In [15]:
# Builds CE Hub data JSON payload, the queries are built by using functions from the last step
def build_json_payload(lat, lon, trial, start_date, end_date, queries):    
    PARAMS = {
        "units":{
            "temperature":"CELSIUS",
            "velocity":"KILOMETER_PER_HOUR",
            "length":"metric",
            "energy":"watts"
        },
        "geometry":{
            "type":"MultiPoint",
            "coordinates":[
                [
                    lon, lat,
                ]
            ],
            "locationNames":[
                f"{trial}"
            ]
        },
        "format":"json",
        "timeIntervals":[
            f"{start_date}T+10:00\/{end_date}T+10:00"
        ],
        "timeIntervalsAlignment":"none",
        "queries": queries
    }
    
    return PARAMS

## REST Call

In [16]:
def get_cehub_data(endpoint, lat, lon, trial, start_date, end_date, queries):
    json_response = []
    URL = endpoint
    PARAMS = build_json_payload(lat, lon, trial, start_date, end_date, queries)
    print(f'Getting trial <{trial}> for date range from <{start_date}> to <{end_date}>')
    
    try:
        response = urllib.request.Request(URL, json.dumps(PARAMS).encode("utf-8"), headers={'Content-type': 'application/json', 'Accept': 'application/json'})
        json_response = json.loads(urllib.request.urlopen(response).read())
      
        return json_response
    
    except ConnectionError as ce:
        print(f'Got connection error with exception {ce}')
        time.sleep(10) 
    except Exception as exe:
        print(f'No coordinates was found for trail: {trial}, exception is {exe}')

## JSON Response to Dictionary

In [17]:
# Converts weater data REST call response JSON to dictionary
def convert_weather_json_to_dict (json_response:List[Any]) -> Dict :
    
    response_dict = {}
    for i in range(len(json_response)):
        json = json_response[i]

        #geometry
        geometry = json[GEOMETRY]
        coordinates = geometry[COORDINATES][0]
        response_dict[TRIAL] = geometry[LOCATION_NAMES][0]
        response_dict[LATITUDE] = coordinates[0]
        response_dict[LONGITUDE] = coordinates[1]
        response_dict[ALTITUDE] = coordinates[2]
        #dates
        response_dict[DATES] = json[TIME_INTERVALS][0]#str(json[TIME_INTERVALS][0]).replace('T0000','')
        #codes
        codes = json[CODES]
        
        for j in range(len(codes)):
            agg:str = str(codes[j][AGGREGATION])
            unit:str = codes[j][UNIT]
            response_dict[str(codes[j][VARIABLE]).replace(' ', '_')+ '_('+ ''.join([agg[0].upper(), agg[1:]]) + ')_(' + unit + ')'] = codes[j][DATA_PER_TIME_INTERVAL][0][DATA][0]
   
   
    return response_dict

# Converts soil data REST call response JSON to dictionary
def convert_soil_json_to_dict (json_response:List[Any]) -> Dict :
    
    response_dict = {}
    for i in range(len(json_response)):
        json = json_response[i]

        #geometry
        geometry = json[GEOMETRY]
        coordinates = geometry[COORDINATES][0]
        response_dict[TRIAL] = geometry[LOCATION_NAMES][0]
        response_dict[LATITUDE] = coordinates[0]
        response_dict[LONGITUDE] = coordinates[1]
        
        #codes
        codes = json[CODES]
        for j in range(len(codes)):
            column_name: str
            unit: str = codes[j][UNIT]
            if codes[j][LEVEL] == LVL_AGGREGATE:
                start_depth: int = codes[j][START_DEPTH]
                end_depth: int = codes[j][END_DEPTH]
                column_name = str(codes[j][VARIABLE]).replace(' ', '_') + '_('+ str(start_depth) + '-'+ str(end_depth) +')_(' + unit + ')'
            else:
                column_name = str(codes[j][VARIABLE]).replace(' ', '_')+ '_('+ codes[j][LEVEL]+')_(' + unit + ')'
                
            response_dict[column_name] = codes[j][DATA_PER_TIME_INTERVAL][0][DATA][0]
    
    return response_dict

# GETs Data From CE Hub

## GET Weather Data

In [18]:
weather_df = pd.DataFrame()

total_trial:int = len(trial_time)
failed_trials: int = 0

for i in range(total_trial):
    weather_queries = build_weather_data_query(trial_time[COUNTRY_CODE][i])
    # weather_queries = build_weather_data_query()
    json_response = get_cehub_data(cehub_endpoint, trial_time[LATITUDE][i], trial_time[LONGITUDE][i], trial_time[TRIAL][i], trial_time[START_DATE_COLUMN][i], trial_time[END_DATE_COLUMN][i], weather_queries)
    try:
        response_dict = convert_weather_json_to_dict(json_response)
        each_trial_df = pd.DataFrame(response_dict)
        weather_df = weather_df.append(each_trial_df, ignore_index=True)
    except Exception as exe:
        print(f"Not working for {trial_time[TRIAL][i]} and it failed with: {exe}")
        failed_trials += 1

print(f'<{failed_trials}> trials has failed to retrive from CE Hub out of <{total_trial}>')
weather_df.info()

print(f'\nModifing datetime from yyyymmhhT0000 to yyyy-mm-dd...')
weather_df[DATES] = weather_df[DATES].str.replace(r'T0000', '')
weather_df[DATES] = pd.to_datetime(weather_df[DATES].str[:8], format='%Y-%m-%d')

country <BR> use precipitation domain <ERA5T>, tempreture domain <domain_temp>, wind <ERA5T>
Getting trial <037SRBR12345-00> for date range from <2020-01-01> to <2020-12-31>
country <BR> use precipitation domain <ERA5T>, tempreture domain <domain_temp>, wind <ERA5T>
Getting trial <037SRBR12345-01> for date range from <2020-01-01> to <2020-12-31>
country <US> use precipitation domain <ERA5T>, tempreture domain <domain_temp>, wind <ERA5T>
Getting trial <TK1234567-12> for date range from <2018-01-01> to <2018-12-31>
<0> trials has failed to retrive from CE Hub out of <3>
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1097 entries, 0 to 1096
Data columns (total 35 columns):
 #   Column                                     Non-Null Count  Dtype  
---  ------                                     --------------  -----  
 0   Trial                                      1097 non-null   object 
 1   Latitude                                   1097 non-null   float64
 2   Longitude                

## GET Soil Data

In [19]:
soil_df = pd.DataFrame()

total_trial:int = len(trial_time)
failed_trials: int = 0

for i in range(len(trial_time)):
    soil_queries = [build_soil_query(START_DEPTH_0, END_DEPTH_30), build_soil_query(START_DEPTH_0, END_DEPTH_60)]
    json_response = get_cehub_data(cehub_endpoint, trial_time[LATITUDE][i], trial_time[LONGITUDE][i], trial_time[TRIAL][i], trial_time[START_DATE_COLUMN][i], trial_time[END_DATE_COLUMN][i], soil_queries)
    try:
        response_dict = convert_soil_json_to_dict(json_response)
        each_trial_df = pd.DataFrame(response_dict)
        soil_df = soil_df.append(each_trial_df, ignore_index=True)
    except Exception as exe:
        print(f"Not working for {i} and it failed with: {exe}")
        failed_trials += 1

print(f'<{failed_trials}> trials has failed to retrive from CE Hub out of <{total_trial}>')
soil_df.info()

Getting trial <037SRBR12345-00> for date range from <2020-01-01> to <2020-12-31>
Getting trial <037SRBR12345-01> for date range from <2020-01-01> to <2020-12-31>
Getting trial <TK1234567-12> for date range from <2018-01-01> to <2018-12-31>
<0> trials has failed to retrive from CE Hub out of <3>
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 24 columns):
 #   Column                                                       Non-Null Count  Dtype  
---  ------                                                       --------------  -----  
 0   Trial                                                        3 non-null      object 
 1   Latitude                                                     3 non-null      float64
 2   Longitude                                                    3 non-null      float64
 3   Bulk_Density_(0-30)_(kg/m³)                                  3 non-null      float64
 4   Cation_Exchange_Capacity_(0-30)_(cmolc/kg)                

# Functions to Merge Trial Data with CE Hub Data

## Basic Functions

In [21]:
def get_column_names (candidates_column_names: List[str], columns_tobe_removed: List[str]) -> List[str]:
    print(f'Candidate columns: \n{candidates_column_names}')
    #remove some of the columns

    for i in range(len(columns_tobe_removed)):
        candidates_column_names.remove(columns_tobe_removed[i])
    print(f'\nSelected columns: \n{candidates_column_names}')
    return candidates_column_names

# Validates and calculates the experiement days
def get_experiment_days(selected_df: pd, i: int, from_date_col, to_date_col, from_date_offset:int, to_date_offset:int) -> int:

    start_date = selected_df[START_DATE_COLUMN][i]
    end_date = selected_df[END_DATE_COLUMN][i]
    
    # We can be sure that the from and to date col data are within start_date and end_date range
    from_date = selected_df[from_date_col][i]
    to_date = selected_df[to_date_col][i]
    
    # A few validations
    # 1. If any of these values is empty
    if pd.isna(from_date):
        from_date = start_date
        if from_date_offset < 0: 
            from_date_offset = 0
        print(f'from_date is empty, using start_date value <{from_date}>')
    if pd.isna(to_date):
        to_date = end_date
        if to_date_offset > 0: 
            to_date_offset = 0
        print(f'to_date is empty, using end_date value <{to_date}>')
    
    if from_date > to_date:
        print(f'from_date <{from_date}> is later than to_date <{to_date}>, swapping the values')
        from_date, to_date = to_date, from_date
        print(f'from_date now is <{from_date}>, to_date is now <{to_date}> after swapping')
    
    # 2. If user sets the offset value too aggasively 
    from_date_delta = from_date + timedelta(days = from_date_offset)   
    if from_date_delta < start_date or from_date_delta > end_date:
        print(f'from_date_delta <{from_date_delta}> is outside the start_date <{start_date}> and end_date <{end_date}> range, use from_date <{from_date}> without offsets.')
        from_date_delta = from_date 
    
    to_date_delta = to_date + timedelta(days = to_date_offset)
    if to_date_delta > end_date or to_date_delta < start_date:
        print(f'to_date_delta <{to_date_delta}> is outside the start_date <{start_date}> and end_date <{end_date}> range, use to_date <{to_date}> without offsets.')
        to_date_delta = to_date
    
    experiment_days: datetime.date 
    try:
        experiment_days = (to_date_delta-from_date_delta) + timedelta(days = 1)
    except Exception as exe:
        print(f'exception at to_date: <{to_date}>, from_date: <{from_date}>')
        print(f'exception at to_date_delta: <{to_date_delta}>, from_date_delta: <{from_date_delta}>')
    
    # 3. If the delta dates are overlapping
    if experiment_days.days <= 0:
        print(f'The experiment days is <= 0, use from_date <{from_date}> and to_date <{to_date}> without the offsets')
        experiment_days = (to_date-from_date) + timedelta(days = 1)
    
    return experiment_days.days

def get_current_time() :
    current_time = datetime.now().strftime("%H:%M:%S")
    print("Current Time =", current_time)

## Merge Weather Data

In [22]:
# Merge trial data with CE Hub weather data with flexibility
def merge_weather_data_basic (column_names: List[str], df_tobe_merged: pd.DataFrame, cehub_data_df: pd.DataFrame,
                              from_date_col:str, to_date_col:str, from_date_offset:int, to_date_offset: int,
                              column_suffix:str, counter: int) -> pd.DataFrame :
    additional_data = pd.DataFrame(columns = column_names)
    selected_df = df_tobe_merged.reset_index(drop = True)    
    
    print(f'The {counter} time of merging...')
    selected_df.info()
    
    for column in column_names:
        averages = []
        for i, trial in enumerate(selected_df.Trial):
            # print(f'Merging for trial {trial} column {column}')
            experiment_days = get_experiment_days(selected_df, i, from_date_col, to_date_col, from_date_offset, to_date_offset)
            temp_sum = 0
            all_values = cehub_data_df[column][cehub_data_df[TRIAL]==trial]
            j = 0
            added_values = 0
            if (len(all_values)==0):
                averages.append(float('nan'))
                continue
            for j, temp in enumerate(all_values):
                if j == experiment_days:
                    # we are done - got all the values we needed
                    try:
                        average = temp_sum / experiment_days
                        averages.append(average)
                    except Exception as exe:
                        print(f'exception at column: {column}, trial id: {trial}, exe is {exe}')
                    break
                else:
                    # keep adding temperatures until we have enough
                    if temp is None: temp = 0
                    temp_sum += temp
                    added_values+=1
            if j < experiment_days:
                averages.append(temp_sum/(j+1))

        selected_df[column] = averages
        
        if (len(column_suffix)>0):
            new_column_name = column + '_' + column_suffix.lower()
            selected_df.rename({column: new_column_name}, axis=1, inplace=True)
            print(f'Renamed {column} to {new_column_name}')
        
    return selected_df

def merge_weather_data (column_names: List[str], interested_dates_cols: list, df_tobe_merged: pd.DataFrame, cehub_data_df: pd.DataFrame) -> pd.DataFrame :
       
    # We use START_DATE and ASSMT_DATE for the default join
    from_date_col:str = START_DATE_COLUMN
    to_date_col:str = ASSMT_DATE
    print('\nStarted default merge')
    get_current_time()
    j = 0
    merge_df = merge_weather_data_basic(column_names, df_tobe_merged, cehub_data_df, from_date_col, to_date_col, 0, 0, DEFAULT, j)
    print('\nFinished the default merge\n')
    
    # Check the cehub.ini file to see if there are any [Weather_Merge_Additional_] sections specified by the user, 
    # reuse merge_weather_data_basic function above according to number of Weather_Merge_Additional_ provided
    additional_merges: list = get_property_sections_with_regex(INI_FILE, WEATHER_MERGE_ADD_COL)
    
    for additional_merge in additional_merges:
        j += 1
        # get the from_date and to_date
        from_date_col = get_property(INI_FILE, additional_merge, FROM_DATE)
        to_date_col = get_property(INI_FILE, additional_merge, TO_DATE)
        # get and validate the from and to date with offset values by comparing them to START and END dates
        from_date_offset = int(get_property(INI_FILE, additional_merge, FROM_DATE_OFFSET))
        to_date_offset = int(get_property(INI_FILE, additional_merge, TO_DATE_OFFSET))
        
        # adding column_suffix
        column_suffix = additional_merge[len(WEATHER_MERGE_ADD_COL):]
        
        print(f'\nStarted the {additional_merge} merge\n')
        get_current_time()
        additional_merge_df = merge_weather_data_basic (column_names, merge_df, cehub_data_df, 
                                  from_date_col, to_date_col, from_date_offset, to_date_offset, column_suffix, j)
        print('trying to copy additional_merge_df...')
        merge_df = additional_merge_df.copy()
        print(f'\nFinished the {additional_merge} merge\n')
        
    return merge_df # additional_merge_df

## Merge Soil Data

In [23]:
# Merges trial data or already merged trial-weather data with CE Hub soil data by using pyspark
def merge_soil_data (main_df: pd.DataFrame, cehub_df: pd.DataFrame, columns_tobe_removed: List[str]) -> pd.DataFrame :
    
    #drop duplicated columns from soil data
    cehub_dropped_df = cehub_df.drop(columns_tobe_removed, axis=1)
    print(f'Soil columns after dropping the duplicated columns are: \n{cehub_dropped_df.columns}') 

    # left join on two dataframes
    merged_df = main_df.merge(cehub_dropped_df, on='Trial', how='left')

    return merged_df

# Performs Merge and Outputs to Files

## Output Directory and Output File Paths
1. Trial + Weather data with daily resolution: input_filename_weather_data.csv
2. Trail + Soil data: input_filename_soil_data.csv
3. Trial + Weather + Soil data: inputfilename_trial_weather_soil_data.csv

In [24]:
# Checks if output directory exists, creates it if not
isExist = os.path.exists(output_file_dir)

if not isExist:
    # Create a new directory because it does not exist 
    os.makedirs(output_file_dir)
    print(f'Output directory <{output_file_dir}> does not exist, it is now created!')
else:
    print(f'Output directory <{output_file_dir}> already exists')

Output directory <C:\example\fields\cehub\output> already exists


In [25]:
# Constructs output file paths
# trial data file name and output path
trial_data_file_name = os.path.splitext(source_data_filename)[0]
trial_data_file_name_path = f'{output_file_dir}{os.path.sep}{trial_data_file_name}'

# weather data only
weather_only_filepath = f'{trial_data_file_name_path}_weather_data_only.csv'
# trial data with weather data filepath
trial_weather_filepath = f'{trial_data_file_name_path}_weather_data.csv'
# trial data with soil data filepath
trial_soil_filepath = f'{trial_data_file_name_path}_soil_data.csv'
# trial data with weather and soil data filepath
trial_weather_soil_filepath = f'{trial_data_file_name_path}_weather_soil_data.csv'

## Weather Data Only and Trial Data with Weather Data

In [26]:
# TODO: a flag to export with full name, but short name when merge with trial data
# merges trial data with CE Hub weather data
# defines the columns list for trial and weather data merge
weather_column_names = list(weather_df)
weather_columns_tobe_removed = [TRIAL, LATITUDE, LONGITUDE, DATES]

# weather data only to csv file
weather_df.to_csv(weather_only_filepath, index = False)

# creates the column list to be iterated over
trial_weather_column_names = get_column_names (weather_column_names, weather_columns_tobe_removed)
# merges trial data with weather data
trial_weather_df = merge_weather_data(trial_weather_column_names, interested_dates_cols, trial_df, weather_df)
# writes the trail and weather merged data to file
trial_weather_df.to_csv(trial_weather_filepath, index = False)
# checks joined data columns
print(f'\nJoined trial and weather data columns are:\n{trial_weather_df.columns}')

Candidate columns: 
['Trial', 'Latitude', 'Longitude', 'Altitude', 'Dates', 'Relative_Humidity_(Max)_(%)', 'Relative_Humidity_(Min)_(%)', 'Relative_Humidity_(Mean)_(%)', 'Cloud_Cover_Total_(Mean)_(%)', 'Cloud_Cover_High_(Mean)_(%)', 'Cloud_Cover_Medium_(Mean)_(%)', 'Cloud_Cover_Low_(Mean)_(%)', 'Sunshine_Duration_(Sum)_(min)', 'Shortwave_Radiation_(Mean)_(W/m²)', 'Direct_Shortwave_Radiation_(Mean)_(W/m²)', 'Diffuse_Shortwave_Radiation_(Mean)_(W/m²)', 'Evapotranspiration_(Sum)_(mm)', 'Soil_Temperature_(Max)_(°C)', 'Soil_Temperature_(Min)_(°C)', 'Soil_Temperature_(Mean)_(°C)', 'Soil_Moisture_(Max)_(m³/m³)', 'Soil_Moisture_(Min)_(m³/m³)', 'Soil_Moisture_(Mean)_(m³/m³)', 'Vapor_Pressure_Deficit_(Max)_(hPa)', 'Vapor_Pressure_Deficit_(Min)_(hPa)', 'Vapor_Pressure_Deficit_(Mean)_(hPa)', 'Temperature_(Max)_(°C)', 'Temperature_(Min)_(°C)', 'Temperature_(Mean)_(°C)', 'Precipitation_Total_(Sum)_(mm)', 'Wind_Speed_(Max)_(km/h)', 'Wind_Speed_(Min)_(km/h)', 'Wind_Speed_(Mean)_(km/h)', 'Wind_Directio

NameError: name 'interested_dates_cols' is not defined

## Trial Data with Soil Data

In [27]:
# identifies the duplicated columns in the soil data that need to be removed
soil_columns_tobe_removed = [LATITUDE, LONGITUDE]

# merges trial data with CE Hub soil data
trial_soil_df = merge_soil_data(trial_df, soil_df, soil_columns_tobe_removed)
trial_soil_df.to_csv(trial_soil_filepath, index = False)
print(f'\nJoined trial and soil data columns are:\n{trial_soil_df.columns}')

NameError: name 'trial_df' is not defined

## Trial-Weather Data with Soil Data¶

In [209]:
trial_weather_soil_df = merge_soil_data(trial_weather_df, soil_df, soil_columns_tobe_removed)
trial_weather_soil_df.to_csv(trial_weather_soil_filepath, index = False)
print(f'\nJoined trial, weather and soil data columns are:\n{trial_weather_soil_df.columns}')

Soil columns after dropping the duplicated columns are: 
Index(['Trial', 'Bulk_Density_(0-30)_(kg/m³)',
       'Cation_Exchange_Capacity_(0-30)_(cmolc/kg)',
       'Clay_Content_(0-2_micro_meter)_mass_fraction_(0-30)_(%)',
       'Coarse_Fragments_volumetric_fraction_(0-30)_(vol. %)',
       'Organic_Carbon_Content_(fine_earth_fraction)_(0-30)_(%)',
       'Organic_Carbon_Density_(0-30)_(kg/m³)',
       'Organic_Carbon_Stocks_(0-30 cm)_(kg/m²)',
       'Sand_content_(50-2000_micro_meter)_mass_fraction_(0-30)_(%)',
       'Silt_Content_(2-50_micro_meter)_mass_fraction_(0-30)_(%)',
       'Total_Nitrogen_Content_(0-30)_(g/kg)', 'pH_in_H2O_(0-30)_(pH)',
       'Bulk_Density_(0-60)_(kg/m³)',
       'Cation_Exchange_Capacity_(0-60)_(cmolc/kg)',
       'Clay_Content_(0-2_micro_meter)_mass_fraction_(0-60)_(%)',
       'Coarse_Fragments_volumetric_fraction_(0-60)_(vol. %)',
       'Organic_Carbon_Content_(fine_earth_fraction)_(0-60)_(%)',
       'Organic_Carbon_Density_(0-60)_(kg/m³)',
       

In [31]:
#Merge Weather and Soil data
#Hack for trials with 400 502 responses.
soil_columns_tobe_removed = [LATITUDE, LONGITUDE]
soil_dropped_df = soil_df.drop(soil_columns_tobe_removed, axis=1) 

# left join on two dataframes
merged_df = weather_df.merge(soil_dropped_df, on='Trial', how='left')
merged_df.to_csv(trial_weather_soil_filepath, index = False)