In [42]:
import xarray as xr
import matplotlib.pyplot as plt
import cartopy.crs as ccrs
import cartopy.feature as cfeature
import numpy as np
import logging 
logging.basicConfig(level=logging.INFO)
import atlite
from __future__ import annotations

import datetime as dt
import logging
from collections import namedtuple
from operator import itemgetter
from pathlib import Path
from typing import TYPE_CHECKING

import geopandas as gpd
import numpy as np
import pandas as pd
import xarray as xr
from dask import compute, delayed
from dask.array import absolute, arccos, cos, maximum, mod, radians, sin, sqrt, arcsin, arctan2, radians, arctan
from dask.diagnostics import ProgressBar
from numpy import pi
from scipy.sparse import csr_matrix
from atlite.aggregate import aggregate_matrix
from atlite.gis import spdiag
from numpy import pi
import sys
from numpy import logical_and

from atlite import csp as cspm

In [2]:
logger = logging.getLogger(__name__)

In [29]:
def SolarPosition(ds, time_shift="0H"):

    # up to h and dec from [1]

    time_shift = pd.to_timedelta(time_shift)
    #for the models: 
    t = ds.indexes["time"].to_datetimeindex()
    t = t + time_shift
    #for the era5
    #t=ds.indexes["time"] + time_shift
    n = xr.DataArray(t.to_julian_date(), coords=ds["time"].coords) - 2451545.0
    hour = (ds["time"] + time_shift).dt.hour
    minute = (ds["time"] + time_shift).dt.minute

    # Operations make new DataArray eager; reconvert to lazy dask arrays
    chunks = ds.chunksizes.get("time", "auto")
    if isinstance(chunks, tuple):
        chunks = chunks[0]
    n = n.chunk(chunks)
    hour = hour.chunk(chunks)
    minute = minute.chunk(chunks)

    L = 280.460 + 0.9856474 * n  # mean longitude (deg)
    g = radians(357.528 + 0.9856003 * n)  # mean anomaly (rad)
    l = radians(L + 1.915 * sin(g) + 0.020 * sin(2 * g))  # ecliptic long. (rad)
    ep = radians(23.439 - 4e-7 * n)  # obliquity of the ecliptic (rad)

    ra = arctan2(cos(ep) * sin(l), cos(l))  # right ascencion (rad)
    lmst = (6.697375 + (hour + minute / 60.0) + 0.0657098242 * n) * 15.0 + ds[
        "lon"
    ]  # local mean sidereal time (deg)
    h = (radians(lmst) - ra + pi) % (2 * pi) - pi  # hour angle (rad)

    dec = arcsin(sin(ep) * sin(l))  # declination (rad)

    # alt and az from [2]
    lat = radians(ds["lat"])
    # Clip before arcsin to prevent values < -1. from rounding errors; can
    # cause NaNs later
    alt = arcsin(
        (sin(dec) * sin(lat) + cos(dec) * cos(lat) * cos(h)).clip(min=-1.0, max=1.0)
    ).rename("altitude")
    alt.attrs["time shift"] = f"{time_shift}"
    alt.attrs["units"] = "rad"

    az = arccos(
        ((sin(dec) * cos(lat) - cos(dec) * sin(lat) * cos(h)) / cos(alt)).clip(
            min=-1.0, max=1.0
        )
    )
    az = az.where(h <= 0, 2 * pi - az).rename("azimuth")
    az.attrs["time shift"] = f"{time_shift}"
    az.attrs["units"] = "rad"

    vars = {da.name: da for da in [alt, az]}
    solar_position = xr.Dataset(vars)

    return solar_position

In [32]:
filepath_model="/groups/FutureWind/SFCRAD/CanESM5/historical/r1i1p2f1/rsds_rsdsdiff_tas_2014.nc"
ds_model_2014 = xr.open_dataset(filepath_model, decode_times=True)

In [33]:
solar_position_model=SolarPosition(ds_model_2014, time_shift="0H") #model data does not need time shift. because 
# the data is the average in a 3h interval, given at the middle point of the interval

  time_shift = pd.to_timedelta(time_shift)
  t = ds.indexes["time"].to_datetimeindex()
  t = ds.indexes["time"].to_datetimeindex()


In [34]:
solar_position_model['altitude'].values

array([[[-1.46005509, -1.50139338, -1.53190762, ..., -0.54514293,
         -0.50090554, -0.456801  ],
        [-1.46655714, -1.5110631 , -1.552638  , ..., -0.55701212,
         -0.51334177, -0.46981976],
        [-1.45165199, -1.48678243, -1.5084164 , ..., -0.56748209,
         -0.52451961, -0.48171312],
        ...,
        [-0.77627374, -0.77833251, -0.77924822, ..., -0.53869501,
         -0.52116598, -0.50344192],
        [-0.72807197, -0.72979562, -0.73056201, ..., -0.52483657,
         -0.50959217, -0.4941543 ],
        [-0.67983328, -0.68124953, -0.68187906, ..., -0.50972833,
         -0.49677787, -0.48364334]],

       [[-0.94795914, -0.9029263 , -0.85794981, ...,  0.13838639,
          0.17848213,  0.21808753],
        [-0.95534587, -0.9107532 , -0.86623198, ...,  0.11509316,
          0.15425703,  0.19289788],
        [-0.95942854, -0.9155788 , -0.87178264, ...,  0.09158903,
          0.12975542,  0.16737052],
        ...,
        [-0.68001246, -0.66633982, -0.6520402 , ..., -

In [35]:
def get_orientation(name, **params):
    """
    Definitions:
    -`slope` is the angle between ground and panel.
    -`azimuth` is the clockwise angle from North.
        i.e. azimuth = 180 faces exactly South
    """
    if isinstance(name, dict):
        params = name
        name = params.pop("name", "constant")
    return getattr(sys.modules[__name__], f"make_{name}")(**params)


def make_latitude_optimal():
    """
    Returns an optimal tilt angle for the given ``lat``, assuming that the
    panel is facing towards the equator, using a simple method from [1].

    This method only works for latitudes between 0 and 50. For higher
    latitudes, a static 40 degree angle is returned.

    These results should be used with caution, but there is some
    evidence that tilt angle may not be that important [2].

    Function and documentation has been adapted from gsee [3].

    [1] http://www.solarpaneltilt.com/#fixed
    [2] http://dx.doi.org/10.1016/j.solener.2010.12.014
    [3] https://github.com/renewables-ninja/gsee/blob/master/gsee/pv.py

    Parameters
    ----------
    lat : float
        Latitude in degrees.

    """

    def latitude_optimal(lon, lat, solar_position):
        slope = np.empty_like(lat.values)

        below_25 = np.abs(lat.values) <= np.radians(25)
        below_50 = np.abs(lat.values) <= np.radians(50)

        slope[below_25] = 0.87 * np.abs(lat.values[below_25])
        slope[~below_25 & below_50] = 0.76 * np.abs(
            lat.values[~below_25 & below_50]
        ) + np.radians(0.31)
        slope[~below_50] = np.radians(40.0)

        # South orientation for panels on northern hemisphere and vice versa
        azimuth = np.where(lat.values < 0, 0, pi)
        return dict(
            slope=xr.DataArray(slope, coords=lat.coords),
            azimuth=xr.DataArray(azimuth, coords=lat.coords),
        )

    return latitude_optimal


def make_constant(slope, azimuth):
    slope = radians(slope)
    azimuth = radians(azimuth)

    def constant(lon, lat, solar_position):
        return dict(slope=slope, azimuth=azimuth)

    return constant


def make_latitude(azimuth=180):
    azimuth = radians(azimuth)

    def latitude(lon, lat, solar_position):
        return dict(slope=lat, azimuth=azimuth)

    return latitude


def SurfaceOrientation(ds, solar_position, orientation, tracking=None):
    """
    Compute cos(incidence) for slope and panel azimuth.

    References
    ----------
    [1] Sproul, A. B., Derivation of the solar geometric relationships using
    vector analysis, Renewable Energy, 32(7), 1187–1205 (2007).
    [2] Marion, William F., and Aron P. Dobos. Rotation angle for the optimum
    tracking of one-axis trackers. No. NREL/TP-6A20-58891. National Renewable
    Energy Lab.(NREL), Golden, CO (United States), 2013.

    """
    lon = radians(ds["lon"])
    lat = radians(ds["lat"])

    orientation = orientation(lon, lat, solar_position)
    surface_slope = orientation["slope"]
    surface_azimuth = orientation["azimuth"]

    sun_altitude = solar_position["altitude"]
    sun_azimuth = solar_position["azimuth"]

    if tracking is None:
        cosincidence = sin(surface_slope) * cos(sun_altitude) * cos(
            surface_azimuth - sun_azimuth
        ) + cos(surface_slope) * sin(sun_altitude)

    elif tracking == "horizontal":  # horizontal tracking with horizontal axis
        axis_azimuth = orientation[
            "azimuth"
        ]  # here orientation['azimuth'] refers to the azimuth of the tracker axis.
        rotation = arctan(
            (cos(sun_altitude) / sin(sun_altitude)) * sin(sun_azimuth - axis_azimuth)
        )
        surface_slope = abs(rotation)
        surface_azimuth = axis_azimuth + arcsin(
            sin(rotation / sin(surface_slope))
        )  # the 2nd part yields +/-1 and determines if the panel is facing east or west
        cosincidence = cos(surface_slope) * sin(sun_altitude) + sin(
            surface_slope
        ) * cos(sun_altitude) * cos(sun_azimuth - surface_azimuth)

    elif tracking == "tilted_horizontal":  # horizontal tracking with tilted axis'
        axis_tilt = orientation[
            "slope"
        ]  # here orientation['slope'] refers to the tilt of the tracker axis.

        rotation = arctan(
            (cos(sun_altitude) * sin(sun_azimuth - surface_azimuth))
            / (
                cos(sun_altitude) * cos(sun_azimuth - surface_azimuth) * sin(axis_tilt)
                + sin(sun_altitude) * cos(axis_tilt)
            )
        )

        surface_slope = arccos(cos(rotation) * cos(axis_tilt))

        azimuth_difference = sun_azimuth - surface_azimuth
        azimuth_difference = np.where(
            azimuth_difference > pi, 2 * pi - azimuth_difference, azimuth_difference
        )
        azimuth_difference = np.where(
            azimuth_difference < -pi, 2 * pi + azimuth_difference, azimuth_difference
        )
        rotation = np.where(
            logical_and(rotation < 0, azimuth_difference > 0),
            rotation + pi,
            rotation,
        )
        rotation = np.where(
            logical_and(rotation > 0, azimuth_difference < 0),
            rotation - pi,
            rotation,
        )

        cosincidence = cos(rotation) * (
            sin(axis_tilt) * cos(sun_altitude) * cos(sun_azimuth - surface_azimuth)
            + cos(axis_tilt) * sin(sun_altitude)
        ) + sin(rotation) * cos(sun_altitude) * sin(sun_azimuth - surface_azimuth)

    elif tracking == "vertical":  # vertical tracking, surface azimuth = sun_azimuth
        cosincidence = sin(surface_slope) * cos(sun_altitude) + cos(
            surface_slope
        ) * sin(sun_altitude)
    elif tracking == "dual":  # both vertical and horizontal tracking
        cosincidence = np.float64(1.0)
    else:
        assert False, (
            "Values describing tracking system must be None for no tracking,"
            + "'horizontal' for 1-axis horizontal tracking,"
            + "tilted_horizontal' for 1-axis horizontal tracking of tilted panle,"
            + "vertical' for 1-axis vertical tracking, or 'dual' for 2-axis tracking"
        )

    # fixup incidence angle: if the panel is badly oriented and the sun shines
    # on the back of the panel (incidence angle > 90degree), the irradiation
    # would be negative instead of 0; this is prevented here.
    cosincidence = cosincidence.clip(min=0)

    return xr.Dataset(
        {
            "cosincidence": cosincidence,
            "slope": surface_slope,
            "azimuth": surface_azimuth,
        }
    )

In [40]:
orientation='latitude_optimal'

In [43]:
orientation = get_orientation(orientation)

In [47]:
surface_orientation_model=SurfaceOrientation(ds_model_2014, solar_position_model, orientation, tracking=None)

In [48]:
surface_orientation_model

Unnamed: 0,Array,Chunk
Bytes,12.48 MiB,12.48 MiB
Shape,"(20, 2920, 28)","(20, 2920, 28)"
Dask graph,1 chunks in 95 graph layers,1 chunks in 95 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 12.48 MiB 12.48 MiB Shape (20, 2920, 28) (20, 2920, 28) Dask graph 1 chunks in 95 graph layers Data type float64 numpy.ndarray",28  2920  20,

Unnamed: 0,Array,Chunk
Bytes,12.48 MiB,12.48 MiB
Shape,"(20, 2920, 28)","(20, 2920, 28)"
Dask graph,1 chunks in 95 graph layers,1 chunks in 95 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [46]:
trigon_model='simple'
clearsky_model='simple'
tracking=None

In [52]:
# SPDX-FileCopyrightText: Contributors to atlite <https://github.com/pypsa/atlite>
#
# SPDX-License-Identifier: MIT

import logging

import numpy as np
from dask.array import cos, fmax, fmin, radians, sin, sqrt

logger = logging.getLogger(__name__)


def DiffuseHorizontalIrrad(ds, solar_position, clearsky_model, influx):
    # Clearsky model from Reindl 1990 to split downward radiation into direct
    # and diffuse contributions. Should switch to more up-to-date model, f.ex.
    # Ridley et al. (2010) http://dx.doi.org/10.1016/j.renene.2009.07.018 ,
    # Lauret et al. (2013):http://dx.doi.org/10.1016/j.renene.2012.01.049

    sinaltitude = sin(solar_position["altitude"])
    influx_toa = ds["influx_toa"]

    if clearsky_model is None:
        clearsky_model = (
            "enhanced" if "temperature" in ds and "humidity" in ds else "simple"
        )

    # Reindl 1990 clearsky model

    k = influx / influx_toa  # clearsky index
    # k.values[k.values > 1.0] = 1.0
    # k = k.rename('clearsky index')

    if clearsky_model == "simple":
        # Simple Reindl model without ambient air temperature and
        # relative humidity
        fraction = (
            ((k > 0.0) & (k <= 0.3))
            * fmin(1.0, 1.020 - 0.254 * k + 0.0123 * sinaltitude)
            + ((k > 0.3) & (k < 0.78))
            * fmin(0.97, fmax(0.1, 1.400 - 1.749 * k + 0.177 * sinaltitude))
            + (k >= 0.78) * fmax(0.1, 0.486 * k - 0.182 * sinaltitude)
        )
    elif clearsky_model == "enhanced":
        # Enhanced Reindl model with ambient air temperature and relative
        # humidity
        T = ds["temperature"]
        rh = ds["humidity"]

        fraction = (
            ((k > 0.0) & (k <= 0.3))
            * fmin(
                1.0,
                1.000 - 0.232 * k + 0.0239 * sinaltitude - 0.000682 * T + 0.0195 * rh,
            )
            + ((k > 0.3) & (k < 0.78))
            * fmin(
                0.97,
                fmax(
                    0.1,
                    1.329 - 1.716 * k + 0.267 * sinaltitude - 0.00357 * T + 0.106 * rh,
                ),
            )
            + (k >= 0.78)
            * fmax(0.1, 0.426 * k - 0.256 * sinaltitude + 0.00349 * T + 0.0734 * rh)
        )
    else:
        raise KeyError("`clearsky model` must be chosen from 'simple' and 'enhanced'")

    # Set diffuse fraction to one when the sun isn't up
    # fraction = fraction.where(sinaltitude >= sin(radians(threshold))).fillna(1.0)
    # fraction = fraction.rename('fraction index')

    return (influx * fraction).rename("diffuse horizontal")


def TiltedDiffuseIrrad(ds, solar_position, surface_orientation, direct, diffuse):
    # Hay-Davies Model

    sinaltitude = sin(solar_position["altitude"])
    influx_toa = ds["influx_toa"]

    cosincidence = surface_orientation["cosincidence"]
    surface_slope = surface_orientation["slope"]

    influx = direct + diffuse

    with np.errstate(divide="ignore", invalid="ignore"):
        # brightening factor
        f = sqrt(direct / influx).fillna(0.0)

        # anisotropy factor
        A = direct / influx_toa

    # geometric factor
    R_b = cosincidence / sinaltitude

    diffuse_t = (
        (1.0 - A)
        * ((1 + cos(surface_slope)) / 2.0)
        * (1.0 + f * sin(surface_slope / 2.0) ** 3)
        + A * R_b
    ) * diffuse

    # fixup: clip all negative values (unclear why it gets negative)
    # note: REatlas does not do the fixup
    if logger.isEnabledFor(logging.WARNING):
        if ((diffuse_t < 0.0) & (sinaltitude > sin(radians(1.0)))).any():
            logger.warning(
                "diffuse_t exhibits negative values above altitude threshold."
            )

    with np.errstate(invalid="ignore"):
        diffuse_t = diffuse_t.clip(min=0).fillna(0)

    return diffuse_t.rename("diffuse tilted")


def TiltedDirectIrrad(solar_position, surface_orientation, direct):
    sinaltitude = sin(solar_position["altitude"])
    cosincidence = surface_orientation["cosincidence"]

    # geometric factor
    R_b = cosincidence / sinaltitude

    return (R_b * direct).rename("direct tilted")


def _albedo(ds, influx):
    if "albedo" in ds:
        albedo = ds["albedo"]
    elif "outflux" in ds:
        albedo = (ds["outflux"] / influx.where(influx != 0)).fillna(0).clip(max=1)
    else:
        raise AssertionError(
            "Need either albedo or outflux as a variable in the dataset. "
            "Check your cutout and dataset module."
        )

    return albedo


def TiltedGroundIrrad(ds, solar_position, surface_orientation, influx):
    surface_slope = surface_orientation["slope"]
    ground_t = influx * _albedo(ds, influx) * (1.0 - cos(surface_slope)) / 2.0
    return ground_t.rename("ground tilted")


def TiltedIrradiation(
    ds,
    solar_position,
    surface_orientation,
    trigon_model,
    clearsky_model,
    tracking=0,
    altitude_threshold=1.0,
    irradiation="total",
):
    """
    Calculate the irradiation on a tilted surface.

    Parameters
    ----------
    ds : xarray.Dataset
        Cutout data used for calculating the irradiation on a tilted surface.
    solar_position : xarray.Dataset
        Solar position calculated using atlite.pv.SolarPosition,
        containing a solar 'altitude' (in rad, 0 to pi/2) for the 'ds' dataset.
    surface_orientation : xarray.Dataset
        Surface orientation calculated using atlite.orientation.SurfaceOrientation.
    trigon_model : str
        Type of trigonometry model. Defaults to 'simple'if used via `convert_irradiation`.
    clearsky_model : str or None
        Either the 'simple' or the 'enhanced' Reindl clearsky
        model. The default choice of None will choose dependending on
        data availability, since the 'enhanced' model also
        incorporates ambient air temperature and relative humidity.
        NOTE: this option is only used if the used climate dataset
        doesn't provide direct and diffuse irradiation separately!
    altitude_threshold : float
        Threshold for solar altitude in degrees. Values in range (0, altitude_threshold]
        will be set to zero. Default value equals 1.0 degrees.
    irradiation : str
        The irradiation quantity to be returned. Defaults to "total" for total
        combined irradiation. Other options include "direct" for direct irradiation,
        "diffuse" for diffuse irradation, and "ground" for irradiation reflected
        by the ground via albedo. NOTE: "ground" irradiation is not calculated
        by all `trigon_model` options in the `convert_irradiation` method,
        so use with caution!

    Returns
    -------
    result : xarray.DataArray
        The desired irradiation quantity on the tilted surface.

    """
    influx_toa = ds["rsds"] #definin influx_toa as rsds from model
    influx_direct=ds['rsds']-ds['rsdsdiff']
    influx_diffuse=ds['rsdsdiff']

    def clip(influx, influx_max):
        # use .data in clip due to dask-xarray incompatibilities
        return influx.clip(min=0, max=influx_max.transpose(*influx.dims).data)

    #if "influx" in ds:
    #    influx = clip(ds["influx"], influx_toa)
    #    diffuse = DiffuseHorizontalIrrad(ds, solar_position, clearsky_model, influx)
    #    direct = influx - diffuse
    if "rsds" in ds and "rsdsdiff" in ds:
        direct = clip(ds["rsds"]-ds['rsdsdiff'], influx_toa)
        diffuse = clip(ds["rsdsdiff"], influx_toa - direct)
    else:
        raise AssertionError(
            "Need either influx or influx_direct and influx_diffuse in the "
            "dataset. Check your cutout and dataset module."
        )
    if trigon_model == "simple":
        k = surface_orientation["cosincidence"] / sin(solar_position["altitude"])
        if tracking != "dual":
            cos_surface_slope = cos(surface_orientation["slope"])
        elif tracking == "dual":
            cos_surface_slope = sin(solar_position["altitude"])

        influx = direct + diffuse
        direct_t = k * direct
        diffuse_t = (1.0 + cos_surface_slope) / 2.0 * diffuse
        ground_t = _albedo(ds, influx) * influx * ((1.0 - cos_surface_slope) / 2.0)

        total_t = direct_t.fillna(0.0) + diffuse_t.fillna(0.0) + ground_t.fillna(0.0)
    else:
        diffuse_t = TiltedDiffuseIrrad(
            ds, solar_position, surface_orientation, direct, diffuse
        )
        direct_t = TiltedDirectIrrad(solar_position, surface_orientation, direct)
        ground_t = TiltedGroundIrrad(
            ds, solar_position, surface_orientation, direct + diffuse
        )

        total_t = direct_t + diffuse_t + ground_t

    if irradiation == "total":
        result = total_t.rename("total tilted")
    elif irradiation == "direct":
        result = direct_t.rename("direct tilted")
    elif irradiation == "diffuse":
        result = diffuse_t.rename("diffuse tilted")
    elif irradiation == "ground":
        result = ground_t.rename("ground tilted")

    # The solar_position algorithms have a high error for small solar altitude
    # values, leading to big overall errors from the 1/sinaltitude factor.
    # => Suppress irradiation below solar altitudes of 1 deg.

    cap_alt = solar_position["altitude"] < radians(altitude_threshold)
    result = result.where(~(cap_alt | (direct + diffuse <= 0.01)), 0)
    result.attrs["units"] = "W m**-2"

    return result

In [53]:
irradiation_model=TiltedIrradiation(
    ds_model_2014,
    solar_position_model,
    surface_orientation_model,
    trigon_model,
    clearsky_model,
    tracking=0,
    altitude_threshold=1.0,
    irradiation="total",
)

AssertionError: Need either albedo or outflux as a variable in the dataset. Check your cutout and dataset module.