# Compute and store geographical boxes histograms

- 2° bins histograms 

In [1]:
import os
from glob import glob

import numpy as np
import pandas as pd
import xarray as xr
import dask.dataframe as dd
import dask.array as da

import cartopy.crs as ccrs
import cartopy.feature as cfeature
import geopandas as gpd
from shapely.geometry import Polygon

%matplotlib inline
from matplotlib import pyplot as plt


import drifters.utils as ut
import pynsitu as pin

from dask.delayed import delayed

import GDP_lib as gdp

from GDP_lib import root_dir

In [2]:
from dask.distributed import Client

if True:
    from dask_jobqueue import PBSCluster
    from dask.distributed import Client

    # cluster = PBSCluster()
    # w = cluster.scale(jobs=3) # 2 not enough for lon, lat, year, binning
    cluster = PBSCluster(cores=3, processes=3, walltime="04:00:00")
    w = cluster.scale(jobs=8)
else:
    from dask.distributed import LocalCluster

    cluster = LocalCluster()

client = Client(cluster)
client



0,1
Connection method: Cluster object,Cluster type: dask_jobqueue.PBSCluster
Dashboard: http://10.148.0.144:8787/status,

0,1
Dashboard: http://10.148.0.144:8787/status,Workers: 0
Total threads: 0,Total memory: 0 B

0,1
Comm: tcp://10.148.0.144:58332,Workers: 0
Dashboard: http://10.148.0.144:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B


In [4]:
parquet_argos = os.path.join(root_dir, "argos_av_time_corrected.parquet")
parquet_gps = os.path.join(root_dir, "gps_av_time_corrected.parquet")

In [5]:
df_gps = dd.read_parquet(parquet_gps).persist()
df_argos = dd.read_parquet(parquet_argos).persist()

In [10]:
len(df_gps)

41635507

In [11]:
len(df_argos)

126753036

_________________
# 2° bins histograms


In [6]:
# bin geographically
def bins_geo(df, dl=2):
    lon_bins = np.arange(
        -180.0, 180.0 + dl, dl
    )  # CAUTION : add dl to upper bound (prevent from pb with last binning intervals)
    lat_bins = np.arange(-90, 90 + dl, dl)
    df["lon"] = (df["lon"] + 180) % 360 - 180
    df["lon_cut"] = df["lon"].map_partitions(pd.cut, bins=lon_bins).map(lambda x: x.mid)
    df["lat_cut"] = df["lat"].map_partitions(pd.cut, bins=lat_bins).map(lambda x: x.mid)


bins_geo(df_gps)
bins_geo(df_argos)

In [14]:
len(df_gps)

41635507

In [18]:
df_gps.groupby(["lon_cut", "lat_cut"]).size().sum().compute()

41635506

In [7]:
ds = (
    df_gps.groupby(["lon_cut", "lat_cut"])
    .size()
    .to_frame("nb_geobins")
    .reset_index()
    .compute()
    .to_xarray()
    .rename({"lon_cut": "lon_bins", "lat_cut": "lat_bins"})
    .set_index(index=["lon_bins", "lat_bins"])
    .unstack()
)

In [22]:
ds.nb_geobins.sum().compute()

In [8]:
ds

In [9]:
# Test pb last binning interval
i = df_argos[df_argos["id"] == 76821].loc["2009-04-27 13:00:00"].compute()
i

Unnamed: 0_level_0,id,lon,lat,ve,vn,typebuoy,gap,deploy_date,deploy_lat,deploy_lon,...,aen,aex,any,axy,dt,vex_diff,vny_diff,vxy_diff,lon_cut,lat_cut
time,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
2009-04-27 13:00:00,76821,-119.410141,88.001511,0.0639,0.062,SVPB,4838.0,2008-09-27,82.49,168.63,...,2e-06,3e-06,-3e-06,4e-06,3600.0,-0.04561,0.081043,0.092996,-119.0,89.0


In [10]:
def compute_histogram(df, key, bins=np.linspace(-3, 3, 150)):

    try:
        lon = df.reset_index()["lon_cut"].loc[0].compute().values[0]
        lat = df.reset_index()["lat_cut"].loc[0].compute().values[0]
    except:
        try:
            lon = df.reset_index()["lon_cut"].loc[0]
            lat = df.reset_index()["lat_cut"].loc[0]
        except:
            assert False, (
                df["lon_cut"].reset_index().values,
                df["lat_cut"].reset_index().values,
            )
    try:
        h, b = da.histogram(df[key], bins=bins, density=True).compute()
    except:
        h, b = np.histogram(df[key], bins=bins, density=True)

    index = pd.MultiIndex.from_arrays(
        [[lon], [lat]],
        names=("lon_cut", "lat_cut"),
    )
    out = pd.DataFrame(
        dict(zip(b[:-1] + np.diff(bins) / 2, h)), index=index
    )  # create multiindex

    if (df["lon_cut"] != lon).any() or (df["lat_cut"] != lat).any():
        assert False, "pb with lon, lat groupby"
    return out

In [11]:
# Test compute histogram
df = df_gps.get_partition(0).groupby(["lon_cut", "lat_cut"]).get_group((-95, 5.0))
hist_ve = compute_histogram(df.compute(), "ve")
hist_ve

Unnamed: 0_level_0,Unnamed: 1_level_0,-2.979866,-2.939597,-2.899329,-2.859060,-2.818792,-2.778523,-2.738255,-2.697987,-2.657718,-2.617450,...,2.617450,2.657718,2.697987,2.738255,2.778523,2.818792,2.859060,2.899329,2.939597,2.979866
lon_cut,lat_cut,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1,Unnamed: 22_level_1
-95.0,5.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [12]:
def build_geo_hist(
    df,
    dl=2,
    v_bins=np.linspace(-2, 2, 150),
    a_bins=np.linspace(-1e-4, 1e-4, 150),
    v_keys=["ve", "vn", "vex", "vny"],
    a_keys=["ae", "an", "aex", "any"],
):
    """Build xr.Dataset with histograms of velocities and accelerations in geographical box + nb of data per bins
    Parameters
    ----------
    df : dask.Dataframe
        contains velocities and accelerations
    dl : float
        geographical box size in degree
    v_keys : list of str,
        keys of velocities for which we want to compute histograms
    a_keys : list of str,
        keys of accelerations for which we want to compute histograms
    v_bins : np.array
        velocities bins
    a_bins : np.array
        accelerations bins
    """
    if "lon_cut" not in df.columns or "lat_cut" not in df.columns:
        bins_geo(df, dl)

    # nb counts
    ds = (
        df.groupby(["lon_cut", "lat_cut"])
        .size()
        .to_frame("nb_geobins")
        .reset_index()
        .compute()
        .to_xarray()
        .rename({"lon_cut": "lon_bins", "lat_cut": "lat_bins"})
        .set_index(index=["lon_bins", "lat_bins"])
        .unstack()
    )

    # Velocities Histograms
    group = tuple(
        df.get_partition(0)
        .reset_index()[["lon_cut", "lat_cut"]]
        .loc[0]
        .values.compute()[0]
    )
    _df = df.get_partition(0).groupby(["lon_cut", "lat_cut"]).get_group(group)
    _histv = compute_histogram(_df.compute(), v_keys[0], v_bins)
    _hista = compute_histogram(_df.compute(), a_keys[0], a_bins)

    for v in v_keys:
        if v not in df.columns:
            print(f"{v} not in dataframe")
            continue
        hv = (
            df.groupby(["lon_cut", "lat_cut"])
            .apply(compute_histogram, v, v_bins, meta=_histv)
            .compute()
        )
        _ds = (
            (
                pd.melt(
                    hv.reset_index(),
                    id_vars=[
                        "lon_cut",
                        "lat_cut",
                    ],
                    var_name="vbins",
                ).rename(
                    columns={
                        "lon_cut": "lon_bins",
                        "lat_cut": "lat_bins",
                        "value": "hist_" + v,
                    }
                )
            )
            .to_xarray()
            .set_index(index=["lon_bins", "lat_bins", "vbins"])
            .unstack()
        )
        _ds["vbins"] = _ds["vbins"].astype(float)  # convert object type to float
        ds = xr.merge([ds, _ds])
        print(v)
    # Accelerations Histograms
    for a in a_keys:
        if a not in df.columns:
            print(f"{a} not in dataframe")
            continue
        ha = (
            df.groupby(["lon_cut", "lat_cut"])
            .apply(compute_histogram, a, a_bins, meta=_hista)
            .compute()
        )
        _ds = (
            (
                pd.melt(
                    ha.reset_index(),
                    id_vars=[
                        "lon_cut",
                        "lat_cut",
                    ],
                    var_name="abins",
                ).rename(
                    columns={
                        "lon_cut": "lon_bins",
                        "lat_cut": "lat_bins",
                        "value": "hist_" + a,
                    }
                )
            )
            .to_xarray()
            .set_index(index=["lon_bins", "lat_bins", "abins"])
            .unstack()
        )
        _ds["abins"] = _ds["abins"].astype(float)  # convert object type to float
        ds = xr.merge([ds, _ds])
        print(a)

    return ds

In [13]:
dl = int(2)
Hv_gps = build_geo_hist(df_gps, dl=2)
Hv_argos = build_geo_hist(df_argos, dl=2)

ve
vn
vex
vny
ae
an
aex
any
ve
vn
vex
vny
ae
an
aex
any


## Store

In [14]:
zarr_gps_geohist = os.path.join(root_dir, f"gps_geohist_{dl}_corrected.zarr")
zarr_argos_geohist = os.path.join(root_dir, f"argos_geohist_{dl}_corrected.zarr")

Hv_gps.to_zarr(zarr_gps_geohist, mode="w")
Hv_argos.to_zarr(zarr_argos_geohist, mode="w")

<xarray.backends.zarr.ZarrStore at 0x2aab1e118cf0>

In [15]:
cluster.close()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
