In [80]:
import numpy as np
import xarray as xr
import matplotlib.pyplot as plt
from scipy import stats, optimize
from mpl_toolkits.axes_grid1 import make_axes_locatable

In [78]:
plt.rcParams['axes.titlesize'] = 20
plt.rcParams['axes.labelsize'] = 16
plt.rcParams['path.simplify']    = True
plt.rcParams['font.family']  = 'monospace'
plt.rcParams['mathtext.fontset'] = 'custom'
plt.rcParams['xtick.major.size'] = 8
plt.rcParams['ytick.major.size'] = 8
plt.rcParams['xtick.minor.size'] = 3    
plt.rcParams['ytick.minor.size'] = 3
plt.rcParams['xtick.labelsize'] = 14
plt.rcParams['ytick.labelsize'] = 14
plt.rcParams['xtick.major.width'] = 1
plt.rcParams['ytick.major.width'] = 1
plt.rcParams['xtick.minor.width'] = 1
plt.rcParams['ytick.minor.width'] = 1
plt.rcParams['lines.markeredgewidth'] = 1
plt.rcParams['legend.numpoints'] = 1
plt.rcParams['legend.frameon'] = False
plt.rcParams['legend.handletextpad'] = 0.3
plt.rcParams['xtick.minor.visible'] = True
plt.rcParams['ytick.minor.visible'] = True
plt.rcParams['axes.grid'] = True
plt.rcParams['grid.color'] = 'k'
plt.rcParams['grid.linestyle'] = '--'
plt.rcParams['grid.linewidth'] = 0.5
plt.rcParams['xtick.direction'] = 'in'
plt.rcParams['ytick.direction'] = 'in'

In [57]:
BASE_PATH = "/root/GSST-daytime-sky-quality"
pressure_levels = [
    1, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000
]

In [55]:
def create_variable_dataset(variable, pressure_levels, year_interval):
    datasets = []  # Lista para armazenar os datasets temporários

    for pressure_level in pressure_levels:
        # load dataset
        pl_variable = xr.open_dataset(
            f"{BASE_PATH}/data/Reanalysis_ERA5/Vertical_Profile/{variable}_{pressure_level}hPa_{year_interval}.nc"
        )
        # get mean from lat and lon
        pl_variable = pl_variable.mean(dim=['latitude', 'longitude'])
        
        # Add the pressure level dimension with the current pressure_level value
        pl_variable_expanded = pl_variable.expand_dims("pressure_level")
        pl_variable_expanded['pressure_level'] = [pressure_level]
        
        # Store the expanded dataset in the list
        datasets.append(pl_variable_expanded)

    # Concatenate all datasets along the new 'pressure_level
    variable_dataset = xr.concat(datasets, dim="pressure_level")

    return variable_dataset

def create_pl_dataset_by_year_interval(year_interval):
    temperature = create_variable_dataset("temperature", pressure_levels, year_interval)
    # rename the variable
    temperature = temperature.rename_vars({"t": "temperature"})

    relative_humidity = create_variable_dataset("relative_humidity", pressure_levels, year_interval)
    relative_humidity = relative_humidity.rename_vars({"r": "relative_humidity"})

    specific_humidity = create_variable_dataset("specific_humidity", pressure_levels, year_interval)
    specific_humidity = specific_humidity.rename_vars({"q": "specific_humidity"})

    u_component_of_wind = create_variable_dataset("u_component_of_wind", pressure_levels, year_interval)
    v_component_of_wind = create_variable_dataset("v_component_of_wind", pressure_levels, year_interval)

    # geopotential = create_variable_dataset("geopotential", pressure_levels, year_interval)
    # geopotential = geopotential.rename_vars({"z": "geopotential"})
    # concatenate all variables
    vertical_profile_ds = xr.merge([
        temperature, 
        relative_humidity, 
        specific_humidity, 
        u_component_of_wind, 
        v_component_of_wind, 
        # geopotential
    ])

    return vertical_profile_ds

vertical_profile_ds = xr.merge([
    create_pl_dataset_by_year_interval("1999_2003"),
    create_pl_dataset_by_year_interval("2004_2008"),
    create_pl_dataset_by_year_interval("2009_2013"),
    create_pl_dataset_by_year_interval("2014_2018"),
    create_pl_dataset_by_year_interval("2019_2023"),
])

vertical_profile_ds['wind_speed'] = np.sqrt(vertical_profile_ds['u']**2 + vertical_profile_ds['v']**2)

vertical_profile_ds

In [68]:
# load single level variables

def create_sl_dataset_by_year_interval(year_interval):
    temperature_2m = xr.open_dataset(f"{BASE_PATH}/data/Reanalysis_ERA5/Single_Level/2m_temperature_{year_interval}.nc")
    temperature_2m = temperature_2m.mean(dim=['latitude', 'longitude'])

    uwind = xr.open_dataset(f"{BASE_PATH}/data/Reanalysis_ERA5/Single_Level/10m_u_component_of_wind_{year_interval}.nc")
    uwind = uwind.mean(dim=['latitude', 'longitude'])

    vwind = xr.open_dataset(f"{BASE_PATH}/data/Reanalysis_ERA5/Single_Level/10m_v_component_of_wind_{year_interval}.nc")
    vwind = vwind.mean(dim=['latitude', 'longitude'])

    mean_sea_level_pressure = xr.open_dataset(f"{BASE_PATH}/data/Reanalysis_ERA5/Single_Level/mean_sea_level_pressure_{year_interval}.nc")
    mean_sea_level_pressure = mean_sea_level_pressure.mean(dim=['latitude', 'longitude'])

    single_level_ds = xr.merge([
        temperature_2m,
        uwind,
        vwind,
        mean_sea_level_pressure
    ])

    return single_level_ds

single_level_ds = xr.merge([
    create_sl_dataset_by_year_interval("1999_2003"),
    create_sl_dataset_by_year_interval("2004_2008"),
    create_sl_dataset_by_year_interval("2009_2013"),
    create_sl_dataset_by_year_interval("2014_2018"),
    create_sl_dataset_by_year_interval("2019_2023")

])

single_level_ds

In [56]:
def calculate_PSD_and_fit_law_power(data, sampling_rate):
    """
    Calculate the Power Spectral Density (PSD) of a time-series data and fit it with a power law,
    and also calculate and return the power law slope (index).

    Parameters:
    data (numpy.array): Array containing the time-series data.
    sampling_rate (float): The sampling rate of the data in Hz.

    Returns:
    tuple: A tuple containing:
        - positive_freqs (numpy.array): Array of positive frequencies.
        - psd (numpy.array): Array of power spectral densities corresponding to the positive frequencies.
        - amplitude (float): Amplitude of the fitted power law.
        - index (float): Power law index (slope) of the fitted model.

    Description:
    This function first computes the Fast Fourier Transform (FFT) of the input time-series data to convert it from
    the time domain to the frequency domain. It then calculates the Power Spectral Density (PSD) by taking the square
    of the absolute values of the FFT results, considering only positive frequencies.

    To model the PSD with a power law, the function fits a line to the log-log plot of the PSD versus frequency, which
    corresponds to fitting a power law of the form y = Ax^B in the original scale. The fitting process returns the
    parameters A (amplitude) and B (index), which describe the fitted power law.

    Usage:
    >>> import numpy as np
    >>> data = np.random.normal(0, 1, 1000)  # Example data: white noise
    >>> sampling_rate = 1.0  # Sampling rate in Hz
    >>> freqs, psd, amplitude, index = calculate_PSD_and_fit_law_power(data, sampling_rate)
    >>> print(f"Amplitude: {amplitude}, Index: {index}")
    """
    # Calculate FFT of the data
    fft_result = np.fft.fft(data)
    freqs = np.fft.fftfreq(len(data), 1 / sampling_rate)
    
    # Only take the positive frequencies and corresponding PSD values
    positive_freqs = freqs[:len(freqs)//2]
    psd = np.abs(fft_result[:len(freqs)//2])**2

    # Power law fitting on a log-log scale
    def power_law(x, amplitude, index):
        return amplitude * (x**index)

    valid_mask = positive_freqs > 0  # Exclude zero frequency
    log_freqs = np.log10(positive_freqs[valid_mask])
    log_psd = np.log10(psd[valid_mask])

    # Fit the log-log transformed data to a linear model
    popt, pcov = optimize.curve_fit(lambda x, a, b: a + b * x, log_freqs, log_psd)
    amplitude = 10**popt[0]  # Convert log amplitude back to linear scale
    index = popt[1]  # Slope of the line is the power law index

    power_law_function = power_law(positive_freqs, amplitude, index)
    # slope -3/5
    spectra_komogorov = amplitude * (positive_freqs)**(-5/3)
    spectra_high_layers = amplitude * (positive_freqs)**(-3)
    

    return positive_freqs, psd, amplitude, index, spectra_komogorov, spectra_high_layers, power_law_function


In [91]:
fig, axs = plt.subplots(3, 4, figsize=(20, 15))

variable = "temperature"
for i, pressure_level in enumerate(pressure_levels):
    row = i // 4
    col = i % 4
    ax = axs[row, col]
    # Get the data for the current pressure level
    data = vertical_profile_ds[variable].sel(pressure_level=pressure_level)
    # drop nan values
    data = data.dropna('time')

    # Calculate the PSD and fit a power law to the data
    freqs, psd, amplitude, index, spectra_komogorov, spectra_high_layers, power_law_function = calculate_PSD_and_fit_law_power(
        data.values, 1
    )

    ax.plot(freqs, psd, color="purple", alpha=0.5)
    ax.plot(freqs, power_law_function, label=f"$\\beta$ = {index:.2f}", color="red")
    ax.plot(freqs, spectra_komogorov, color='green')
    ax.plot(freqs, spectra_high_layers, color='blue')
    ax.set_title(f"{pressure_level} hPa")
    ax.set_xscale("log")
    ax.set_yscale("log")
    if row == 2:
        ax.set_xlabel("f, [1/hour]")
    if col == 0:
        ax.set_ylabel(f"E(f), [$K^{2}/hour$]")
    ax.legend(loc="upper right", fontsize=14)


# surface plot
variable = "t2m"
ax = axs[-1, -1]
data = single_level_ds[variable]
# drop nan values
data = data.dropna('time')

# Calculate the PSD and fit a power law to the data
freqs, psd, amplitude, index, spectra_komogorov, spectra_high_layers, power_law_function = calculate_PSD_and_fit_law_power(
    data.values, 1
)

ax.plot(freqs, psd, color="purple", alpha=0.5)
ax.plot(freqs, power_law_function, label=f"$\\beta$ = {index:.2f}", color="red")
ax.plot(freqs, spectra_komogorov, color='green')
ax.plot(freqs, spectra_high_layers, color='blue')
ax.set_title(f"Surface")
ax.set_xscale("log")
ax.set_yscale("log")
if row == 2:
    ax.set_xlabel("f, [1/hour]")
if col == 0:
    ax.set_ylabel(f"E(f), [$K^{2}/hour$]")
ax.legend(loc="upper right", fontsize=14)

# add legend below subplots center
ax2 = fig.add_axes([0.3, 0.0, 0.4, 0.05])
ax2.axis('off')
ax2.plot([], color="purple", alpha=0.5, label="PSD of Temperature")
ax2.plot([], label="Power Law Fit", color="red")
ax2.plot([], label=f"Kolmogorov Spectra ($\\beta = -5/3$)", color='green')
ax2.plot([], label="Spectra in the High Layers ($\\beta = -3$)", color='blue')
ax2.legend(loc='center', ncol=2, fontsize=18)


plt.subplots_adjust(hspace=0.3, wspace=0.3)

plt.savefig('../images/PSD_Temperature_History.png', dpi=72, bbox_inches='tight')
plt.savefig('../images/PSD_Temperature_History.pdf', dpi=150, bbox_inches='tight')

  return amplitude * (x**index)
  spectra_komogorov = amplitude * (positive_freqs)**(-5/3)
  spectra_high_layers = amplitude * (positive_freqs)**(-3)
  return amplitude * (x**index)
  spectra_komogorov = amplitude * (positive_freqs)**(-5/3)
  spectra_high_layers = amplitude * (positive_freqs)**(-3)
  return amplitude * (x**index)
  spectra_komogorov = amplitude * (positive_freqs)**(-5/3)
  spectra_high_layers = amplitude * (positive_freqs)**(-3)
  return amplitude * (x**index)
  spectra_komogorov = amplitude * (positive_freqs)**(-5/3)
  spectra_high_layers = amplitude * (positive_freqs)**(-3)
  return amplitude * (x**index)
  spectra_komogorov = amplitude * (positive_freqs)**(-5/3)
  spectra_high_layers = amplitude * (positive_freqs)**(-3)
  return amplitude * (x**index)
  spectra_komogorov = amplitude * (positive_freqs)**(-5/3)
  spectra_high_layers = amplitude * (positive_freqs)**(-3)
  return amplitude * (x**index)
  spectra_komogorov = amplitude * (positive_freqs)**(-5/3)
  spectra

In [82]:
?fig.add_axes

[0;31mSignature:[0m [0mfig[0m[0;34m.[0m[0madd_axes[0m[0;34m([0m[0;34m*[0m[0margs[0m[0;34m,[0m [0;34m**[0m[0mkwargs[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Add an `~.axes.Axes` to the figure.

Call signatures::

    add_axes(rect, projection=None, polar=False, **kwargs)
    add_axes(ax)

Parameters
----------
rect : tuple (left, bottom, width, height)
    The dimensions (left, bottom, width, height) of the new
    `~.axes.Axes`. All quantities are in fractions of figure width and
    height.

projection : {None, 'aitoff', 'hammer', 'lambert', 'mollweide', 'polar', 'rectilinear', str}, optional
    The projection type of the `~.axes.Axes`. *str* is the name of
    a custom projection, see `~matplotlib.projections`. The default
    None results in a 'rectilinear' projection.

polar : bool, default: False
    If True, equivalent to projection='polar'.

axes_class : subclass type of `~.axes.Axes`, optional
    The `.axes.Axes` subclass that is inst

In [None]:
data = ds['t2m'].values
# definig sampling considering that frequency is in hourly
sampling_rate = 1 / 3600
positive_freqs, psd, amplitude, index, spectra_komogorov, spectra_high_layers, power_law_function = calculate_PSD_and_fit_law_power(data, sampling_rate)


plt.figure(figsize=(10, 6))
plt.loglog(positive_freqs, psd, label='Measured PSD', color='blue')
plt.loglog(positive_freqs, power_law_function, label=f'Fitted Power Law: $f^{{{index:.2f}}}$', color='red', linestyle='--')
plt.loglog(positive_freqs, spectra_komogorov, label='Kolmogorov: $f^{{{-3/5}}}$', color='green', linestyle='--')
plt.loglog(positive_freqs, spectra_high_layers, label='High Layers: -3', color='purple', linestyle='--')
plt.title('Power Spectral Density and Fitted Power Law')
plt.xlabel('Frequency (Hz)')
plt.ylabel('PSD')
plt.legend()
plt.grid(True)
plt.show()

In [None]:
def calculate_PSD_and_fit_law_power(data, sampling_rate):
    # Calculate the power spectral density of the data
    fft_result = np.fft.fft(data)
    freqs = np.fft.fftfreq(len(data), 1 / sampling_rate)
    positive_freqs = freqs[:len(freqs)//2]
    psd = np.abs(fft_result[:len(freqs)//2])**2

    # Fit a power law to the data
    def power_law(x, amplitude, index):
        return amplitude * (x**index)

    # Only consider positive frequencies and non-zero values to avoid log(0)
    valid_mask = (positive_freqs > 0)
    log_freqs = np.log10(positive_freqs[valid_mask])
    log_psd = np.log10(psd[valid_mask])

    # Curve fit on log-log scale
    popt, pcov = optimize.curve_fit(lambda x, a, b: a + b * x, log_freqs, log_psd)
    amplitude = 10**popt[0]
    index = popt[1]

    return positive_freqs, psd, amplitude, index

# Plotting
fig, ax = plt.subplots(figsize=(10, 6))
ax.loglog(positive_freqs, psd, label='Measured PSD', color='blue')
ax.loglog(positive_freqs, power_law(positive_freqs, amplitude, index), label=f'Fitted Power Law (index={index:.2f})', color='red', linestyle='--')
ax.set_title('Power Spectral Density and Power Law Fit')
ax.set_xlabel('Frequency (Hz)')
ax.set_ylabel('Power Spectral Density')

ref_slope = amplitude * (positive_freqs ** (-5/3))
f_3_slope = amplitude * (positive_freqs ** (-3))
ax.loglog(positive_freqs, ref_slope, 'k--', label='Theoretical -5/3 slope')
ax.loglog(positive_freqs, f_3_slope, 'g--', label='Theoretical -3 slope')

ax.legend()
ax.grid(True)
plt.show()