In [None]:
import pandas as pd
import numpy as np
import sqlalchemy as sq
import scipy as sp

In [None]:
class hydropost:
    """
    Класс hydropost создан для начальной выгрузки, агрегации, предобработки данных с -гидро, -метео постов.
    Класс является родительским для класса postmodel, и служит для полуавтоматической предобработки данных.
    Важнейший атрибут класса- .hydroframe датафрейм, хранящий все данные, используемые для обучения моделей.
    """
    def __init__(self,
                 loc_id='3010',              #id гидропоста, 4 цифры без первого нуля
                 high_of_measurement=281):   #Высота над нулем поста. Необязательный параметр,
                                             #используется для перехода от высоты уровня воды над нулем поста к абсолютной высоте над уровнем моря. Брал с allrivers

        self.loc_id = loc_id

        connectionstring ='dialect+driver://username:password@host:port/database' # Здесь необходимо вставить connectionstring пользователя по образцу
        engine = sq.create_engine(connectionstring)
        self.connection = engine.connect()
        station_info_sql = 'SELECT station_id, station_name, ST_X(geom) AS lat, ST_Y(geom) AS lon, geom FROM atlas.hydro_stations_view WHERE station_id='+str(loc_id)
        station_data_sql='''SELECT ts AS date,
            min_level,
            max_level,
            avg_level,
            water_temperature,
            discharge, snow_depth,
            ice_thickness,
            precipitation_amount
            FROM rosgidromet.hydro_arch
            WHERE identifier='''+str(loc_id)

        self.station_info = pd.read_sql(station_info_sql,self.connection)
        station_data=pd.read_sql(station_data_sql,self.connection)
        self.coordinats = {'lat': float(self.station_info['lat']),  #Храним координаты гидропоста для выбора ближайших метеопостов
                           'lon': float(self.station_info['lon']),
                           'geom':self.station_info['geom'].values[0] }
        #Формируем рабочий датафрейм
        self.hydroframe=station_data
        self.hydroframe=self.hydroframe[~self.hydroframe['date'].duplicated(keep='last')]  #Избавляемся от дубликатов по дате.
        self.hydroframe = self.hydroframe.set_index('date')
        self.hydroframe.index=pd.to_datetime(self.hydroframe.index.date)
        self.hydroframe.sort_index(inplace=True)
        self.hydroframe=self.hydroframe.asfreq(freq='d', fill_value=None)
        self.hydroframe['max_level'] += high_of_measurement         #Для удобства добавляем высоту над нулем поста к данным,
        self.hydroframe['min_level'] += high_of_measurement         #тогда, текущая единица измерения- сантиметры над уровнем моря.
        self.hydroframe['avg_level'] += high_of_measurement
        self.hydroframe['minmaxdelta']=self.hydroframe['max_level']-self.hydroframe['min_level']        #Создаем переменную абсолютного изменения за сутки.
        self.hydroframe.drop(columns=['min_level','avg_level'], inplace=True)                           #и удаляем неинформативные признаки
        self.hydroframe["day_cos"] = [np.cos(x * (2 * np.pi / 365)) for x in self.hydroframe.index.dayofyear]  #Добавляем время, разложенное в тригонометрическую функцию .
        self.hydroframe["day_sin"] = [np.sin(x * (2 * np.pi / 365)) for x in self.hydroframe.index.dayofyear]
        self.hydroframe.dropna(how='all', inplace=True, axis=1)                                                #Удаляем пустые столбцы.


    def add_snowdata(self,
                     n_nearest=5,   # Количество ближайших к гидропосту метеопостов, данные с которых будем выбирать
                     addlags=None,  # None или максимальный временной лаг выбираемых признаков.
                     add_count=5):  # Количество признаков, добавляемых к датафрейму в результате выполнения.
        """
        Метод добавляет к hydroframe количество признаков add_count из данных маршрутных снегосъемок n_nearest ближайших постов.
        С одного метеопоста берется не более одного признака. В ходе выполнения данные выгружаются из озера, лаггируются на интервале
        addlags, ранжируются в соответствии с модулем корреляции по убыванию, добавляются самые скоррелированные с max_level
        """

        nearest_stations_sql='SELECT DISTINCT atlas.aisori_snow.station_id,'\
            +'rosgidromet.stations.location,' +'rosgidromet.stations.location <-> '+"'"+ self.coordinats['geom']+"' as dist " \
            +'''FROM atlas.aisori_snow, rosgidromet.stations
            WHERE atlas.aisori_snow.station_id::int = rosgidromet.stations.identifier::int
            ORDER BY dist''' +' '+ 'limit ' +str(n_nearest)

        self.nearest_snowstations = pd.read_sql(nearest_stations_sql,self.connection) #Выгружаем список ближайших к гидропосту метеопосты

        most_corr_per_post={}
        for post in self.nearest_snowstations['station_id']:

            #С метеопоста выгружаем типы признаков, показавших наибольшую корреляцию c max_level в ходе предварительных экспериментов
            snowstation_data=pd.read_sql('''SELECT
                                dt as date,
                                route_type,
                                station_snow_coverage,
                                route_snow_coverage,
                                snow_cover_average_height,
                                snow_cover_max_height,
                                snow_density,
                                nature_of_snow_cover_occurrence,
                                snow_cover_nature
                                FROM atlas.aisori_snow
                                WHERE station_id =''' \
                                +str(post) +"and dt>=TO_DATE('" +str(self.hydroframe.index[0])+"','YYYY-MM-DD')" +" and dt<=TO_DATE('"+ str(self.hydroframe.index[-1])+"','YYYY-MM-DD')"+ 'ORDER BY date'   ,connection)

            workframe_all_routes=pd.DataFrame(self.hydroframe['max_level'])

            for route in range(1,4):     
                                                                                           #На метеопосте осуществляется проходка по одному или нескольким из трех возможных маршрутов
                workframe=snowstation_data.loc[snowstation_data['route_type']==route]\
                .set_index('date').drop(columns=['route_type'])\
                    .add_prefix(str(post)+'_r_'+str(route)+'_')
                workframe.index=pd.to_datetime(workframe.index)
                workframe.index=workframe.index.date
                workframe.sort_index(inplace=True)
                workframe_all_routes=pd.concat([workframe_all_routes,workframe],axis=1)

            
            workframe_all_routes.dropna(how='all', axis='columns', inplace=True)                                                      #Получили workframe_all_routes датафрейм, содержащий признаки на каждом из доступных маршрутов
            workframe_all_routes=workframe_all_routes.interpolate(method='linear', limit_area='inside', limit=30)                     #Интерполируем для заполнения пробелов между измерениями
            workframe_all_routes=workframe_all_routes.loc[:,(workframe_all_routes.isna().sum()/workframe_all_routes.shape[0] < 0.5)]  #Удаляем признаки с пропускми больше 50%. Для более теплых районов следует подумать над коэффициентом
            workframe_all_routes=workframe_all_routes.fillna(0)

            if addlags!=None:   #Лаггируем признаки для добавления в фрейм
                lagged_corr=pd.DataFrame()
                for lag in range(1, addlags+1):
                    lagged_corr=pd.concat([lagged_corr,
                                           workframe_all_routes.drop(columns=['max_level']).shift(lag).iloc[(lag+1):].add_suffix(('_lag_'+str(lag))) ], axis=1 )

                workframe_all_routes=pd.concat([workframe_all_routes, lagged_corr], axis=1)
                                                                                                                                      #Ранжируем признаки в соответстваии с коэффицентом корреляции, берем add_count признаков
                corr_rank=pd.DataFrame(workframe_all_routes.drop(columns=['max_level'])\
                                       .corrwith(workframe_all_routes['max_level'])\
                                       .abs().sort_values(ascending=False).head(add_count))

            try:    #добавляем в словарь ключ- модуль коэффициета корреляции, значение- самый скоррелированный признак
                most_corr_per_post[float(corr_rank.iloc[0])]= workframe_all_routes[corr_rank.index[0]]
            except IndexError:
                continue
        #Из отсортированного по ключу (модуль коэффициентакорреляции) most_corr_per_post словаря добавляем add_count признаков к hydroframe
        for key in sorted(most_corr_per_post.keys(), reverse=True)[:add_count]:
            most_corr_per_post[key].index=pd.to_datetime(most_corr_per_post[key].index)
            self.hydroframe=self.hydroframe.merge(most_corr_per_post[key].fillna(0), right_index=True, left_index=True)




    def add_weather_data(self, n_nearest=5): # n_nearest- из скольки ближайших постов формировать признаки
        """
        Метод добавляет к hydroframe климатические признаки. Значения максимальной, минимальной температуры воздуха, скорости ветра, влажности
        усредняются, значения дневных осадков усредняются с весовыми коэффициентами до достижения максимальной корреляции с max_level
        """

        nearest_stations_sql='''select distinct on (dist)
                                stationid, wkb_geometry, namelong,
                                wkb_geometry <->''' +"'"+ self.coordinats['geom']+"' as dist "+\
                                '''from (select * from (select
                                distinct identifier from rosgidromet.meteo_daily) as tab
                                inner join rosgidromet.asunp_station on rosgidromet.asunp_station.stationid= tab.identifier
                                order by wkb_geometry <->'''+\
                                "'"+ self.coordinats['geom']+"'"+\
                                'limit 1000) as tab2 where meteo::float != 0 order by dist limit '+ str(n_nearest)

        self.nearest_meteostations = pd.read_sql(nearest_stations_sql, self.connection)
        meteostation_data={}
        for station in self.nearest_meteostations['stationid']:

            meteostation_data[station]=pd.read_sql('''select dt as date,
                                        maxairtemperature,
                                        minairtemperature,
                                        soiltemperature,
                                        totalaccumulatedprecipitation,
                                        windspeed,
                                        maxrelativehumidity
                            from rosgidromet.meteo_daily
                            where identifier::float =''' +str(station)\
                            +" and dt>=TO_DATE('" + str(self.hydroframe.index[0])\
                            + " ','YYYY-MM-DD') and dt<=TO_DATE('" +str(self.hydroframe.index[-1]) + " ' ,'YYYY-MM-DD') ORDER BY date" , connection)\
                            .set_index('date')
            meteostation_data[station]=meteostation_data[station][~meteostation_data[station].index.duplicated(keep='first')]

        for feature in ['maxairtemperature',
                        'minairtemperature',    #   'soiltemperature', - пропущенны данные по всем постам в 20,19, 21 годах
                        'windspeed',
                        'maxrelativehumidity']:
            for station in meteostation_data:
                frame=pd.concat([meteostation_data[station][feature] for station in meteostation_data], axis=1)
                frame=frame.loc[:,(frame.isna().sum()/frame.shape[0] < 0.5)]
                frame=frame.mean(axis=1) 
                frame.name=feature+'_mean'
            self.hydroframe=self.hydroframe.merge(frame, right_index=True, left_index=True,how='right')

        self.weights={}
        for feature in ['totalaccumulatedprecipitation']:
            for station in meteostation_data:
                precipitation_frame=pd.concat([meteostation_data[station][feature].rename(station+'_'+feature, inplace=True) for station in meteostation_data], axis=1)
                precipitation_frame.dropna(how='all', axis='columns', inplace=True)
                precipitation_frame.fillna(0, inplace=True)

            bnds=[(0.0, 1.0)]*precipitation_frame.shape[1]
            constraints=[{'type':'eq', 'fun':lambda x: 1-sum(x)}]

            def function(weights=[],ret_frame=False):
                frame=precipitation_frame.copy()
                for num, col in enumerate(frame):
                    frame[col]=frame[col]*weights[num]
                frame=frame.sum(axis=1)
                if ret_frame==False:
                    return -abs(frame.corr(self.hydroframe['max_level']))
                elif ret_frame==True:
                    return frame

            sol=sp.optimize.basinhopping(function,
                                            [1/precipitation_frame.shape[1]]*precipitation_frame.shape[1],
                                            niter=5,
                                            stepsize=0.5,
                                            T=0.01,
                                            interval=5,
                                            minimizer_kwargs={'bounds':bnds, 'constraints':constraints})

            self.weights[feature]=dict(zip(precipitation_frame.columns, sol.x))
            ret=function(sol.x, ret_frame=True).rename('weight_mean_'+feature)
            ret.index=pd.to_datetime(ret.index)
            self.hydroframe=self.hydroframe.merge(ret, right_index=True, left_index=True, how='right')



    def diff_timeseries(self,
                        features=['max_level',   # Список признаков, подлежащих дифференцированию
                                  'water_temperature',
                                  'air_temperature'],
                        add=False,               # Добавить дифференцированный признак в датафрейм, или заменить исходный
                        ret=False ):             # Вернуть признак вместо добавления или замены в hydroframe
        """Метод позволяет дифференцировать временные ряды hydroframe, хранить первое значение, восстанавливать 
        ранее дифференцированные временные ряды.
        """
        if hasattr(self, 'firstval')==False:
            self.firstval={}
        if ret==False:
            for feature in features:
                self.firstval[feature]=self.hydroframe[feature].iloc[0]
                if add==False:
                    self.hydroframe[feature]=self.hydroframe[feature].diff()
                elif add==True:
                    self.hydroframe=self.hydroframe.merge(self.hydroframe[feature].diff().rename('diff_'+feature), left_index=True, right_index=True)
        elif ret==True:
            for feature in features:
                self.hydroframe[feature]= np.r_[self.firstval[feature], self.hydroframe[feature].iloc[1:]].cumsum().astype('float64')



    def interp_gaps(self, 
                    fillwith=None,    #Чем запронять значения, не попавшие в интервал интерполяции
                    features=[],      #Признаки, подлежащие интерполяции 
                    between=10,       #Максимальное число пропусков между двумя значениями, подлежащие интерполяции
                    method='linear'): #Метод интерполяции
        """
        Интерполирование пропусков временных рядов features из hydroframe с условием заполнения не более чем between пропусков между двумя значениями,
        и заполнением не интерполированных пропусков константами 
        """
        for feature in features:
            self.hydroframe[feature]=self.hydroframe[feature].interpolate(method=method, limit_area='inside', limit=between)
            if fillwith !=None:
                self.hydroframe[feature]=self.hydroframe[feature].fillna(fillwith)



    def log_timeseries(self, 
                       features=['max_level',
                                 'min_level',
                                 'avg_level',
                                 'discharge'], 
                       inv=False):
        """
        Логарифмирование временных рядов features из hydroframe с возможностью восстановления (экспоненциирования).
        """
        for feature in features:
            if inv==False:
                self.hydroframe[feature]=np.log(self.hydroframe[feature])
            else:
                self.hydroframe[feature]=np.exp(self.hydroframe[feature])



    def categorical_dummy(self,features=['water_code_status']):
        """ 
        Даммификация кодов состояний водных объектов
        """
        for feature in features:
            self.hydroframe=self.hydroframe.merge(self.hydroframe[features].squeeze().str.get_dummies(sep=',').add_prefix('ksvo_'), left_index=True, right_index=True)
            del(self.hydroframe[feature])


            
    def mean_timeseries(self,
                        inverse=None,                  # Series, для которого нужно вернуть обратно преобразованный Series
                        target='max_level',            # Имя переменной для обратного преобразования
                        features=['avg_level',         # Признаки из hydroframe для прямого преобразования
                                  'min_level',
                                  'max_level',
                                  'water_temperature',
                                  'ice_thickness',
                                  'snow_depth',
                                  'air_temperature', 
                                  'precipitation_amount',
                                  'discharge'],
                        scaling=True,                   # Масштабирование признака на единичном интерваое
                        ret=False):                     # Возвращать прямо преобразрванные признаки вместо изменения в фрейме
        """
        Z-преобразование временных рядов из features с возможностью обратного преобразования.
        Если scaling = True, данные просто нормализуются на интервале [0:1].
        """
        if inverse is None:
            self.frame_mean = self.hydroframe[features].mean()
            self.frame_std= self.hydroframe[features].std()
            norm_df = (self.hydroframe[features] - self.frame_mean) / self.frame_std
        if (scaling==True) and (inverse is None):
            self.max_val= norm_df.max()
            self.min_val=norm_df.min()
            norm_df=(norm_df-self.min_val)/(self.max_val-self.min_val)
        if (ret==False) and (inverse is None):
            self.hydroframe[features]=norm_df
        elif ret==True:
            return norm_df

        if inverse is not None:
            return (inverse*(self.max_val[target]-self.min_val[target])+self.min_val[target])*self.frame_std[target]+ self.frame_mean[target]



    def cut_level_nas(self, will_nas=8):
        """
        Вырезать участки фрейма, для которых max_level is NaN,  пометить семь следующих дней и will_nas предыдущих категориальными признаками
        """
        self.hydroframe['was_na']=(self.hydroframe['max_level'].isna().replace({False: 0, True:1}).shift(7)).fillna(0)
        self.hydroframe['will_na']=(self.hydroframe['max_level'].isna().replace({False: 0, True:1}).shift(-will_nas)).fillna(0)
        self.hydroframe=self.hydroframe.loc[~self.hydroframe['max_level'].isna()]


    def something_like_mrw(self, 
                           features=[],   # Признаки для которых применить преобразование.
                           window=30,     # Размер окна.
                           inv=None,      # Series, обратно преобразованный который нужно вернуть.
                           target=None):  # Имя обратно преобразовываемого Series.
        """
        Метод делает что-то вроде Moving Average Smoothing для случая, когда сезонные пики не предопределены по дням в году.
        Подробнее описано в отчете о НИР. 
        """
        if inv is None:
            if hasattr(self, 'rolling_mean_by_day')==False:
                self.rolling_mean_by_day={}
            for feature in features:
                self.rolling_mean_by_day[feature]=self.hydroframe[feature].rolling(window, center=True).mean().groupby(self.hydroframe.index.dayofyear).median()
                self.hydroframe[feature]-=pd.Series([i for i in self.hydroframe.index.dayofyear], index=self.hydroframe.index ).apply(lambda x: self.rolling_mean_by_day[feature][x])

        elif inv is not None:
            return inv+pd.Series([i for i in inv.index.dayofyear], index=inv.index ).apply(lambda x: self.rolling_mean_by_day[target][x])
