In [None]:
import os              #Operating System
import math            #funzioni matematiche
import numpy as np
import pandas as pd
import datetime

#from datetime import datetime #
%matplotlib inline 
import matplotlib.pyplot as plt #Plot

# Strumenti di Statistica (ECDF, GUMBEL, MinimiQuadrati)
from statsmodels.distributions.empirical_distribution import ECDF
from scipy.stats import genextreme,gumbel_r, norm
from scipy.optimize import least_squares
from numpy import linspace


In [None]:
if os.getcwd() != '/home/fanda/Dropbox/doc/trento/Tesi/data':
    os.chdir("./data")
os.listdir()

In [None]:
file = ! zenity --file-selection  --separator=, --file-filter=*.xls
print(str(file[0]))
dati = pd.read_excel(str(file[0]) ) # si può usare anche read_csv

dati.columns = ['year','15m', 'date15h',
                       '30m', 'date30m',
                       '45m', 'date45m',
                       '1h',  'date1h',
                       '3h',  'date3h',
                       '6h',  'date6h',
                       '12h', 'date12h',
                       '24h', 'date24h',
                       '1g',  'date1g',
                       '2g',  'date2g',
                       '3g',  'date3g',
                       '4g',  'date4g',
                       '5g',  'date5g'     ] # Rinomina gli indici delle colonne(va bene per il sito meteotrentino.it)
dati = dati[ dati.year.notnull() ]            # elimino le righe senza informazioni
dati = dati[ dati.year != "anno" ]            # elimino le righe senza informazioni
#dati.year = [ int(i)  for i in dati.year ]    # trasformo da float a integer gli anni
dati = dati.set_index('year')                 # imposto l'anno come indice delle righe



### tempi di interesse
seleziono quali tempi di pioggia sono interesse dell'analisi

In [None]:
TR = [10,20,100] # Tempi di ritorno

times_rain = ['15m', '30m', '45m', '1h', '3h', '6h', '12h', '24h', '1g', '2g', '3g', '4g', '5g', ]

dati = dati.apply(pd.to_numeric, errors='coerce') # trasforma tutti i numeri in float

times_toremove = [] # rimuovo i tempi nulli
for t in times_rain:
    #print(t)
    if dati[t].isnull().all():
        #print('Tutti i valori di ' + t + ' sono nulli')
        times_toremove.append(t)
for t in times_toremove:
    times_rain.remove(t)
    
dati = dati[times_rain]
dati

## Metodi di Fitting

### Media e deviazione standard delle misurazioni

In [None]:
fit = pd.DataFrame(dati.mean(), index = times_rain, columns=["mean"])
fit['std'] = dati.std()
fit

### Metodo dei Momenti

Il metodo dei momenti si riduce alla risoluzione di:
\begin{equation}
\left\{
\begin{array}{l}
\mu_H = b \gamma + a \\
\sigma^2_H = b^2  \frac{\pi^2}{6}
\end{array}
\right.
\end{equation}
dove $a$ e $b$ sono i parametri da stimare $\mu_H$ è la media del campioni di dati e $\sigma_H$ è la deviazione standard dei medesimi dati. 
Dalla seconda equazione si ricava:
\begin{equation}
b = \frac{\sqrt{6}}{\pi} \sigma_H
\end{equation}
che, sostituito nella prima equazione, dà:
\begin{equation}
a = \mu_H -\frac{\sqrt{6}\ \gamma}{\pi} \sigma_H 
\end{equation}
dove $\gamma$ è la costante di Eulero-Mascheroni pari a 0.577

In [None]:
def MetodoMomenti(data):
    mean = data.mean()
    sd   = data.std()
    Mascheroni = 0.577215664901532860606512090 # numero di Mascheroni
    return [mean - math.sqrt(6) / math.pi * Mascheroni * sd,  
            math.sqrt(6) / math.pi * sd]

### Metodo della Massima Verosimiglianza
Ricordiamo l'espressione della famiglia parametrica di curve di Gumbel:
 $$P[H<h;a,b] = exp\left(-exp\left(-\frac{h-a}{b}\right)\right) $$

Il metodo della massima verosimiglianza calcola i valori dei parametri 
per cui  la probabilità congiunta di ottenere una serie di dati  $\{h_1, \cdot \cdot, h_n \}$ è massima:
\begin{equation}
{\rm argmax}_{a,b} P[\{h_1, \cdot \cdot, h_n \};a,b] = {\rm argmax}_{a,b} \prod_i^n P[h_i;a,b]
\end{equation}

In [None]:
#In python è un metodo già implementato attraverso 
#la funzione gumbel_r.fit determina i parametri a,b
def Metodo_Massima_Verosimiglianza(data):  
    param = gumbel_r.fit(data) # distribution fitting
    return param


### Metodo dei Minimi Quadrati
Il metodo dei minimi quadrati è implementato nel software come un problema di ottimizzazione in (a,b) di un sistema non linare:
\begin{equation} 
\delta^2(a,b) = \sum_i^N (ECDF_i-P[h_i;a,b])^2 \to \rm min 
\end{equation}
dove $ECDF_i$ sono i valori sperimentali e $P[h_i;a,b]$ sono i valori attesi

In [None]:
def gumbel(x,a,b):
    return np.exp(-np.exp(-(x-a)/b)) # Valori attesi

def fun(x,t,y): # least_squares vuole x come incognite
    a =x[0]
    b = x[1]
    return gumbel(t,a,b) - y

def Metodo_Minimi_Quadrati(dat):
    t = sorted(dat) 
    y = ECDF(t)(t) # Valori Sperimentali
    res_lsq = least_squares(fun, #funzione
                            x0=[dat.mean(),10.], #punti iniziali
                            args=(t, y),
                            )
    return res_lsq.x

In [None]:
# Applico i 3 modelli di fitting ai vari dati

mm, mv, mq = [], [], []

for i in times_rain:
    dt = dati[i].dropna()
    mm.append(MetodoMomenti(dt))
    mv.append(Metodo_Massima_Verosimiglianza(dt))
    mq.append(Metodo_Minimi_Quadrati(dt))
    
mm = pd.DataFrame(mm, columns=["mm_a","mm_b"], index=times_rain)
mv = pd.DataFrame(mv, columns=["mv_a","mv_b"], index=times_rain)
mq = pd.DataFrame(mq, columns=["mq_a","mq_b"], index=times_rain)

fit = pd.concat([fit,mm,mv,mq], axis=1, sort=False)
fit

### Test di Pearson

Il test di perarson è un test non parametrico che consiste nel 
- suddividere il campo di probabilità in $k$ parti
- derivarne una suddivisione del dominio
- contare il numero di dati in ciascun intervallo 
- nel caso di $k$ parti uguali vautare la funzione:

$$ \chi^2 = \frac{k}{n} \sum_{j=1}^k (N_j - \frac{n}{k})^2 $$



* Valuazione sulla bontà del risultato

In [None]:
def x_test(dat, a, b, tr, k): 
    dt = dat.dropna()
    #print(dt)
    n = len(dt)
    #print("lh: ",lh)
    # suddivido il campo in k parti uguali
    q  = [ 1/k * (i+1) for i in range(k) ]
    #print("q: ",q)
    rv = gumbel_r( loc= a, scale= b ) 
    r  = ECDF(dt)(rv.ppf(q))          # [lista] numero di valori sperimentali in ogni quantile
    #print("r: ",r)
    oss = n * r
    #print("o0: ",o0)
    N  = oss - np.append([0],np.delete(oss,-1)) # numero di osservazioni in ogni campo
    #print("o: ",o)
    e  = [1/k * n for i in range(k)]  # caso ottimale di valori egualmente distribuiti
    #print("e: ",e)
    return ((N-e)**2/e).sum()  

#Chi TEST
chi = []
for time in times_rain:
    elem=[]
    for metodo in ['mm', 'mv', 'mq']:
        a = fit.loc[time, metodo+'_a']
        b = fit.loc[time, metodo+'_b']
        # print(a,b)
        test = x_test(dati[time], a, b, time, 5) 
        elem.append( round(test, 3) )
    chi.append(elem)
    
chi = pd.DataFrame(chi, columns=["chi_mm", "chi_mv", "chi_mq"], 
                   index=times_rain)
fit = pd.concat([fit,chi], axis=1, sort=False)
fit

In [None]:
#Determina la migliore approsimazione (quella con il chi^2 minore) e 
m_best = chi.T.idxmin()
for i in times_rain:
    m_best[i] = m_best[i].replace('chi_','')

a = []
b = []
for i in times_rain:
    best = m_best.loc[i]
    best = best.replace('chi_','')
        
    if "mm" in best :
        a.append(fit.loc[i,'mm_a'])
        b.append(fit.loc[i,'mm_b'])
    elif "mv" in best:
        a.append(fit.loc[i,'mv_a'])
        b.append(fit.loc[i,'mv_b'])
    elif "mq" in best:
        a.append(fit.loc[i,'mq_a'])
        b.append(fit.loc[i,'mq_b'])
    else: 
        a.append('ERRORE!')
        b.append('ERRORE!')
        

#print(a,b)
#print(m_best)

fit['best'] = m_best
fit['a'] = a
fit['b'] = b

fit

## Grafici delle distribuzioni
Andiamo a disegnare le curve di gumbel ottenute dai vari metodi di fitting, andando a ingrossare quella con il punteggio del  test di Pearson migliore. Disegneremo in seguito le curve di gumbel scelte e la densità di probabilità.

In [None]:
def gumbel(x,a,b):
    return np.exp(-np.exp(-(x-a)/b))
#plt.rc('xtick', labelsize=12) #Questi comandi globali
#plt.rc('ytick', labelsize=12) #Questi sono comandi globali  
#plt.rc("savefig",dpi=300)

for i in times_rain:
    
    df = pd.DataFrame()

    space=np.linspace(dati[i].min(),dati[i].max(),100)
    df['h']  = space 
    df['mm'] = gumbel(space, fit.loc[i,'mm_a'], fit.loc[i,'mm_b'])
    df['mv'] = gumbel(space, fit.loc[i,'mv_a'], fit.loc[i,'mv_b'])
    df['mq'] = gumbel(space, fit.loc[i,'mq_a'], fit.loc[i,'mq_b'])
    
    list_width=[1,1,1]
    best = fit.loc[i, 'best']
    if   'mm' in best:
        list_width[0]=2.5
    elif 'mv' in best:
        list_width[1]=2.5
    elif 'mq' in best:
        list_width[2]=2.5
      
    graf = df.plot( 
                 'h' ,    'mm' , linewidth=list_width[0], label='mm')
    graf.plot(df['h'], df['mv'], linewidth=list_width[1], label='mv')
    graf.plot(df['h'], df['mq'], linewidth=list_width[2], label='mq')
    
    # Sovrappongo i Punti Sperimentali
    x = dati[i].dropna()
    
    graf.plot(x,ECDF(x)(x),'.')
    
    #Tabella con i parametri di gumbel

    table=pd.DataFrame()
    table['a'] = fit.loc[ i, ['mm_a','mv_a','mq_a']].values.tolist()
    table['b'] = fit.loc[ i, ['mm_b','mv_b','mq_b']].values.tolist()
    table['c'] = fit.loc[ i, ['chi_mm','chi_mv','chi_mq']].values.tolist()
    table=table.round(2)
    
    graf.table(cellText=table.values.tolist(), 
               fontsize=10,                 
               #colWidths = [.15,.15,.15],
               rowLabels=['mm','mv','mq'],
               colLabels=['a','b','chi'],
               #colLoc='center', loc='bottom',
               bbox=[.6, 0.05, .35, .3],
              )

    
    graf.legend(loc='best')
    title='Curve di gubel per pioggia di '+i
    graf.set_title(title, size=18)
    graf.set_xlabel('Altezza pioggia [mm]')
    graf.set_ylabel('P[H<h]')
    
    fig = graf.get_figure()
    fig.savefig('gumbel_fit' + i + '.png', dpi = 300)


In [None]:
#Grafico delle curve ottimali per i vari tempi di pioggia

space=np.linspace(dati.min()[0],dati.max()[-1],100)

gb = pd.DataFrame()

for i in times_rain:
    gb[i] = gumbel(space, fit.loc[i,'a'], fit.loc[i,'b'])

gb.index=space
gb

plt=gb.plot()

for i in times_rain: 
    x = dati[i].dropna()
    plt.plot(x, ECDF(x)(x), '.')

## Assi
plt.set_title('Distribuzione di  probabilità di Gumbel, Cumulate', size=12)
plt.set_xlabel('Altezza pioggia [mm]')
plt.set_ylabel('P[H<h]')
fig = plt.get_figure()
fig.savefig(file[0] + '.Gumbel.png', dpi = 300)

In [None]:
def gumbel_dens(x,a,b):
    z = (x-a) / b
    return 1/b*np.exp( -z - np.exp( -z ))

gb_dens = pd.DataFrame()
space=np.linspace(dati.min()[0],dati.max()[-1],100)
for i in times_rain:
    gb_dens[i] = gumbel_dens(space, fit.loc[i,'a'], fit.loc[i,'b'])
    
gb_dens.index=space
gb_dens

plt = gb_dens.plot()

plt.set_title('Distribuzione di Gumbel, densità di probabilità', size=12)
plt.set_xlabel('Altezza pioggia [mm]')
plt.set_ylabel('P[h]') ## Chiedere al professore la corretta etichetta
#plt.get_figure().savefig('GumbelDensità.png')

## Linee Segnalatrici di Possibilità Pluviometrica
Si va ora a interpolare i punti dei quantili con un prefissato tempo di ritorno,
per le varie durate di pioggia.

In [None]:
def qtls(a,b,tr):
    qu=1-1/tr
    rv1h=gumbel_r(loc=a,scale=b)
    return rv1h.ppf(qu) #Percent point function

pts = []
for tr in TR:
    a = []
    for i in times_rain:
        a.append(qtls(fit.loc[i,'a'], fit.loc[i,'b'],tr))
    pts.append(a)

pts=pd.DataFrame(pts, index=TR, columns=times_rain).T
pts

In [None]:
#Grafico delle curve ottimali per i vari tempi di pioggia
#
#space=np.linspace(dati.min()[0],dati.max()[-1],1000)
#
#gb = pd.DataFrame()
#
#for i in times_rain:
#    gb[i] = gumbel(space, fit.loc[i,'a'], fit.loc[i,'b'])
#
#gb.index=space
#gb
#
#plt=gb.plot()
#
#for tr in TR: 
#    x = pts[tr]
#    y = [1-1/tr] *len(x)
#    plt.plot(x, y, '.', label = 'TR=' + str(tr) + ' anni')
#plt.legend()
#
## I punti devono stare sulle curve
#
### Assi
#plt.set_title('Curve probabilità di Gumbel', size=16)
#plt.set_xlabel('Altezza pioggia [mm]')
#plt.set_ylabel('P[H<h]')

Le curve di possibilità pluviometrica, sono rette nel piano bilogaritmico. Dovremo determinare i parametri a(Tr) e n
$$ h(t_p, Tr) = a(T_r) t_p^n$$
$$\log h(t_p, Tr) = \log a(T_r) + n \log t_p $$

In [None]:
def parm_prox(x,y):
    # Least squares polynomial fit
    a=np.polyfit(np.log(x), np.log(y), 1) 
    return (a[0], np.exp(a[1]))

times_rain_numeric = []

for i in times_rain: # Trasformo i minuti in ore
    if 'm' in i:
        i = i.replace('m','')
        i = float(i)
        i = i / 60
    elif 'g' in i:
        i = i.replace('g','')
        i = float(i)
        i = i * 24
    elif 'h' in i:
        i = i.replace('h','')
        i = float(i)
    times_rain_numeric.append(i)

fnl =  []
for tr in TR:
    fnl.append(parm_prox(x=times_rain_numeric,y=pts[tr]))
    
fnl = pd.DataFrame(fnl, columns=["n","a"], index=TR)
#
fnl.to_latex(file[0] +'LSPP.tex') #coefficenti a, n della curva di possibilità

Le Linee Segnalatrici di Possibilità Pluviometrica per tempi di ritorno di anni prendono le seguenti forme:

$$ h(t, tr) = a t^{n}$$


In [None]:
space = times_rain_numeric

def h(t,a,n):
    return a*t**n

ddf = pd.DataFrame()

for tr in TR:
    height = h(space, fnl.loc[tr,'a'], fnl.loc[tr,'n'])
    ddf[tr] = height
    
ddf.index = space 
ddf

plt = ddf.plot()
#plt.show()
## Assi
plt.set_title('Curve probabilità Pluviometrica', size=16)
plt.set_xlabel('Altezza pioggia [mm]')
plt.set_ylabel('Tempo di Pioggia [h]')


In [None]:
plt_log = ddf.plot()

plt_log.grid(True,)
#plt_log.set_xticks([1,5,10,50,100,500])

plt_log.plot(ddf, '.')
plt_log.set_yscale('log')
plt_log.set_xscale('log')
## Assi
plt_log.set_title('Linee di probabilità Pluviometrica', size=16)
plt_log.set_xlabel('Altezza pioggia [mm]')
plt_log.set_ylabel('Tempo di Pioggia [h]')

#Tabella con i parametri di gumbel
table=pd.DataFrame()
table['a'] = fnl.loc[ :,'a' ].values.tolist()
table['n'] = fnl.loc[ :,'n' ].values.tolist()
table=table.round(3)
   
plt_log.table(cellText=table.values.tolist(), 
           fontsize=10  ,               
          #colWidths = [.15,.15,.15],
           rowLabels=TR,
           colLabels=['a','n'],
          #colLoc='center', loc='bottom',
           bbox=[.62,.1,.28,.28],
           )
fig = plt_log.get_figure()
fig.savefig(file[0] + '.LSPP.png', dpi = 300)

In [None]:
Htot = []
Junit = []

for time in times_rain_numeric:
    Htime = []
    Jtime = []
    for tr in TR:
        a = fnl.loc[ tr,'a' ]
        n = fnl.loc[ tr,'n' ]
        Htime.append(a * time ** n)
        Jtime.append(a * time ** (n-1)) # intensità in mm/hr
    Htot.append(Htime)
    Junit.append(Jtime)

Htot = pd.DataFrame(Htot, index = times_rain, columns = TR)
Junit = pd.DataFrame(Junit, index = times_rain, columns = TR)

In [None]:
Htot.to_latex(file[0] +'Hpioggia.tex')


In [None]:
Junit.to_latex(file[0] +'Jpioggia.tex')


In [None]:
# Produco i file per le timeseries in EPA_SWMMM

os.chdir('../timeseries/')

for tr in TR:
    for t in times_rain_numeric:
        timeRain = times_rain[times_rain_numeric.index(t)]
        t = t * 60  # trasformo in minuti
        j = Junit.loc[timeRain,tr] # altezza di pioggia unitaria
        clock = 1.
        day0 = 1
        f = open('TR' + str(tr)+'_t'+timeRain+'.dat', 'w')
        
        while clock <= t:
            day  = int(clock/1440)
            hour = int((clock - day * 1440)/60)
            minute = clock - hour*60 - day * 1440
            day = day + day0
            
            print("01/%02d/2020    %02d:%02d    %5f" % (day, hour, minute, j), file = f)
            clock +=1
        f.close()
        

In [None]:
# Modifico il file .inp in modo che presenti tutti le timeseries
import swmmio, shutil
from swmmio.utils.modify_model import replace_inp_section
from swmmio import create_dataframeINP

baseline = swmmio.Model(r'../inp_file/rovereto.inp')
#TIMESERIES = create_dataframeINP(baseline.inp.path,'[TIMESERIES]')

list = []

for tr in TR:
    for t in times_rain_numeric:
        timeRain = times_rain[times_rain_numeric.index(t)]
        t = t * 60  # trasformo in minuti
        #h = Junit.loc[timeRain,tr] # altezza di pioggia unitaria
        #clock = 1.
        #day0 = 1
        name = 'TR' + str(tr) + '_t' + timeRain
        list.append(name+ '        FILE "../timeseries/' + name +'.dat"')
df = pd.DataFrame(list)
df.columns = ['[TIMESERIES]']
df

# Creo nuovo inp file con un con le timeseries modificate
newfilepath = os.path.join('../inp_file/' + baseline.inp.name + "_" + 'mod' + '.inp')
shutil.copyfile(baseline.inp.path, newfilepath)
replace_inp_section(newfilepath, '[TIMESERIES]', df)
