# A workflow for calculating SHARP keywords directly from magnetic field maps using Dask

The SHARP data series include patches of vector magnetic field data, or all three components of the surface magnetic field, taken by the Helioseismic and Magnetic Imager (HMI) instrument on NASA's Solar Dynamics Observatory (SDO) satellite. These patches encapsulate automatically-detected active regions that are tracked throughout the entirety of their disk passage. A bitmap array, called `bitmap`, describes whether any given pixel in any array is part of an active region. Another bitmap, called `conf_disambig`, describes the confidence in the azimuthal component of the magnetic field vector. The SHARP data seres also include keywords, as metadata, that describe various physical parameters of solar active regions; for example, the total unsigned flux within an active region. 

In this notebook, we will calculate these SHARP keywords directly from the magnetic field maps using Dask.

First, we import some modules (note that this assumes that [dask-jobqueue](https://jobqueue.dask.org/en/latest/install.html) is already installed and the [Jupyter notebook is properly configured for interactive use](https://jobqueue.dask.org/en/latest/interactive.html)).

In [1]:
import dask
import drms
import numpy as np
import math
from astropy.io import fits
from datetime import datetime as dt_obj
from dask.distributed import Client
import dask.array as da
import calculate_swx_fits as swx

Define some constants useful for calculating space weather keywords:

In [2]:
radsindeg = np.pi/180.
munaught  = 0.0000012566370614

Now, start a Dask client. In this case, we're running Dask locally on a personal machine, so [scheduling](https://docs.dask.org/en/latest/scheduling.html#) is relatively straightforward. The Dask client starts up a dashboard for monitoring the machine and computational processes.

In [3]:
dask_client = Client()
dask_client

0,1
Client  Scheduler: tcp://127.0.0.1:58113  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 17.18 GB


Fetch the magnetic field image data using SunPy affiliated package [`drms`](https://joss.theoj.org/papers/10.21105/joss.01614).

In [4]:
drms_client = drms.Client()

In [5]:
keys, segments = drms_client.query('hmi.sharp_cea_720s[377][2011.02.14_15:00:00/12h]', key='T_REC, CDELT1, RSUN_REF, RSUN_OBS, DSUN_OBS, USFLUX, ERRVF, CMASK', seg='Br, Bp, Bt, Br_err, Bp_err, Bt_err, bitmap, conf_disambig')

In [6]:
def parse_tai_string(tstr,datetime=True):
    year   = int(tstr[:4])
    month  = int(tstr[5:7])
    day    = int(tstr[8:10])
    hour   = int(tstr[11:13])
    minute = int(tstr[14:16])
    if datetime: return dt_obj(year,month,day,hour,minute)
    else: return year,month,day,hour,minute

In [7]:
t_rec = np.array([parse_tai_string(keys.T_REC[i],datetime=True) for i in range(keys.T_REC.size)])

As an example, read one magnetic field map in as a Dask array!

In [8]:
url = 'http://jsoc.stanford.edu' + segments.Br[0]
bz = da.from_array(fits.getdata(url))
bz

Unnamed: 0,Array,Chunk
Bytes,2.24 MB,2.24 MB
Shape,"(377, 744)","(377, 744)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 2.24 MB 2.24 MB Shape (377, 744) (377, 744) Count 1 Tasks 1 Chunks Type float64 numpy.ndarray",744  377,

Unnamed: 0,Array,Chunk
Bytes,2.24 MB,2.24 MB
Shape,"(377, 744)","(377, 744)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray


Since Dask is lazy by default, the `compute()` function retrieves the actual value of an an element:

In [9]:
bz[100,100].compute()

9.77

This is how it would work with NumPy:

In [10]:
url = 'http://jsoc.stanford.edu' + segments.Br[0]
bznp = fits.getdata(url)
bznp

array([[-23.84,   7.42,  23.81, ...,  13.79,  35.23,  43.58],
       [-17.9 ,  21.67,   4.38, ...,  31.41,   2.04,  23.05],
       [-13.69,   1.94,   4.94, ...,  10.08,   6.28,  12.05],
       ...,
       [ -7.47, -10.49,  -4.65, ..., -11.1 ,  -9.18,  -7.49],
       [ -5.45,  -5.55,   2.87, ...,  -1.37, -15.99,   3.44],
       [ -0.82,   4.75,   4.35, ...,  -0.68,  14.3 ,  -4.3 ]])

In [11]:
bznp[100,100]

9.77

The function `get_data_for_a_single_trec_dask()` retreives the data as Dask arrays; the following function, `get_data_for_a_single_trec_numpy()` retreives the data as a numpy array.

In [12]:
def get_data_for_a_single_trec_dask(i):
    url = 'http://jsoc.stanford.edu' + segments.Br[i]
    bz = da.from_array(fits.getdata(url))
    url = 'http://jsoc.stanford.edu' + segments.Br_err[i]
    bz_err = da.from_array(fits.getdata(url))
    url = 'http://jsoc.stanford.edu' + segments.conf_disambig[i]
    conf_disambig = da.from_array(fits.getdata(url))
    url = 'http://jsoc.stanford.edu' + segments.bitmap[i]
    bitmap = da.from_array(fits.getdata(url))
    nx = bz.shape[1]
    ny = bz.shape[0]
    rsun_ref = keys.RSUN_REF[i]
    rsun_obs = keys.RSUN_OBS[i]
    cdelt1 = keys.CDELT1[i]
    dsun_obs = keys.DSUN_OBS[i]
    cdelt1_arcsec = (math.atan((rsun_ref*cdelt1*radsindeg)/(dsun_obs)))*(1/radsindeg)*(3600.)
    return bz, bz_err, conf_disambig, bitmap, nx, ny, rsun_ref, rsun_obs, cdelt1_arcsec

In [13]:
def get_data_for_a_single_trec_numpy(i):
    url = 'http://jsoc.stanford.edu' + segments.Br[i]
    bz = (fits.getdata(url))
    url = 'http://jsoc.stanford.edu' + segments.Br_err[i]
    bz_err = (fits.getdata(url))
    url = 'http://jsoc.stanford.edu' + segments.conf_disambig[i]
    conf_disambig = (fits.getdata(url))
    url = 'http://jsoc.stanford.edu' + segments.bitmap[i]
    bitmap = (fits.getdata(url))
    nx = bz.shape[1]
    ny = bz.shape[0]
    rsun_ref = keys.RSUN_REF[i]
    rsun_obs = keys.RSUN_OBS[i]
    cdelt1 = keys.CDELT1[i]
    dsun_obs = keys.DSUN_OBS[i]
    cdelt1_arcsec = (math.atan((rsun_ref*cdelt1*radsindeg)/(dsun_obs)))*(1/radsindeg)*(3600.)
    return bz, bz_err, conf_disambig, bitmap, nx, ny, rsun_ref, rsun_obs, cdelt1_arcsec

Get the data for a single `T_REC` using both functions: 

In [14]:
bz, bz_err, conf_disambig, bitmap, nx, ny, rsun_ref, rsun_obs, cdelt1_arcsec = get_data_for_a_single_trec_dask(0)
bznp, bz_errnp, conf_disambignp, bitmapnp, nxnp, nynp, rsun_refnp, rsun_obsnp, cdelt1_arcsecnp = get_data_for_a_single_trec_numpy(0)

Now we compute the total unsigned flux using Dask arrays and NumPy arrays. The function we imported as `swx.compute_abs_flux` uses NumPy arrays. The function `compute_abs_flux_dask()` computes the flux using Dask arrays. We will start with NumPy:

In [15]:
# Show that we are working with numpy arrays
bznp

array([[-23.84,   7.42,  23.81, ...,  13.79,  35.23,  43.58],
       [-17.9 ,  21.67,   4.38, ...,  31.41,   2.04,  23.05],
       [-13.69,   1.94,   4.94, ...,  10.08,   6.28,  12.05],
       ...,
       [ -7.47, -10.49,  -4.65, ..., -11.1 ,  -9.18,  -7.49],
       [ -5.45,  -5.55,   2.87, ...,  -1.37, -15.99,   3.44],
       [ -0.82,   4.75,   4.35, ...,  -0.68,  14.3 ,  -4.3 ]])

In [16]:
%%time
mean_vfnp, mean_vf_errnp, count_masknp  = swx.compute_abs_flux(bznp, bz_errnp, conf_disambignp, bitmapnp, nxnp, nynp, rsun_refnp, rsun_obsnp, cdelt1_arcsecnp)

CPU times: user 893 ms, sys: 11.2 ms, total: 904 ms
Wall time: 899 ms


In [17]:
print(mean_vfnp, mean_vf_errnp, count_masknp)

2.42917544543274e+22 5.809018927454359e+18 40397


This gives the same result as the C code running in the JSOC pipeline:

In [18]:
print(keys.USFLUX[0],keys.ERRVF[0],keys.CMASK[0])

2.429176e+22 5.80902e+18 40397.0


Now use Dask. *Note: Something is wrong with this function because it takes forever. Or maybe nothing is wrong with it, but it is just a bad idea to use Dask with for loops.*

In [21]:
def compute_abs_flux_dask(bz, bz_err, conf_disambig, bitmap, nx, ny, rsun_ref, rsun_obs, cdelt1_arcsec):

    """function: compute_abs_flux
    This function computes the total unsigned flux in units of G/cm^2.
    It also returns the number of pixels used in this calculation in the keyword CMASK.
    
    To compute the unsigned flux, we simply calculate
       flux = surface integral [(vector Bz) dot (normal vector)],
            = surface integral [(magnitude Bz)*(magnitude normal)*(cos theta)].
    However, since the field is radial, we will assume cos theta = 1.
    Therefore, the pixels only need to be corrected for the projection.
    To convert G to G*cm^2, simply multiply by the number of square centimeters per pixel: 
       (Gauss/pix^2)(CDELT1)^2(RSUN_REF/RSUN_OBS)^2(100.cm/m)^2
       =Gauss*cm^2
    """

    count_mask = 0
    sum        = 0.0
    err        = 0.0
    
    for j in range(ny):
        for i in range(nx):
            if ( conf_disambig[j,i].compute() < 70 or bitmap[j,i].compute() < 30 ):
                continue
            if np.isnan(bz[j,i].compute()):
                continue
            sum += abs(bz[j,i].compute())
            err += bz_err[j,i].compute()*bz_err[j,i].compute();
            count_mask += 1

    mean_vf     = sum*cdelt1_arcsec*cdelt1_arcsec*(rsun_ref/rsun_obs)*(rsun_ref/rsun_obs)*100.0*100.0
    mean_vf_err = (np.sqrt(err))*abs(cdelt1_arcsec*cdelt1_arcsec*(rsun_ref/rsun_obs)*(rsun_ref/rsun_obs)*100.0*100.0)

    return [mean_vf, mean_vf_err, count_mask]

In [22]:
# Show that we are working with dask arrays
bz

Unnamed: 0,Array,Chunk
Bytes,2.24 MB,2.24 MB
Shape,"(377, 744)","(377, 744)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 2.24 MB 2.24 MB Shape (377, 744) (377, 744) Count 1 Tasks 1 Chunks Type float64 numpy.ndarray",744  377,

Unnamed: 0,Array,Chunk
Bytes,2.24 MB,2.24 MB
Shape,"(377, 744)","(377, 744)"
Count,1 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [23]:
mean_vf, mean_vf_err, count_mask  = compute_abs_flux_dask(bz, bz_err, conf_disambig, bitmap, nx, ny, rsun_ref, rsun_obs, cdelt1_arcsec)



KeyboardInterrupt: 