# Daten in Parquet Dateien transformieren

Die Daten werden nun aus den Dateien in ein Dataframe geschrieben und dann als Parquet Files zwischengespeichert. Dabei werden die Daten zudem nach Bundesland gefiltert.

In [1]:

import os
from glob import glob

import pandas as pd
from dotenv import load_dotenv

from src.file_wrapper.meta_data import GeographieMeta, StationNameMeta
from src.file_wrapper.weather_data import Wind10MinutesData
from src.file_wrapper.geo_data import AdministrativeBoundaries

load_dotenv()


META_DATA_FOLDER = os.path.join(os.getenv('PROJECT_DIR'), 'data','interim', 'station_meta' )
DATA_FOLDER = os.path.join(os.getenv('PROJECT_DIR'), 'data','interim', 'wind10_min' )
PARQUET_FOLDER = os.path.join(os.getenv('PROJECT_DIR'), 'data','interim', 'parquet' )

GEO_BOUNDARIES_GEOJSON_PATH = os.path.join(os.getenv('PROJECT_DIR'), 'data', 'raw', 'federal_states.geojson')

GET_WIND = True
MIN_TIMESTAMP = pd.Timestamp('2020-01-01')

In [2]:
geo_meta_data = []
name_meta_data = []
meta_data_folders = glob(os.path.join(META_DATA_FOLDER, '*'))

if not os.path.exists(PARQUET_FOLDER):
    os.makedirs(PARQUET_FOLDER)

for folder in meta_data_folders:
    geo_meta_data.append(GeographieMeta(folder))
    name_meta_data.append(StationNameMeta(folder))

geo_df = pd.concat([gm.data for gm in geo_meta_data])
name_df = pd.concat([nm.data for nm in name_meta_data])

In [3]:
geo_df.head()

Unnamed: 0,station_id,altitude,latitude,longitude,from_date,to_date,station_name
0,3811,130.0,51.2959,13.093,1889-01-01,1941-09-30,Oschatz
1,3811,130.0,51.2959,13.093,1941-10-01,1947-09-30,Oschatz
2,3811,150.0,51.2959,13.093,1947-10-01,1977-12-31,Oschatz
3,3811,150.0,51.296,13.0928,1978-01-01,2006-05-04,Oschatz
4,3811,150.0,51.296,13.0928,2006-05-05,2008-07-31,Oschatz


In [4]:
name_df.head()

Unnamed: 0,station_id,name,from_date,to_date
0,3811,Oschatz,1889-01-01,NaT
0,4466,Schleswig,1946-10-01,NaT
0,6103,Mittenwald/Obb.,2000-06-15,NaT
0,3015,Lindenberg,1906-04-01,NaT
0,3023,Lingen,1855-01-01,1950-12-14


An die Geodaten werden nun die Bundesländer mithilfe der vorherigen Geoinformationen angefügt.

In [5]:
boundaries = AdministrativeBoundaries(GEO_BOUNDARIES_GEOJSON_PATH)
geo_df['state'] = boundaries.get_state_by_point(geo_df['longitude'], geo_df['latitude'])

In [6]:
geo_df.head()

Unnamed: 0,station_id,altitude,latitude,longitude,from_date,to_date,station_name,state
0,3811,130.0,51.2959,13.093,1889-01-01,1941-09-30,Oschatz,Sachsen
1,3811,130.0,51.2959,13.093,1941-10-01,1947-09-30,Oschatz,Sachsen
2,3811,150.0,51.2959,13.093,1947-10-01,1977-12-31,Oschatz,Sachsen
3,3811,150.0,51.296,13.0928,1978-01-01,2006-05-04,Oschatz,Sachsen
4,3811,150.0,51.296,13.0928,2006-05-05,2008-07-31,Oschatz,Sachsen


In [7]:
# Daten werden gespeichert.
geo_df.to_parquet(os.path.join(PARQUET_FOLDER, 'geo.parquet'))
name_df.to_parquet(os.path.join(PARQUET_FOLDER, 'name.parquet'))


In [8]:
if GET_WIND:
    # Geo Daten filter
    valid_station_ids = set(geo_df[geo_df['state'].isin(['Berlin', 'Brandenburg'])]['station_id'].unique().tolist())
    wind_data = []
    
    wind_data_folders = glob(os.path.join(DATA_FOLDER, '*'))
    
    # filter all folders where the third part of the split by _ is not a valid station id
    valid_folders = [folder for folder in wind_data_folders if int(os.path.splitext(os.path.basename(folder))[0].split('_')[-4]) in valid_station_ids]
    print(f"Length after Station Filter: {len(valid_folders)}")
    
    # filter where the fifth part of the split by _ is bigger then MIN_TIMESTAMP when parsed to timestamp.
    valid_folders = [folder for folder in valid_folders if pd.Timestamp(os.path.splitext(os.path.basename(folder))[0].split('_')[-2]) >= MIN_TIMESTAMP]
    print(f"Length after Timestamp Filter: {len(valid_folders)}")
    
    
    print(f"Found {len(wind_data_folders)} folders")
    print(f"Found {len(valid_folders)} valid folders")
    for i, folder in enumerate(valid_folders):
        try:
            station_id = int(os.path.splitext(os.path.basename(folder))[0].split('_')[-4])
        except ValueError:
            print(f"Could not parse station id from {folder}")
            continue
        if station_id not in valid_station_ids:
            print(f"Station id {station_id} not in valid station ids")
            continue
        wind_data.append(Wind10MinutesData(folder))
        
        
    wind_df = pd.concat([wd.data for wd in wind_data])
    wind_df = wind_df[~((wind_df['avg_direction'].isna()) & (wind_df['avg_speed'].isna()))]
    wind_df = wind_df[wind_df['timestamp']>=MIN_TIMESTAMP]
    
    wind_df.to_parquet(os.path.join(PARQUET_FOLDER, 'wind.parquet'), engine='pyarrow')
    print(f"Length of wind data: {len(wind_df)}")
    wind_df.head()

Length after Station Filter: 76
Length after Timestamp Filter: 19
Found 997 folders
Found 19 valid folders
Length of wind data: 2895037


In [9]:
wind_df.head()

Unnamed: 0,station_id,timestamp,quality,avg_speed,avg_direction
0,430,2020-01-01 00:00:00,3,3.0,270.0
1,430,2020-01-01 00:10:00,3,3.6,270.0
2,430,2020-01-01 00:20:00,3,3.6,280.0
3,430,2020-01-01 00:30:00,3,3.1,280.0
4,430,2020-01-01 00:40:00,3,3.3,280.0


In [10]:
wind_df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 2895037 entries, 0 to 157823
Data columns (total 5 columns):
 #   Column         Dtype         
---  ------         -----         
 0   station_id     int64         
 1   timestamp      datetime64[ns]
 2   quality        int64         
 3   avg_speed      float64       
 4   avg_direction  float64       
dtypes: datetime64[ns](1), float64(2), int64(2)
memory usage: 132.5 MB
