In [60]:
# Evalúa el error de diferentes metodologías de cálculo de las curvas IDF.

import warnings
import numpy as np
import xarray as xr
from scipy import optimize

warnings.filterwarnings( "ignore", category = RuntimeWarning )

# W: número de longitud.
# N: número de latitud.
W = 1
N = 0

In [61]:
# Método de Wenzel sin parámetro c con regresión lineal múltiple.

# Cargamos el archivo.
i = 3
name = [ "CHIRPS_comp", "CHIRPS_megalopolis", "prec_hist_comp_CHIRPS",
    "prec_hist_comp_estaciones", "prec_hist_hist" ]
path_ret = "../results/" + name[i] + "/" + name[i] + "_tretorno.nc" 
ds_ret = xr.open_dataset(path_ret)
df = ds_ret.to_dataframe()

# Quitamos la intensidad, año, probabilidad, tiempo de retorno, y agregamos
# columnas para los parámetros de la distribución.
df = df.reorder_levels( ["LATITUD",
    "LONGITUD", "DURACION", "TIEMPO_RETORNO"] )
df_2 = df.copy().drop( ["PROBABILIDAD", "AÑO"], axis = 1 
    ).reset_index( "TIEMPO_RETORNO" )
df_2 = df_2.reset_index( "DURACION" )
df_3 = df_2.copy().drop(
    ["INTENSIDAD", "TIEMPO_RETORNO", "DURACION"], axis = 1
    ).groupby( ["LATITUD", "LONGITUD"] ).mean()
cols = ["k", "m", "n", "error"]
df_3[ cols ] = None

# Escogemos latitud y longitud.
i = df_3.index.get_level_values("LATITUD").unique()[N]
j = df_3.index.get_level_values("LONGITUD").unique()[W]

# Creamos la tabla a predecir.
t = ( ( [5, 10, 25, 50, 100, 200, 500, 1000] + [None] *
    ( df_2["TIEMPO_RETORNO"].unique().shape[0] - 8 ) )
    * df_2.index.get_level_values("LONGITUD").unique().shape[0]
    * df_2.index.get_level_values("LATITUD").unique().shape[0]
    * df_2["DURACION"].unique().shape[0] )
df_2["TIEMPO_RETORNO"] = t
df_2 = df_2.dropna().set_index( "TIEMPO_RETORNO", append = True )

# Función que nos genera una curva idT.
def idT(X, k, m, n):
    return ( k * X[0] ** m ) / X[1] ** n

# Regresión lineal múltiple.
df_4 = np.log( df.loc[ (i, j), "INTENSIDAD" ].reset_index() )
df_4["1"] = 1
# Predictando.
Y = df_4["INTENSIDAD"]
# Predictores.
X = df_4[ ["1", "TIEMPO_RETORNO", "DURACION"] ]
# Coeficientes de regresión.
B = np.linalg.inv( X.T @ X ) @ ( X.T @ Y )

# Coeficientes de las curvas idT.
df_3.loc[ (i, j), cols[0] ] = np.exp(B[0])
df_3.loc[ (i, j), cols[1] ] = B[1]
df_3.loc[ (i, j), cols[2] ] = -B[2]
# Error
X = np.swapaxes( df.loc[ (i, j) ].reset_index(
    )[ ["TIEMPO_RETORNO", "DURACION"] ].values, 0, 1 )
df_3.loc[ (i, j), cols[3] ] = ( ( df.loc[ (i, j)
    ].reset_index()["INTENSIDAD"].values - idT( 
    X, *df_3.loc[ (i, j) ].iloc[0:3] ) ) ** 2 ).sum()

# Calculamos las intensidades.
X = np.swapaxes( df_2.loc[ (i, j) ].reset_index()[
    ["TIEMPO_RETORNO", "DURACION"] ].values, 0, 1 )
df_2.loc[ (i, j), "INTENSIDAD"] = idT(X, *df_3.loc[ (i, j) ].iloc[0:3] )

# Coeficientes
df_3.loc[ (i, j) ]

k           1.18185
m          0.864496
n           0.79486
error    229.676201
Name: (18.146495819091797, -100.39393615722656), dtype: object

In [62]:
# Método de Wenzel sin parámetro c con mínimos cuadrados no lineales.

# Cargamos el archivo.
i = 3
name = [ "CHIRPS_comp", "CHIRPS_megalopolis", "prec_hist_comp_CHIRPS",
    "prec_hist_comp_estaciones", "prec_hist_hist" ]
path_ret = "../results/" + name[i] + "/" + name[i] + "_tretorno.nc"
ds_ret = xr.open_dataset(path_ret)
df = ds_ret.to_dataframe()

# Quitamos la intensidad, año, probabilidad, tiempo de retorno, y agregamos
# columnas para los parámetros de la distribución.
df = df.reorder_levels( ["LATITUD",
    "LONGITUD", "DURACION", "TIEMPO_RETORNO"] )
df_2 = df.copy().drop( ["PROBABILIDAD", "AÑO"], axis = 1 
    ).reset_index( "TIEMPO_RETORNO" )
df_2 = df_2.reset_index( "DURACION" )
df_3 = df_2.copy().drop(
    ["INTENSIDAD", "TIEMPO_RETORNO", "DURACION"], axis = 1
    ).groupby( ["LATITUD", "LONGITUD"] ).mean()
cols = ["k", "m", "n", "error"]
df_3[ cols ] = None

# Creamos la tabla a predecir.
t = ( ( [5, 10, 25, 50, 100, 200, 500, 1000] + [None] *
    ( df_2["TIEMPO_RETORNO"].unique().shape[0] - 8 ) )
    * df_2.index.get_level_values("LONGITUD").unique().shape[0]
    * df_2.index.get_level_values("LATITUD").unique().shape[0]
    * df_2["DURACION"].unique().shape[0] )
df_2["TIEMPO_RETORNO"] = t
df_2 = df_2.dropna().set_index( "TIEMPO_RETORNO", append = True )

# Escogemos latitud y longitud.
i = df_3.index.get_level_values("LATITUD").unique()[N]
j = df_3.index.get_level_values("LONGITUD").unique()[W]

# Función que nos genera una curva idT.
def idT(X, k, m, n):
    return ( k * X[0] ** m ) / X[1] ** n

# Mínimos cuadrados no lineales.
df_4 = df.loc[ (i, j), "INTENSIDAD" ].reset_index()
# Predictando.
Y = df_4["INTENSIDAD"].values
# Predictores.
X = np.swapaxes( df_4[ ["TIEMPO_RETORNO", "DURACION"] ].values, 0, 1 )
# Cálculo de parámetros.
fit = optimize.curve_fit( f = idT, xdata = X, ydata = Y,
    p0 = (1, 1, 1), full_output = True )

# Coeficientes de las curvas idT.
df_3.loc[ (i, j), cols[:-1] ] = fit[0]
# Error.
df_3.loc[ (i, j), cols[-1] ] = ( fit[2]["fvec"] ** 2 ).sum()

# Calculamos las intensidades.
X = np.swapaxes( df_2.loc[ (i, j) ].reset_index()[
    ["TIEMPO_RETORNO", "DURACION"] ].values, 0, 1 )
df_2.loc[ (i, j), "INTENSIDAD"] = idT(X, *df_3.loc[ (i, j) ].iloc[0:3] )

# Coeficientes
df_3.loc[ (i, j) ]

k          0.953427
m          1.011362
n          0.913854
error    137.758543
Name: (18.146495819091797, -100.39393615722656), dtype: object

In [63]:
# Método de Wenzel con mínimos cuadrados no lineales.

# Cargamos el archivo.
i = 3
name = [ "CHIRPS_comp", "CHIRPS_megalopolis", "prec_hist_comp_CHIRPS",
    "prec_hist_comp_estaciones", "prec_hist_hist" ]
path_ret = "../results/" + name[i] + "/" + name[i] + "_tretorno.nc"
ds_ret = xr.open_dataset(path_ret)
df = ds_ret.to_dataframe()

# Quitamos la intensidad, año, probabilidad, tiempo de retorno, y agregamos
# columnas para los parámetros de la distribución.
df = df.reorder_levels( ["LATITUD",
    "LONGITUD", "DURACION", "TIEMPO_RETORNO"] )
df_2 = df.copy().drop( ["PROBABILIDAD", "AÑO"], axis = 1 
    ).reset_index( "TIEMPO_RETORNO" )
df_2 = df_2.reset_index( "DURACION" )
df_3 = df_2.copy().drop(
    ["INTENSIDAD", "TIEMPO_RETORNO", "DURACION"], axis = 1
    ).groupby( ["LATITUD", "LONGITUD"] ).mean()
cols = ["k", "m", "n", "c", "error"]
df_3[ cols ] = None

# Creamos la tabla a predecir.
t = ( ( [5, 10, 25, 50, 100, 200, 500, 1000] + [None] *
    ( df_2["TIEMPO_RETORNO"].unique().shape[0] - 8 ) )
    * df_2.index.get_level_values("LONGITUD").unique().shape[0]
    * df_2.index.get_level_values("LATITUD").unique().shape[0]
    * df_2["DURACION"].unique().shape[0] )
df_2["TIEMPO_RETORNO"] = t
df_2 = df_2.dropna().set_index( "TIEMPO_RETORNO", append = True )

# Escogemos latitud y longitud.
i = df_3.index.get_level_values("LATITUD").unique()[N]
j = df_3.index.get_level_values("LONGITUD").unique()[W]

# Función que nos genera una curva idT.
def idT(X, k, m, n, d):
    return ( k * X[0] ** m ) / ( X[1] + d ) ** n

# Mínimos cuadrados no lineales.
df_4 = df.loc[ (i, j), "INTENSIDAD" ].reset_index()
# Predictando.
Y = df_4["INTENSIDAD"].values
# Predictores.
X = np.swapaxes( df_4[ ["TIEMPO_RETORNO", "DURACION"] ].values, 0, 1 )
# Cálculo de parámetros.
fit = optimize.curve_fit( f = idT, xdata = X, ydata = Y,
    p0 = (1, 1, 1, 1), full_output = True )

# Coeficientes de las curvas idT.
df_3.loc[ (i, j), cols[:-1] ] = fit[0]
# Error.
df_3.loc[ (i, j), cols[-1] ] = ( fit[2]["fvec"] ** 2 ).sum()

# Calculamos las intensidades.
X = np.swapaxes( df_2.loc[ (i, j) ].reset_index()[
    ["TIEMPO_RETORNO", "DURACION"] ].values, 0, 1 )
df_2.loc[ (i, j), "INTENSIDAD"] = idT(X, *df_3.loc[ (i, j) ].iloc[0:4] )

# Coeficientes
df_3.loc[ (i, j) ]

k          0.989165
m          1.011334
n          0.931184
c          0.042076
error    137.715232
Name: (18.146495819091797, -100.39393615722656), dtype: object

In [64]:
# Distribución de Valores Extremos Generalizada.

# Cargamos el archivo.
i = 3
name = [ "CHIRPS_comp", "CHIRPS_megalopolis", "prec_hist_comp_CHIRPS",
    "prec_hist_comp_estaciones", "prec_hist_hist" ]
path_ret = "../results/" + name[i] + "/" + name[i] + "_tretorno.nc"
ds_ret = xr.open_dataset(path_ret)
df = ds_ret.to_dataframe()

# Quitamos la intensidad, año, probabilidad, tiempo de retorno, y agregamos
# columnas para los parámetros de la distribución.
df = df.reorder_levels( ["LATITUD",
    "LONGITUD", "DURACION", "TIEMPO_RETORNO"] )
df_2 = df.copy().drop( ["PROBABILIDAD", "AÑO"], axis = 1 
    ).reset_index( "TIEMPO_RETORNO" )
#df_2 = df_2.reset_index( "DURACION" )
df_3 = df_2.copy().drop( ["INTENSIDAD", "TIEMPO_RETORNO"], axis = 1
    ).groupby( ["LATITUD", "LONGITUD", "DURACION"] ).mean()
cols = ["GEV_C", "GEV_LOC", "GEV_SCALE", "KTEST_P"]
df_3[ cols ] = None

# Creamos la tabla a predecir.
t = ( ( [5, 10, 25, 50, 100, 200, 500, 1000] + [None] *
    ( df_2["TIEMPO_RETORNO"].unique().shape[0] - 8 ) )
    * df_2.index.get_level_values("LONGITUD").unique().shape[0]
    * df_2.index.get_level_values("LATITUD").unique().shape[0]
    * df_2.index.get_level_values("DURACION").unique().shape[0] )
df_2["TIEMPO_RETORNO"] = t
df_2 = df_2.dropna().set_index( "TIEMPO_RETORNO", append = True )

# Escogemos latitud y longitud.
i = df_3.index.get_level_values("LATITUD").unique()[N]
j = df_3.index.get_level_values("LONGITUD").unique()[W]

df_4 = df.loc[ (i, j), ["INTENSIDAD"] ]

# Ajuste de la distribución GEV por duración.
for k in df_3.index.get_level_values("DURACION").unique():
    # ajustamos la distribución de valores extremos.
    params = stats.genextreme.fit( df.loc[ (i, j, k), "INTENSIDAD" ] )
    # Hacemos la prueba Kolmogorov Smirnoff.
    pvalue = stats.kstest( df.loc[ (i, j, k), "INTENSIDAD" ],
    stats.genextreme(*params).cdf ).pvalue
    df_3.loc[ (i, j, k), cols ] = [*params] + [pvalue]
    # Calculamos las intensidades de acuerdo con la distribución.
    df_2.loc[ (i, j, k), "INTENSIDAD"] = stats.genextreme(
        *df_3.loc[ (i, j, k), cols[:-1] ] ).isf(
        1 / df_2.loc[ (i, j, k) ].index.get_level_values("TIEMPO_RETORNO") )
    # Calculamos las intensidades para calcular el error.
    df_4.loc[ (k), "INTENSIDAD" ] = stats.genextreme(
        *df_3.loc[ (i, j, k), cols[:-1] ] ).isf(
        1 / df_4.loc[ (k) ].index.get_level_values("TIEMPO_RETORNO") )
    
# Error.
error = ( ( df_4["INTENSIDAD"] - df.loc[ (i, j), "INTENSIDAD" ] ) ** 2 ).sum()
print(f"Error: {error:.6f}")

Error: 638.001038
