In [None]:
import os
import sunpy.map
import numpy as np
import pandas as pd
import plotly.io as pio
from sunpy.net import Fido, attrs as a
from datetime import datetime, timedelta

import astropy.units as u
from astropy.coordinates import SkyCoord
from matplotlib import colormaps, pyplot as plt

import prepare_data
import detect
import plot_detection
from settings import *


pio.renderers.default = 'vscode'
%matplotlib inline

# Data Inspection

## Prepare Data

Extract Data from File System

In [None]:
# Extract He I observation datetimes from FITS files
HE_DATE_LIST = prepare_data.get_fits_date_list(
    DATE_RANGE, ALL_HE_DIR, SELECT_HE_DIR
)

# Extract magnetogram datetimes from 6302l FITS files
MAG_DATE_LIST = prepare_data.get_fits_date_list(
    DATE_RANGE, ALL_MAG_DIR, SELECT_MAG_DIR
)

# Extract EUV datetimes from FITS files
EUV_DATE_LIST = prepare_data.get_fits_date_list(
    DATE_RANGE, ALL_EUV_DIR, SELECT_EUV_DIR
)

if HE_DATE_LIST or EUV_DATE_LIST:
    date_strs = [HE_DATE_LIST[0], HE_DATE_LIST[-1]]
    file_date_str = f'{date_strs[0]}_{date_strs[-1]}'

    num_maps = len(HE_DATE_LIST)
    datetimes = [datetime.strptime(date_str, DICT_DATE_STR_FORMAT)
                for date_str in date_strs]
    title_date_strs = [datetime.strftime(d, '%m/%d/%Y') for d in datetimes]
    DATE_RANGE_SUPTITLE = (f'{num_maps} Maps Evaluated from '
                        + f'{title_date_strs[0]} to {title_date_strs[-1]}')
else:
    print('No data is available for the configured date range.')

## EUV Download

Obtain dates to download

In [None]:
# Identify datetimes to download
# Idenitfy He I observation datetimes
available_he_dates = [
    datetime.strptime(date_str, DICT_DATE_STR_FORMAT).date()
    for date_str in HE_DATE_LIST
]

# Identify missing dates between He I observations with a place holder hour
days_in_period = (available_he_dates[-1] - available_he_dates[0]).days
all_period_dates = set(available_he_dates[0] + timedelta(num_days)
                       for num_days in range(days_in_period + 1))
missing_he_dates = all_period_dates - set(available_he_dates)
missing_date_str_list = [
    datetime.strftime(missing_date, DICT_DATE_STR_FORMAT)
    for missing_date in missing_he_dates
]
missing_date_str_list = [
    missing_date_str[:12] + '16_00'
    for missing_date_str in missing_date_str_list
]

all_period_date_str_list = HE_DATE_LIST + missing_date_str_list
all_period_date_str_list.sort()

# Identify datetimes to download EUV that have not yet been downloaded
available_euv_dates = [
    datetime.strptime(date_str, DICT_DATE_STR_FORMAT).date()
    for date_str in EUV_DATE_LIST
]
download_euv_date_list = [
    date_str for date_str, available_date
    in zip(all_period_date_str_list, all_period_dates)
    if available_date not in available_euv_dates
]

In [None]:
print('Datetimes to Download for EUV Observations:')
prepare_data.display_dates(download_euv_date_list)

Automatic

In [None]:
prepare_data.download_euv(
    download_euv_date_list, EUV_DATE_LIST, sat='SOHO',
    output_dir=ALL_EUV_DIR, hr_window=2
)

Manual

In [None]:
sat = 'SOHO'
euv_date_str = '2003_07_28__17_58'
hr_window = 2

center_date = datetime.strptime(euv_date_str, DICT_DATE_STR_FORMAT)
        
min_date = center_date - timedelta(hours=hr_window)
max_date = center_date + timedelta(hours=hr_window)
time_range = a.Time(min_date, max_date)
cadence = a.Sample(30*u.minute)

if sat == 'SOHO':
    result = Fido.search(
        time_range,
        a.Instrument.eit, a.Wavelength(195*u.angstrom),
        cadence
    )
else: # sat == SDO
    result = Fido.search(
        time_range,
        a.Instrument.aia, a.Wavelength(193*u.angstrom),
        cadence
    )
result

In [None]:
row_num = -1

Fido.fetch(result[:, row_num], path=ALL_EUV_DIR+'{file}')

## Rename Data

In [None]:
# Delete all gzipped files after renaming
remove_gzip = True

# Rename all He, magnetogram, and EUV FITS files to include
# observation date in title
prepare_data.rename_dir(ALL_HE_DIR, remove_gzip)
prepare_data.rename_dir(ALL_MAG_DIR, remove_gzip)
prepare_data.rename_dir(ALL_EUV_DIR, remove_gzip)

## Available Data

In [None]:
print(f'{len(HE_DATE_LIST)} Available Datetimes for He I Observations:')
prepare_data.display_dates(HE_DATE_LIST)

In [None]:
print(f'{len(MAG_DATE_LIST)} Available Datetimes for Magnetograms:')
prepare_data.display_dates(MAG_DATE_LIST)

In [None]:
print(f'{len(EUV_DATE_LIST)} Available Datetimes for EUV Observations:')
prepare_data.display_dates(EUV_DATE_LIST)

In [None]:
print('He I Observation Coverage: '
      f'{len(HE_DATE_LIST)/len(EUV_DATE_LIST)*100:.1f}%')

Observation Comparison: He, Mag, EUV

In [None]:
smooth_size_percent = 10

for he_date_str in HE_DATE_LIST[:1]:
    # he_date_str = '2003_07_14__18_07'

    mag_date_str = prepare_data.get_nearest_date_str(
        MAG_DATE_LIST, selected_date_str=he_date_str
    )
    euv_date_str = prepare_data.get_nearest_date_str(
        date_str_list=EUV_DATE_LIST, selected_date_str=he_date_str
    )

    he_fits_file = prepare_data.get_fits_path(
        he_date_str, DATE_RANGE, ALL_HE_DIR, SELECT_HE_DIR
    )
    mag_fits_file = prepare_data.get_fits_path(
        mag_date_str, DATE_RANGE, ALL_MAG_DIR, SELECT_MAG_DIR
    )
    euv_fits_file = prepare_data.get_fits_path(
        euv_date_str, DATE_RANGE, ALL_EUV_DIR, SELECT_EUV_DIR
    )
    he_map = prepare_data.get_nso_sunpy_map(he_fits_file)
    mag_map = prepare_data.get_nso_sunpy_map(mag_fits_file)
    euv_map = sunpy.map.Map(euv_fits_file)

    # Process magnetogram
    smoothed_mag_map = prepare_data.get_smoothed_map(mag_map, smooth_size_percent)

    fig = plt.figure(figsize=(18, 5))
    
    plot_detection.plot_he_map(fig, (1, 3, 1), he_map, he_date_str)

    ax = fig.add_subplot(132, projection=mag_map)
    mag_map.plot(axes=ax, vmin=-50, vmax=50, title=mag_date_str)
    plot_detection.plot_map_contours(ax, smoothed_mag_map)
    
    plot_detection.plot_euv_map(fig, (1, 3, 3), euv_map, euv_date_str)

# Data Products

## Pre-Processed Files

### Pre-Processed Maps

v0.1-v0.4

In [None]:
overwrite = False

if not os.path.isdir(PREPROCESS_SAVE_DIR):
    os.makedirs(PREPROCESS_SAVE_DIR)

for he_date_str in HE_DATE_LIST:
    
    # Optionally overwrite existing files
    pre_process_file = (PREPROCESS_SAVE_DIR + he_date_str
                        + '_pre_processed_map.npy')
    if os.path.isfile(pre_process_file) and not overwrite:
        print((f'He {he_date_str} pre-processed map already exists.'))
        continue
    
    he_fits_file = prepare_data.get_fits_path(
        he_date_str, DATE_RANGE, ALL_HE_DIR, SELECT_HE_DIR
    ) 
    he_map_data = prepare_data.get_image_from_fits(he_fits_file)
    
    # pre_processed_map_data = detect.pre_process_v0_1(he_map_data)[0]
    pre_processed_map_data = detect.pre_process_v0_4(he_map_data)
    
    save_list = [he_date_str, pre_processed_map_data]
    np.save(pre_process_file, np.array(save_list, dtype=object), 
            allow_pickle=True)
    print(f'{he_date_str} Pre-Processed Map Saved')

v0.5.1

In [None]:
overwrite = True

if not os.path.isdir(PREPROCESS_MAP_SAVE_DIR):
    os.makedirs(PREPROCESS_MAP_SAVE_DIR)

for he_date_str in HE_DATE_LIST:
    
    # Optionally overwrite existing files
    pre_process_file = (PREPROCESS_MAP_SAVE_DIR + he_date_str
                        + '_pre_processed_map.fits')
    if os.path.isfile(pre_process_file) and not overwrite:
        print((f'He {he_date_str} pre-processed map already exists.'))
        continue
    
    he_fits_file = prepare_data.get_fits_path(
        he_date_str, DATE_RANGE, ALL_HE_DIR, SELECT_HE_DIR
    )
    he_map = prepare_data.get_nso_sunpy_map(he_fits_file)
    
    pre_processed_map = detect.pre_process_v0_5_1(he_map)
    
    pre_processed_map.save(pre_process_file, overwrite=overwrite)
    print(f'{he_date_str} Pre-Processed Map Saved')

vY

In [None]:
overwrite = False

if not os.path.isdir(PREPROCESS_MAP_SAVE_DIR):
    os.makedirs(PREPROCESS_MAP_SAVE_DIR)

for he_date_str in HE_DATE_LIST:
    
    # Optionally overwrite existing files
    pre_process_file = (PREPROCESS_MAP_SAVE_DIR + he_date_str
                        + '_pre_processed_map.fits')
    if os.path.isfile(pre_process_file) and not overwrite:
        print((f'He {he_date_str} pre-processed map already exists.'))
        continue
    
    he_fits_file = prepare_data.get_fits_path(
        he_date_str, DATE_RANGE, ALL_HE_DIR, SELECT_HE_DIR
    )
    he_map = prepare_data.get_nso_sunpy_map(he_fits_file)
    
    pre_processed_map = detect.pre_process_vY(he_map)
    
    pre_processed_map.save(pre_process_file, overwrite=overwrite)
    print(f'{he_date_str} Pre-Processed Map Saved')

EUV Ratio

In [None]:
overwrite = False

if not os.path.isdir(PREPROCESS_SAVE_DIR):
    os.makedirs(PREPROCESS_SAVE_DIR)

for he_date_str in HE_DATE_LIST:
    
    euv_date_str = prepare_data.get_nearest_date_str(
        EUV_DATE_LIST, selected_date_str=he_date_str
    )
    
    # Optionally overwrite existing files
    pre_process_file = (PREPROCESS_SAVE_DIR + he_date_str
                        + '_pre_processed_map.npy')
    if os.path.isfile(pre_process_file) and not overwrite:
        print((f'He {he_date_str} pre-processed map already exists.'))
        continue

    ratio_fits_file = f'{RATIO_SAVE_DIR}He{he_date_str}_EUV{euv_date_str}.fits'

    ratio_map_data = prepare_data.get_image_from_fits(ratio_fits_file)
    
    # pre_processed_map_data = detect.pre_process_v0_1(ratio_map_data)[0]
    pre_processed_map_data = detect.pre_process_v0_4(ratio_map_data)
    
    save_list = [he_date_str, pre_processed_map_data]
    np.save(pre_process_file, np.array(save_list, dtype=object), 
            allow_pickle=True)
    print(f'{he_date_str} Pre-Processed Map Saved')

### Reprojected Magnetograms

Differential rotation

In [None]:
overwrite = False
smooth_size_percent = 10

if not os.path.isdir(ROTATED_MAG_SAVE_DIR):
    os.makedirs(ROTATED_MAG_SAVE_DIR)

for he_date_str in HE_DATE_LIST:

    mag_date_str = prepare_data.get_nearest_date_str(
        MAG_DATE_LIST, selected_date_str=he_date_str
    )
    
    fits_file_name = f'{ROTATED_MAG_SAVE_DIR}Mag{mag_date_str}_He{he_date_str}'
    reprojected_fits_file = f'{fits_file_name}.fits'
    reprojected_smooth_fits_file = f'{fits_file_name}_smooth.fits'
    
    # Optionally overwrite existing files
    if (os.path.isfile(reprojected_fits_file) or \
        os.path.isfile(reprojected_smooth_fits_file)) and not overwrite:
        print((f'{mag_date_str} magnetogram reprojected '
                + f'to {he_date_str} already exists.'))
        continue
    
    # Extract He I observation
    he_map = prepare_data.get_nso_sunpy_map(ALL_HE_DIR + he_date_str + '.fts')
    if not he_map:
        print(f'{he_date_str} He I observation extraction failed.')
        continue
    
    # Extract Magnetogram observation
    mag_map = prepare_data.get_nso_sunpy_map(ALL_MAG_DIR + mag_date_str + '.fts')

    # Process magnetogram
    reprojected_mag_map = prepare_data.diff_rotate(
        input_map=mag_map, target_map=he_map
    )
    
    smoothed_map = prepare_data.get_smoothed_map(mag_map, smooth_size_percent)
    reprojected_smooth_map = prepare_data.diff_rotate(
        input_map=smoothed_map, target_map=he_map
    )
    
    # Save to FITS files
    reprojected_mag_map.save(reprojected_fits_file, overwrite=overwrite)
    reprojected_smooth_map.save(f'{fits_file_name}_smooth.fits', overwrite=overwrite)
    print(f'{mag_date_str} magnetogram reprojected to {he_date_str} maps saved.')

Heliographic reprojection

In [None]:
overwrite = True
smooth_size_percent = 10

if not os.path.isdir(HELIOGRAPH_MAG_SAVE_DIR):
    os.makedirs(HELIOGRAPH_MAG_SAVE_DIR)

for he_date_str in HE_DATE_LIST:

    mag_date_str = prepare_data.get_nearest_date_str(
        MAG_DATE_LIST, selected_date_str=he_date_str
    )
    
    fits_file_name = f'{HELIOGRAPH_MAG_SAVE_DIR}Mag{mag_date_str}_He{he_date_str}'
    reprojected_fits_file = f'{fits_file_name}.fits'
    reprojected_smooth_fits_file = f'{fits_file_name}_smooth.fits'
    
    # Optionally overwrite existing files
    if (os.path.isfile(reprojected_fits_file) or \
        os.path.isfile(reprojected_smooth_fits_file)) and not overwrite:
        print((f'{mag_date_str} magnetogram reprojected '
                + f'to {he_date_str} already exists.'))
        continue
    
    # Extract observations
    pre_process_file = (PREPROCESS_MAP_SAVE_DIR + he_date_str
                        + '_pre_processed_map.fits')
    pre_processed_map = sunpy.map.Map(pre_process_file)
    
    mag_fits_file = prepare_data.get_fits_path(
        mag_date_str, DATE_RANGE, ALL_MAG_DIR, SELECT_MAG_DIR
    )
    mag_map = prepare_data.get_nso_sunpy_map(mag_fits_file)

    # Process magnetogram
    hg_mag_map = detect.reproject_to_cea(mag_map)
    reprojected_mag_map = hg_mag_map.reproject_to(
        pre_processed_map.wcs, algorithm='adaptive'
    )
    
    smoothed_map = prepare_data.get_smoothed_map(mag_map, smooth_size_percent)
    hg_smoothed_map = detect.reproject_to_cea(smoothed_map)
    reprojected_smooth_map = hg_smoothed_map.reproject_to(
        pre_processed_map.wcs, algorithm='adaptive'
    )
    
    # Save to FITS files
    reprojected_mag_map.save(reprojected_fits_file, overwrite=overwrite)
    reprojected_smooth_map.save(reprojected_smooth_fits_file, overwrite=overwrite)
    print(f'{mag_date_str} magnetogram reprojected to {he_date_str} map saved.')

### He I/EUV Ratio Maps

In [None]:
overwrite = False


if not os.path.isdir(RATIO_SAVE_DIR):
    os.makedirs(RATIO_SAVE_DIR)

for he_date_str in HE_DATE_LIST:

    euv_date_str = prepare_data.get_nearest_date_str(
        EUV_DATE_LIST, selected_date_str=he_date_str
    )
    
    # Optionally overwrite existing files
    ratio_fits_file = f'{RATIO_SAVE_DIR}He{he_date_str}_EUV{euv_date_str}.fits'
    if os.path.isfile(ratio_fits_file):
        if overwrite:
            os.remove(ratio_fits_file)
        else:
            print((f'{he_date_str} to {euv_date_str} ratio already exists.'))
            continue
    
    # Extract He I observation
    he_fits_file = prepare_data.get_fits_path(
        he_date_str, DATE_RANGE, ALL_HE_DIR, SELECT_HE_DIR
    )
    he_map = prepare_data.get_nso_sunpy_map(he_fits_file)
    if not he_map:
        print(f'{he_date_str} He I observation extraction failed.')
        continue
    
    # Remove error causing keywords which have invalid ascii content
    he_map.meta.pop('history')
    he_map.meta.pop('comment')
    
    # Extract and reproject EUV observation
    euv_fits_file = prepare_data.get_fits_path(
        euv_date_str, DATE_RANGE, ALL_EUV_DIR, SELECT_EUV_DIR
    )
    euv_map = sunpy.map.Map(euv_fits_file)
    reprojected_euv_map = prepare_data.diff_rotate(
        input_map=euv_map, target_map=he_map
    )
    
    # Pre-process He I data via background removal and upper cutoff
    # Satisfactory only for Sarnoff camera observations
    he_map_data = np.where(he_map.data == he_map.data[0,0],
                           np.nan, he_map.data)
    he_map_data = np.where(he_map_data >= np.percentile(he_map_data, 99.9),
                           np.nan, he_map_data)
    
    ratio_data = np.divide(he_map_data, (reprojected_euv_map.data)**0.5)
    ratio_map = sunpy.map.Map(ratio_data, he_map.meta)
    
    # Save to FITS files
    ratio_map.save(ratio_fits_file)
    print(f'He{he_date_str} to EUV{euv_date_str} ratio saved.')

## Detection Files

### Single Masks

v0.2-v0.4

In [None]:
overwrite = False

percent_of_peak = 80
morph_radius = 18


if not os.path.isdir(DETECTION_SAVE_DIR):
    os.makedirs(DETECTION_SAVE_DIR)

for he_date_str in HE_DATE_LIST:
    
    # Optionally overwrite existing files
    mask_file = f'{DETECTION_SAVE_DIR}{he_date_str}_ensemble_map.npy'
    if os.path.isfile(mask_file) and not overwrite:
        print((f'He {he_date_str} single mask already exists.'))
        continue
    
    pre_process_file = (PREPROCESS_SAVE_DIR + he_date_str
                        + '_pre_processed_map.npy')
    pre_processed_map = np.load(pre_process_file, allow_pickle=True)[-1]

    ch_mask = detect.get_ch_mask(
        pre_processed_map, percent_of_peak, morph_radius
    )
    
    save_list = [he_date_str, ch_mask]
    np.save(mask_file, np.array(save_list, dtype=object), allow_pickle=True)
    print(f'{he_date_str} Single Mask Saved')

### Ensemble Maps

v0.2-v0.5

In [None]:
overwrite = True

# percent_of_peak_list = [80,80, 90, 100,100]
# morph_radius_list = [18,20, 16, 16,20]
percent_of_peak_list = [100,100, 110, 120,120]
morph_radius_list = [18,20, 16, 16,20]


if not os.path.isdir(DETECTION_SAVE_DIR):
    os.makedirs(DETECTION_SAVE_DIR)

for he_date_str in HE_DATE_LIST:
    
    # Optionally overwrite existing files
    ensemble_file = f'{DETECTION_SAVE_DIR}{he_date_str}_ensemble_map.npy'
    if os.path.isfile(ensemble_file) and not overwrite:
        print((f'He {he_date_str} ensemble map already exists.'))
        continue
    
    pre_process_file = (PREPROCESS_SAVE_DIR + he_date_str
                        + '_pre_processed_map.npy')
    pre_processed_map = np.load(pre_process_file, allow_pickle=True)[-1]

    # ensemble_map_data = detect.get_ensemble_v0_2(
    #     pre_processed_map, percent_of_peak_list, morph_radius_list
    # )[0]
    ensemble_map_data = detect.get_ensemble_v0_3(
        pre_processed_map, percent_of_peak_list, morph_radius_list
    )[0]
    
    save_list = [he_date_str, ensemble_map_data]
    np.save(ensemble_file, np.array(save_list, dtype=object), allow_pickle=True)
    print(f'{he_date_str} Ensemble Map Saved')

v0.5

In [None]:
overwrite = False

percent_of_peak_list = [80,80, 90, 100,100]
morph_radius_list = [18,20, 16, 16,20]
unipolarity_threshold = 0.5


if not os.path.isdir(DETECTION_SAVE_DIR):
    os.makedirs(DETECTION_SAVE_DIR)

for he_date_str in HE_DATE_LIST:
    
    # Optionally overwrite existing files
    ensemble_file = f'{DETECTION_SAVE_DIR}{he_date_str}_ensemble_map.npy'
    if os.path.isfile(ensemble_file) and not overwrite:
        print((f'He {he_date_str} ensemble map already exists.'))
        continue
    
    # Extract He I observation
    he_fits_file = prepare_data.get_fits_path(
        he_date_str, DATE_RANGE, ALL_HE_DIR, SELECT_HE_DIR
    )
    he_map = prepare_data.get_nso_sunpy_map(he_fits_file)

    pre_process_file = (PREPROCESS_SAVE_DIR + he_date_str
                        + '_pre_processed_map.npy')
    pre_processed_map_data = np.load(pre_process_file, allow_pickle=True)[-1]
    pre_processed_map = sunpy.map.Map(
        np.flipud(pre_processed_map_data), he_map.meta
    )

    # Extract saved processed magnetograms
    mag_date_str = prepare_data.get_nearest_date_str(
        MAG_DATE_LIST, selected_date_str=he_date_str
    )
    reprojected_fits_file = (f'{ROTATED_MAG_SAVE_DIR}'
                            f'Mag{mag_date_str}_He{he_date_str}.fits')
    reprojected_mag_map = sunpy.map.Map(reprojected_fits_file)

    ensemble_map_data = detect.get_ensemble_v0_5(
        pre_processed_map, reprojected_mag_map,
        percent_of_peak_list, morph_radius_list,
        unipolarity_threshold
    )[0]
    
    save_list = [he_date_str, ensemble_map_data]
    np.save(ensemble_file, np.array(save_list, dtype=object), allow_pickle=True)
    print(f'{he_date_str} Ensemble Map Saved')

v0.5.1

In [None]:
overwrite = False

# v0.5.1 Conservative Design
# percent_of_peak_list = [80, 80, 90, 100]
# morph_radius_list = [   15, 17, 13, 13] # Mm
# unipolarity_threshold = 0.5

# v0.5.1 Design
percent_of_peak_list = [70, 70, 80, 90]
morph_radius_list = [   15, 17, 13, 13] # Mm
# unipolarity_threshold = 0.5
unipolarity_threshold = 0

# # v0.5.1 KPVT Design
# percent_of_peak_list = [85, 105, 85, 95]
# morph_radius_list = [   17, 13, 15, 13] # Mm
# unipolarity_threshold = 0


if not os.path.isdir(DETECTION_MAP_SAVE_DIR):
    os.makedirs(DETECTION_MAP_SAVE_DIR)

for he_date_str in HE_DATE_LIST:
    
    # Optionally overwrite existing files
    ensemble_file = f'{DETECTION_MAP_SAVE_DIR}{he_date_str}_ensemble_map.fits'
    if os.path.isfile(ensemble_file) and not overwrite:
        print((f'He {he_date_str} ensemble map already exists.'))
        continue
    
    # Extract pre-processed map
    pre_process_file = (PREPROCESS_MAP_SAVE_DIR + he_date_str
                        + '_pre_processed_map.fits')
    pre_processed_map = sunpy.map.Map(pre_process_file)

    # Extract saved processed magnetograms
    mag_date_str = prepare_data.get_nearest_date_str(
        MAG_DATE_LIST, selected_date_str=he_date_str
    )
    reprojected_fits_file = (f'{ROTATED_MAG_SAVE_DIR}'
                             f'Mag{mag_date_str}_He{he_date_str}.fits')
    reprojected_mag_map = sunpy.map.Map(reprojected_fits_file)

    ensemble_map_data = detect.get_ensemble_v0_5_1(
        pre_processed_map, reprojected_mag_map,
        percent_of_peak_list, morph_radius_list,
        unipolarity_threshold
    )[0]
    ensemble_map = sunpy.map.Map(
        np.flipud(ensemble_map_data), pre_processed_map.meta
    )
    
    ensemble_map.save(ensemble_file, overwrite=overwrite)
    print(f'{he_date_str} Ensemble Map Saved')

vY

In [None]:
overwrite = False

# percent_of_peak_list = [  62, 68, 73, 80]
# morph_radius_list = [11, 13,  8, 10]
# unipolarity_threshold = 0.01
percent_of_peak_list = [85, 73, 95, 85]
morph_radius_list = [   10, 14, 10, 14]
unipolarity_threshold = 0.5



if not os.path.isdir(DETECTION_MAP_SAVE_DIR):
    os.makedirs(DETECTION_MAP_SAVE_DIR)

for he_date_str in HE_DATE_LIST:
    
    # Optionally overwrite existing files
    ensemble_file = f'{DETECTION_MAP_SAVE_DIR}{he_date_str}_ensemble_map.fits'
    if os.path.isfile(ensemble_file) and not overwrite:
        print((f'He {he_date_str} ensemble map already exists.'))
        continue
    
    # Extract pre-processed map
    pre_process_file = (PREPROCESS_MAP_SAVE_DIR + he_date_str
                        + '_pre_processed_map.fits')
    pre_processed_map = sunpy.map.Map(pre_process_file)

    # Extract saved processed magnetograms
    mag_date_str = prepare_data.get_nearest_date_str(
        MAG_DATE_LIST, selected_date_str=he_date_str
    )
    reprojected_fits_file = (f'{HELIOGRAPH_MAG_SAVE_DIR}'
                            f'Mag{mag_date_str}_He{he_date_str}.fits')
    reprojected_mag_map = sunpy.map.Map(reprojected_fits_file)

    ensemble_map_data = detect.get_ensemble_vY(
        pre_processed_map, reprojected_mag_map,
        percent_of_peak_list, morph_radius_list,
        unipolarity_threshold
    )[0]
    ensemble_map = sunpy.map.Map(
        np.flipud(ensemble_map_data), pre_processed_map.meta
    )
    
    ensemble_map.save(ensemble_file, overwrite=overwrite)
    print(f'{he_date_str} Ensemble Map Saved')

## Pre-Process Map Images

### Pre-Process Comparison

In [None]:
for he_date_str in HE_DATE_LIST[:1]:
    
    # Extract observations and ratio map
    he_fits_file = prepare_data.get_fits_path(
        he_date_str, DATE_RANGE, ALL_HE_DIR, SELECT_HE_DIR
    )
    he_map = prepare_data.get_nso_sunpy_map(he_fits_file)
    if not he_map:
        print(f'{he_date_str} He I observation extraction failed.')
        continue
    
    pre_process_file = (PREPROCESS_MAP_SAVE_DIR + he_date_str
                        + '_pre_processed_map.fits')
    pre_processed_map = sunpy.map.Map(pre_process_file)

    fig = plt.figure(figsize=(10, 4))
    
    plot_detection.plot_he_map(fig, (1, 2, 1), he_map, he_date_str)
    
    ax = fig.add_subplot(122, projection=pre_processed_map)
    pre_processed_map.plot(axes=ax, title=he_date_str)

### Ratio Comparison: He I, EUV, & He I/EUV Ratio

In [None]:
overwrite = False
out_dir = OUTPUT_DIR + 'Ratio_Comparison/' + DATE_DIR

if not os.path.isdir(out_dir):
    os.makedirs(out_dir)

for he_date_str in HE_DATE_LIST:
    
    # Optionally overwrite existing files
    comparison_img_file = f'{out_dir}He{he_date_str}.jpg'
    if os.path.isfile(comparison_img_file) and not overwrite:
        print((f'He {he_date_str} ratio comparison already exists.'))
        continue

    euv_date_str = prepare_data.get_nearest_date_str(
        EUV_DATE_LIST, selected_date_str=he_date_str
    )
    
    # Extract observations and ratio map
    he_fits_file = prepare_data.get_fits_path(
        he_date_str, DATE_RANGE, ALL_HE_DIR, SELECT_HE_DIR
    )
    he_map = prepare_data.get_nso_sunpy_map(he_fits_file)
    if not he_map:
        print(f'{he_date_str} He I observation extraction failed.')
        continue
    
    euv_fits_file = prepare_data.get_fits_path(
        euv_date_str, DATE_RANGE, ALL_EUV_DIR, SELECT_EUV_DIR
    )
    euv_map = sunpy.map.Map(euv_fits_file)
    
    ratio_fits_file = f'{RATIO_SAVE_DIR}He{he_date_str}_EUV{euv_date_str}.fits'
    ratio_map = sunpy.map.Map(ratio_fits_file)
    
    # Create panel plot
    fig = plt.figure(figsize=(18, 5))

    plot_detection.plot_he_map(fig, (1, 3, 1), he_map, he_date_str)

    plot_detection.plot_euv_map(fig, (1, 3, 2), euv_map, euv_date_str)

    ax = fig.add_subplot(133, projection=he_map)
    ratio_map.plot(axes=ax, cmap='jet', vmin=-1, vmax=6)
    
    # Save plot
    plt.savefig(comparison_img_file)
    plt.close(fig)
    print(f'He {he_date_str} map comparison saved.')

## Detection Map Images

### Single Mask Comparison: Pre-Processed He I & Single Mask

v0.1

In [None]:
out_dir = DETECTION_IMAGE_DIR + 'Preprocess_Single_Comparison/'


if not os.path.isdir(out_dir):
    os.makedirs(out_dir)

for he_date_str in HE_DATE_LIST:
    
    pre_process_file = (PREPROCESS_SAVE_DIR + he_date_str
                        + '_pre_processed_map.npy')
    pre_processed_map = np.load(pre_process_file, allow_pickle=True)[-1]
    
    he_fits_file = prepare_data.get_fits_path(
        he_date_str, DATE_RANGE, ALL_HE_DIR, SELECT_HE_DIR
    ) 
    raw_he = prepare_data.get_image_from_fits(he_fits_file)
    he = np.where(raw_he == raw_he[0,0], np.NaN, raw_he)
    
    mask_file = f'{DETECTION_SAVE_DIR}{he_date_str}_ensemble_map.npy'
    single_mask = np.load(mask_file, allow_pickle=True)[-1]

    fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(20, 10))
    ax = axes.ravel()

    ax[0].set_title(he_date_str, fontsize=24)
    ax[0].imshow(pre_processed_map, cmap=plt.cm.gray)
    
    ax[1].imshow(he, cmap=plt.cm.afmhot, vmin=-100, vmax=100)
    ax[1].contour(single_mask, linewidths=2, cmap=plt.cm.gray)

    plt.savefig(f'{out_dir}{he_date_str}.jpg')
    plt.close(fig)
    print(f'{he_date_str} map saved.')

In [None]:
for euv_date_str in EUV_DATE_LIST[:1]:

    he_date_str = prepare_data.get_latest_date_str(
        HE_DATE_LIST, selected_date_str=euv_date_str
    )
    
    # Extract He I observation
    he_map = prepare_data.get_nso_sunpy_map(ALL_HE_DIR + he_date_str + '.fts')
    if not he_map:
        print(f'{he_date_str} He I observation extraction failed.')
        continue
    
    he_base_data = np.where(he_map.data == he_map.data[0,0], np.nan, he_map.data)
    he_base_map = sunpy.map.Map(he_base_data, he_map.meta)
    
    # Extract saved single mask array and convert to Sunpy map
    mask_file = f'{DETECTION_SAVE_DIR}{he_date_str}_ensemble_map.npy'
    mask_data = np.load(mask_file, allow_pickle=True)[-1]
    mask_map = sunpy.map.Map(np.flipud(mask_data), he_map.meta)
    mask_map.plot_settings['cmap'] = colormaps['gray']
    
    euv_map = sunpy.map.Map(ALL_EUV_DIR + euv_date_str + '.fts')
    
    fig = plt.figure(figsize=(18, 5))
    
    plot_detection.plot_he_map(fig, (1, 3, 1), he_map, he_date_str)
    
    # Plot He I observation with overlayed detection contours
    ax = fig.add_subplot(132, projection=he_map)
    he_base_map.plot(axes=ax, vmin=-100, vmax=100, title=he_date_str,
                     cmap='afmhot')
    for contour in mask_map.contour(0):
        ax.plot_coord(contour, color='black', linewidth=1)
    
    plot_detection.plot_euv_map(fig, (1, 3, 3), euv_map, euv_date_str)

### Ensemble Comparison

#### Pre-Processed & Ensemble Maps

v0.2-v0.5

In [None]:
out_dir = DETECTION_IMAGE_DIR + 'Preprocess_Comparison/'


if not os.path.isdir(out_dir):
    os.makedirs(out_dir)

for he_date_str in HE_DATE_LIST:
    
    pre_process_file = (PREPROCESS_SAVE_DIR + he_date_str
                        + '_pre_processed_map.npy')
    pre_processed_map = np.load(pre_process_file, allow_pickle=True)[-1]
    
    ensemble_file = f'{DETECTION_SAVE_DIR}{he_date_str}_ensemble_map.npy'
    ensemble_map = np.load(ensemble_file, allow_pickle=True)[-1]

    fig, axes = plt.subplots(nrows=1, ncols=2, figsize=(20, 10))
    ax = axes.ravel()

    ax[0].set_title(he_date_str, fontsize=24)
    ax[0].imshow(pre_processed_map, cmap=plt.cm.gray)
    
    ax[1].imshow(ensemble_map, cmap=plt.cm.magma)

    plt.savefig(f'{out_dir}{he_date_str}.jpg')
    plt.close(fig)
    print(f'{he_date_str} map saved.')

v0.5.1

In [None]:
out_dir = DETECTION_IMAGE_DIR + 'Preprocess_Comparison/'


if not os.path.isdir(out_dir):
    os.makedirs(out_dir)

for he_date_str in HE_DATE_LIST[:1]:
    
    pre_process_file = (PREPROCESS_MAP_SAVE_DIR + he_date_str
                        + '_pre_processed_map.fits')
    pre_processed_map = sunpy.map.Map(pre_process_file)
    
    ensemble_file = f'{DETECTION_MAP_SAVE_DIR}{he_date_str}_ensemble_map.fits'
    ensemble_map = sunpy.map.Map(ensemble_file)

    fig = plt.figure(figsize=(12, 5))
    ax = fig.add_subplot(1, 2, 1, projection=pre_processed_map)
    pre_processed_map.plot(axes=ax, title=he_date_str)
    
    # Plot ensemble map with overlayed neutral lines
    ax = fig.add_subplot(1, 2, 2, projection=ensemble_map)
    ensemble_map.plot(axes=ax, title='', cmap='magma')

    # plt.savefig(f'{out_dir}{he_date_str}.jpg')
    # plt.close(fig)
    # print(f'{he_date_str} map saved.')

vY

In [None]:
out_dir = DETECTION_IMAGE_DIR + 'Preprocess_Comparison/'


if not os.path.isdir(out_dir):
    os.makedirs(out_dir)

for he_date_str in HE_DATE_LIST:
    
    pre_process_file = (PREPROCESS_MAP_SAVE_DIR + he_date_str
                        + '_pre_processed_map.fits')
    pre_processed_map = sunpy.map.Map(pre_process_file)
    
    ensemble_file = f'{DETECTION_MAP_SAVE_DIR}{he_date_str}_ensemble_map.fits'
    ensemble_map = sunpy.map.Map(ensemble_file)

    fig = plt.figure(figsize=(18, 5))
    ax = fig.add_subplot(1, 5, (1,2), projection=pre_processed_map)
    pre_processed_map.plot(axes=ax, title=he_date_str)
    
    # Plot ensemble map with overlayed neutral lines
    ax = fig.add_subplot(1, 5, (3,5), projection=ensemble_map)
    ensemble_map.plot(axes=ax, title='', cmap='magma')

    plt.savefig(f'{out_dir}{he_date_str}.jpg')
    plt.close(fig)
    print(f'{he_date_str} map saved.')

#### Full Comparison: He I, Ensemble, Magnetogram, & EUV

v0.2-v0.5

In [None]:
overwrite = True
out_dir = DETECTION_IMAGE_DIR + 'Full_Comparison/'
smooth_size_percent = 10

if not os.path.isdir(out_dir):
    os.makedirs(out_dir)

for euv_date_str in EUV_DATE_LIST:
    
    # Optionally overwrite existing files
    comparison_img_file = f'{out_dir}EUV{euv_date_str}.jpg'
    if os.path.isfile(comparison_img_file) and not overwrite:
        print((f'EUV {euv_date_str} full comparison already exists.'))
        continue

    he_date_str = prepare_data.get_latest_date_str(
        HE_DATE_LIST, selected_date_str=euv_date_str
    )
    mag_date_str = prepare_data.get_nearest_date_str(
        MAG_DATE_LIST, selected_date_str=he_date_str
    )
    
    # Extract He I observation
    he_map = prepare_data.get_nso_sunpy_map(ALL_HE_DIR + he_date_str + '.fts')
    if not he_map:
        print(f'{he_date_str} He I observation extraction failed.')
        continue
    
    mag_map = prepare_data.get_nso_sunpy_map(ALL_MAG_DIR + mag_date_str + '.fts')
    
    # Extract and crop EUV map to similar zoom level to other observations 
    euv_map = sunpy.map.Map(ALL_EUV_DIR + euv_date_str + '.fts')
    euv_map = euv_map.submap(
        bottom_left=SkyCoord(
            Tx=-1050*u.arcsec, Ty=-1050*u.arcsec,
                 frame=euv_map.coordinate_frame
        ),
        top_right=SkyCoord(
            Tx=1050*u.arcsec, Ty=1050*u.arcsec,
            frame=euv_map.coordinate_frame
        )
    )

    # Process magnetogram
    smoothed_mag_map = prepare_data.get_smoothed_map(mag_map, smooth_size_percent)
    
    # Extract saved ensemble map array and convert to Sunpy map
    ensemble_file = f'{DETECTION_SAVE_DIR}{he_date_str}_ensemble_map.npy'
    ensemble_map_data = np.load(ensemble_file, allow_pickle=True)[-1]
    ensemble_map = sunpy.map.Map(np.flipud(ensemble_map_data), he_map.meta)
    ensemble_map.plot_settings['cmap'] = colormaps['magma']
    
    fig = plt.figure(figsize=(24, 5))

    plot_detection.plot_he_map(fig, (1, 4, 1), he_map, he_date_str)

    ax = fig.add_subplot(142, projection=he_map)
    ensemble_map.plot(axes=ax, title='')
    
    ax = fig.add_subplot(143, projection=mag_map)
    mag_map.plot(axes=ax, vmin=-50, vmax=50, title=mag_date_str)
    plot_detection.plot_map_contours(ax, smoothed_mag_map)
    
    plot_detection.plot_euv_map(fig, (1, 4, 4), euv_map, euv_date_str)
    
    plt.savefig(comparison_img_file)
    plt.close(fig)

    print(f'{euv_date_str} map comparison saved.')

#### Magnetic Comparison: He I, Ensemble, & Magnetogram with Inversion Lines

v0.2-v0.5

In [None]:
out_dir = DETECTION_IMAGE_DIR + 'Mag_Comparison/'

if not os.path.isdir(out_dir):
    os.makedirs(out_dir)

for he_date_str in HE_DATE_LIST[-10:]:

    mag_date_str = prepare_data.get_nearest_date_str(
        MAG_DATE_LIST, selected_date_str=he_date_str
    )

    # Extract He I observation
    he_map = prepare_data.get_nso_sunpy_map(ALL_HE_DIR + he_date_str + '.fts')
    if not he_map:
        print(f'{he_date_str} He I observation extraction failed.')
        continue

    # Extract saved processed magnetograms
    mag_fits_name = f'{ROTATED_MAG_SAVE_DIR}Mag{mag_date_str}_He{he_date_str}'
    reprojected_fits_file = f'{mag_fits_name}.fits'
    reprojected_smooth_fits_file = f'{mag_fits_name}_smooth.fits'
    reprojected_mag_map = sunpy.map.Map(reprojected_fits_file)
    reprojected_smooth_map = sunpy.map.Map(reprojected_smooth_fits_file)

    # Extract saved ensemble map array and convert to Sunpy map
    ensemble_file = f'{DETECTION_SAVE_DIR}{he_date_str}_ensemble_map.npy'
    ensemble_map_data = np.load(ensemble_file, allow_pickle=True)[-1]
    ensemble_map = sunpy.map.Map(np.flipud(ensemble_map_data), he_map.meta)
    ensemble_map.plot_settings['cmap'] = colormaps['magma']

    fig = plt.figure(figsize=(18, 5))

    plot_detection.plot_he_map(fig, (1, 3, 1), he_map, he_date_str)

    ax = fig.add_subplot(132, projection=he_map)
    ensemble_map.plot(axes=ax, title='')
    plot_detection.plot_map_contours(ax, reprojected_smooth_map)

    ax = fig.add_subplot(133, projection=he_map)
    reprojected_mag_map.plot(axes=ax, vmin=-50, vmax=50, title=mag_date_str)
    plot_detection.plot_map_contours(ax, reprojected_smooth_map)

    plt.savefig(f'{out_dir}He{he_date_str}.jpg')
    plt.close(fig)

    print(f'{he_date_str} map comparison saved')

#### EUV Comparison: He I, Ensemble with Inversion Lines, & EUV

v0.2-v0.5

In [None]:
overwrite = False
out_dir = DETECTION_IMAGE_DIR + 'EUV_Comparison/'

if not os.path.isdir(out_dir):
    os.makedirs(out_dir)

for euv_date_str in EUV_DATE_LIST:
    
    # Optionally overwrite existing files
    comparison_img_file = f'{out_dir}EUV{euv_date_str}.jpg'
    if os.path.isfile(comparison_img_file) and not overwrite:
        print((f'EUV {euv_date_str} comparison already exists.'))
        continue
    
    he_date_str = prepare_data.get_latest_date_str(
        HE_DATE_LIST, selected_date_str=euv_date_str
    )
    mag_date_str = prepare_data.get_nearest_date_str(
        MAG_DATE_LIST, selected_date_str=he_date_str
    )
    
    fig = plt.figure(figsize=(18, 5))
    plot_detection.plot_he_neutral_lines_euv_comparison(
        fig, he_date_str, mag_date_str, euv_date_str,
        ROTATED_MAG_SAVE_DIR
    )
    
    # Save plot
    plt.savefig(comparison_img_file)
    plt.close(fig)
    print(f'{euv_date_str} map comparison saved.')

v0.5.1+

In [None]:
overwrite = True
hg_reproject = False
out_dir = DETECTION_IMAGE_DIR + 'EUV_Comparison/'

if not os.path.isdir(out_dir):
    os.makedirs(out_dir)

for euv_date_str in EUV_DATE_LIST[:1]:
    euv_date_str = '2003_07_14__16_12'
    
    # Optionally overwrite existing files
    comparison_img_file = f'{out_dir}EUV{euv_date_str}.jpg'
    if os.path.isfile(comparison_img_file) and not overwrite:
        print((f'EUV {euv_date_str} comparison already exists.'))
        continue
    
    he_date_str = prepare_data.get_latest_date_str(
        HE_DATE_LIST, selected_date_str=euv_date_str
    )
    mag_date_str = prepare_data.get_nearest_date_str(
        MAG_DATE_LIST, selected_date_str=he_date_str
    )
    
    if hg_reproject:
        fig = plt.figure(figsize=(22, 5))
    else:
        fig = plt.figure(figsize=(18, 5))
    
    plot_detection.plot_he_neutral_lines_euv_v0_5_1(
        fig, he_date_str, mag_date_str, euv_date_str, hg_reproject=False
    )
    
    # Save plot
    plt.savefig(comparison_img_file)
    plt.close(fig)
    print(f'{euv_date_str} map comparison saved.')

Version Comparison

In [None]:
overwrite = True
# out_dir = (DETECTION_IMAGE_DIR + '_Version_Comparison'
#            + DATE_DIR + 'v0_2_v0_4_Unipolar/')
out_dir = (DETECTION_IMAGE_DIR + '_Version_Comparison'
           + DATE_DIR + 'v0_5_v0_6/')

if not os.path.isdir(out_dir):
    os.makedirs(out_dir)

# comparison_version_dir = DETECT_DIR + 'v0_1/' + 'Saved_npy_Files/'
comparison_version_dir = DETECT_DIR + 'v0_5/' + 'Saved_npy_Files/'

for euv_date_str in EUV_DATE_LIST:
    
    # Optionally overwrite existing files
    comparison_img_file = f'{out_dir}EUV{euv_date_str}.jpg'
    if os.path.isfile(comparison_img_file) and not overwrite:
        print((f'EUV {euv_date_str} comparison already exists.'))
        continue
    
    he_date_str = prepare_data.get_latest_date_str(
        HE_DATE_LIST, selected_date_str=euv_date_str
    )
    mag_date_str = prepare_data.get_nearest_date_str(
        MAG_DATE_LIST, selected_date_str=he_date_str
    )
    
    # Extract He I observation
    he_map = prepare_data.get_nso_sunpy_map(ALL_HE_DIR + he_date_str + '.fts')
    if not he_map:
        print(f'{he_date_str} He I observation extraction failed.')
    
    # Extract comparison saved ensemble map array and convert to Sunpy map
    comparison_file = f'{comparison_version_dir}{he_date_str}_ensemble_map.npy'
    comparison_map_data = np.load(comparison_file, allow_pickle=True)[-1]
    comparison_map_data = np.where(np.isnan(ensemble_map_data), np.nan, comparison_map_data)
    comparison_map = sunpy.map.Map(np.flipud(comparison_map_data), he_map.meta)
    
    # Extract saved ensemble map array and convert to Sunpy map
    ensemble_file = f'{DETECTION_SAVE_DIR}{he_date_str}_ensemble_map.npy'
    ensemble_map_data = np.load(ensemble_file, allow_pickle=True)[-1]
    ensemble_map = sunpy.map.Map(np.flipud(ensemble_map_data), he_map.meta)
    ensemble_map.plot_settings['cmap'] = colormaps['magma']

    
    # Extract saved processed magnetogram
    reprojected_smooth_file = (f'{ROTATED_MAG_SAVE_DIR}Mag{mag_date_str}'
                               + f'_He{he_date_str}_smooth.fits')
    reprojected_smooth_map = sunpy.map.Map(reprojected_smooth_file)
    
    euv_map = sunpy.map.Map(ALL_EUV_DIR + euv_date_str + '.fts')
    
    fig = plt.figure(figsize=(12, 11))
    
    # Plot He observation
    plot_detection.plot_he_map(fig, (2, 2, 1), he_map, he_date_str)
    
    plot_detection.plot_euv_map(fig, (2, 2, 2), euv_map, euv_date_str)
    
    # Plot ensemble map with overlayed neutral lines
    ax = fig.add_subplot(2, 2, 3, projection=he_map)
    comparison_map.plot(axes=ax, title='v0.5')
    plot_detection.plot_map_contours(ax, reprojected_smooth_map)
    
    # Plot ensemble map with overlayed neutral lines
    ax = fig.add_subplot(2, 2, 4, projection=he_map)
    ensemble_map.plot(axes=ax, title='v0.6')
    plot_detection.plot_map_contours(ax, reprojected_smooth_map)

    # Save plot
    plt.savefig(comparison_img_file)
    plt.close(fig)
    print(f'{euv_date_str} map comparison saved.')

## Outcome Images

### Time Series Outcomes

v0.1

In [None]:
outcome_time_series_dict = detect.get_outcome_time_series_dict_v0_1(
    HE_DATE_LIST, DETECTION_SAVE_DIR
)

In [None]:
overwrite = True
region_num_settings = (
    DETECTION_IMAGE_DIR + 'Region_Number/',
    'num_ch', 'viridis', 'Detected CH Number'
)
px_percent_settings = (
    DETECTION_IMAGE_DIR + 'EUV_Px_Percentage/',
    'px_percent', 'plasma', 'Detected Pixel Percentage (%)'
)
area_percent_settings = (
    DETECTION_IMAGE_DIR + 'EUV_Area_Percentage/',
    'area_percent', 'plasma', 'Detected Area Percentage (%)'
)
area_settings = (
    DETECTION_IMAGE_DIR + 'EUV_Area/',
    'area', 'plasma', 'Detected Area (Mm^2)'
)
out_dir, outcome_key, cmap, ylabel = area_percent_settings

In [None]:
if not os.path.isdir(out_dir):
    os.makedirs(out_dir)

for euv_date_str in EUV_DATE_LIST[:1]:
    
    # Optionally overwrite existing files
    comparison_img_file = f'{out_dir}EUV{euv_date_str}.jpg'
    if os.path.isfile(comparison_img_file) and not overwrite:
        print((f'EUV {euv_date_str} comparison already exists.'))
        continue

    he_date_str = prepare_data.get_latest_date_str(
        HE_DATE_LIST, selected_date_str=euv_date_str
    )
    
    # Extract He I observation
    he_map = prepare_data.get_nso_sunpy_map(ALL_HE_DIR + he_date_str + '.fts')
    if not he_map:
        print(f'{he_date_str} He I observation extraction failed.')
        continue
    
    # Extract He I observation for mask base and convert to Sunpy map
    he_fits_file = prepare_data.get_fits_path(
        he_date_str, DATE_RANGE, ALL_HE_DIR, SELECT_HE_DIR
    ) 
    raw_he = prepare_data.get_image_from_fits(he_fits_file)
    he_base_data = np.where(raw_he == raw_he[0,0], np.NaN, raw_he)
    he_base_map = sunpy.map.Map(np.flipud(he_base_data), he_map.meta)
    
    # Extract saved single mask array and convert to Sunpy map
    mask_file = f'{DETECTION_SAVE_DIR}{he_date_str}_ensemble_map.npy'
    mask_data = np.load(mask_file, allow_pickle=True)[-1]
    mask_map = sunpy.map.Map(np.flipud(mask_data), he_map.meta)
    mask_map.plot_settings['cmap'] = colormaps['gray']
    
    euv_map = sunpy.map.Map(ALL_EUV_DIR + euv_date_str + '.fts')
    
    fig = plt.figure(figsize=(18, 10))
    
    plot_detection.plot_he_map(fig, (2, 3, 1), he_map, he_date_str)
    
    # Plot He I observation with overlayed detection contours
    ax = fig.add_subplot(232, projection=he_map)
    he_base_map.plot(axes=ax, vmin=-100, vmax=100, title=he_date_str,
                     cmap='afmhot')
    for contour in mask_map.contour(0):
        ax.plot_coord(contour, color='black', linewidth=1)
    
    plot_detection.plot_euv_map(fig, (2, 3, 3), euv_map, euv_date_str)
    
    ax = fig.add_subplot(2, 3, (4, 6))
    plot_detection.plot_outcome_series_vs_time(
        ax, outcome_time_series_dict[outcome_key], he_date_str, cmap,
        ylabel, ylim=[0,3.75]
    )
    
    # Save plot
    plt.savefig(comparison_img_file)
    plt.close(fig)
    print(f'{euv_date_str} map comparison saved.')

v0.2-v0.5

In [None]:
confidence_level_list = [1, 50, 75, 95]
# confidence_level_list = [0, 35, 65, 95]
outcome_time_series_dict = detect.get_outcome_time_series_dict(
    HE_DATE_LIST, confidence_level_list, DETECTION_SAVE_DIR
)
outcome_time_series_dict['area_percent'][50].median()

In [None]:
overwrite = True
region_num_settings = (
    DETECTION_IMAGE_DIR + 'Region_Number/',
    'num_ch', 'viridis', 'Detected CH Number'
)
px_percent_settings = (
    DETECTION_IMAGE_DIR + 'EUV_Px_Percentage/',
    'px_percent', 'plasma', 'Detected Pixel Percentage (%)'
)
area_percent_settings = (
    DETECTION_IMAGE_DIR + 'EUV_Area_Percentage/',
    'area_percent', 'plasma', 'Detected Area Percentage (%)'
)
area_settings = (
    DETECTION_IMAGE_DIR + 'EUV_Area/',
    'area', 'plasma', 'Detected Area (Mm^2)'
)
out_dir, outcome_key, cmap, ylabel = area_percent_settings

In [None]:
if not os.path.isdir(out_dir):
    os.makedirs(out_dir)

for euv_date_str in EUV_DATE_LIST:
    
    # Optionally overwrite existing files
    comparison_img_file = f'{out_dir}EUV{euv_date_str}.jpg'
    if os.path.isfile(comparison_img_file) and not overwrite:
        print((f'EUV {euv_date_str} comparison already exists.'))
        continue
    
    he_date_str = prepare_data.get_latest_date_str(
        HE_DATE_LIST, selected_date_str=euv_date_str
    )
    mag_date_str = prepare_data.get_nearest_date_str(
        MAG_DATE_LIST, selected_date_str=he_date_str
    )
    
    # Extract He I observation
    he_map = prepare_data.get_nso_sunpy_map(ALL_HE_DIR + he_date_str + '.fts')
    if not he_map:
        print(f'{he_date_str} He I observation extraction failed.')
        continue
    
    # Extract saved ensemble map array and convert to Sunpy map
    ensemble_file = f'{DETECTION_SAVE_DIR}{he_date_str}_ensemble_map.npy'
    ensemble_map_data = np.load(ensemble_file, allow_pickle=True)[-1]
    ensemble_map = sunpy.map.Map(np.flipud(ensemble_map_data), he_map.meta)
    ensemble_map.plot_settings['cmap'] = colormaps['magma']
    
    fig = plt.figure(figsize=(18, 10))
    plot_detection.plot_he_neutral_lines_euv_comparison(
        fig, he_date_str, mag_date_str, euv_date_str,
        ROTATED_MAG_SAVE_DIR, nrows=2
    )
    
    ax = fig.add_subplot(2, 3, (4, 6))
    plot_detection.plot_outcome_df_vs_time(
        ax, outcome_time_series_dict[outcome_key], he_date_str, cmap, ylabel
    )
    
    # Save plot
    plt.savefig(comparison_img_file)
    plt.close(fig)
    print(f'{euv_date_str} map comparison saved.')

v0.5.1+

In [None]:
confidence_level_list = [50, 75, 95]
# confidence_level_list = [1, 50, 75, 95]
# confidence_level_list = [0, 35, 65, 95]
outcome_time_series_dict = detect.get_outcome_time_series_dict_v0_5_1(
    HE_DATE_LIST, confidence_level_list, DETECTION_MAP_SAVE_DIR
)
outcome_time_series_dict['area_percent'][50].median()

In [None]:
overwrite = True
region_num_settings = (
    DETECTION_IMAGE_DIR + 'Region_Number/',
    'num_ch', 'viridis', 'Detected CH Number'
)
px_percent_settings = (
    DETECTION_IMAGE_DIR + 'EUV_Px_Percentage/',
    'px_percent', 'plasma', 'Detected Pixel Percentage (%)'
)
area_percent_settings = (
    DETECTION_IMAGE_DIR + 'EUV_Area_Percentage/',
    'area_percent', 'plasma', 'Detected Area Percentage (%)'
)
area_settings = (
    DETECTION_IMAGE_DIR + 'EUV_Area/',
    'area', 'plasma', 'Detected Area (Mm^2)'
)
out_dir, outcome_key, cmap, ylabel = area_percent_settings

In [None]:
hg_reproject = False


if not os.path.isdir(out_dir):
    os.makedirs(out_dir)

for euv_date_str in EUV_DATE_LIST:
    
    # Optionally overwrite existing files
    comparison_img_file = f'{out_dir}EUV{euv_date_str}.jpg'
    if os.path.isfile(comparison_img_file) and not overwrite:
        print((f'EUV {euv_date_str} comparison already exists.'))
        continue
    
    he_date_str = prepare_data.get_latest_date_str(
        HE_DATE_LIST, selected_date_str=euv_date_str
    )
    mag_date_str = prepare_data.get_nearest_date_str(
        MAG_DATE_LIST, selected_date_str=he_date_str
    )
    
    if hg_reproject:
        fig = plt.figure(figsize=(22, 10))
    else:
        fig = plt.figure(figsize=(18, 10))
    
    plot_detection.plot_he_neutral_lines_euv_v0_5_1(
        fig, he_date_str, mag_date_str, euv_date_str,
        nrows=2, hg_reproject=False
    )
    
    if hg_reproject:
        ax = fig.add_subplot(2, 7, (8, 14))
    else:
        ax = fig.add_subplot(2, 3, (4, 6))
    
    plot_detection.plot_outcome_df_vs_time(
        ax, outcome_time_series_dict[outcome_key], he_date_str, cmap,
        ylabel, #ylim=[0,3.75]
    )
    
    # Save plot
    plt.savefig(comparison_img_file)
    plt.close(fig)
    print(f'{euv_date_str} map comparison saved.')

### Outcome Comparison

#### Preparation

Compare outcomes between confidence levels and/or methods

In [None]:
out_dir = DETECT_DIR + '_Outcome_Comparison/' + DATE_DIR
if not os.path.isdir(out_dir):
    os.makedirs(out_dir)

# confidence_level_list = [0, 35, 65, 95]
confidence_level_list = [0, 50, 80]
# confidence_level_list = list(range(0,96,5))

# version_dirs = ['v0_3/', 'Band_Pass/', 'Rescale/', 'Rescale_Center/']
# version_dirs = ['v0_3/', 'Rescale/']
# version_dirs = ['v0_3/', 'Rescale/', 'v0_4/']
# version_dirs = ['v0_3/', 'v0_4/']
# version_dirs = ['v0_4_Single/', 'v0_4/']
# version_dirs = ['v0_4_Unipolar']
# version_dirs = ['v0_1', 'v0_2', 'v0_3', 'v0_4', 'v0_5']
version_dirs = ['v0_1', 'v0_2', 'v0_4', 'v0_5']
descript_list = version_dirs + [f'cl{cl}' for cl in confidence_level_list]

Plot Formatting

In [None]:
# cl_dx_list = np.arange(-0.3,0.31,0.2)
# method_list = ['Bright & Coherent Mask', 'Ensemble', 'Smoothness',
#                'Consistency', 'Unipolarity']

cl_dx_list = np.arange(-0.2,0.21,0.2)
method_list = ['Single Preliminary Mask', 'Area Ensemble',
               'Smoothness Ensemble', 'Unipolarity Ensemble']

# cl_dx_list = np.arange(-0.9,0.91,0.2)
# method_list = ['Unipolarity']

# cl_dx_list = np.arange(0,1,0.05)
# method_list = ['Unipolarity']

# cl_dx_list = np.arange(-0.3,0.31,0.2)
# # method_list = ['v0.3', 'v0.3 Design + Band Pass', 'v0.3 Design + Rescale',
# #               'v0.3 Design + Rescale & Center']
# method_list = ['v0.1', 'v0.2', 'v0.3', 'v0.4']

# cl_dx_list = [-0.1, 0.1]
# # method_list = ['v0.3', 'v0.3 Design + Rescale']
# # method_list = ['v0.3', 'v0.4']
# method_list = ['v0.4 Single', 'v0.4 Ensemble']

# cl_dx_list = [-0.2, 0, 0.2]
# method_list = ['v0.3', 'v0.3 Design + Rescale', 'v0.4']

# cmap = colormaps['viridis']
cmap = colormaps['bone_r']
color_list = cmap(np.linspace(0.25, 0.9, len(confidence_level_list)))
# cmap = colormaps['plasma_r']
# color_list = cmap(np.linspace(0.25, 1, len(confidence_level_list)))

v0.2-v0.5 Compute Outcomes

In [None]:
area_percent_df_by_method_list = []
autocorr_by_conf_by_method_list = []
mad_by_conf_by_method_list = []
norm_mad_by_conf_by_method_list = []


for version_dir in version_dirs:
    detection_save_dir = os.path.join(DETECT_DIR, version_dir, 'Saved_npy_Files/')
    
    outcome_time_series_dict = detect.get_outcome_time_series_dict(
        HE_DATE_LIST, confidence_level_list, detection_save_dir
    )
    area_percent_df_by_method_list.append(
        outcome_time_series_dict['area_percent']
    )
    
    autocorr_by_confidences = [
        outcome_time_series_dict['area'][cl].autocorr()
        for cl in confidence_level_list
    ]
    autocorr_by_conf_by_method_list.append(autocorr_by_confidences)
    out = detect.get_mad_by_confidences(
        outcome_time_series_dict['area'], confidence_level_list
    )
    mad_by_confidences, norm_mad_by_confidences = out
    mad_by_conf_by_method_list.append(mad_by_confidences)
    norm_mad_by_conf_by_method_list.append(norm_mad_by_confidences)
    print(f'Outcomes computed for {version_dir}')

descript_list = version_dirs + [f'cl{cl}' for cl in confidence_level_list]
autocorr_file = f'{out_dir}Autocorr_comp_{"_".join(descript_list)}.npy'
np.save(autocorr_file, np.array(autocorr_by_conf_by_method_list),
        allow_pickle=True)

v0.5.1 Compute Outcomes

In [None]:
area_percent_df_by_method_list = []
autocorr_by_conf_by_method_list = []


for version_dir in version_dirs:
    if 'v0_5_1' in version_dir:
        detection_save_dir = os.path.join(
            DETECT_DIR, version_dir, 'Saved_fits_Files/'
        )
        outcome_time_series_dict = detect.get_outcome_time_series_dict_v0_5_1(
            HE_DATE_LIST, confidence_level_list, detection_save_dir
        )
    else:
        detection_save_dir = os.path.join(
            DETECT_DIR, version_dir, 'Saved_npy_Files/'
        )
        outcome_time_series_dict = detect.get_outcome_time_series_dict(
            HE_DATE_LIST, confidence_level_list, detection_save_dir
        )
    
    area_percent_df_by_method_list.append(
        outcome_time_series_dict['area_percent']
    )
    
    autocorr_by_confidences = [
        outcome_time_series_dict['area'][cl].autocorr()
        for cl in confidence_level_list
    ]
    autocorr_by_conf_by_method_list.append(autocorr_by_confidences)
    print(f'Outcomes computed for {version_dir}')

descript_list = version_dirs + [f'cl{cl}' for cl in confidence_level_list]
autocorr_file = f'{out_dir}Autocorr_comp_{"_".join(descript_list)}.npy'
np.save(autocorr_file, np.array(autocorr_by_conf_by_method_list),
        allow_pickle=True)

#### Design Variable Sweep

Area Sweep

In [None]:
confidence_level = 0
area_percent_df = area_percent_df_by_method_list[confidence_level]
median_area_percent_by_cl = [
    np.median(area_percent_df[cl]) for cl in confidence_level_list
]

x_ticks = np.arange(len(method_list))
plt.figure(1, figsize=(9,6))

# Loop over confidence levels to plot bars for all methods at once
for median_area_percent, cl_dx, color in zip(
    median_area_percent_by_cl, cl_dx_list, color_list):
    plt.bar(x_ticks + cl_dx, median_area_percent, width=0.05, color=color)

plt.suptitle(DATE_RANGE_SUPTITLE)
plt.title('Design Variable Sweep')
plt.ylabel('Median Detected Area Percentage (%)')
plt.xlabel('Unipolarity Threshold')
plt.xlim([-0.025,1.025])

Autocorrelation Sweep

In [None]:
autocorr_file_name = f'{out_dir}Autocorr_comp_{"_".join(descript_list)}'
autocorrs_by_cl_by_method = np.load(autocorr_file_name + '.npy', allow_pickle=True)
autocorrs_by_method_by_cl = autocorrs_by_cl_by_method.T

x_ticks = np.arange(len(method_list))

plt.figure(1, figsize=(9,6))

# Loop over confidence levels to plot bars for all methods at once
for autocorrs_by_method, cl_dx, color in zip(
    autocorrs_by_method_by_cl, cl_dx_list, color_list):
    plt.bar(x_ticks + cl_dx, autocorrs_by_method, width=0.05,
            color=color)

plt.suptitle(DATE_RANGE_SUPTITLE)
plt.title('Design Variable Sweep')
plt.ylabel(f'Autocorrelation')
plt.xlabel('Unipolarity Threshold')
plt.xlim([-0.025,1.025])

#### Method Comparison

Autocorrelation by Method

In [None]:
autocorr_file_name = f'{out_dir}Autocorr_comp_{"_".join(descript_list)}'
autocorrs_by_cl_by_method = np.load(autocorr_file_name + '.npy', allow_pickle=True)
autocorrs_by_method_by_cl = autocorrs_by_cl_by_method.T

x_ticks = np.arange(len(method_list))
confidence_label_list = [
    f'{confidence_level}% Confidence'
    for confidence_level in confidence_level_list
]

plt.figure(1, figsize=(10,6))

# Loop over confidence levels to plot bars for all methods at once
for autocorrs_by_method, cl_dx, confidence, color in zip(
    autocorrs_by_method_by_cl, cl_dx_list, confidence_label_list, color_list):
    plt.bar(x_ticks + cl_dx, autocorrs_by_method, width=0.2,
            label=confidence, color=color)

plt.suptitle(DATE_RANGE_SUPTITLE)
plt.title('Method Comparison')
plt.xticks(x_ticks, method_list)
plt.ylabel(f'Time Series Autocorrelation')

# plt.ylim([0, 0.8])
# plt.ylim([-0.1, 0.8])
# plt.axhline(0, color='k', linestyle='--')
plt.legend(loc='upper left')

# Save plot
plt.savefig(autocorr_file_name + '.png')
plt.close()
print(f'{autocorr_file_name.split("/")[-1]} method comparison saved.')

## Write Images to Video

In [None]:
detect.write_video(out_dir, fps=2)