# Tranformation af DMI metobs historiske data 1954-2024

Først skal man nok lave et virtuelt environment til at installere python pakker i lokalt.

```
venv create ...

# Indlæs json filer i DuckDB 

In [41]:
pip install duckdb

Note: you may need to restart the kernel to use updated packages.


In [42]:
import duckdb

r1 = duckdb.sql("SELECT 42 AS i")
duckdb.sql("SELECT i * 2 AS k FROM r1")

┌───────┐
│   k   │
│ int32 │
├───────┤
│    84 │
└───────┘

In [43]:
import duckdb

duckdb.sql("DESCRIBE FROM read_json('./raw_text/1965*.txt');")

┌─────────────┬─────────────────────────────────────────────────────────────────────────────────────────────────────┬─────────┬─────────┬─────────┬─────────┐
│ column_name │                                             column_type                                             │  null   │   key   │ default │  extra  │
│   varchar   │                                               varchar                                               │ varchar │ varchar │ varchar │ varchar │
├─────────────┼─────────────────────────────────────────────────────────────────────────────────────────────────────┼─────────┼─────────┼─────────┼─────────┤
│ geometry    │ STRUCT(coordinates DOUBLE[], "type" VARCHAR)                                                        │ YES     │ NULL    │ NULL    │ NULL    │
│ properties  │ STRUCT(created VARCHAR, observed TIMESTAMP, parameterId VARCHAR, stationId VARCHAR, "value" DOUBLE) │ YES     │ NULL    │ NULL    │ NULL    │
│ type        │ VARCHAR                             

In [44]:
import duckdb

duckdb.sql("DESCRIBE FROM read_json('./raw_text/2001*.txt');")

┌─────────────┬─────────────────────────────────────────────────────────────────────────────────────────────────────┬─────────┬─────────┬─────────┬─────────┐
│ column_name │                                             column_type                                             │  null   │   key   │ default │  extra  │
│   varchar   │                                               varchar                                               │ varchar │ varchar │ varchar │ varchar │
├─────────────┼─────────────────────────────────────────────────────────────────────────────────────────────────────┼─────────┼─────────┼─────────┼─────────┤
│ geometry    │ STRUCT(coordinates DOUBLE[], "type" VARCHAR)                                                        │ YES     │ NULL    │ NULL    │ NULL    │
│ properties  │ STRUCT(created VARCHAR, observed TIMESTAMP, parameterId VARCHAR, stationId VARCHAR, "value" DOUBLE) │ YES     │ NULL    │ NULL    │ NULL    │
│ type        │ VARCHAR                             

In [45]:
obs_1965_long = duckdb.sql(
"""
    FROM read_json('./raw_text/1965*.txt') select type, id, unnest(geometry),  unnest(properties)
"""
)
obs_1965_long.show()

┌─────────┬──────────────────────────────────────┬─────────────────────┬─────────┬─────────────────────────────┬─────────────────────┬─────────────────┬───────────┬─────────┐
│  type   │                  id                  │     coordinates     │  type   │           created           │      observed       │   parameterId   │ stationId │  value  │
│ varchar │                 uuid                 │      double[]       │ varchar │           varchar           │      timestamp      │     varchar     │  varchar  │ double  │
├─────────┼──────────────────────────────────────┼─────────────────────┼─────────┼─────────────────────────────┼─────────────────────┼─────────────────┼───────────┼─────────┤
│ Feature │ 023ae7f0-0fcb-5815-5cb9-a20f7c1e446b │ [11.964, 54.5639]   │ Point   │ 2023-07-08T10:05:01.617937Z │ 1965-01-01 21:00:00 │ cloud_cover     │ 06149     │   100.0 │
│ Feature │ 028a88f6-7639-e722-8ffc-c2425e2df127 │ NULL                │ NULL    │ 2023-12-19T22:57:56.133346Z │ 1965-01-01 2

In [46]:
duckdb.sql(
"""
copy ( SELECT * FROM obs_1965_long )
to 'metobs_1965_long.parquet' (FORMAT PARQUET, CODEC 'SNAPPY', ROW_GROUP_SIZE 100000);
"""
)

In [47]:
obs_1965_long_ = duckdb.sql(
"""
SELECT observed, stationId, parameterId,  value from obs_1965_long -- limit 1000
"""
)
obs_1965_long_.show()

┌─────────────────────┬───────────┬─────────────────┬─────────┐
│      observed       │ stationId │   parameterId   │  value  │
│      timestamp      │  varchar  │     varchar     │ double  │
├─────────────────────┼───────────┼─────────────────┼─────────┤
│ 1965-01-01 21:00:00 │ 06149     │ cloud_cover     │   100.0 │
│ 1965-01-01 21:00:00 │ 06011     │ visibility      │ 10000.0 │
│ 1965-01-01 21:00:00 │ 06147     │ wind_dir        │   250.0 │
│ 1965-01-01 21:00:00 │ 06120     │ humidity        │    97.0 │
│ 1965-01-01 21:00:00 │ 04250     │ humidity        │    89.0 │
│ 1965-01-01 21:00:00 │ 06009     │ wind_speed      │     9.3 │
│ 1965-01-01 21:00:00 │ 06149     │ wind_dir        │   250.0 │
│ 1965-01-01 21:00:00 │ 06009     │ wind_dir        │   360.0 │
│ 1965-01-01 21:00:00 │ 06191     │ visibility      │ 20000.0 │
│ 1965-01-01 21:00:00 │ 06147     │ cloud_cover     │   100.0 │
│          ·          │   ·       │    ·            │     ·   │
│          ·          │   ·       │    ·

In [48]:
obs_1965_wide = duckdb.sql(
"""
PIVOT obs_1965_long_ 
ON (parameterId)
USING first(value)
"""
)
obs_1965_wide.show()

┌─────────────────────┬───────────┬─────────────┬──────────────┬──────────┬─────────────────┬──────────┬──────────┬──────────────────┬──────────────────┬────────────┬─────────┬──────────┬────────────┐
│      observed       │ stationId │ cloud_cover │ cloud_height │ humidity │ pressure_at_sea │ temp_dew │ temp_dry │ temp_max_past12h │ temp_min_past12h │ visibility │ weather │ wind_dir │ wind_speed │
│      timestamp      │  varchar  │   double    │    double    │  double  │     double      │  double  │  double  │      double      │      double      │   double   │ double  │  double  │   double   │
├─────────────────────┼───────────┼─────────────┼──────────────┼──────────┼─────────────────┼──────────┼──────────┼──────────────────┼──────────────────┼────────────┼─────────┼──────────┼────────────┤
│ 1965-01-01 21:00:00 │ 04270     │        10.0 │       2600.0 │     70.0 │          1026.1 │    -20.2 │    -16.0 │             NULL │             NULL │    75000.0 │     2.0 │     90.0 │        2

Alle operationer fra 1965, gentaget for året 2001, i en celle

In [49]:
%%time
obs_2001_long = duckdb.sql("""
    FROM read_json('./raw_text/2001*.txt') select type, id, unnest(geometry),  unnest(properties)
""")
duckdb.sql("""
    copy ( SELECT * FROM obs_2001_long )
    to 'metobs_2001_long.parquet' (FORMAT PARQUET, CODEC 'SNAPPY', ROW_GROUP_SIZE 100000);
""")
obs_2001_long_ = duckdb.sql("""
    SELECT observed, stationId, parameterId,  value from obs_2001_long -- limit 1000
""")
obs_2001_wide = duckdb.sql("""
    PIVOT obs_2001_long_ 
    ON (parameterId)
    USING first(value)
""")

obs_2001_wide.show()

┌─────────────────────┬───────────┬─────────────┬──────────────┬──────────┬─────────────────┬────────────────────────┬─────────────────────┬──────────────────────┬───────────────────┬──────────────────┬───────────────┬──────────┬─────────────────┬────────────┬───────────────────┬────────────────┬────────────────┬────────────────────┬─────────────────┬──────────┬──────────┬────────────┬───────────────────────┬────────────────────────┬───────────────────────┬──────────────────┬─────────────────┬──────────────────┬──────────────────┬─────────────────┬───────────┬──────────────────────┬───────────────────────┬──────────────────────┬──────────────────────┬────────────┬─────────┬──────────┬─────────────────┬──────────────────────────┬──────────┬─────────────────┬────────────┬───────────────────┐
│      observed       │ stationId │ cloud_cover │ cloud_height │ humidity │ humidity_past1h │ leav_hum_dur_past10min │ leav_hum_dur_past1h │ precip_dur_past10min │ precip_dur_past1h │ precip_past10mi

Alle operationer fra alle år, i en celle.
ok, i mappen `raw_text/` er der kun filer for årene 1965 og 2001. 
Men pricippet, og for det ikke skal tager for lang tid i test...

In [51]:
%%time
obs_long = duckdb.sql("""
    FROM read_json('./raw_text/*.txt') select type, id, unnest(geometry),  unnest(properties)
""")
duckdb.sql("""
    copy ( SELECT * FROM obs_long )
    to 'metobs_long.parquet' (FORMAT PARQUET, CODEC 'SNAPPY', ROW_GROUP_SIZE 100000);
""")
obs_long_ = duckdb.sql("""
    SELECT observed, stationId, parameterId,  value from obs_long -- limit 1000
""")
obs_wide = duckdb.sql("""
    PIVOT obs_long_ 
    ON (parameterId)
    USING first(value)
""")

obs_wide.show()

┌─────────────────────┬───────────┬─────────────┬──────────────┬──────────┬─────────────────┬────────────────────────┬─────────────────────┬──────────────────────┬───────────────────┬──────────────────┬───────────────┬──────────┬─────────────────┬────────────┬───────────────────┬────────────────┬────────────────┬────────────────────┬─────────────────┬──────────┬──────────┬────────────┬───────────────────────┬────────────────────────┬───────────────────────┬──────────────────┬─────────────────┬──────────────────┬──────────────────┬─────────────────┬───────────┬──────────────────────┬───────────────────────┬──────────────────────┬──────────────────────┬────────────┬─────────┬──────────┬─────────────────┬──────────────────────────┬──────────┬─────────────────┬────────────┬───────────────────┐
│      observed       │ stationId │ cloud_cover │ cloud_height │ humidity │ humidity_past1h │ leav_hum_dur_past10min │ leav_hum_dur_past1h │ precip_dur_past10min │ precip_dur_past1h │ precip_past10mi

Ok.

Nu prøver at lave en udgave hvor jeg ændrer stien til at passe til mappen med alle data, 1954 til nu (18. oktober 2024).   
Måske går det godt?!

In [54]:
%%time
print("indlæser filer ... ", end="")
obs_long = duckdb.sql("""
    FROM read_json('d:/downloads/dmi_all_1953-2024.sept/*.txt') select type, id, unnest(geometry),  unnest(properties)
""")
print(" done")

# print("kopierer til parquet filer ... ", end="")
# duckdb.sql("""
#     copy ( SELECT * FROM obs_long )
#     to 'metobs_long.parquet' (FORMAT PARQUET, CODEC 'SNAPPY', ROW_GROUP_SIZE 100000);
# """)
# print(" done")

print("udvælger nødvendige felter før pivot ... ", end="")
obs_long_ = duckdb.sql("""
    SELECT observed, stationId, parameterId,  value from obs_long -- limit 1000
""")
print(" done")

print("pivot ... ", end="")
obs_wide = duckdb.sql("""
    PIVOT obs_long_ 
    ON (parameterId)
    USING first(value)
""")
print(" done")

print("kopierer wide-form til parquet filer ... ", end="")
duckdb.sql("""
    copy ( SELECT * FROM obs_wide )
    to 'metobs_wide.parquet' (FORMAT PARQUET, CODEC 'SNAPPY', ROW_GROUP_SIZE 100000);
""")
print(" done")

# obs_wide.show()

indlæser filer ...  done
udvælger nødvendige felter før pivot ...  done
pivot ... 

RuntimeError: Query interrupted

Det gik så __ikke__ så godt, fordi det tof for evigt...

Ny strategi: Et år ad gangen!

In [2]:
%%time
import duckdb

raw_file_path = 'd:/downloads/dmi_all_1953-2024.sept/'
out_file_path = './metobs_data/'

# years = [1965, 2001, 2023]
# years = [ 2001, 1965]
# years = range(1954, 2025)
years = range(2023, 2025)

for year_to_process in years:
    print()
    print(f"process {year_to_process}")
    print()

    print(f"indlæser {year_to_process}-filer ... ", end="")
    # %time obs_long = duckdb.sql(f"""FROM read_json('d:/downloads/dmi_all_1953-2024.sept/{year_to_process}*.txt') select type, id, unnest(geometry),  unnest(properties)""")

    import_path = f"d:/downloads/dmi_all_1953-2024.sept/{year_to_process}*.txt"
    sql = "select type, id, unnest(geometry),  unnest(properties) FROM obs"
    %time (obs := duckdb.read_json(import_path, convert_strings_to_integers = True)) ; (obs_long := duckdb.sql(sql))
    print(" done")

    print(f"kopierer {year_to_process} til parquet filer ... ", end="")
    # sql = f"""
    #     copy ( SELECT * FROM obs_long )
    #     to '{out_file_path}metobs_{year_to_process}_long.parquet' (FORMAT PARQUET, CODEC 'SNAPPY', ROW_GROUP_SIZE 100000);
    # """
    # %time duckdb.sql(sql)
    %time obs_long.write_parquet(f"{out_file_path}metobs_{year_to_process}_long.parquet", compression = 'SNAPPY', row_group_size = 100000)
    print(" done")

    print(f"Genindlæser {year_to_process}, fra parquet fil (på SSD)")
    %time obs_long = duckdb.read_parquet(f"{out_file_path}metobs_{year_to_process}_long.parquet")

    print("udvælger nødvendige felter før pivot ... ", end="")
    sql = """
        SELECT observed, cast(stationId AS INTEGER) as stationId, parameterId,  value from obs_long -- limit 1000
    """
    obs_long_ = duckdb.sql(sql)
    print(" done")

    print("pivot ... ", end="")
    sql = """
        PIVOT obs_long_ 
        ON (parameterId)
        USING first(value)
    """
    obs_wide = duckdb.sql(sql)
    print(" done")

    print(f"kopierer {year_to_process} wide-form til parquet filer ... ", end="")
    # sql = f"""
    #     copy ( SELECT * FROM obs_wide )
    #     to 'metobs_{year_to_process}_wide.parquet' (FORMAT PARQUET, CODEC 'SNAPPY', ROW_GROUP_SIZE 100000);
    # """
    # %time duckdb.sql(sql)
    %time obs_wide.write_parquet(f"{out_file_path}metobs_{year_to_process}_wide.parquet", compression = 'SNAPPY', row_group_size = 100000)
    print(f" done {year_to_process}")

print(f"done all {years}")
# obs_wide.show()


process 2023

indlæser 2023-filer ... CPU times: total: 156 ms
Wall time: 577 ms
 done
kopierer 2023 til parquet filer ... CPU times: total: 1min 56s
Wall time: 3min 8s
 done
Genindlæser 2023, fra parquet fil (på SSD)
CPU times: total: 0 ns
Wall time: 29.5 ms
udvælger nødvendige felter før pivot ...  done
pivot ...  done
kopierer 2023 wide-form til parquet filer ... CPU times: total: 1min 4s
Wall time: 51.5 s
 done 2023

process 2024

indlæser 2024-filer ... CPU times: total: 188 ms
Wall time: 6.56 s
 done
kopierer 2024 til parquet filer ... CPU times: total: 1min 13s
Wall time: 1min 57s
 done
Genindlæser 2024, fra parquet fil (på SSD)
CPU times: total: 15.6 ms
Wall time: 16.4 ms
udvælger nødvendige felter før pivot ...  done
pivot ...  done
kopierer 2024 wide-form til parquet filer ... CPU times: total: 34.2 s
Wall time: 25.9 s
 done 2024
done all range(2023, 2025)
CPU times: total: 4min 54s
Wall time: 6min 32s


In [3]:
import duckdb

data_path = './metobs_data/'
# year = '1965'
year = '*'
filepattern = f'metobs_{year}_wide.parquet'
file_path = data_path + filepattern

metobs = duckdb.read_parquet(file_path,  union_by_name = True)

query = """
-- SELECT observed, stationId, humidity, pressure, pressure_at_sea, temp_dew, temp_dry, wind_speed
SELECT observed, stationId, humidity, pressure_at_sea, temp_dew, temp_dry, wind_speed
FROM metobs
WHERE stationId > 5000 and stationId < 34000
    and wind_speed is not null
order by observed, stationId desc
"""
windspeed_dk = duckdb.sql(query)

windspeed_dk.show()


┌─────────────────────┬───────────┬──────────┬─────────────────┬──────────┬──────────┬────────────┐
│      observed       │ stationId │ humidity │ pressure_at_sea │ temp_dew │ temp_dry │ wind_speed │
│      timestamp      │   int32   │  double  │     double      │  double  │  double  │   double   │
├─────────────────────┼───────────┼──────────┼─────────────────┼──────────┼──────────┼────────────┤
│ 1954-01-01 00:00:00 │      6180 │    100.0 │          1029.0 │     -2.0 │     -2.0 │        1.0 │
│ 1954-01-01 00:00:00 │      6110 │    100.0 │          1027.3 │      0.0 │      0.0 │        2.6 │
│ 1954-01-01 00:00:00 │      6081 │     93.0 │          1027.5 │      0.0 │      1.0 │        1.0 │
│ 1954-01-01 00:00:00 │      6071 │     93.0 │          1027.7 │      0.0 │      1.0 │        1.0 │
│ 1954-01-01 00:00:00 │      6060 │    100.0 │          1028.0 │     -2.0 │     -2.0 │        0.0 │
│ 1954-01-01 00:00:00 │      6041 │     86.0 │          1027.2 │     -2.0 │      0.0 │        1.0 │


In [9]:
import duckdb

data_path = './metobs_data/'
# year = '1965'
year = '*'
filepattern = f'metobs_{year}_wide.parquet'
file_path = data_path + filepattern

metobs = duckdb.read_parquet(file_path,  union_by_name = True)

query = """
-- SELECT observed, stationId, humidity, pressure, pressure_at_sea, temp_dew, temp_dry, wind_speed
-- SELECT observed, stationId, humidity, pressure_at_sea, temp_dew, temp_dry, wind_speed
SELECT observed, avg(wind_speed)
FROM metobs
WHERE stationId > 5000 and stationId < 34000
    and wind_speed is not null
GROUP BY observed
-- order by observed, stationId desc
ORDER BY observed
"""
windspeed_dk_avg = duckdb.sql(query)

windspeed_dk_avg.show()


┌─────────────────────┬────────────────────┐
│      observed       │  avg(wind_speed)   │
│      timestamp      │       double       │
├─────────────────────┼────────────────────┤
│ 1954-01-01 00:00:00 │              2.625 │
│ 1954-01-01 03:00:00 │  2.414285714285714 │
│ 1954-01-01 06:00:00 │ 2.3428571428571425 │
│ 1954-01-01 09:00:00 │             2.4375 │
│ 1954-01-01 12:00:00 │ 2.9857142857142853 │
│ 1954-01-01 15:00:00 │ 2.9000000000000004 │
│ 1954-01-01 18:00:00 │               4.75 │
│ 1954-01-01 21:00:00 │                6.3 │
│ 1954-01-02 00:00:00 │             7.2125 │
│ 1954-01-02 03:00:00 │             6.6125 │
│          ·          │                 ·  │
│          ·          │                 ·  │
│          ·          │                 ·  │
│ 1957-06-02 18:00:00 │                6.1 │
│ 1957-06-02 21:00:00 │             3.2125 │
│ 1957-06-03 00:00:00 │               2.95 │
│ 1957-06-03 03:00:00 │ 3.8625000000000003 │
│ 1957-06-03 06:00:00 │ 3.8625000000000003 │
│ 1957-06-

In [2]:
import duckdb

data_path = './metobs_data/'
# year = '1965'
year = '*'
filepattern = f'metobs_{year}_wide.parquet'
file_path = data_path + filepattern

metobs = duckdb.read_parquet(file_path,  union_by_name = True)

query = """
SELECT DATE_TRUNC('hour', observed) AS hour, avg(wind_speed)
FROM metobs
WHERE stationId > 5000 and stationId < 34000
    and wind_speed is not null
--    and observed > '2020-01-01'
GROUP BY hour
ORDER BY hour 
"""
windspeed_dk_avg = duckdb.sql(query)

windspeed_dk_avg.show()


┌─────────────────────┬────────────────────┐
│        hour         │  avg(wind_speed)   │
│      timestamp      │       double       │
├─────────────────────┼────────────────────┤
│ 1954-01-01 00:00:00 │              2.625 │
│ 1954-01-01 03:00:00 │  2.414285714285714 │
│ 1954-01-01 06:00:00 │ 2.3428571428571425 │
│ 1954-01-01 09:00:00 │             2.4375 │
│ 1954-01-01 12:00:00 │ 2.9857142857142853 │
│ 1954-01-01 15:00:00 │                2.9 │
│ 1954-01-01 18:00:00 │               4.75 │
│ 1954-01-01 21:00:00 │                6.3 │
│ 1954-01-02 00:00:00 │             7.2125 │
│ 1954-01-02 03:00:00 │  6.612500000000001 │
│          ·          │                 ·  │
│          ·          │                 ·  │
│          ·          │                 ·  │
│ 1957-06-02 18:00:00 │                6.1 │
│ 1957-06-02 21:00:00 │             3.2125 │
│ 1957-06-03 00:00:00 │               2.95 │
│ 1957-06-03 03:00:00 │             3.8625 │
│ 1957-06-03 06:00:00 │ 3.8625000000000003 │
│ 1957-06-

: 