In [1]:
# default_exp mario

# End-to-End Pipeline

<br>

### Imports

In [2]:
#exports
import pandas as pd
import xarray as xr

from satip import eumetsat, reproj, io, gcp_helpers
from dagster import execute_pipeline, pipeline, solid, Field

import os
import glob
import dotenv
import warnings

Downloading: 100%|█████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00,  4.46rows/s]


<br>

### Log Cleaning

We'll suppress some errors/warnings to make the logs easier to parse

In [3]:
#exports
warnings.filterwarnings('ignore', message='divide by zero encountered in true_divide')
warnings.filterwarnings('ignore', message='invalid value encountered in sin')
warnings.filterwarnings('ignore', message='invalid value encountered in cos')
warnings.filterwarnings('ignore', message='invalid value encountered in subtract')
warnings.filterwarnings('ignore', message='You will likely lose important projection information when converting to a PROJ string from another format. See: https://proj.org/faq.html#what-is-the-best-format-for-describing-coordinate-reference-systems')

<br>

### Dagster Pipeline

We're now going to combine these steps into a pipeline using `dagster`, first we'll create the individual components.

In [4]:
#exports
@solid()
def download_eumetsat_files(context, env_vars_fp: str, data_dir: str, metadata_db_fp: str, debug_fp: str, table_id: str, project_id: str, start_date: str='', end_date: str='', max_mins: int=60):
    _ = dotenv.load_dotenv(env_vars_fp)
    
    if start_date == '':
        sql_query = f'select * from {table_id} where result_time = (select max(result_time) from {table_id})'
        
        latest_saved_date = gcp_helpers.query(sql_query, project_id)['result_time'].iloc[0].tz_localize(None)
        earliest_start_date = pd.Timestamp.now() - pd.Timedelta(max_mins, unit='minutes')
        
        start_date = max(earliest_start_date, latest_saved_date).strftime('%Y-%m-%d %H:%M')
        
    if end_date == '':
        end_date = pd.Timestamp.now().strftime('%Y-%m-%d %H:%M')
        
    context.log.info(f'Querying data between {start_date} - {end_date}')

    dm = eumetsat.DownloadManager(os.environ.get('USER_KEY'), os.environ.get('USER_SECRET'), data_dir, metadata_db_fp, debug_fp, slack_webhook_url=os.environ.get('SLACK_WEBHOOK_URL'), slack_id=os.environ.get('SLACK_ID'))
    df_new_metadata = dm.download_date_range(start_date, end_date)

    if df_new_metadata is None:
        df_new_metadata = pd.DataFrame(columns=['result_time', 'file_name'])
    else:
        df_new_metadata = df_new_metadata.iloc[1:] # the first entry is the last one we downloaded
        
    return df_new_metadata

@solid()
def df_metadata_to_dt_to_fp_map(_, df_new_metadata, data_dir: str) -> dict:
    """
    Here we'll then identify downloaded files in 
    the metadata dataframe and return a mapping
    between datetimes and filenames
    """
    
    datetime_to_filename = (df_new_metadata
                            .set_index('result_time')
                            ['file_name']
                            .drop_duplicates()
                            .to_dict()
                           )

    datetime_to_filepath = {
        datetime: f"{data_dir}/{filename}.nat" 
        for datetime, filename 
        in datetime_to_filename.items()
        if filename != {}
    }
    
    return datetime_to_filepath

@solid()
def reproject_datasets(_, datetime_to_filepath: dict, new_coords_fp: str, new_grid_fp: str):
    reprojector = reproj.Reprojector(new_coords_fp, new_grid_fp)

    reprojected_dss = [
        (reprojector
         .reproject(filepath, reproj_library='pyresample')
         .pipe(io.add_constant_coord_to_da, 'time', pd.to_datetime(datetime))
        )
        for datetime, filepath 
        in datetime_to_filepath.items()
    ]

    if len(reprojected_dss) > 0:
        ds_combined_reproj = xr.concat(reprojected_dss, 'time', coords='all', data_vars='all')
        return ds_combined_reproj
    else:
        return xr.Dataset()

@solid()
def compress_and_save_datasets(_, ds_combined_reproj, zarr_bucket: str, var_name: str='stacked_eumetsat_data'):
    # Handle case where no new data exists
    if len(ds_combined_reproj.dims) == 0:
        return
    
    # Compressing the datasets
    compressor = io.Compressor()

    var_name = var_name
    da_compressed = compressor.compress(ds_combined_reproj[var_name])

    # Saving to Zarr
    ds_compressed = io.save_da_to_zarr(da_compressed, zarr_bucket)
    
    return ds_compressed

@solid()
def save_metadata(context, ds_combined_compressed, df_new_metadata, table_id: str, project_id: str):
    if ds_combined_compressed is not None:
        if df_new_metadata.shape[0] > 0:
            gcp_helpers.write_metadata_to_gcp(df_new_metadata, table_id, project_id, append=True)
            context.log.info(f'{df_new_metadata.shape[0]} new metadata entries were added')
        else:
            context.log.info('No metadata was available to be added')
            
    return True

@solid()
def compress_export_then_delete_raw(context, ds_combined_compressed, data_dir: str, compressed_dir: str, BUCKET_NAME: str='solar-pv-nowcasting-data', PREFIX: str='satellite/EUMETSAT/SEVIRI_RSS/native/', ready_to_delete: bool=True):
    if ready_to_delete == True:
        eumetsat.compress_downloaded_files(data_dir=data_dir, compressed_dir=compressed_dir, log=context.log)
        eumetsat.upload_compressed_files(compressed_dir, BUCKET_NAME=BUCKET_NAME, PREFIX=PREFIX, log=None)
        
        for dir_ in [data_dir, compressed_dir]:
            files = glob.glob(f'{dir_}/*')
            
            for f in files:
                try:
                    os.remove(f)
                except:
                    context.log.info(f'File path {f} was not removed.')

<br>

Then we'll combine them in a pipeline

In [5]:
#exports
@pipeline
def download_latest_data_pipeline(): 
    df_new_metadata = download_eumetsat_files()
    datetime_to_filepath = df_metadata_to_dt_to_fp_map(df_new_metadata)
    ds_combined_reproj = reproject_datasets(datetime_to_filepath)
    ds_combined_compressed = compress_and_save_datasets(ds_combined_reproj)
    
    ready_to_delete = save_metadata(ds_combined_compressed, df_new_metadata)
    compress_export_then_delete_raw(ready_to_delete)

<br>

Which we'll now run a test with

In [6]:
run_config = {
    'solids': {
        'download_eumetsat_files': {
            'inputs': {
                'env_vars_fp': "../.env",
                'data_dir': "../data/raw",
                'metadata_db_fp': "../data/EUMETSAT_metadata.db",
                'debug_fp': "../logs/EUMETSAT_download.txt",
                'table_id': "eumetsat.metadata",
                'project_id': "solar-pv-nowcasting",
                'start_date': "",
                'end_date': ""
            },
        },
        'df_metadata_to_dt_to_fp_map': {
            'inputs': {
                'data_dir': "../data/raw"
            }
        },
        'reproject_datasets': {
            'inputs': {
                'new_coords_fp': "../data/intermediate/reproj_coords_TM_4km.csv",
                'new_grid_fp': "../data/intermediate/new_grid_4km_TM.json"
            }
        },
        'compress_and_save_datasets': {
            'inputs': {
                'zarr_bucket': "solar-pv-nowcasting-data/satellite/EUMETSAT/SEVIRI_RSS/full_extent_TM_int16",
                'var_name': "stacked_eumetsat_data"
            }
        },
        'save_metadata': {
            'inputs': {
                'table_id': "eumetsat.metadata",
                'project_id': "solar-pv-nowcasting"
            },
        },
        'compress_export_then_delete_raw': {
            'inputs': {
                'data_dir': "../data/raw",
                'compressed_dir': "../data/compressed",
                'BUCKET_NAME': "solar-pv-nowcasting-data",
                'PREFIX': "satellite/EUMETSAT/SEVIRI_RSS/native/",
                'ready_to_delete': True
            },
        }
    }
}

execute_pipeline(download_latest_data_pipeline, run_config=run_config)

2021-01-21 22:33:00 - dagster - DEBUG - download_latest_data_pipeline - d3e2e4f6-c6a3-4ee2-af3a-2570ac7c607a - 1108 - ENGINE_EVENT - Starting initialization of resources [asset_store].
2021-01-21 22:33:00 - dagster - DEBUG - download_latest_data_pipeline - d3e2e4f6-c6a3-4ee2-af3a-2570ac7c607a - 1108 - ENGINE_EVENT - Finished initialization of resources [asset_store].
2021-01-21 22:33:00 - dagster - DEBUG - download_latest_data_pipeline - d3e2e4f6-c6a3-4ee2-af3a-2570ac7c607a - 1108 - PIPELINE_START - Started execution of pipeline "download_latest_data_pipeline".
2021-01-21 22:33:00 - dagster - DEBUG - download_latest_data_pipeline - d3e2e4f6-c6a3-4ee2-af3a-2570ac7c607a - 1108 - ENGINE_EVENT - Executing steps in process (pid: 1108)
2021-01-21 22:33:00 - dagster - DEBUG - download_latest_data_pipeline - d3e2e4f6-c6a3-4ee2-af3a-2570ac7c607a - 1108 - download_eumetsat_files.compute - STEP_START - Started execution of step "download_eumetsat_files.compute".
2021-01-21 22:33:00 - dagster - DE

2021-01-21 22:34:10 - dagster - DEBUG - download_latest_data_pipeline - d3e2e4f6-c6a3-4ee2-af3a-2570ac7c607a - 1108 - download_eumetsat_files.compute - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2021-01-21 22:34:10 - dagster - DEBUG - download_latest_data_pipeline - d3e2e4f6-c6a3-4ee2-af3a-2570ac7c607a - 1108 - download_eumetsat_files.compute - OBJECT_STORE_OPERATION - Stored intermediate object for output result in memory object store using pickle.
2021-01-21 22:34:10 - dagster - DEBUG - download_latest_data_pipeline - d3e2e4f6-c6a3-4ee2-af3a-2570ac7c607a - 1108 - download_eumetsat_files.compute - STEP_SUCCESS - Finished execution of step "download_eumetsat_files.compute" in 1m10s.
2021-01-21 22:34:10 - dagster - DEBUG - download_latest_data_pipeline - d3e2e4f6-c6a3-4ee2-af3a-2570ac7c607a - 1108 - df_metadata_to_dt_to_fp_map.compute - STEP_START - Started execution of step "df_metadata_to_dt_to_fp_map.compute".
2021-01-21 22:34:10 - dagster - DEBUG - dow

10 rows written to BQ eumetsat.metadata, append=True
Found 20 native files.


2021-01-21 22:46:49 - dagster - ERROR - download_latest_data_pipeline - d3e2e4f6-c6a3-4ee2-af3a-2570ac7c607a - 1108 - compress_export_then_delete_raw.compute - STEP_FAILURE - Execution of step "compress_export_then_delete_raw.compute" failed.

FileNotFoundError: [WinError 2] The system cannot find the file specified

  File "C:\Users\Ayrto\anaconda3\envs\satip_dev\lib\site-packages\dagster\core\errors.py", line 180, in user_code_error_boundary
    yield
  File "C:\Users\Ayrto\anaconda3\envs\satip_dev\lib\site-packages\dagster\core\execution\plan\execute_step.py", line 475, in _user_event_sequence_for_step_compute_fn
    for event in iterate_with_context(raise_interrupts_immediately, gen):
  File "C:\Users\Ayrto\anaconda3\envs\satip_dev\lib\site-packages\dagster\utils\__init__.py", line 443, in iterate_with_context
    next_output = next(iterator)
  File "C:\Users\Ayrto\anaconda3\envs\satip_dev\lib\site-packages\dagster\core\execution\plan\compute.py", line 105, in _execute_core_compute

FileNotFoundError: [WinError 2] The system cannot find the file specified

In [None]:
#exports
@solid()
def download_missing_eumetsat_files(context, env_vars_fp: str, data_dir: str, metadata_db_fp: str, debug_fp: str, table_id: str, project_id: str, start_date: str='', end_date: str=''):
    _ = dotenv.load_dotenv(env_vars_fp)
    dm = eumetsat.DownloadManager(os.environ.get('USER_KEY'), os.environ.get('USER_SECRET'), data_dir, metadata_db_fp, debug_fp, slack_webhook_url=os.environ.get('SLACK_WEBHOOK_URL'), slack_id=os.environ.get('SLACK_ID'))
    
    missing_datasets = io.identifying_missing_datasets(start_date, end_date)
    df_new_metadata = dm.download_datasets(missing_datasets)
    
    if df_new_metadata is None:
        df_new_metadata = pd.DataFrame(columns=['result_time', 'file_name'])
    else:
        df_new_metadata = df_new_metadata.iloc[1:] # the first entry is the last one we downloaded
    
    return df_new_metadata

In [9]:
#hide
@pipeline
def download_missing_data_pipeline():  
    # Retrieving data, reprojecting, compressing, and saving to GCP
    df_new_metadata = download_missing_eumetsat_files()
    datetime_to_filepath = df_metadata_to_dt_to_fp_map(df_new_metadata)
    ds_combined_reproj = reproject_datasets(datetime_to_filepath)
    ds_combined_compressed = compress_and_save_datasets(ds_combined_reproj)
    
    save_metadata(ds_combined_compressed, df_new_metadata)
    compress_export_then_delete_raw(ds_combined_compressed)

In [10]:
#hide
if 'download_eumetsat_files' in run_config['solids'].keys():
    run_config['solids']['download_missing_eumetsat_files'] = run_config['solids']['download_eumetsat_files']
    run_config['solids'].pop('download_eumetsat_files')

# execute_pipeline(download_missing_data_pipeline, run_config=run_config)

In [1]:
#hide
from nbdev.export import *
notebook2script()

Converted 00_utils.ipynb.
Converted 01_eumetsat.ipynb.
Converted 02_reprojection.ipynb.
Converted 03_zarr.ipynb.
Converted 04_gcp.ipynb.
Converted 05_pipeline.ipynb.
Converted 06-ci-cd.ipynb.
Converted 101_downloading.ipynb.
Converted 102_reprojecting.ipynb.
Converted 103_loading.ipynb.
