### Обучение пайплайна

1. Загрузим данные https://www.kaggle.com/jsphyg/weather-dataset-rattle-package
2. Соберем пайплайн с простейшим препроцессингом (tfidf) на текстовых данных
3. Обучим логистическую регрессию и сохраним на диск предобученный пайплайн

In [1]:
import pandas as pd
import numpy as np
import dill
from datetime import datetime
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import precision_recall_curve, roc_auc_score, confusion_matrix
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
import xgboost as xgb
from tqdm.notebook import tqdm


In [2]:
# Загрузим данные
df = pd.read_csv('weatherAUS.csv')
df.tail(3)

Unnamed: 0,Date,Location,MinTemp,MaxTemp,Rainfall,Evaporation,Sunshine,WindGustDir,WindGustSpeed,WindDir9am,...,Humidity9am,Humidity3pm,Pressure9am,Pressure3pm,Cloud9am,Cloud3pm,Temp9am,Temp3pm,RainToday,RainTomorrow
145457,2017-06-23,Uluru,5.4,26.9,0.0,,,N,37.0,SE,...,53.0,24.0,1021.0,1016.8,,,12.5,26.1,No,No
145458,2017-06-24,Uluru,7.8,27.0,0.0,,,SE,28.0,SSE,...,51.0,24.0,1019.4,1016.5,3.0,2.0,15.1,26.0,No,No
145459,2017-06-25,Uluru,14.9,,0.0,,,,,ESE,...,62.0,36.0,1020.2,1017.9,8.0,8.0,15.0,20.9,No,


In [3]:
# Разделим данные на train/test с предсказанной целевой переменной
# и без нее и сохраним тестовую выборку на диск
df_train = df[~df['RainTomorrow'].isna()]
df_test = df[df['RainTomorrow'].isna()]

#save df_test
df_test.to_csv("df_test.csv", index=None)

# Разделим train_данные на train/test и сохраним тестовую выборку на диск
X_train, X_test, y_train, y_test = train_test_split(
    df_train.drop('RainTomorrow', axis=1), 
    df_train['RainTomorrow'], 
    test_size=0.25,
    stratify = df_train['RainTomorrow'],
    random_state=0
)

#save test
X_test.to_csv("X_test.csv", index=None)
y_test.to_csv("y_test.csv", index=None)

#save train
X_train.to_csv("X_train.csv", index=None)
y_train.to_csv("y_train.csv", index=None)

In [4]:
# С помощью google.maps выпишим геоданные метеостанций
stations = {
    'Albury': [-36.065983, 146.933757],
    'BadgerysCreek': [-33.874789, 150.742198],
    'Cobar': [-31.498089, 145.840004],
    'CoffsHarbour': [-30.302664, 153.121074],
    'Moree': [-29.466090, 149.841702],
    'Newcastle': [-32.928553, 151.780644],
    'NorahHead': [-33.282690, 151.565798],
    'NorfolkIsland': [-29.045021, 167.946025],
    'Penrith': [-23.108893, 143.611796],
    'Richmond': [-20.732187, 143.143603],
    'Sydney': [-33.865255, 151.216484],
    'SydneyAirport': [-33.939820, 151.178658],
    'WaggaWagga': [-35.130577, 147.369054],
    'Williamtown': [-32.795088, 151.853003],
    'Wollongong': [-34.390606, 150.879616],
    'Canberra': [-35.306904, 149.125529],
    'Tuggeranong': [-35.407187, 149.102398],
    'MountGinini': [-35.533333, 148.783333],
    'Ballarat': [-37.558485, 143.855087],
    'Bendigo': [-36.753422, 144.279909],
    'Sale': [-38.103693, 147.068100],
    'MelbourneAirport': [-37.673333, 144.843333],
    'Melbourne': [-37.813747, 144.963005],
    'Nhil': [-36.332988, 141.650199],    
    'Mildura': [-37.558485, 143.855087],
    'Nhil': [-36.332988, 141.650199],
    'Portland': [-38.358293, 141.609900],
    'Watsonia': [-37.711887, 145.083003],
    'Dartmoor': [-37.927392, 141.276500],
    'Brisbane': [-27.468545, 153.024029],
    'Cairns': [-16.922018, 145.776439],
    'GoldCoast': [-28.001431, 153.363484],
    'Townsville': [-19.252226, 146.813347],
    'Adelaide': [-34.951652, 138.593834],
    'MountGambier': [-37.752367, 140.786152],
    'Nuriootpa': [-34.487174, 138.975699],
    'Woomera': [-37.398593, 140.335999],
    'Albany': [-35.009677, 117.884181],
    'Witchcliffe': [-34.025494, 115.100904],
    'PearceRAAF': [-31.667778, 116.015],
    'PerthAirport': [-31.940326, 115.966769],    
    'Perth': [-32.088410, 115.895122],
    'SalmonGums': [-32.98, 121.645],
    'Walpole': [-34.975391, 116.734796],
    'Hobart': [-42.859536, 147.309693],
    'Launceston': [-41.446690, 147.139202],    
    'AliceSprings': [-23.723467, 133.883826],
    'Darwin': [-12.465673, 130.842738],
    'Katherine': [-14.465293, 132.263199],
    'Uluru': [-25.345400, 131.033856]
}

In [5]:
def data_preprocessing(df, stations=stations):
    
    def search_nearest_station(first_station, max_distance):
        """
        Осуществляет поиск ближайших стаций в заданном радиусе.
        На выходе отсорированный по удаленности список станций.
        """
        new_stations = stations.copy() # создадим корию словоря со станциями
        distance_dict = {} # Создадим пустой словарь для расстояний между станциями
        x1, y1 = stations[first_station] # координаты первой станции
        new_stations.pop(first_station) # удалим ее из копии словоря со станциями
        for station in new_stations: # для каждой стации 
            x2, y2 = new_stations[station] # определим ее координаты
            # и измерием расстояние до первой станции 
            distance = np.math.sqrt((x1 - x2) ** 2 + (y1 - y2) ** 2)
            # добавим станцию и расстояние в словарь 
            distance_dict[station] = round(distance, 3)
        # сформируем отсорированный по расстояниям список станций находящихся в 
        # пределах заданного радиуса
        new_stations = np.array([key for key, value in sorted(distance_dict.items(), key=lambda item: item[1])
                                 if value < max_distance])
        return new_stations
    
    def my_fillna(i, j):
        """
        Ищет значение на ближейшей станции
        """
        value = df.iloc[i, j] # координаты значения
        date, location = df.iloc[i, :2] # даты и место измерения
        stations_list = stations_distance_list[int(location), 1] # список ближайших станций
        while stations_list.shape[0]: # если станции есть
            # получаем значение искомого параметра с соседней станции в этот день
            new_value = df.loc[(df['Date'] == date) & (df['Location'] == stations_list[0]), df.columns[j]]
            if len(new_value) and not np.isnan(new_value.iloc[0]): # если найдено значение
                value = new_value.iloc[0] # меняем его на новое
                break # выходим из цикла
            # если значение не найдено, проверяем следующую станцию
            else: stations_list = stations_list[1:]  
        return value
    
    # Заменим значения RainToday на 0 и 1:
    binary_to_numbers = {'No': 0, 'Yes': 1}
    df['RainToday'] = df['RainToday'].replace(binary_to_numbers)
    
    # Преобразуем значение даты в количество дней с 1970-01-01
    df['Date'] = (pd.to_datetime(df['Date'])- pd.Timestamp('1970-01-01')) // pd.Timedelta('1days')
    
    # Создадим датафрем с наименованиями станций и их координатами
    sdf = pd.DataFrame({'Location': stations.keys(), 'coordinate': stations.values()})
    sdf['longitude'] = sdf['coordinate'].apply(lambda x: x[0])
    sdf['latitude'] = sdf['coordinate'].apply(lambda x: x[1])
    sdf.drop('coordinate', axis=1, inplace=True)
    
    # Добавим координаты станций в исследуемый data frame
    df = df.merge(sdf, on=['Location'], how='left')

    # Присвоим станциям номера 
    locations = {name: i for i, name in enumerate(stations)}

    # Заменим их наименования на номера
    df['Location'] = df['Location'].replace(locations)

    # Сформируем корректный data frame
    columns = list(df.columns[:2]) + list(df.columns[-2:]) + list(df.columns[2:-2])
    df = df[columns]
    df.head()
    
    # Оставшиеся категориальные переменные представляют собой направление верта
    # Заменим их на пoрядковые номера значений их азимутов
    categorical_columns = df.select_dtypes(include='object').columns.tolist()
    cardinal_points = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW']
    cardinal_points_dict = {i: cardinal_points.index(i) for i in cardinal_points}
    for col in categorical_columns: df[col] = df[col].replace(cardinal_points_dict)
        
    # Сформируем списоки соседних стаций для каждой стации в радиусе 2 градуса
    max_distance = 2
    stations_distance_list = np.array([[location, search_nearest_station(location, max_distance)] for location in locations])
    
    # Заполним пропуски значениями параметров на ближайших стациях
    for i in tqdm(np.arange(df.shape[0])): # по строкам
        for j in np.arange(df.shape[1]): # по столбцам
            if np.isnan(df.iloc[i, j]): # если не np.NaN 
                df.iloc[i, j] = my_fillna(i, j) # заменяем на значение с ближайшей станции
                                                # если конечно стация есть
                                                # и если есть значение
                        
    # Оставшиеся пропуски заменим на медианные значения в месте измерения
    for col in df.columns:
        wdf = pd.DataFrame(df.groupby(['Location'])[col].median().rename(f'{col}ByLocation')).reset_index()
        df = df.merge(wdf, on=['Location'], how='left')
        df[col].fillna(df[f'{col}ByLocation'], inplace=True)
        df.drop(f'{col}ByLocation', axis=1, inplace=True)
    
    # Если пропуски остались
    df.fillna(df.median(), inplace=True)

    # Для построения универсального пайплайна обработки вернем обратно типы данных
    
    # Date
    df['Date'] = df['Date'].apply(lambda x: datetime.fromordinal(x+719163).strftime('%Y-%m-%d'))
    # Location
    df['Location'] = df['Location'].replace({i: name for i, name in enumerate(stations)})
    # WindGustDir, WindDir9am, WindDir3pm
    for column in ['WindGustDir', 'WindDir9am', 'WindDir3pm']:
        df[column] = df[column].replace({i: name for i, name in enumerate(cardinal_points)})
    # RainToday
    df['RainToday'] = df['RainToday'].replace({0: 'No', 1: 'Yes'})
    
    # Удалим вспомогательные столбцы longitude, latitude
    df.drop(['longitude', 'latitude'], axis=1, inplace=True)
    
    return df

In [6]:
def target_preprocessing(y_train):
    # заменим значения целевой переменной на 0 и 1:
    return y_train.replace({'No': 0, 'Yes': 1})   

In [7]:
# Соберем pipeline
class DateSelector(BaseEstimator, TransformerMixin):
    def __init__(self, key):
        self.key = key

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X[self.key]  = (pd.to_datetime(X[self.key])- pd.Timestamp('1970-01-01')) // pd.Timedelta('1days')
        return X[[self.key]]

class LocationSelector(BaseEstimator, TransformerMixin):
    
    def __init__(self, key):
        self.key = key
        self.location = stations.copy()
        
    def fit(self, X, y=None):
        return self

    def transform(self, X, y=None):
        sdf = pd.DataFrame({'Location': self.location.keys(), 'Coordinate': self.location.values()})
        sdf['Longitude'] = sdf['Coordinate'].apply(lambda x: x[0])
        sdf['Latitude'] = sdf['Coordinate'].apply(lambda x: x[1])
        sdf.drop('Coordinate', axis=1, inplace=True)
        X = X.merge(sdf, on=[self.key], how='left')
        
        return X[['Longitude', 'Latitude']]
    
class NumberSelector(BaseEstimator, TransformerMixin):
    def __init__(self, key):
        self.key = key

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return X[[self.key]]

class WindDirSelector(BaseEstimator, TransformerMixin):
    def __init__(self, key):
        self.key = key
        self.cardinal_points = ['N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 
                                'S', 'SSW', 'SW', 'WSW', 'W', 'WNW', 'NW', 'NNW']
        
    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X[self.key] = X[self.key].replace({name: i for i, name in enumerate(self.cardinal_points)})
        return X[[self.key]]
    
class BinarySelector(BaseEstimator, TransformerMixin):
    def __init__(self, key):
        self.key = key

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X[self.key] = X[self.key].replace({'No': 0, 'Yes': 1}) 
        return X[[self.key]]

In [8]:
if __name__ == '__main__':
    
    # Предобработка данных
    X_train = data_preprocessing(X_train)
    y_train = target_preprocessing(y_train)

    # Зададим списки признаков
    date_columns = ['Date']
    location_columns = ['Location']
    continuous_columns = ['MinTemp', 'MaxTemp', 'Rainfall', 'Evaporation', 'Sunshine','WindGustSpeed', 
                          'WindSpeed9am', 'WindSpeed3pm', 'Humidity9am', 'Humidity3pm', 'Pressure9am', 
                          'Pressure3pm', 'Cloud9am', 'Cloud3pm', 'Temp9am', 'Temp3pm']
    wind_dir_columns = ['WindGustDir', 'WindDir9am', 'WindDir3pm']
    binary_columns = ['RainToday']

    # Под каждый признак создадим трансформер и объединить их в список
    final_transformers = list()
    
    for date_col in date_columns:
        date_transformer = Pipeline([
                ('selector', DateSelector(key=date_col))
                ])    
        final_transformers.append((date_col, date_transformer))
    
    for location_col in location_columns:
        location_transformer = Pipeline([
                ('selector', LocationSelector(key=location_col))
                ])
        final_transformers.append((location_col, location_transformer))
    
    for cont_col in continuous_columns:
        cont_transformer = Pipeline([
                ('selector', NumberSelector(key=cont_col))
                ])
        final_transformers.append((cont_col, cont_transformer))
    
    for wind_dir_col in wind_dir_columns:
        wind_dir_transformer = Pipeline([
                ('selector', WindDirSelector(key=wind_dir_col))
                ])
        final_transformers.append((wind_dir_col, wind_dir_transformer))
    
    for binary_col in binary_columns:
        binary_transformer = Pipeline([
                ('selector', BinarySelector(key=binary_col))
                ])
        final_transformers.append((binary_col, binary_transformer))

    # Объединим все это в единый пайплайн
    feats = FeatureUnion(final_transformers)
    feature_processing = Pipeline([('feats', feats)])

    # Добавим модель
    pipeline = Pipeline([
        ('features', feats),
        ('classifier', GradientBoostingClassifier(
            n_estimators=700,
            min_samples_leaf=7,
            max_depth=10
        ))
    ])


    # Обучим пайплайны
    pipeline.fit(X_train, y_train)

    # Сохраним модель
    with open("gbc_pipeline.dill", "wb") as f:
        dill.dump(pipeline, f)

  stations_distance_list = np.array([[location, search_nearest_station(location, max_distance)] for location in locations])


  0%|          | 0/106644 [00:00<?, ?it/s]