In [1]:
from model import DESModel, HydroDynamicModel
import openclsim.plot as plot
import datetime as dt
import pandas as pd
import numpy as np
import xarray as xr
from multiprocessing import Pool, cpu_count
from weather_resource import vector_magnitude
from tqdm.notebook import tqdm

In [2]:
# m = DESModel()
# m.resources()
# m.processes(limit_expr=lambda Hm0: Hm0 > 0.5)
# m.start_simulation()

In [3]:
# plot.get_log_dataframe(m.process)


In [4]:
# m.project_length()

In [5]:
# -------------------------------------------------------------------------------------!
def reset_direction(dataframe, dir_col="dir"):
    """Reset the wave angle to a [0, 360] range."""
    dataframe[dir_col]= dataframe[dir_col] % 360
    return dataframe

def pandas_to_xarray(dataframe):
    """Converts a pandas multi-index dataframe into a 3D xarray."""
    # Simplify variable name.
    df = dataframe

    # Define columns of interest.
    DOFs = [
        "RAOSurgeAmp",
        "RAOSurgePhase",
        "RAOSwayAmp",
        "RAOSwayPhase",
        "RAOHeaveAmp",
        "RAOHeavePhase",
        "RAORollAmp",
        "RAORollPhase",
        "RAOPitchAmp",
        "RAOPitchPhase",
        "RAOYawAmp",
        "RAOYawPhase",
    ]

    # Separate dataframe by degree of freedom.
    dfs = [df[DOF] for DOF in DOFs]

    # Create xarray.DataArray
    da = xr.DataArray(
        data=dfs,
        dims=["DOF", "freq", "dir"],
        coords={
            "DOF": DOFs,
            "freq": dfs[0].index.values,
            "dir": dfs[0].columns.values
        }
    )

    return da

# Define a parallelise function.
def apply_parallelise(groups, func, column):
    """Apply a function parallel to pandas.GroupBy groups.""" 
    series = (group[column] for name, group in groups)
    results = []
    
    with Pool(processes=cpu_count()) as p:
        with tqdm(total=len(groups)) as pbar:
            for i, res in enumerate(p.imap(func, series)):
                pbar.update()
                results.append(res)

    results = pd.DataFrame(
        index=pd.MultiIndex.from_tuples(groups.groups.keys()),
        data=dict(values=results)
    )

    return results

# Append the 0 [deg] angle and rename to 360 [deg].
def append_angle(da: xr.DataArray = None):
    return xr.concat([
            da,
            da.sel(dict(dir=0)).assign_coords(dict(dir=360.0))
        ], dim="dir"
    )

# -------------------------------------------------------------------------------------!
class HydroDynamicModel(object):
    """Model the hydrodynamic behaviour of (floating) structures."""

    def __init__(self, rao_file: str = None, wave_file: str = None, *args, **kwargs) -> None:
        
        # If RAO file.
        if isinstance(rao_file, str):
            self.RAO = self.parse_rao(file=rao_file)

        # If wave file.
        if isinstance(wave_file, str):
            self.wave = self.parse_wave_data(file=wave_file)

        # If both are provided -> create a xarray.Dataset.
        if hasattr(self, "wave") and hasattr(self, "RAO"):
            self.data = self.build_dataset()

        
        # Limits for MPI Adventure lifting operations.
        self.response_limits(
            type=["accelerations", "displacements"],
            surge=[0.1378, np.inf],
            sway=[0.2063, np.inf],
            heave=[0.5115, 1],
            roll=[0.005, np.radians(0.50)],
            pitch=[0.01, np.radians(0.20)],
            yaw=[0.0039, np.inf],
        )
    
    def build_dataset(self, *args, **kwargs):
        """Combine wave and RAO data in a xarray.Dataset."""
        # Make sure not to recreate the dataset.
        if hasattr(self, "data"):
            return self.data

        # Create a dataset.
        ds = xr.Dataset(
            data_vars=dict(
                launch_rao=(["DOF", "freq", "dir"], self.RAO),
                wave_data=(["time", "wave_parameter"], self.wave)
            ),
            coords=dict(
                DOF=self.RAO["DOF"],
                freq=self.RAO["freq"],
                dir=self.RAO["dir"],
                time=self.wave.index.values,
                wave_parameter=self.wave.columns
            )
        )

        # Return the result.
        return ds
    
    def limit_expression(self, *args, **kwargs):
        pass
    
    def parse_rao(self, file, *args, **kwargs):
        """Read the contents of a response amplitude operator file."""
        # Make sure not to reload the data.
        if hasattr(self, "RAO"):
            return self.RAO

        # Read using pandas and store in a xarray.DataArray
        df = (pd.read_csv(file)  # Read .csv.
            .pipe(reset_direction)  # reset wave angle range.
            .set_index(["RAOPeriodOrFreq", "dir"])  # Create multi-index.
            .unstack()  # Turn multi-index in a 3D kind of dataframe.
            .pipe(pandas_to_xarray)  # Convert to a xarray.DataArray
            .pipe(append_angle)  # Copies the 0[deg] RAO to 360[deg].
        )

        # Return the result
        return df

    def parse_wave_data(self, file, *args, **kwargs):
        """Read the contents of a DHI wave data file."""
        # Make sure not to reload the data.
        if hasattr(self, "wave"):
            return self.wave

        # Import wave data.
        df = pd.read_csv(
            file,
            skiprows=[0],
            parse_dates={"datetime": ["YYYY", "M", "D", "HH", "MM", "SS"]},
            date_parser=lambda x: dt.datetime.strptime(x, "%Y %m %d %H %M %S")
        ).set_index("datetime")

        # Return the result.
        return df

    def response_limits(
        self,
        type: str = "accelerations",  # Otherwise "displacements".
        surge: float = None,  # [m/s²] or [m]
        sway: float = None,  # [m/s²] or [m]
        heave: float = None,  # [m/s²] or [m]
        yaw: float = None,  # [rad/s²] or [rad]
        roll: float = None,  # [rad/s²] or [rad]
        pitch: float = None,  # [rad/s²] or [rad]
        *args,
        **kwargs
    ):
        """Impose response limits."""
        self.motion_limits = pd.DataFrame(
            dict(
                type=type,
                surge=surge,
                sway=sway,
                heave=heave,
                roll=roll,
                pitch=pitch,
                yaw=yaw,
            )
        )

    def response_motions(self, *args, **kwargs):
        """Find the response motions."""
        # Define the possible wave headings.
        vessel_heading = xr.DataArray(np.arange(0, 360, 45), dims="dir")

        # Determine the resulting wave angles based on the time-series.
        windsea_dir = self.data["wave_data"].sel(dict(wave_parameter="MWDWS"))
        windsea_wave_angle = (180 - windsea_dir + vessel_heading) % 360
        windsea_wave_angle = windsea_wave_angle.assign_coords(dict(dir=vessel_heading))

        swell_dir = self.data["wave_data"].sel(dict(wave_parameter="MWDS"))
        swell_wave_angle = (180 - swell_dir + vessel_heading) % 360
        swell_wave_angle = swell_wave_angle.assign_coords(dict(dir=vessel_heading))

        # Compute the windsea and swell response components.
        windsea_response = self.data["launch_rao"].interp(
            dict(
                freq=self.data["wave_data"].sel(dict(wave_parameter="TpWS")),
                dir=windsea_wave_angle
            ),
            kwargs=dict(fill_value=None)
        ) * self.data["wave_data"].sel(dict(wave_parameter="Hm0WS"))

        swell_response = self.data["launch_rao"].interp(
            dict(
                freq=self.data["wave_data"].sel(dict(wave_parameter="TpS")),
                dir=swell_wave_angle
            ),
            kwargs=dict(fill_value=None)
        ) * self.data["wave_data"].sel(dict(wave_parameter="Hm0S"))
    

        # Coupled response.
        res = windsea_response + swell_response

        # Find the index with the smallest response amplitudes.
        index = (res.loc[["RAOSurgeAmp", "RAOSwayAmp", "RAOHeaveAmp"]]
            .to_dataframe(name="launch_rao")
            .reset_index()
            .groupby(["time", "dir"])
            .pipe(lambda g: apply_parallelise(groups=g, func=vector_magnitude, column="launch_rao"))
            .unstack(level=0)
            .idxmin()
            .values
        )

        # Find the resulting response motions and return.
        self.response_motions = res.sel(dict(dir=xr.DataArray(index, dims="time")))

# -------------------------------------------------------------------------------------!

In [6]:
HD = HydroDynamicModel(
    rao_file="./data/preprocessed/MPI_Adventure_RAO.csv",
    wave_file="./data/raw/HKZ_3.970372E_52.014651N.csv"
)

In [7]:
HD.response_motions()

  0%|          | 0/2103744 [00:00<?, ?it/s]

In [18]:
HD.response_motions.to_pandas().T.to_pickle("MPI_response_motions.pc")