In [96]:
from argopy.data_fetchers.proto import ArgoDataFetcherProto
import json
from abc import ABC, abstractmethod
from argopy.utilities import list_standard_variables
from argopy.options import OPTIONS
import numpy as np
import xarray as xr
import pdb
import requests
import pandas as pd

def urlopenjson(url):
    resp = requests.get(url)
    # Consider any status other than 2xx an error
    if not resp.status_code // 100 == 2:
        return "Error: Unexpected response {}".format(resp)
    return resp.json()

class ArgovisDataFetcher(ArgoDataFetcherProto):
    ###
    # Methods to be customised for a specific Argovis request
    ###
    @abstractmethod
    def init(self):
        """ Initialisation for a specific fetcher """
        pass
    
    @abstractmethod
    def cname(self):
        """ Return a unique string defining the request

            Provide this string to populate meta data and titles
        """
        pass
    
    @property
    def url(self):
        """ Return the URL used to download data """
        pass
    ###
    # Methods that must not change
    ###
    def __init__(self,
             ds: str = "",
             cache: bool = False,
             cachedir: str = "",
             **kwargs):
        """ Instantiate an Argovis Argo data loader

            Parameters
            ----------
            ds: 'phy' or 'bgc' # @tylertucker202: Do you provide BGC data as well ?? what is the way to select which data (core or bgc) to retrieve ?
            cache : False
            cachedir : None
        """
        #self.fs = httpstore(cache=cache, cachedir=cachedir, timeout=120)
        self.definition = 'Argovis Argo data fetcher'
        self.dataset_id = OPTIONS['dataset'] if ds == '' else ds
        self.init(**kwargs)
        self.key_map = {\
               'date':'TIME', \
               'date_qc':'TIME_QC', \
               'lat':'LATITUDE', \
               'lon':'LONGITUDE', \
               'cycle_number':'CYCLE_NUMBER', \
               'DATA_MODE':'DATA_MODE', \
               'DIRECTION':'DIRECTION', \
               'platform_number':'PLATFORM_NUMBER', \
               'position_qc':'POSITION_QC', \
               'pres': 'PRES', \
               'temp': 'TEMP', \
               'psal': 'PSAL', \
               'index': 'N_POINTS'
              }
    
    def __repr__(self):
        summary = ["<datafetcher '%s'>" % self.definition]
        summary.append("Domain: %s" % self.cname())
        return '\n'.join(summary)

    # This one will be delegated to the httpstore later
    def open_json(self, url):
        #return self.fs.open(url)
        return urlopenjson(url)

    def json2dataframe(self, profiles):
        """ convert json data to Pandas DataFrame """
        # Make sure we deal with a list
        if isinstance(profiles, list):
            data = profiles
        else: 
            data = [profiles] 
        # Transform
        rows = []
        for profile in data:
            keys = [ x for x in profile.keys() if not x in ['measurements', 'bgcMeas']]
            meta_row = dict( (key, profile[key]) for key in keys)
            for row in profile['measurements']:
                row.update(meta_row)
                rows.append(row)
        df = pd.DataFrame(rows)
        return df
    
    def filter_data_mode(self, ds, **kwargs):
        ds = ds.argo.filter_data_mode(errors='ignore', **kwargs)
        if ds.argo._type == 'point':
            ds['N_POINTS'] = np.arange(0, len(ds['N_POINTS']))
        return ds

    def filter_qc(self, ds, **kwargs):
        ds = ds.argo.filter_qc(**kwargs)
        if ds.argo._type == 'point':
            ds['N_POINTS'] = np.arange(0, len(ds['N_POINTS']))
        return ds

    def filter_variables(self, ds, mode='standard'):
        if mode == 'standard':
            to_remove = sorted(list(set(list(ds.data_vars)) - set(list_standard_variables())))
            return ds.drop_vars(to_remove)
        else:
            return ds

class Fetch_wmo(ArgovisDataFetcher):
    def init(self, WMO=[], CYC=None):
        """ Create Argo data loader for WMOs and CYCs

            Parameters
            ----------
            WMO : list(int)
                The list of WMOs to load all Argo data for.
            CYC : int, np.array(int), list(int)
                The cycle numbers to load.
        """
        if isinstance(WMO, int):
            WMO = [WMO]  # Make sure we deal with a list
        if isinstance(CYC, int):
            CYC = np.array((CYC,), dtype='int')  # Make sure we deal with an array of integers
        if isinstance(CYC, list):
            CYC = np.array(CYC, dtype='int')  # Make sure we deal with an array of integers
        self.WMO = WMO
        self.CYC = CYC

        self.definition = "?"
        if self.dataset_id == 'phy':
            self.definition = 'Argovis Argo data fetcher for profiles'
        return self
    
    def cname(self):
        """ Return a unique string defining the constraints """
        if len(self.WMO) > 1:
            listname = ["WMO%i" % i for i in self.WMO]
            if isinstance(self.CYC, (np.ndarray)):
                [listname.append("CYC%0.4d" % i) for i in self.CYC]
            listname = ";".join(listname)
        else:
            listname = "WMO%i" % self.WMO[0]
            if isinstance(self.CYC, (np.ndarray)):
                listname = [listname]
                [listname.append("CYC%0.4d" % i) for i in self.CYC]
                listname = "_".join(listname)
        listname = self.dataset_id + "_" + listname
        return listname

    @property
    def url(self):
        """ Return the URL used to download data """
        urls = []
        if isinstance(self.CYC, (np.ndarray)) and self.CYC.nbytes > 0:
            profIds = [str(wmo) + '_' + str(cyc) for wmo in self.WMO for cyc in self.CYC.tolist()]
            urls.append('https://argovis.colorado.edu/catalog/mprofiles/?ids={}'.format(profIds).replace(' ', ''))
        elif self.dataset_id == 'bgc' and isinstance(self.CYC, (np.ndarray)) and self.CYC.nbytes > 0:
            profIds = [str(wmo) + '_' + str(cyc) for wmo in self.WMO for cyc in self.CYC.tolist()]
            urls.append('https://argovis.colorado.edu/catalog/profiles/{}'.format(profile_number))
        else:
            for wmo in self.WMO:
                urls.append('https://argovis.colorado.edu/catalog/platforms/{}'.format(str(wmo)))
        return urls
    
    def to_dataframe(self, js):
        df = self.json2dataframe(js)
        df = df.reset_index()
        df = df.rename(columns=self.key_map)
        df = df[[ value for value in self.key_map.values() ]]
        df = df.set_index(['N_POINTS', 'TIME', 'LATITUDE', 'LONGITUDE'])
        return df
    
    def to_xarray(self):
        """ Download and return data as xarray Datasets """
        results = []
        for url in self.url:
            #with self.open_json(url) as of:
            #    js = json.load(of)
            js = self.open_json(url)
            if isinstance(js, str):
                continue
            ds = self.to_dataframe(js).to_xarray()
            # @tylertucker202: This is where we would rename variables, cast data types, etc to ensure upper level compatibility with argopy fetcher facade.
            results.append(ds)
        results = [r for r in results if r is not None]  # Only keep non-empty results
        if len(results) > 0:
            ds = xr.concat(results, dim='N_POINTS', data_vars='all', coords='all', compat='override')
            ds['N_POINTS'] = np.arange(0, len(ds['N_POINTS']))  # Re-index to avoid duplicate values
            ds = ds.set_coords('N_POINTS')
            #ds = ds.sortby('DATE') # should already be sorted by date in decending order
            return ds
        else:
            raise ValueError("CAN'T FETCH ANY DATA !")

In [97]:
# different ways to use (internally):
fetcher = Fetch_wmo(cache=0, cachedir="./tmp", WMO=6901929) # memory error
#fetcher = Fetch_wmo(cache=0, cachedir="./tmp", WMO=6901929, CYC=12)
#fetcher = Fetch_wmo(cache=0, cachedir="./tmp", WMO=6901929, CYC=[1, 12])
#fetcher = Fetch_wmo(cache=0, cachedir="./tmp", WMO=[6901929, 2901623]) # memory error
#fetcher = Fetch_wmo(cache=0, cachedir="./tmp", WMO=[6901929, 2901623], CYC=12)
#fetcher = Fetch_wmo(cache=0, cachedir="./tmp", WMO=[6901929, 2901623], CYC=[1, 12])

#fetcher = Fetch_wmo(WMO=6901929) # memory error
#fetcher = Fetch_wmo(WMO=6901929, CYC=12)
#fetcher = Fetch_wmo(WMO=6901929, CYC=[1, 12])
#fetcher = Fetch_wmo(WMO=[6901929, 2901623]) # memory error
#fetcher = Fetch_wmo(WMO=[6901929, 2901623], CYC=12)
#fetcher = Fetch_wmo(WMO=[6901929, 2901623], CYC=[1, 12])

print(fetcher)
print(fetcher.url)
ds = fetcher.to_xarray()
#np.unique(ds['_id'])
# fetcher.fs.clear_cache()

<datafetcher 'Argovis Argo data fetcher for profiles'>
Domain: phy_WMO6901929
['https://argovis.colorado.edu/catalog/platforms/6901929']


MemoryError: 

In [82]:
ds.info()

xarray.Dataset {
dimensions:
	LATITUDE = 4 ;
	LONGITUDE = 4 ;
	N_POINTS = 1357 ;
	TIME = 4 ;

variables:
	object TIME(TIME) ;
	float64 LATITUDE(LATITUDE) ;
	float64 LONGITUDE(LONGITUDE) ;
	int64 N_POINTS(N_POINTS) ;
	float64 TIME_QC(N_POINTS, TIME, LATITUDE, LONGITUDE) ;
	float64 CYCLE_NUMBER(N_POINTS, TIME, LATITUDE, LONGITUDE) ;
	object DATA_MODE(N_POINTS, TIME, LATITUDE, LONGITUDE) ;
	object DIRECTION(N_POINTS, TIME, LATITUDE, LONGITUDE) ;
	float64 PLATFORM_NUMBER(N_POINTS, TIME, LATITUDE, LONGITUDE) ;
	float64 POSITION_QC(N_POINTS, TIME, LATITUDE, LONGITUDE) ;
	float64 PRES(N_POINTS, TIME, LATITUDE, LONGITUDE) ;
	float64 TEMP(N_POINTS, TIME, LATITUDE, LONGITUDE) ;
	float64 PSAL(N_POINTS, TIME, LATITUDE, LONGITUDE) ;

// global attributes:
}