The aim of this notebook is to process downward longwave (LW_down) data and cloud mask radar data from the BCO. After creating files of measurements, this code aims at separating the LW_down from each cloud, to plot the composite image

In [5]:
import intake
import os
import numpy as np
import xarray as xr
import dask.array as da
import pandas as pd
import glob
from dask.diagnostics import ProgressBar

## Get $LW\downarrow$ data

The aim of this part is to create a xarray containing the meaaured $LW\downarrow$ for the entire year of 2020. We use the data from the BCO to achieve this using the internal file storage on levante.

In [None]:
fileformat = f'/pool/data/OBS/BARBADOS_CLOUD_OBSERVATORY/Level_1/K_Radiation/%Y%m/Radiation__Deebles_Point__DownwellingRadiation__1s__%Y%m%d.nc'
dates = pd.date_range('2020-01-12','2020-11-30')
 
def create_filelist(dates, fileformat):
    """
    Create list of files
 
    Returns list of existing files for
    the given dates and fileformat
 
    Input
    -----
    dates : datetime-like
        List of dates that are of interest
    fileformat : str
        Fileformat of files incl. full path
 
    Returns
    -------
    files : list
        List of files that could be found
    """
    files = []
    for date in dates:
        fn = date.strftime(fileformat)
        if os.path.exists(fn):
            files.append(fn)
    return files
 
 
files_available = create_filelist(dates, fileformat)
def preprocess_remove_duplicates(ds):
    _, index = np.unique(ds['time'], return_index=True)
    return ds.isel(time=index)

# Use preprocess function to remove duplicates
ds_radiation = xr.open_mfdataset(files_available, preprocess=preprocess_remove_duplicates)

In [None]:
LW_down = ds_radiation.LWdown_diffuse

In [6]:
"""
We also get the windspeed data from 2020, and do an average at 1000m. This average is used as an approximation to get the mean windspeed.
This value is used to convert time to distances.
"""
windspeed = xr.open_dataset('./../data/windspeed_eurec4a.nc')['wind_speed']
windspeed_avg = windspeed.sel(range = 1000, method = 'nearest').mean(dim = 'time')

## Get cloud mask data

In this part of the code, we aim at processing the cloud mask data from the BCO. The idea is to create an array with all the measured clouds. We include the information on the clouds: start and end time, lenght, depth, bottom and top height.

In [None]:
def cloud_borders(cloud_mask, min_cloud_length, max_cloud_length):
    """
    Create an array with all observed clouds with their information
 
    Input
    -----
    cloud_mask : cloud mask product from the BCO radar measurement
    min_cloud_length, max_cloud_length: minimum and maximum cloud lengths allowed
 
    Returns
    -------
    arrays of observed clouds with their informations (characteristics)
    """
    cloud_times = []
    
    for cloud_index in cloud_mask.N_clouds:
        
        cloud_start_time = np.datetime64(cloud_mask.cloudStartTime[int(cloud_index-1)].values)
        
        if start_time <= cloud_start_time <= end_time:
            cloud_length = cloud_mask.cloudLength[int(cloud_index-1)].values
            
            if  max_cloud_length >= cloud_length >= min_cloud_length:
                
                cloud_end_time = cloud_mask.cloudEndTime[int(cloud_index-1)].values
                cloud_base_height = cloud_mask.cloudBase[int(cloud_index-1)].values
                cloud_top_height = cloud_mask.cloudTop[int(cloud_index-1)].values
                cloud_mid_height = (cloud_top_height - cloud_base_height)/2 + cloud_base_height
                cloud_times.append((cloud_start_time, cloud_end_time, cloud_length, cloud_base_height, 
                                    cloud_top_height, cloud_mid_height))

    cloud_times_array = np.array(cloud_times)
    
    return cloud_times_array

In [None]:
start_time = np.datetime64(dates[0].to_pydatetime())
end_time = np.datetime64(dates[-1].to_pydatetime())

In [None]:
#Cloud mask data from the BCO radar:
data_cloud_mask = xr.open_dataset('./../data/cloudObjectMask_MBR_155m-18km_200112-201130_v0.3.nc')

#we filter clouds also with length, to avoid to have clouds bigger than 50km. Clouds lenghts and depth will be further filtered later
min_cloud_length = 0
max_cloud_length = 50000
clouds = cloud_borders(data_cloud_mask, min_cloud_length, max_cloud_length)

In [11]:
np.savetxt('./../data/processed_data/cloud_borders_2020.txt', clouds, delimiter=',', fmt='%s')

In [7]:
clouds = np.loadtxt('./../data/processed_data/cloud_borders_2020.txt', delimiter=',', dtype='object') 
clouds[:, 0] = np.array(clouds[:, 0], dtype='datetime64[s]')
clouds[:, 1] = np.array(clouds[:, 1], dtype='datetime64[s]')
clouds[:, 2] = np.array(clouds[:, 2], dtype='float64')
clouds[:, 3] = np.array(clouds[:, 3], dtype='float64')
clouds[:, 4] = np.array(clouds[:, 4], dtype='float64')
clouds[:, 5] = np.array(clouds[:, 5], dtype='float64')

## Combine cloud data and $LW\downarrow$ measurements

Now that we have $LW\downarrow$ data and an array with clouds and their characteristics, we can isolate the emission signal from 60min before and after the clouds. This gives us an xarray of measurements over time, with one dimension per cloud. The coordinate is still time

This will then be used to plot a composite of the average $LW\downarrow$ signal for all clouds.

However to avoid cross contamination in the signal (in each cloud $LW\downarrow$ array, there might be other clouds closer than 60min), therefore, we set to NaN the values of each cloud. This is justified as we are interested in the surroundings of clouds and not in clouds themselves

To summarise, here are the processing steps:
1) Set to NaN all the in-cloud values to avoid cross-contamination, the resulting xarray is called LW_down_Nan;
2) Separate the $LW\downarrow$ signal into single clouds arrays using the cloud data and the overall $LW\downarrow$ measurements;
3) Convert the time coordinate into a distance coordinate (to better analyse haloes).
4) Save he data into the folder data/processed_data

### 1) Step 1:Set to NaN all the in-cloud values to avoid cross-contamination

In [8]:
def process_shallow(data_LW_down, clouds):
    """
    This function sets to NaN all the values of LW_down in clouds to avoid cross-contamination.
    """

    # Ensure data_LW_down uses Dask and chunk by a reasonable size
    data_LW_down = data_LW_down.chunk({'time': 'auto'})

    # Convert clouds to a Dask array to facilitate vectorized operations
    clouds_array = da.from_array(clouds, chunks=(1000, 2))  # Adjust chunk size as needed

    # Extract start and end times
    start_times = xr.DataArray(clouds_array[:, 0], dims='cloud')
    end_times = xr.DataArray(clouds_array[:, 1], dims='cloud')

    # Create the initial combined mask using Dask operations
    combined_mask = da.zeros(len(data_LW_down.time), dtype=bool)

    for start_time, end_time in zip(start_times.values, end_times.values):
        start_time = np.datetime64(start_time)
        end_time = np.datetime64(end_time)
        mask = (data_LW_down.time >= start_time) & (data_LW_down.time <= end_time)
        combined_mask = combined_mask | mask.data

    # Invert the combined mask to keep data outside cloudy periods
    cloudy_mask = ~combined_mask

    # Apply the combined cloudy mask
    data_LW_down_masked = data_LW_down.where(cloudy_mask, np.nan)

    # Compute the result with a progress bar
    with ProgressBar():
        result = data_LW_down_masked.compute()

    return result

In [None]:
%%time
#this code takes a long time to process. The result are already stoed in 'LW_down_2020_tot.nc'
LW_down_NaN = process_shallow(LW_down, clouds)

In [None]:
LW_down_NaN_xarray = xr.Dataset({'LWdown_diffuse': LW_down_NaN})
LW_down_NaN_xarray.to_netcdf('./../data/processed_data/LW_down_2020_tot.nc')

In [None]:
LW_down = LW_down_NaN_xarray['LWdown_diffuse']

In [1]:
#LW_down = xr.open_dataset('./../data/processed_data/LW_down_2020_tot.nc')['LWdown_diffuse']

### 2) Step 2: Separate the signal into single clouds arrays

In [19]:
def lw_down_clouds(LW_down, clouds):

    """
    This function separates the LW_down measurements into single arrays for each cloud. This results in a main xarray that contains LW_down 
    measurements 60min before and after each cloud. There is a dimension for time and cloud, but also coordinates with each cloud characetiristics
    for each cloud's array
    """
    
    half_t_cloud = 5 #half time/size of the cloud (clouds are rescaled to 10min)
    delta_time = np.timedelta64(60, 'm') #time we look before and after the cloud 
    
    # Initialize an empty list to store processed data for each cloud
    processed_data = {}
    cloud_length = []
    cloud_bottom = []
    cloud_top = []
    cloud_mid = []
    cloud_start_time = []
    cloud_end_time = []

    #iteration over each cloud
    for cloud_index, cloud_times in enumerate(clouds):
        
        # Extract start and end time for the current cloud
        start_time = np.datetime64(cloud_times[0])
        end_time = np.datetime64(cloud_times[1])
        
        
        # Extract data for the time window before the cloud
        timedelta_before = start_time - delta_time
        extracted_data_before = LW_down.sel(time=slice(timedelta_before, start_time))
        extracted_data_before = extracted_data_before.assign_coords(time=extracted_data_before.time.values[::-1])
        #we rescale the time such that it starts at 0 at the middle of the cloud
        extracted_data_before['time'] = np.round(-np.abs((extracted_data_before.time - timedelta_before) / np.timedelta64(1, 's')) - half_t_cloud * 60 - 1)
        
        
        # Extract data for the time window after the cloud
        timedelta_after = end_time + delta_time
        extracted_data_after = LW_down.sel(time=slice(end_time, timedelta_after))
        #similar as before
        extracted_data_after['time'] = np.round(np.abs((extracted_data_after.time - end_time) / np.timedelta64(1, 's')) + half_t_cloud * 60 + 1)

        
        # Extract data for the time window during the cloud
        extracted_data_during = LW_down.sel(time=slice(start_time, end_time))
        
        if len(extracted_data_during) == 0:
            continue

        rescaled_time_during = np.abs((extracted_data_during.time.values - start_time) / np.timedelta64(1, 's'))
        extracted_data_during['time'] = rescaled_time_during

        #rescaling of the cloud such that each cloud has the same duration and we can compare different cloud lengths between them
        min_time = np.nanmin(extracted_data_during['time'])
        max_time = np.nanmax(extracted_data_during['time'])
        rescaled_time = ((extracted_data_during['time'] - min_time) / (max_time - min_time)) * 600 - half_t_cloud * 60
        
        extracted_data_during['time'] = rescaled_time
        
        new_time = np.arange(-300, 301, 1)
        extracted_data_during_unique = extracted_data_during.drop_duplicates(dim='time')
        
        extracted_data_during_unique = extracted_data_during_unique.sortby('time')
        
        time_diff = np.diff(extracted_data_during_unique['time'])
        is_monotonic = (time_diff > 0).all() or (time_diff < 0).all()

        # If the time coordinate is not strictly monotonic, sort the DataArray along the time dimension
        if not is_monotonic:
            continue

        #for each cloud, append the data of LW_down, as well as the cloud's characteristics into a list called processed_data
        try:
            extracted_data_during_interpolated = extracted_data_during_unique.interp(time=new_time)

            if ((extracted_data_during_interpolated < 10).any() or (extracted_data_before < 10).any() or (extracted_data_after < 10).any()):
                continue
        
            cloud_length.append(clouds[cloud_index][2])
            cloud_bottom.append(clouds[cloud_index][3])
            cloud_top.append(clouds[cloud_index][4])
            cloud_mid.append(clouds[cloud_index][5])
            cloud_start_time.append(clouds[cloud_index][0])
            cloud_end_time.append(clouds[cloud_index][1])
            processed_data[cloud_index] = xr.concat([extracted_data_before, extracted_data_during_interpolated, extracted_data_after], dim='time')
        except:
            #print('ouch')
            # Skip the current iteration if interpolation fails
            continue
    
    # Concatenate the processed data along a new dimension 'cloud'
    processed_data_combined = xr.concat([data.drop_duplicates('time') for data in processed_data.values()], dim='cloud')
    processed_data_combined.coords['cloud_length'] = ('cloud', cloud_length)
    processed_data_combined.coords['cloud_bottom'] = ('cloud', cloud_bottom)
    processed_data_combined.coords['cloud_mid'] = ('cloud', cloud_mid)
    processed_data_combined.coords['cloud_top'] = ('cloud', cloud_top)
    processed_data_combined.coords['cloud_start_time'] = ('cloud', cloud_start_time)
    processed_data_combined.coords['cloud_end_time'] = ('cloud', cloud_end_time)
    #processed_data_combined = xr.concat([data for data in processed_data.values()], dim='cloud')
    return processed_data_combined


In [20]:
%%time
LW_down_clouds = lw_down_clouds(LW_down, clouds)

CPU times: user 9min 57s, sys: 12.4 s, total: 10min 9s
Wall time: 10min 15s


### 3) Step 3: Convert the time coordinate into a distance coordinate 

In [21]:
def lw_down_clouds_dist_approx(LW_down_clouds, windspeed_avg):
    """
    Converts the time dimension into distance using the average windspeed. This approximation is justified since the focus is not on a few meters of 
    precision. This average windspeed is enough.
    """
    # Calculate distance from cloud based on time and windspeed
    distance_from_cloud = LW_down_clouds.time * windspeed_avg.item()
    
    # Rename the 'time' dimension to 'distance_from_cloud'
    LW_down_clouds = LW_down_clouds.rename({'time': 'distance_from_cloud'})
    
    # Create a new coordinate 'distance' using the calculated distances
    LW_down_clouds.coords['distance'] = ('distance_from_cloud', distance_from_cloud.data)
    
    # Replace the existing 'time' coordinate with the new 'distance' coordinate
    LW_down_clouds = LW_down_clouds.swap_dims({'distance_from_cloud': 'distance'})
    
    # Add units attribute to the distance coordinate
    LW_down_clouds.coords['distance'].attrs['units'] = 'm'
    
    return LW_down_clouds


In [22]:
%%time
LW_down_clouds_dist =  lw_down_clouds_dist_approx(LW_down_clouds, windspeed_avg)

CPU times: user 1.26 ms, sys: 6 µs, total: 1.27 ms
Wall time: 1.07 ms


### 4) Step 4: save the xarray in processed_data

In [23]:
ds = xr.Dataset({'LWdown_diffuse': LW_down_clouds_dist})
ds.to_netcdf('./../LW_down_clouds_2020_tot.nc')