# Интеграция данных
***

*Корректировки под baseline*:
- Снимки *Sentinel* не используются. Только табличные данные

## Имеющиеся данные

- [спутниковые снимки Sentinel](../../notebooks/1.1-data-review-sentinel.ipynb);
- [климатические данные](../../notebooks/1.3-data-review-wrf-hrrr.ipynb);
- [данные о влажности почвы](../../notebooks/1.4-data-review-moisture.ipynb);
- [исторические данные об урожайности](../../notebooks/1.2-data-review-usda.ipynb).

Необходимо их объединить в единое целое.
***

## Ограничение накладываемые данными

- **Sentinel:** Снимки одной локации предоставлены за 1 и 15 числа каждого месяца.

- **USDA:** Данные об урожайности предоставлены для округов (counties).

Единый датасет должен быть структурирован так, чтобы соответствовать ограничениям.
***

## Структура будущего датасета

```plaintext
interim
    |
    ├── X.csv                                               <- Табличные данные. Признаки
    |
    └── y.csv                                               <- Таргетные значения
```
***

## Интеграция данных

- Возьмем данные за *1 и 15 дни каждого месяца для каждого округа* и объединим их в одну таблицу.
- Удалим метаданные из таблиц.
- Усредним данные для каждого округа (fips).
- `PRODUCTION, MEASURED IN BU` можно удалить.

In [1]:
import os
import gc
import h5py
import numpy as np
import pandas as pd
from pathlib import Path

PATH_DATA = Path("../../data/raw")
PATH_INTERIM = Path("../../data/interim")
PATH_SENTINEL = PATH_DATA / "Sentinel"
PATH_HRRR = PATH_DATA / "WRF-HRRR"
PATH_ERA5 = PATH_DATA / "ERA5-Land-Moisture"
PATH_USDA = PATH_DATA / "USDA"

In [2]:
def get_all_files(path: Path) -> list[Path]:
    """Возвращает все пути к файлам данных из подкаталогов

    Args:
        path (Path): путь к каталогу данных

    Returns:
        list[Path]: список путей к файлам
    """
    return [
        file
        for dir_ in path.iterdir()
        for subdir in dir_.iterdir()
        for file in subdir.iterdir()
    ]

In [3]:
files_sentinel = get_all_files(PATH_SENTINEL)
files_hrrr = get_all_files(PATH_HRRR)
files_era5 = get_all_files(PATH_ERA5)
files_usda = get_all_files(PATH_USDA)

In [4]:
def read_csv_files(files: list[Path]) -> pd.DataFrame:
    """Читает файлы .csv одной структуры из списка файлов files
    и возвращает их как pd.DataFrame

    Args:
        files (list[Path]): список файлов

    Returns:
        pd.DataFrame: DataFrame
    """
    return pd.concat(
        [pd.read_csv(file) for file in files], axis=0, ignore_index=True
    )

In [5]:
df_hrrr = read_csv_files(files_hrrr)
df_era5 = read_csv_files(files_era5)
df_usda = read_csv_files(files_usda)

In [6]:
print(f"df_hrrr columns:\n{df_hrrr.columns.to_list()}\n")
print(f"df_era5 columns:\n{df_era5.columns.to_list()}\n")
print(f"df_usda columns:\n{df_usda.columns.to_list()}\n")

df_hrrr columns:
['Year', 'Month', 'Day', 'Daily/Monthly', 'State', 'County', 'FIPS Code', 'Grid Index', 'Lat (llcrnr)', 'Lon (llcrnr)', 'Lat (urcrnr)', 'Lon (urcrnr)', 'Avg Temperature (K)', 'Max Temperature (K)', 'Min Temperature (K)', 'Precipitation (kg m**-2)', 'Relative Humidity (%)', 'Wind Gust (m s**-1)', 'Wind Speed (m s**-1)', 'U Component of Wind (m s**-1)', 'V Component of Wind (m s**-1)', 'Downward Shortwave Radiation Flux (W m**-2)', 'Vapor Pressure Deficit (kPa)']

df_era5 columns:
['year', 'month', 'day', 'hour', 'fips', 'state', 'latitude', 'longitude', 'src', 'swvl1', 'swvl2', 'swvl3']

df_usda columns:
['commodity_desc', 'reference_period_desc', 'year', 'state_ansi', 'state_name', 'county_ansi', 'county_name', 'asd_code', 'asd_desc', 'domain_desc', 'source_desc', 'agg_level_desc', 'PRODUCTION, MEASURED IN BU', 'YIELD, MEASURED IN BU / ACRE']



In [7]:
def get_fips(df_usda: pd.DataFrame) -> pd.Series:
    """Получает fips используя поля state_ansi и county_ansi

    Args:
        df_usda (pd.DataFrame): USDA dataset

    Returns:
        pd.Series: fips codes
    """
    states_fips = df_usda["state_ansi"].astype(str).str.zfill(2)
    counties_fips = df_usda["county_ansi"].astype(str).str.zfill(3)
    fips = states_fips + counties_fips
    return fips

In [8]:
df_usda["fips"] = get_fips(df_usda).astype(np.int64)

In [9]:
df_hrrr.rename(
    {"Year": "year", "Month": "month", "Day": "day", "FIPS Code": "fips"},
    axis=1,
    inplace=True,
)


In [10]:
df_hrrr.query("(day == 1) | (day == 15)", inplace=True)
df_era5.query("(day == 1) | (day == 15)", inplace=True)
df_usda.query(
    "(state_name == 'ILLINOIS') | (state_name == 'IOWA')", inplace=True
)

In [11]:
# see ../../notebooks/1.3-data-review-wrf-hrrr.ipynb
df_hrrr.dropna(axis=0, inplace=True)
df_hrrr.drop(
    [
        "State",
        "County",
        "Grid Index",
    ],
    axis=1,
    inplace=True,
)

In [12]:
agg_dict = {
    column: "mean"
    for column in [
        "Lat (llcrnr)",
        "Lon (llcrnr)",
        "Lat (urcrnr)",
        "Lon (urcrnr)",
        "Avg Temperature (K)",
        "Max Temperature (K)",
        "Min Temperature (K)",
        "Precipitation (kg m**-2)",
        "Relative Humidity (%)",
        "Wind Gust (m s**-1)",
        "Wind Speed (m s**-1)",
        "U Component of Wind (m s**-1)",
        "V Component of Wind (m s**-1)",
        "Downward Shortwave Radiation Flux (W m**-2)",
        "Vapor Pressure Deficit (kPa)",
    ]
}
agg_dict["Lat (llcrnr)"] = "min"
agg_dict["Lon (llcrnr)"] = "min"
agg_dict["Lat (urcrnr)"] = "max"
agg_dict["Lon (urcrnr)"] = "max"
df_hrrr = (
    df_hrrr.groupby(["fips", "year", "month", "day"])
    .agg(agg_dict)
    .reset_index()
)

In [13]:
df_era5.drop(["hour", "state", "latitude", "longitude"], axis=1, inplace=True)
df_era5 = (
    df_era5.groupby(["fips", "year", "month", "day"]).mean().reset_index()
)

In [14]:
df_usda = df_usda[df_usda["commodity_desc"] == "CORN"]
df_usda.drop(
    [
        "commodity_desc",
        "reference_period_desc",
        "state_ansi",
        "state_name",
        "county_ansi",
        "county_name",
        "asd_code",
        "asd_desc",
        "domain_desc",
        "source_desc",
        "agg_level_desc",
        "PRODUCTION, MEASURED IN BU",
    ],
    axis=1,
    inplace=True,
)

In [15]:
# X = pd.merge(df_usda, df_hrrr, how="left", on=["year", "fips"])
# del df_usda, df_hrrr
# gc.collect()

X = pd.merge(
    df_hrrr, df_era5, how="inner", on=["year", "month", "day", "fips"]
)
del df_hrrr, df_era5
gc.collect();

In [16]:
display(X.shape)
display(X.columns)

(28800, 23)

Index(['fips', 'year', 'month', 'day', 'Lat (llcrnr)', 'Lon (llcrnr)',
       'Lat (urcrnr)', 'Lon (urcrnr)', 'Avg Temperature (K)',
       'Max Temperature (K)', 'Min Temperature (K)',
       'Precipitation (kg m**-2)', 'Relative Humidity (%)',
       'Wind Gust (m s**-1)', 'Wind Speed (m s**-1)',
       'U Component of Wind (m s**-1)', 'V Component of Wind (m s**-1)',
       'Downward Shortwave Radiation Flux (W m**-2)',
       'Vapor Pressure Deficit (kPa)', 'src', 'swvl1', 'swvl2', 'swvl3'],
      dtype='object')

In [17]:
# Новые имена столбцов
X_new_columns = {
    "year": "year",
    "month": "month",
    "day": "day",
    "fips": "fips",
    "Lat (llcrnr)": "lat_lower_left",
    "Lon (llcrnr)": "lon_lower_left",
    "Lat (urcrnr)": "lat_upper_right",
    "Lon (urcrnr)": "lon_upper_right",
    "Avg Temperature (K)": "temperature_avg",
    "Max Temperature (K)": "temperature_max",
    "Min Temperature (K)": "temperature_min",
    "Precipitation (kg m**-2)": "precipitation",
    "Relative Humidity (%)": "humidity_relative",
    "Wind Gust (m s**-1)": "wind_gust",
    "Wind Speed (m s**-1)": "wind_speed",
    "U Component of Wind (m s**-1)": "wind_u_component",
    "V Component of Wind (m s**-1)": "wind_v_component",
    "Downward Shortwave Radiation Flux (W m**-2)": "solar_radiation_downward",
    "Vapor Pressure Deficit (kPa)": "vapor_pressure_deficit",
    "src": "skin_reservoir_content",
    "swvl1": "soil_water_vol_layer1",
    "swvl2": "soil_water_vol_layer2",
    "swvl3": "soil_water_vol_layer3",
}

y_new_columns = {
    "year": "year",
    "fips": "fips",
    "YIELD, MEASURED IN BU / ACRE": "yield_bu_per_acre",
}

X = X[X_new_columns.keys()]
X.rename(X_new_columns, inplace=True, axis=1)
df_usda = df_usda[y_new_columns.keys()]
df_usda.rename(y_new_columns, inplace=True, axis=1)

In [18]:
for to_int in ["year", "month", "day", "fips"]:
    X[to_int] = X[to_int].astype(np.int32)
df_usda.loc[:, ["year", "fips"]] = df_usda[["year", "fips"]].astype(np.int32)

In [19]:
X["images"] = (
    X["fips"].astype(str)
    + "-"
    + X["year"].astype(str)
    + "-"
    + X["month"].astype(str).str.zfill(2)
    + "-"
    + X["day"].astype(str).str.zfill(2)
)

In [20]:
X["images"]

0        17001-2017-01-01
1        17001-2017-01-15
2        17001-2017-02-01
3        17001-2017-02-15
4        17001-2017-03-01
               ...       
28795    19197-2022-10-15
28796    19197-2022-11-01
28797    19197-2022-11-15
28798    19197-2022-12-01
28799    19197-2022-12-15
Name: images, Length: 28800, dtype: object

***
Далее обработка интеграция *Sentinel*, но для baseline опустим этот этап

Копируем изображения. Если для набора изображений нет данных в таблице `X`, то не копируем

In [21]:
# if not PATH_INTERIM.exists():
#     os.mkdir(PATH_INTERIM)
# for file in files_sentinel:
#     with h5py.File(file, "r") as h5:
#         for fips, attrs0 in h5.items():
#             for date, attrs1 in attrs0.items():
#                 dir_name = f"{fips}-{date}"
#                 if dir_name not in X["images"].values:
#                     continue

#                 new_dir = PATH_INTERIM / dir_name
#                 if new_dir.exists():
#                     continue

#                 os.mkdir(new_dir)
#                 for i, image in enumerate(attrs1["X"][:]):
#                     np.save(new_dir / f"{i}.npy", image)

Удаляем те объекты из `X`, для которых нет снимков

In [22]:
# images = pd.Series(
#     [str(path.name) for path in PATH_INTERIM.iterdir() if not path.is_file()]
# )

In [23]:
# X = X[X["images"].isin(images)]

Конец интеграции снимков

***

На всякий отсортируем и потом разделим признаки и таргет

In [24]:
X.sort_values(by=["year", "fips", "month", "day"], inplace=True)

Сохраняем

In [25]:
if not PATH_INTERIM.exists():
    os.mkdir(PATH_INTERIM)
X.to_csv(PATH_INTERIM / "X.csv", index=False)
df_usda.to_csv(PATH_INTERIM / "y.csv", index=False)

In [26]:
display(X.shape)
display(df_usda.shape)

(28800, 24)

(1084, 3)