Download data from DMI:
=======================
https://dmigw.govcloud.dk/v2/metObs/bulk/?api-key=d40321ee-7de5-4417-a0bf-108bd34061ab

https://dmigw.govcloud.dk/v2/climateData/bulk/?api-key=fa9056ec-2f41-4042-828b-91750e966966

This notebook opens and transforms bulk climateData from DMI (Danish Meteorological Institute) into a pandas DataFrame and then saves it as a CSV file for further analysis.

In [1]:
import os
import re
import pandas as pd
import orjson 

In [2]:
# api keys for different DMI services
forecast_api_key = 'e55dced6-70fb-44f1-a658-cdef62c74b6a'
climate_api_key = 'fa9056ec-2f41-4042-828b-91750e966966'
metobs_api_key = 'd40321ee-7de5-4417-a0bf-108bd34061ab'

In [2]:
# bulk loader function for every year
def bulk_load_year(year, file_paths):
    all_records = []
    
    total_files = len(file_paths)
    processed_files = 0

    print(f"\nStarted processing {total_files} files for year {year}.")

    # Process files sequentially
    for file_path in file_paths:
        try:
            with open(file_path, 'rb') as file:  # Open in binary mode for orjson
                for line in file:
                    if line.strip():
                        json_obj = orjson.loads(line)
                        properties = json_obj.get('properties', {})
                        geometry = json_obj.get('geometry', {})

                        # Filter out records where timeResolution is not 'hour'
                        if properties.get('timeResolution') != 'hour':
                            continue

                        cellId = properties.get('cellId')
                        from_time = properties.get('from')
                        to_time = properties.get('to')
                        parameterId = properties.get('parameterId')
                        value = properties.get('value')

                        # Skip records with missing critical data
                        if None in (cellId, from_time, to_time, parameterId, value):
                            continue

                        key = (cellId, from_time, to_time)

                        # Build a record
                        record = {
                            'key': key,
                            'cellId': cellId,
                            'from': from_time,
                            'to': to_time,
                            parameterId: value
                        }

                        # Always add geometry
                        record['geometry_type'] = geometry.get('type')
                        coordinates = geometry.get('coordinates', [[]])
                        flat_coords = coordinates[0] if coordinates else []
                        record['coordinates'] = flat_coords

                        all_records.append(record)

        except Exception as e:
            print(f"Error processing file {file_path}: {e}")

        # Update the processed files counter
        processed_files += 1

        # Print progress every 10 files
        if processed_files % 100 == 0 or processed_files == total_files:
            print(f"Processed {processed_files}/{total_files} files for year {year}.")

    if not all_records:
        print(f"No records were loaded for year {year}. Please check your data and filters.")
        return pd.DataFrame()  # Return an empty DataFrame

    # Aggregate records by key
    aggregated_data = {}
    for record in all_records:
        key = record.pop('key')
        if key not in aggregated_data:
            aggregated_data[key] = record
        else:
            # Update existing record with new parameterId and value
            aggregated_data[key].update(record)

    # Convert the aggregated data into a DataFrame
    df = pd.DataFrame(aggregated_data.values())

    if df.empty:
        print(f"DataFrame is empty after aggregation for year {year}.")
        return df

    # Optimize data types if 'cellId' exists
    if 'cellId' in df.columns:
        df['cellId'] = df['cellId'].astype('category')
    else:
        print(f"Warning: 'cellId' column is missing in the DataFrame for year {year}.")

    if 'from' in df.columns:
        df['from'] = pd.to_datetime(df['from'])
    if 'to' in df.columns:
        df['to'] = pd.to_datetime(df['to'])

    return df


# function to process all in a folder
def process_all_years(folder_path, output_folder_path):
    # Ensure the output folder exists
    os.makedirs(output_folder_path, exist_ok=True)

    # Gather all file paths
    file_paths = [
        os.path.join(folder_path, filename)
        for filename in os.listdir(folder_path)
        if filename.endswith('.txt')
    ]

    # Create a mapping from year to list of file paths
    year_to_files = {}
    for file_path in file_paths:
        filename = os.path.basename(file_path)
        # Extract the year from the filename using regex
        match = re.match(r'(\d{4})-\d{2}-\d{2}\.txt', filename)
        if match:
            year = match.group(1)
            year_to_files.setdefault(year, []).append(file_path)
        else:
            print(f"Filename {filename} does not match expected pattern. Skipping.")

    # Process files for each year
    for year in sorted(year_to_files.keys()):
        files_for_year = year_to_files[year]
        df_year = bulk_load_year(year, files_for_year)

        if not df_year.empty:
            # Construct the output file path
            output_filename = f"{year}.csv"
            output_file_path = os.path.join(output_folder_path, output_filename)

            # Save the DataFrame to a CSV file in the specified output folder
            df_year.to_csv(output_file_path, index=False)
            print(f"Saved data for year {year} to {output_file_path}.")
        else:
            print(f"No data to save for year {year}.")

In [3]:
# paths
folder_path = '/Users/johan/Downloads/20k-load'  # Update with your folder path
output_folder_path = '/Users/johan/Downloads/20k-output'  # Specify your output folder path

# run the function
process_all_years(folder_path, output_folder_path)


Started processing 1 files for year 2010.
Processed 1/1 files for year 2010.
Saved data for year 2010 to /Users/johan/Downloads/20k-output/2010.csv.

Started processing 365 files for year 2011.
Processed 100/365 files for year 2011.
Processed 200/365 files for year 2011.
Processed 300/365 files for year 2011.
Processed 365/365 files for year 2011.
Saved data for year 2011 to /Users/johan/Downloads/20k-output/2011.csv.

Started processing 366 files for year 2012.
Processed 100/366 files for year 2012.
Processed 200/366 files for year 2012.
Processed 300/366 files for year 2012.
Processed 366/366 files for year 2012.
Saved data for year 2012 to /Users/johan/Downloads/20k-output/2012.csv.

Started processing 365 files for year 2013.
Processed 100/365 files for year 2013.
Processed 200/365 files for year 2013.
Processed 300/365 files for year 2013.
Processed 365/365 files for year 2013.
Saved data for year 2013 to /Users/johan/Downloads/20k-output/2013.csv.

Started processing 365 files f