# Weather datasets verification portal - Historical gridded datasets

## Inputs

In [135]:
# Basic inputs
variable=['E_AIR_TEMPERATURE','E_RELATIVE_HUMIDITY','E_RAINFALL','E_DEWPOINT','E_WIND_SPEED'] # E_AIR_TEMPERATURE

aggregation=['daily_avg','daily_avg','daily_sum','daily_avg','daily_avg'] # daily_max, daily_min, daily_sum, daily_avg
time_aggregation='daily' #daily, weekly, monthly, default='daily'

country='None'
location='None' # --> Coordinates + radius or closest station
region=['LATAM','APAC']

start = '2021-01-01T00:00:00'
end = '2023-11-20T00:00:00'

analysis_name='LATAM-APAC'
account_name='Irene_Caramatti'

# Gridded datasets (from table)
dataset=['ERA5T',
'NEMSGLOBAL',
'NEMSAUTO',
'MFGLOBAL',
'MFEU',
'ICON',
'GFS05',
'NAM12',
'IMERG-REALTIME',
'CMORPH',
'CHIRPS2',
'PDIRNOW',
'LATAMCOSCH',
'ARC2',
'CPCGBAUS',
'BOM',
'MFFR']

# Weather stations
import numpy as np
min_completeness=0.6  #otherwise np.nan
min_obs=100

suppliers= ["Davis","Arable","Pessl","Sencrop"]

#Analysis (from table)
metrics=['MAE','R','KGE','RMSE']
threshold_pc= 5
threshold_pod_far=1

###  Import packages

In [136]:
from Functions.utils import *

### Quick Start

In [137]:
# It creates the bucket for this analysis 
bucket='deaw-abb-dataplatform-cehub'       
key="Validation_analyses/" 
key_analysis=key + account_name + '/' + analysis_name 

In [138]:
# It merge variable and aggregation. It simplify the fuctions later
var=[m+'_'+n for m,n in zip(variable,aggregation)]

### Identification of WS based on the selection location/country/region

In [5]:
def get_stations_region(variable, start, end, suppliers, region):
    
    import json
    import pandas as pd
    from datetime import datetime, timedelta
    import requests
    from dotenv import load_dotenv
    
    load_dotenv()
    CEHUB_API_KEY_WS=os.getenv('CEHUB_API_KEY_WS')
    
    url = "https://t2xer83e5a.execute-api.eu-central-1.amazonaws.com/cehub-prod/station/v2/location"
    
    headers = {
    'x-api-key': CEHUB_API_KEY_WS,
    'Content-Type': 'application/json'
    }
    
    payload = {
            "datefrom": str(start[0:4])+str(start[5:7])+(start[8:10]),
            "dateto": str(end[0:4])+str(end[5:7])+(end[8:10]), 
            "variables":variable,
            "suppliers": suppliers,
            "location":{
               "areas": [ 
                   {"region": region}
               ],
            "only-stations-list": "true"
            }
        }  

    response = requests.request("POST", url, headers=headers, data=json.dumps(payload))
    response = response.json()
    
    return response

In [6]:
if country!='None':
    stationsID = []
    for c in country:
        station=get_stations_country(variable, start, end,suppliers, c)
        for i in range(0,len(station[:][0]['data'])):
            stationsID.append(station[:][0]['data'][i]['stationID'][:])
    
elif (region!='None')&(country=='None'):
    stationsID = []
    for r in region:
        station=get_stations_region(variable, start, end,suppliers, r)
        for i in range(0,len(station[:][0]['data'])):
            stationsID.append(station[:][0]['data'][i]['stationID'][:])
# for point it is missing. Select the closest WS available in a radius  

print(len(stationsID))
save_in_s3(stationsID, 'stations', bucket, key_analysis)

223


###  Observations Data 

In [4]:
import pandas as pd
import pickle

stations=pd.read_csv('IBM_worldwide.csv')
#stations=pd.read_csv('New_Syngenta_stations.csv')
stations=stations.StationID.tolist()
save_in_s3(stations, 'stations', bucket, key_analysis)

In [7]:
def create_obs_frame(variable, start, end, key, bucket, aggregation):
    import json
    import pandas as pd
    from datetime import datetime, timedelta
    import requests
    
    stations=open_from_S3('stations', bucket, key)

    # temporary list to store results
    tmp = []
    df = pd.DataFrame()
    for s in stations:
        try:
            result = get_station_data(station_id = [s], 
                                          datefrom = start[0:4]+start[5:7]+start[8:10], 
                                          dateto = end[0:4]+end[5:7]+end[8:10], 
                                          variables = variable, 
                                          frequency = aggregation, 
                                          format = "JSON")
            tmp += result
        except: continue

    # create dataframe from results
    
    _tmp = create_df_station_data(tmp)
    # reorganize dataframe
    obs_frame = reorganize_data(_tmp)
    
    for v,a,i in zip(variable, aggregation,range(0,len(variable))):
        obs_frame.columns.values[4+i]= v+ '_' + a
        

    save_in_s3(obs_frame, 'observations', bucket, key)
    
    stations=obs_frame[['StationID','Latitude','Longitude']].drop_duplicates().reset_index(drop=True)
    save_in_s3(stations, 'stations', bucket, key)

In [8]:
create_obs_frame(variable, start, end, key_analysis, bucket, aggregation)

{'stations': ['Pessl-002035C5'], 'datefrom': '20210101', 'dateto': '20231120', 'variables': ['E_AIR_TEMPERATURE', 'E_RELATIVE_HUMIDITY', 'E_RAINFALL', 'E_DEWPOINT', 'E_WIND_SPEED'], 'frequencies': ['daily_avg', 'daily_avg', 'daily_sum', 'daily_avg', 'daily_avg'], 'format': 'JSON'}
{'stations': ['Pessl-00203FAC'], 'datefrom': '20210101', 'dateto': '20231120', 'variables': ['E_AIR_TEMPERATURE', 'E_RELATIVE_HUMIDITY', 'E_RAINFALL', 'E_DEWPOINT', 'E_WIND_SPEED'], 'frequencies': ['daily_avg', 'daily_avg', 'daily_sum', 'daily_avg', 'daily_avg'], 'format': 'JSON'}
{'stations': ['Pessl-002036CC'], 'datefrom': '20210101', 'dateto': '20231120', 'variables': ['E_AIR_TEMPERATURE', 'E_RELATIVE_HUMIDITY', 'E_RAINFALL', 'E_DEWPOINT', 'E_WIND_SPEED'], 'frequencies': ['daily_avg', 'daily_avg', 'daily_sum', 'daily_avg', 'daily_avg'], 'format': 'JSON'}
{'stations': ['Pessl-002049C4'], 'datefrom': '20210101', 'dateto': '20231120', 'variables': ['E_AIR_TEMPERATURE', 'E_RELATIVE_HUMIDITY', 'E_RAINFALL', 'E_

### Quality Control

In [83]:
def quality_control(var, key, bucket):
    
    if (~np.isnan(min_completeness))|(~np.isnan(min_completeness)):

        for i in range(0,len(var)):

            obs_frame=open_from_S3('observations', bucket, key) 

            obs=obs_frame[['Time','StationID','Longitude','Latitude',var[i]]]

            # Check of minimum number of records and completeness
            counted=obs.groupby('StationID').count().reset_index()
            counted=counted[['StationID',var[i]]]
            counted=counted.rename(columns={var[i]: "Count"})

            m=obs_frame.groupby(['StationID']).min().reset_index()
            m=m[['StationID','Time']]
            m=m.rename(columns={"Time": "min"})

            M=obs_frame.groupby(['StationID']).max().reset_index()
            M=M[['StationID','Time']]
            M=M.rename(columns={"Time": "max"})

            completeness=(counted.merge(m)).merge(M)
            completeness.drop(completeness.loc[completeness['min'].isna()].index, inplace=True)
            completeness.drop(completeness.loc[completeness['max'].isna()].index, inplace=True)

            completeness['max_count']=completeness['max']-completeness['min']
            completeness['max_count']=completeness['max_count'].apply(lambda x: x.days)+ 1

            completeness['ratio']=completeness['Count']/completeness['max_count']
            completeness=completeness[(completeness['ratio']>0)]

            completeness=completeness.rename(columns={"ratio": "completeness"})
            completeness.completeness=completeness.completeness.round(2)

            IDs=completeness[(completeness.completeness>=min_completeness)&(completeness.Count>=min_obs)].StationID

            mask_obs = obs.set_index('StationID').index.isin(IDs)
            obs=obs[mask_obs]

            l=len(obs.StationID.unique())
            print(var[i] + ':' + str(l))

            if i==0: new_obs_frame=obs
            else: new_obs_frame=new_obs_frame.merge(obs,how='outer')

        new_obs_frame.dropna(subset=var[:]).reset_index(drop=True)

        stations=new_obs_frame[['StationID','Latitude','Longitude']].drop_duplicates().reset_index(drop=True)
    
    else:
         
        new_obs_frame=open_from_S3('observations', bucket, key) 
        stations=open_from_S3('stations', bucket, key) 

    print('Number of stations:' + str(len(stations)))

    return stations, new_obs_frame

In [84]:
stations, obs_frame= quality_control(var, key_analysis, bucket)

E_AIR_TEMPERATURE_daily_avg:131
E_RELATIVE_HUMIDITY_daily_avg:129
E_RAINFALL_daily_sum:127
E_DEWPOINT_daily_avg:130
E_WIND_SPEED_daily_avg:7
Number of stations:136


In [86]:
save_in_s3(stations, 'stations', bucket, key_analysis)
save_in_s3(obs_frame, 'observations', bucket, key_analysis)

### Visualization of retained WS

In [127]:
# P0
from dash import Dash, dcc, html, Input, Output, callback
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.graph_objects as go

import pandas as pd

app = Dash(__name__)

app.layout = html.Div([
    html.Div([

        html.Div([
            dcc.Dropdown(
                var,
                var[0],
                id='variable-type'
            )   
        ], style={'width': '48%'}),
        
        dcc.Graph(id='indicator-graphic')
    ])
])


@callback(
    Output('indicator-graphic', 'figure'),
    Input('variable-type', 'value'))
def update_graph(variable_type_name):
    
    df=open_from_S3('observations', bucket, key_analysis) 
    
    fig = make_subplots(rows=1, cols=1)
    
    dff=df[['StationID','Longitude','Latitude',variable_type_name]]
    dff=dff.dropna(subset=variable_type_name)
    
    stations=dff[['StationID','Longitude','Latitude']].drop_duplicates().reset_index(drop=True)
    
    fig = go.Figure(data=go.Scattergeo(
        lon = stations['Longitude'],
        lat = stations['Latitude']
        ))
          
    fig.update_layout(margin={'l': 40, 'b': 40, 't': 10, 'r': 0}, hovermode='closest')
    

    return fig

if __name__ == '__main__':
    app.run(debug=True)

In [128]:
# Visual inspection of weather stations
from dash import Dash, dcc, html, Input, Output, callback
import plotly.express as px

import pandas as pd

app = Dash(__name__)

stations=open_from_S3("stations", bucket, key_analysis) 
df=open_from_S3('observations', bucket, key_analysis) 

app.layout = html.Div([
    html.Div([

        html.Div([
            dcc.Dropdown(
                stations.StationID,
                stations.StationID[0],
                id='station-name'
            ),
            dcc.Dropdown(
                var,
                var[0],
                id='variable-type'
            )   
        ], style={'width': '48%'}),
        
        dcc.Graph(id='indicator-graphic')
    ])
])


@callback(
    Output('indicator-graphic', 'figure'),
    Input('variable-type', 'value'),
    Input('station-name', 'value'))
def update_graph(variable_type_name, station_name):
    
    dff = df[df.StationID==station_name]

    fig = px.scatter(dff, x='Time', y=variable_type_name)
    
    fig.update_layout(margin={'l': 40, 'b': 40, 't': 10, 'r': 0}, hovermode='closest')
    
    fig.update_yaxes(title=variable_type_name)

    return fig

if __name__ == '__main__':
    app.run(debug=True)

In [129]:
# Stations by country
# P0
from dash import Dash, dcc, html, Input, Output, callback
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.graph_objects as go
import requests
from shapely.geometry import mapping, shape
from shapely.prepared import prep
from shapely.geometry import Point
import pandas as pd

df=open_from_S3('observations', bucket, key_analysis) 

data = requests.get("https://raw.githubusercontent.com/datasets/geo-countries/master/data/countries.geojson").json()

countries = {}
for feature in data["features"]:
    geom = feature["geometry"]
    country = feature["properties"]["ADMIN"]
    countries[country] = prep(shape(geom))

COUNTRY=[]
for i in range(0,len(stations)):
    lon=stations['Longitude'].loc[i]
    lat=stations['Latitude'].loc[i]
    point = Point(lon, lat)
    count='unknown'
    for country, geom in countries.items():
        if geom.contains(point):
            count=country
    COUNTRY.append(count)

stations['COUNTRY']=COUNTRY

app = Dash(__name__)

app.layout = html.Div([
    html.Div([

        html.Div([
            dcc.Dropdown(
                var,
                var[0],
                id='variable-type'
            )   
        ], style={'width': '48%'}),
        
        dcc.Graph(id='indicator-graphic')
    ])
])


@callback(
    Output('indicator-graphic', 'figure'),
    Input('variable-type', 'value'))
def update_graph(variable_type_name):
    
    dff=df[['StationID','Longitude','Latitude',variable_type_name]]
    dff=dff.dropna(subset=variable_type_name)
    
    stations=dff[['StationID','Longitude','Latitude']].drop_duplicates().reset_index(drop=True)
    
    countries = {}
    for feature in data["features"]:
        geom = feature["geometry"]
        country = feature["properties"]["ADMIN"]
        countries[country] = prep(shape(geom))

    COUNTRY=[]
    for i in range(0,len(stations)):
        lon=stations['Longitude'].loc[i]
        lat=stations['Latitude'].loc[i]
        point = Point(lon, lat)
        count='unknown'
        for country, geom in countries.items():
            if geom.contains(point):
                count=country
        COUNTRY.append(count)

    stations['COUNTRY']=COUNTRY
    
    dff_grouped=stations.groupby(by='COUNTRY').count().reset_index()
    dff_grouped=dff_grouped[['COUNTRY','StationID']]
    dff_grouped=dff_grouped.rename(columns={'StationID':'count'})
    
    fig = px.bar(dff_grouped, x="COUNTRY", y="count", barmode="group")
          
    fig.update_layout(margin={'l': 40, 'b': 40, 't': 10, 'r': 0}, hovermode='closest')
    

    return fig

if __name__ == '__main__':
    app.run(debug=True)

## Extractrion of gridded datasets from CE Hub

In [88]:
def extraction(dataset, key, bucket, variable,start,end, ini, aggregation):
       
    # Import packages
    import json
    import urllib.request
    import urllib.error
    import calendar
    import time
    import datetime
    import requests
    import pandas as pd
    from botocore.errorfactory import ClientError
    import io
    import numpy as np
    import boto3
    import meteoblue_dataset_sdk
    import logging
    from dotenv import load_dotenv
    
    
    load_dotenv()

    CEHUB_API_KEY=os.getenv('CEHUB_API_KEY')
    client = meteoblue_dataset_sdk.Client(apikey=CEHUB_API_KEY)
    # Display information about the current download state
    logging.basicConfig(level=logging.INFO)

    AWS_SECRET_KEY, AWS_ACCESS_KEY=get_AWS_credentials()

    s3 = boto3.client('s3', 
                            aws_access_key_id = AWS_ACCESS_KEY, 
                            aws_secret_access_key = AWS_SECRET_KEY
                           )
    
    # Create folder for the extraction
    new_key = key + '/' + str(dataset)
    
    stations=open_from_S3('stations', bucket, key) 
    
    url=get_CEHUB_API()
    
    query = {
        "units": {
            "temperature": "C",
            "velocity": "km/h",
            "length": "metric",
            "energy": "watts",
        },
        "geometry": {
            "type": "MultiPoint",
            "coordinates": [[7.57327, 47.558399, 279]],
            "locationNames": ["Basel"],
        },
        "format": "protobuf",
        "timeIntervals": ["2000-01-01T+00:00/2020-01-01T+00:00"],
        "timeIntervalsAlignment": "none",
        "queries": [
            {
                "domain": "ERA5T",
                "gapFillDomain": None,
                "timeResolution": "daily",
                "codes": [{"code": 11, "level": "2 m above gnd", "aggregation": "mean"}],
            }
        ],
    }
    
    # Definition of the dataset to query
    query['queries'][0]['domain'] = dataset

    print('Number of available stations: ' + str(len(stations)))

    for i in range(ini, len(stations)):
        try: 
            query["geometry"]['coordinates'][0][0] = stations['Longitude'].iloc[i]
            query["geometry"]['coordinates'][0][1] = stations['Latitude'].iloc[i]
            time1=start[0:11]+'+00:00'
            time2=end[0:11]+'+00:00'
            TimeInterval=str(time1) + '/' + str(time2)
            query["timeIntervals"][0] = TimeInterval

            try:        
                s3.head_object(Bucket=bucket, Key=new_key + '/' + str(stations['StationID'].iloc[i]) + ".pkl")
            except ClientError:

                for v,a,l_v in zip(variable, aggregation, range(0,len(variable))):
                    if (v!='E_RAINFALL')&((dataset=='IMERG-REALTIME')|(dataset=='CMORPH')|(dataset=='ARC2')|(dataset=='PDIRNOW')|(dataset=='CHIRPS2')|(dataset=='CPCGBAUS')|(dataset=='LATAMCOSCH')): continue
                    else:
                        try:
                            if v=='E_AIR_TEMPERATURE': 
                                query["queries"][0]["codes"][0]["code"]=11
                                query["queries"][0]["codes"][0]["level"]="2 m above gnd"
                            elif v=='E_RAINFALL': 
                                query["queries"][0]["codes"][0]["code"]=61
                                query["queries"][0]["codes"][0]["level"]="sfc"
                            elif v=='E_SOLAR': 
                                query["queries"][0]["codes"][0]["code"]=204
                                query["queries"][0]["codes"][0]["level"]="sfc"
                            elif v=='E_RELATIVE_HUMIDITY': 
                                query["queries"][0]["codes"][0]["code"]=52
                                query["queries"][0]["codes"][0]["level"]="2 m above gnd"
                            elif v=='E_WIND_SPEED': 
                                query["queries"][0]["codes"][0]["code"]=32
                                query["queries"][0]["codes"][0]["level"]="10 m above gnd"
                            elif v=='E_WIND_GUST': 
                                query["queries"][0]["codes"][0]["code"]=180
                                query["queries"][0]["codes"][0]["level"]="10 m above gnd"
                            elif v=='E_DEWPOINT': 
                                query["queries"][0]["codes"][0]["code"]=17
                                query["queries"][0]["codes"][0]["level"]="2 m above gnd"

                            if a.split('_')[1]=='avg': query["queries"][0]["codes"][0]["aggregation"]="mean"
                            elif a.split('_')[1]=='sum': query["queries"][0]["codes"][0]["aggregation"]="sum"
                            elif a.split('_')[1]=='max': query["queries"][0]["codes"][0]["aggregation"]="max"
                            elif a.split('_')[1]=='min': query["queries"][0]["codes"][0]["aggregation"]="min"

                            result = client.query_sync(query)

                            df = meteoblue_result_to_dataframe(result.geometries[0])
                            
                            if (l_v==0)|(dataset=='IMERG-REALTIME')|(dataset=='CMORPH')|(dataset=='ARC2')|(dataset=='PDIRNOW')|(dataset=='CHIRPS2')|(dataset=='CPCGBAUS')|(dataset=='LATAMCOSCH'): 
                                gridded_data=df
                            else: gridded_data=gridded_data.merge(df,how='outer')
                            
                        except:
                            continue

                save_in_s3(gridded_data, str(stations['StationID'].iloc[i]), bucket, new_key)
                print(str(stations['StationID'].iloc[i]) + '_' + dataset)  
    
        except: # The dataset doesn't cover this location
            continue

In [90]:
import psutil
import concurrent.futures

cores=psutil.cpu_count()

stations=open_from_S3('stations', bucket, key_analysis) 

for i in range(0,len(dataset)):
    print(dataset[i])
    with concurrent.futures.ThreadPoolExecutor() as executor:
        results=[executor.submit(extraction, dataset[i], key_analysis, bucket, variable, start,end, ini, aggregation)
            for ini in range(0,len(stations),int(len(stations)/cores)+1)] 

ERA5T
Number of available stations: 136
Number of available stations: 136
Number of available stations: 136Number of available stations: 136

Number of available stations: 136Number of available stations: 136

Number of available stations: 136Number of available stations: 136

Arable-C006696_ERA5T
Arable-C002728_ERA5T
Arable-C006017_ERA5T
Arable-C006953_ERA5T
Pessl-0020B016_ERA5T
Arable-C009814_ERA5T
Arable-C008163_ERA5T
Pessl-00002DE1_ERA5T
Arable-C006956_ERA5T
Arable-C006969_ERA5T
Arable-C006737_ERA5T
Arable-C003787_ERA5T
Pessl-0020B06E_ERA5T
Arable-C008164_ERA5T
Arable-C006021_ERA5T
Arable-C006806_ERA5T
Arable-C006974_ERA5T
Pessl-002035C5_ERA5T
Arable-C010251_ERA5T
Arable-C006976_ERA5T
Arable-C006022_ERA5T
Arable-C006993_ERA5T
Arable-C008172_ERA5T
Pessl-0020CCCB_ERA5T
Arable-C004522_ERA5T
Arable-C006812_ERA5T
Pessl-002036CB_ERA5T
Arable-C010253_ERA5T
Arable-C008187_ERA5T
Arable-C006819_ERA5T
Arable-C006825_ERA5T
Arable-C006026_ERA5T
Pessl-0020D98E_ERA5T
Arable-C007694_ERA5T
Arable-C

In [None]:
# Possibility to add an external dataset
import pandas as pd

external_dataset=pd.read_csv('IBM_final.csv')
del external_dataset['Unnamed: 0']
external_dataset

In [None]:
external_dataset['Time']=pd.to_datetime(external_dataset['Time'])

obs_frame=open_from_S3('observations', bucket, key_analysis) 

df_new=obs_frame.merge(external_dataset)
df_new

In [None]:
dataset=dataset+['IBM']
dataset

## Merge data from gridded datasets with WS observations

In [91]:
def merge( dataset, bucket, key):
    
    # Import packages
    import pandas as pd

    obs_frame=open_from_S3('observations', bucket, key) 
    stations=open_from_S3('stations', bucket, key) 
    
    for d in dataset:
        print(d)
        try:
            new_key = key + '/' + str(d)

            extract = []
            for i in range(0,len(stations)):
                try: 
                    df=open_from_S3(str(stations['StationID'].iloc[i]), bucket, new_key) 
                    # Adjust unit of measurement for windspeed

                    df = df.assign(StationID=stations['StationID'].iloc[i])
                    extract.append(df)

                except: pass

            df_new = pd.concat(extract, ignore_index=True)
                        
            df_new['timestamp'] = pd.to_datetime(df_new['timestamp'])
            df_new['StationID']=pd.Categorical(df_new['StationID'])
            
            del df_new['lon']
            del df_new['lat']

            df_new=df_new.rename(columns={'timestamp': 'Time'})
            df_new['Time']=df_new['Time'].dt.date
            df_new['Time'] = pd.to_datetime(df_new['Time'])
            
            
            df_new = pd.merge(obs_frame, df_new, how='inner')
            
            save_in_s3(df_new, 'Merged_data_' + str(d), bucket, key)
        except: continue

In [92]:
# Merge extraction with observations
merge(dataset, bucket, key_analysis)  # AWS secret key

ERA5T
            Time       StationID  Longitude  Latitude  \
0     2021-01-01  Arable-C002728    -48.806   -21.563   
1     2021-01-02  Arable-C002728    -48.806   -21.563   
2     2021-01-03  Arable-C002728    -48.806   -21.563   
3     2021-01-04  Arable-C002728    -48.806   -21.563   
4     2021-01-05  Arable-C002728    -48.806   -21.563   
...          ...             ...        ...       ...   
59430 2023-05-07  Arable-C008362    -54.171   -15.701   
59431 2023-05-08  Arable-C008362    -54.171   -15.701   
59432 2023-05-09  Arable-C008362    -54.171   -15.701   
59433 2023-05-10  Arable-C008362    -54.171   -15.701   
59434 2023-05-11  Arable-C008362    -54.171   -15.701   

       E_AIR_TEMPERATURE_daily_avg  E_RELATIVE_HUMIDITY_daily_avg  \
0                            24.04                          20.94   
1                            24.17                          20.70   
2                            24.88                          19.91   
3                            23.7

### Time aggregation

In [101]:
def time_aggregation (var, time_aggregation, dataset, key, bucket, aggregation):
    
    import pandas as pd

    for i in range (0,len(dataset)):
        print(dataset[i])
        try:
            df=open_from_S3('Merged_data_' + str(dataset[i]), bucket, key) 

            df['month']=pd.DatetimeIndex(df['Time']).month
            df['YEAR']=pd.DatetimeIndex(df['Time']).year
            
            for v, a in zip(var, aggregation):
                
                try:
                    df_variable=df[['Time','YEAR','month','StationID',v, v + '_dataset','Latitude','Longitude']]

                    if (a=='daily_avg')|(a=='daily_min')|(a=='daily_max'):
                        if (time_aggregation=='month'):
                            df_agg=df_variable.groupby(['YEAR','month','StationID'], as_index=False).mean()
                        elif (time_aggregation=='week'):
                            df_variable['week'] = pd.DatetimeIndex(df_variable['Time']).strftime("%V")
                            df_variable['week'] = pd.to_numeric(df_variable['week'])
                            df_mean = df_variable.groupby(['YEAR','week', 'StationID'], as_index=False).mean()
                            df_first = df_variable.groupby(['YEAR','week', 'StationID'], as_index=False).first()
                            del df_mean['month']
                            del df_first[v]
                            del df_first[v + '_dataset']
                            df_agg=df_mean.merge(df_first)
                        else: df_agg=df_variable

                    else:
                        if (time_aggregation=='month'):
                            df_agg=df_variable.groupby(['YEAR','month','StationID'], as_index=False).sum(min_count=25)
                        elif (time_aggregation=='week'):
                            df_variable['week'] = pd.DatetimeIndex(df_variable['Time']).strftime("%V")
                            df_variable['week'] = pd.to_numeric(df_variable['week'])
                            df_mean = df_variable.groupby(['YEAR','week', 'StationID'], as_index=False).sum(min_count=7)
                            df_first = df_variable.groupby(['YEAR','week', 'StationID'], as_index=False).first()
                            del df_mean['month']
                            del df_first[v]
                            del df_first[v + '_dataset']
                            df_agg=df_mean.merge(df_first)
                        else: 
                            df_agg=df_variable

                    df_agg=df_agg.dropna(subset=[v, v + '_dataset']).reset_index(drop=True)

                    save_in_s3(df_agg, 'Aggregated' + str(dataset[i]) + '_' + v, bucket, key)
                except: continue
        except: continue

In [102]:
time_aggregation(var, time_aggregation, dataset, key_analysis,  bucket, aggregation)  

ERA5T
            Time  YEAR  month       StationID  E_AIR_TEMPERATURE_daily_avg  \
0     2021-01-01  2021      1  Arable-C002728                        24.04   
1     2021-01-02  2021      1  Arable-C002728                        24.17   
2     2021-01-03  2021      1  Arable-C002728                        24.88   
3     2021-01-04  2021      1  Arable-C002728                        23.74   
4     2021-01-05  2021      1  Arable-C002728                        25.20   
...          ...   ...    ...             ...                          ...   
59430 2023-05-07  2023      5  Arable-C008362                          NaN   
59431 2023-05-08  2023      5  Arable-C008362                          NaN   
59432 2023-05-09  2023      5  Arable-C008362                          NaN   
59433 2023-05-10  2023      5  Arable-C008362                          NaN   
59434 2023-05-11  2023      5  Arable-C008362                          NaN   

       E_AIR_TEMPERATURE_daily_avg_dataset  Latitude  Lon

### Creation of a unique dataframe

In [105]:
def unique_dataframe(dataset, var, key,  bucket):
    import pandas as pd
    
    for v in var:
        for d,i in zip(dataset,range(0,len(dataset))):
            try:
                df=open_from_S3('Aggregated' + str(dataset[i]) + '_' + v, bucket, key)
                
                if len(df)>0:
                    df.StationID=pd.Categorical(df.StationID)
                    df=df[['Time','StationID','Longitude','Latitude','month',v,v + '_dataset']]
                    df=df.rename(columns={v + '_dataset': v + '_' + d})

                    if i==0: df_new=df
                    else: df_new=df_new.merge(df, how='outer')
                    
            except: continue
        
        save_in_s3(df_new, 'Final_dataframe_' + v, bucket, key)

unique_dataframe(dataset, var, key_analysis,  bucket)

            Time       StationID  Longitude  Latitude  month  \
0     2021-01-01  Arable-C002728    -48.806   -21.563      1   
1     2021-01-02  Arable-C002728    -48.806   -21.563      1   
2     2021-01-03  Arable-C002728    -48.806   -21.563      1   
3     2021-01-04  Arable-C002728    -48.806   -21.563      1   
4     2021-01-05  Arable-C002728    -48.806   -21.563      1   
...          ...             ...        ...       ...    ...   
55160 2023-11-16  Pessl-0020D99A    -60.967   -33.114     11   
55161 2023-11-17  Pessl-0020D99A    -60.967   -33.114     11   
55162 2023-11-18  Pessl-0020D99A    -60.967   -33.114     11   
55163 2023-11-19  Pessl-0020D99A    -60.967   -33.114     11   
55164 2023-11-20  Pessl-0020D99A    -60.967   -33.114     11   

       E_AIR_TEMPERATURE_daily_avg  E_AIR_TEMPERATURE_daily_avg_ERA5T  
0                            24.04                          23.789167  
1                            24.17                          24.138330  
2              

### Statistics

In [111]:
def statistics(var, dataset, bucket, key):
    import sklearn.metrics 
    import scipy.stats
    import math
    import numpy as np
    import hydroeval as he
    import requests
    from shapely.geometry import mapping, shape
    from shapely.prepared import prep
    from shapely.geometry import Point

    for v in var:

        df=open_from_S3('Final_dataframe_' + v, bucket, key) 
        
        s=df.StationID.unique()

        best=[]
        for d in dataset:
            for st in s:
                try:
                    series=df[[v, v + '_' + d,'StationID']]
                    series=series[series.StationID==st]
                    series=series.dropna(subset=[v])
                    series=series.dropna(subset=[v + '_' + d])
                    obs=series[v]
                    data=series[v + '_' + d]
                    if (len(obs)>0):
                        mae=round(sklearn.metrics.mean_absolute_error(obs, data),2)
                        rmse=round(math.sqrt(sklearn.metrics.mean_squared_error(obs, data)), 2)
                        percentage=round(percentage_below_threshold(abs(data-obs),threshold_pc),0)
                        bias=round(np.mean(data-obs),2)
                        kge, r, alpha, beta = he.evaluator(he.kge, data, obs)
                        r2=round(r[0]**2,2)

                        if v=='E_RAINFALL':
                            obs=np.where(obs>threshold_pod_far, 1, 0)
                            data=np.where(data>threshold_pod_far, 1, 0)
                            tn, fp, fn, tp = sklearn.metrics.confusion_matrix(obs, data).ravel()
                            POD=round(tp/(tp+fn),2)
                            FAR=round(fp/(fp+fn),2)
                        else: 
                            POD=np.nan
                            FAR=np.nan

                        best.append([d, st, mae, rmse, percentage, r2, bias, abs(bias), POD, FAR, round(kge[0],2) , round(r[0],2) , round(alpha[0], 2), round(beta[0],2)])
                    else: continue
                except: continue
                
        df_new=pd.DataFrame(best,columns=['DATASET','StationID','MAE','RMSE', 'PC','R2','Bias','Abs Bias','POD','FAR', 'KGE', 'R', 'Alpha', 'Beta'])
            
        stations=open_from_S3("stations", bucket, key) 

        data = requests.get("https://raw.githubusercontent.com/datasets/geo-countries/master/data/countries.geojson").json()

        countries = {}
        for feature in data["features"]:
            geom = feature["geometry"]
            country = feature["properties"]["ADMIN"]
            countries[country] = prep(shape(geom))

        COUNTRY=[]
        for i in range(0,len(stations)):
            lon=stations['Longitude'].loc[i]
            lat=stations['Latitude'].loc[i]
            point = Point(lon, lat)
            count='unknown'
            for country, geom in countries.items():
                if geom.contains(point):
                    count=country
            COUNTRY.append(count)

        stations['COUNTRY']=COUNTRY

        df_new=df_new.merge(stations)
        
        save_in_s3(df_new, 'Metrics_' + v, bucket, key)
    
statistics(var, dataset, bucket, key_analysis)


invalid value encountered in true_divide


invalid value encountered in true_divide


invalid value encountered in true_divide


invalid value encountered in true_divide


invalid value encountered in true_divide



## Output

In [130]:
df=open_from_S3('Metrics_' + var[0], bucket, key_analysis) 
country_list=df.COUNTRY.unique()
country_list

array(['Brazil', 'Argentina', 'Chile', 'India', 'Guatemala', 'Colombia',
       'Indonesia', 'Vietnam', 'Taiwan', 'Australia', 'Peru', 'Ecuador',
       'Kazakhstan'], dtype=object)

In [131]:
# P1
from dash import Dash, dcc, html, Input, Output, callback
import plotly.express as px

import pandas as pd

app = Dash(__name__)

app.layout = html.Div([
    html.Div([

        html.Div([
            dcc.Dropdown(
                country_list,
                country_list[0],
                id='country-type'
            ),
            dcc.Dropdown(
                metrics,
                metrics[0],
                id='metric-type'
            ),
            dcc.Dropdown(
                var,
                var[0],
                id='variable-type'
            )   
        ], style={'width': '48%'}),
        
        dcc.Graph(id='indicator-graphic')
    ])
])


@callback(
    Output('indicator-graphic', 'figure'),
    Input('country-type', 'value'),
    Input('metric-type', 'value'),
    Input('variable-type', 'value'))
def update_graph(country_type_name, metric_type_name, variable_type_name):
    
    df=open_from_S3('Metrics_' + variable_type_name, bucket, key_analysis) 
    mean=df.groupby(['DATASET','COUNTRY'])[metric_type_name].mean().reset_index().round(2)
    
    dff = mean[mean.COUNTRY==country_type_name]

    fig = px.bar(dff, x='DATASET', y=metric_type_name)

    fig.update_layout(margin={'l': 40, 'b': 40, 't': 10, 'r': 0}, hovermode='closest')
    
    fig.update_yaxes(title=metric_type_name)

    return fig

if __name__ == '__main__':
    app.run(debug=True)

In [132]:
# P2
from dash import Dash, dcc, html, Input, Output, callback
import plotly.express as px
from plotly.subplots import make_subplots
import plotly.graph_objects as go

import pandas as pd

from matplotlib.colors import LinearSegmentedColormap
cdict = {
    'red': ((0.0, 0.12, 0.12),
            (1.0, 0.96, 0.96)),

    'green': ((0.0, 0.53, 0.53),
              (1.0, 0.15, 0.15)),

    'blue': ((0.0, 0.90, 0.90),
             (1.0, 0.34, 0.34)),

    'alpha': ((0.0, 1, 1),
              (0.5, 1, 1),
              (1.0, 1, 1))
}

red_blue = LinearSegmentedColormap('RedBlue', cdict)

def matplotlib_to_plotly(cmap, pl_entries):
    h = 1.0/(pl_entries-1)
    pl_colorscale = []
red_blue = matplotlib_to_plotly(red_blue, 255)


app = Dash(__name__)

app.layout = html.Div([
    html.Div([

        html.Div([
            dcc.Dropdown(
                metrics,
                metrics[0],
                id='metric-type'
            ),
            dcc.Dropdown(
                var,
                var[0],
                id='variable-type'
            ),
            dcc.Dropdown(
                dataset,
                dataset[0],
                id='dataset-name'
            )
            
        ], style={'width': '48%'}),
        
        dcc.Graph(id='First dataset')
    ])
])


@callback(
    Output('First dataset', 'figure'),
    Input('metric-type', 'value'),
    Input('variable-type', 'value'),
    Input('dataset-name', 'value'))
def update_graph(metric_type_name, variable_type_name, dataset_name):
    
    df=open_from_S3('Metrics_' + variable_type_name, bucket, key_analysis) 
    
    if metric_type_name=='MAE': 
        c_min=0
        c_max=5
    elif metric_type_name=='R':
        c_min=0
        c_max=1
    elif metric_type_name=='KGE':
        c_min=-1
        c_max=1
    
    dff=df[df.DATASET==dataset_name]
    fig = go.Figure(data=go.Scattergeo(
        lon = dff['Longitude'],
        lat = dff['Latitude'],
        marker_color = dff[metric_type_name],
        marker=dict(colorscale=red_blue, showscale=True, cmin=c_min, cmax=c_max, colorbar=dict(thickness=5, outlinewidth=0))))
   

    fig['layout']['showlegend'] = False
    

    fig.update_layout(margin={'l': 40, 'b': 40, 't': 10, 'r': 0}, hovermode='closest')
    return fig
    

if __name__ == '__main__':
    app.run(debug=True)

In [146]:
# Visual inspection of weather stations
from dash import Dash, dcc, html, Input, Output, callback
import plotly.express as px
import plotly.graph_objects as go

import pandas as pd

app = Dash(__name__)

df=open_from_S3('observations', bucket, key_analysis) 

app.layout = html.Div([
    html.Div([

        html.Div([
            dcc.Dropdown(
                stations.StationID,
                stations.StationID[0],
                id='station-name'
            ),
            dcc.Dropdown(
                var,
                var[0],
                id='variable-type'
            ),
            dcc.Checklist(
                dataset,
                [dataset[0]],
                inline=True,
                id='dataset-list'
            )
        ], style={'width': '48%'}),
        
        dcc.Graph(id='indicator-graphic')
    ])
])


@callback(
    Output('indicator-graphic', 'figure'),
    Input('variable-type', 'value'),
    Input('station-name', 'value'),
    Input('dataset-list', 'value')
)
def update_graph(variable_type_name, station_name, dataset_list):
    
    df=open_from_S3('Final_dataframe_' + variable_type_name, bucket, key_analysis) 
    
    dff = df[df.StationID==station_name]
    
    fig = go.Figure()
    
    fig.add_trace(go.Scatter(x=dff.Time, y=dff[variable_type_name],
                        mode='markers',
                        name='Observations'))
    
    for d in dataset_list:
        fig.add_trace(go.Scatter(x=dff.Time, y=dff[variable_type_name + '_' + d],
                            mode='lines+markers',
                            name=d))
    
    
    fig.update_layout(margin={'l': 40, 'b': 40, 't': 10, 'r': 0}, hovermode='closest')
    
    fig.update_yaxes(title=variable_type_name)

    return fig

if __name__ == '__main__':
    app.run(debug=True)

[1;31m---------------------------------------------------------------------------[0m
[1;31mKeyError[0m                                  Traceback (most recent call last)
[1;32m~\Anaconda3\lib\site-packages\pandas\core\indexes\base.py[0m in [0;36mget_loc[1;34m(
    self=Index(['Time', 'StationID', 'Longitude', 'Latitu...PERATURE_daily_avg_GFS05'],
      dtype='object'),
    key='E_AIR_TEMPERATURE_daily_avg_ARC2',
    method=None,
    tolerance=None
)[0m
[0;32m   3801[0m             [1;32mtry[0m[1;33m:[0m[1;33m[0m[1;33m[0m[0m
[1;32m-> 3802[1;33m                 [1;32mreturn[0m [0mself[0m[1;33m.[0m[0m_engine[0m[1;33m.[0m[0mget_loc[0m[1;33m([0m[0mcasted_key[0m[1;33m)[0m[1;33m[0m[1;33m[0m[0m
[0m        [0;36mself._engine.get_loc[0m [1;34m= <built-in method get_loc of pandas._libs.index.ObjectEngine object at 0x00000200C6FEC0A0>[0m[1;34m
        [0m[0;36mcasted_key[0m [1;34m= 'E_AIR_TEMPERATURE_daily_avg_ARC2'[0m
[0;32m   3803[0m     