# Demonstrate parallel IO with multiprocessing.Pool in Python

## Introduction

You can use processes for IO-bound tasks, although threads may be a better fit.

An IO-bound task is a type of task that involves reading from or writing to a device, file, or socket connection.

The operations involve input and output (IO) and the speed of these operations is bound by the device, hard drive, or network connection. This is why these tasks are referred to as IO-bound.

CPUs are really fast. Modern CPUs like a 4GHz can execute 4 billion instructions per second, and you likely have more than one CPU in your system.

Doing IO is very slow compared to the speed of CPUs.

Interacting with devices, reading and writing files, and socket connections involves calling instructions in your operating system (the kernel), which will wait for the operation to complete. If this operation is the main focus for your CPU, such as executing in the main thread of your Python program, then your CPU is going to wait many milliseconds or even many seconds doing nothing.

That is potentially billions of operations that it is prevented from executing.

We can free-up the CPU from IO-bound operations by performing IO-bound operations on another process of execution. This allows the CPU to start the task and pass it off to the operating system (kernel) to do the waiting, and free it up to execute in another application process.

There's more to it under the covers, but this is the gist.

Therefore, the tasks we execute with a multiprocessing.Pool can be tasks that involve IO operations.

Examples include:

* Reading or writing a file from the hard drive.
* Reading or writing to standard output, input, or error (stdin, stdout, stderr).
* Printing a document.
* Downloading or uploading a file.
* Querying a server.
* Querying a database.
* Taking a photo or recording a video.

And so much more.

Source: https://superfastpython.com/multiprocessing-pool-python/.

## Objective

Write a multi-process program to download and process ERA5 data from the CDS API of [https://confluence.ecmwf.int/display/CKB/Please+read%3A+CDS+and+ADS+migrating+to+new+infrastructure%3A+Common+Data+Store+%28CDS%29+Engine](ECMWF). Note that this API has recently been upgraded - please check for updates on the documentation page.

The final goal of this programming activity shall be to establish a database of extreme weather events diagnosed from the ERA5 dataset. For this, we need to download many ERA5 files and inspect their content. We will run a couple of diagnostic tests to identify extreme events and save these events in a database. In this program, we only experiment with using a multiprocessing.Pool for parallel file downloads from the API. You can change the number of parallel processes and compare results.

## Preparation

Create a new CDS-Beta account and accept CDS-Beta Terms & Conditions at [https://cds-beta.climate.copernicus.eu/](CDS-Beta).

Download the data from https://raw.githubusercontent.com.
In a terminal window type 
```wget https://raw.githubusercontent.com/SuperFastPython/DataSets/main/bin/1m_words.txt.zip```
Then unzip the file with `unzip 1m_words.txt.zip`
You can remove the `__MACOS/` folder afterwards.

In [None]:
from multiprocessing.pool import ThreadPool    
# ThreadPool is better for IO than (process)Pool; see https://superfastpython.com/multiprocessing-pool-python/
# Threadpools can manage thousands of threads and they use shared memory, i.e. no data needs to be pickled for exchange between tasks



In [None]:
# how to obtain process and task name and number
from multiprocessing import current_process
from multiprocessing.threading import current_thread

def get_process_and_thread():
    print(f'Current process {current_process()}, current thread {current_thread()}')
    

In [None]:
# print number of available CPUs
print(os.cpu_count())
# set number of processes for multiprocessing pool
processes = 8

In [None]:
# set up CDS API 
import cdsapi

c = cdsapi.Client()

In [None]:
# define variables of interest and their grib codes
# ***ToDo***: replace with correct grib codes and extend
vars = {'temperature': 130,
        'specific humidity': 111,
        'surface pressure': 111,
        '10-m wind speed': 111}

In [None]:
# define MARS request for ERA5 retrieval
# *** ToDo *** use templates for date, variable, ...
# ***ToDo***: distinguish between 3d and surface variables
thisvar = 'temperature'
firstdate = '2023-01-01'
lastdate = '2023-01-31'
c.retrieve("reanalysis-era5-complete", {
    "class": "ea",
    "date": f"{firstdate}/to/{lastdate}",
    "expver": "1",
    "levelist": "137",
    "levtype": "ml",
    "param": f"{vars[thisvar]}",
    "step": "0",
    "stream": "oper",
    "time": "09:00:00/21:00:00",
    "type": "4v",
    "format": "netcdf"
}, f"era5_{thisvar}_{firstdate}_{lastdate}.nc")

# see CDS and ERA5 documentation...

In [None]:
# define processing list (dates and/or variables)

In [None]:
# set up MultiThread pool and execute processes