In [1]:
import xray
import json
import cPickle as pickle
from collections import OrderedDict
import numpy as np
import pandas as pd

class StreamKey(object):
    def __init__(self, subsite, node, sensor, method, stream):
        self.subsite = subsite
        self.node = node
        self.sensor = sensor
        self.method = method
        self.stream_name = stream

    @staticmethod
    def from_dict(d):
        return StreamKey(d['subsite'], d['node'], d['sensor'], d['method'], d['stream'])

    @staticmethod
    def from_refdes(refdes):
        return StreamKey(*refdes.split('|'))

    @staticmethod
    def from_stream_key(stream_key, sensor, stream):
        return StreamKey(stream_key.subsite, stream_key.node, sensor, stream_key.method, stream)

    def __eq__(self, other):
        return all([self.subsite == other.subsite,
                    self.node == other.node,
                    self.sensor == other.sensor,
                    self.method == other.method,
                    self.stream == other.stream])

    def as_dict(self):
        return {
            'subsite': self.subsite,
            'node': self.node,
            'sensor': self.sensor,
            'method': self.method,
            'stream': self.stream_name
        }

    def as_refdes(self):
        return '%(subsite)s|%(node)s|%(sensor)s|%(method)s|%(stream)s' % self.as_dict()

    def as_dashed_refdes(self):
        return self.as_refdes().replace('|', '-')

    def as_three_part_refdes(self):
        return "{:s}-{:s}-{:s}".format(self.subsite, self.node, self.sensor)

    def __repr__(self):
        return repr(self.as_dict())

    def __str__(self):
        return str(self.as_dict())

In [2]:
class NumpyJSONEncoder(json.JSONEncoder):
    """
    numpy array indexing will often return numpy scalars, for
    example a = array([0.5]), type(a[0]) will be numpy.float64.
    The problem is that numpy types are not json serializable.
    However, they have a lot of the same methods as ndarrays, so
    for example, tolist() can be called on a numpy scalar or
    numpy ndarray to convert to regular python types.
    """
    def default(self, o):
        if isinstance(o, (np.generic, np.ndarray)):
            return o.tolist()
        else:
            return json.JSONEncoder.default(self, o)

def return_as_pickled_xray_full(ds):
    return pickle.dumps(ds, protocol=-1)

def return_as_pickled_xray_min(ds):
    return pickle.dumps(ds)

def particles(ds, stream_key, parameters):
    # convert data into a list of particles
    particles = []
    for index in xrange(len(ds.time)):
        particle = OrderedDict()
        
        particle['pk'] = stream_key.as_dict()
        # Add non-param data to particle
        particle['pk']['deployment'] = ds['deployment'].values[index]
        particle['pk']['time'] = ds['time'].values[index]
        particle['provenance'] = str(ds['provenance'].values[index])

        for param in ds.variables:
            particle[param] = ds[param].values[index]

#             qc_postfixes = ['qc_results', 'qc_executed']
#             for qc_postfix in qc_postfixes:
#                 qc_key = '%s_%s' % (param, qc_postfix)
#                 if qc_key in ds:
#                     particle[qc_key] = ds[qc_key].values[index]
        particles.append(particle)
    return particles

def small_particles(ds, stream_key, parameters):
    # convert data into a list of particles
    particles = []
    warned = set()
    for index in xrange(len(ds.time)):
        particle = OrderedDict()
        particle['provenance'] = str(ds['provenance'].values[index])

        for param in ds.variables:
            particle[param] = ds[param].values[index]

            qc_postfixes = ['qc_results', 'qc_executed']
            for qc_postfix in qc_postfixes:
                qc_key = '%s_%s' % (param, qc_postfix)
                if qc_key in ds:
                    particle[qc_key] = ds[qc_key].values[index]
        particles.append(particle)
    return particles

def more_particles(ds, stream_key):
    particles = []
    for index in xrange(len(ds.time)):
        particle = {p: ds[p].values[index] for p in ds.variables}
        
        particle['pk'] = stream_key.as_dict()
        # Add non-param data to particle
        particle['pk']['deployment'] = ds['deployment'].values[index]
        particle['pk']['time'] = ds['time'].values[index]
        particle['provenance'] = str(ds['provenance'].values[index])
        particles.append(particle)
    return particles

def particles_json(ds, sk, pa):
    return json.dumps(particles(ds, sk, pa), cls=NumpyJSONEncoder)

def small_particles_json(ds, sk, pa):
    return json.dumps(small_particles(ds, sk, pa), cls=NumpyJSONEncoder)

def pete(ds, stream_key):
    # convert data into a list of particles
    particles = []
    data = {}
    for param in ds.variables:
        data[param] = ds[param].values
    for index in ds.index.values:
        particle = {p: data[p][index] for p in ds.variables}
        particles.append(particle)
    return particles

def m1(ds, count):
    for i in xrange(count):
        x = ds.time.values[i]
        
def m2(ds, count):
    t = ds.time.values
    for i in xrange(count):
        t[i]
        
def ds_to_dict(ds):
    return {p: list(ds[p].values) for p in ds.variables}

def to_csv(ds):
    ds.to_dataframe().to_csv()
    

In [3]:
parameters = ['time', 'conductivity', 'temperature', 'deployment', 'provenance']
with xray.open_dataset('ctdbp_no_sample_0000.nc', decode_times=False) as orig_ds:
    orig_ds.load()
    full_ds = orig_ds.copy()
    ds = xray.Dataset()
    for var in parameters:
        ds[var] = orig_ds[var][:1000]

    
sk = StreamKey('a', 'b', 'c', 'd', 'e')

In [4]:
%timeit return_as_pickled_xray_full(ds)

1000 loops, best of 3: 411 µs per loop


In [5]:
%timeit particles_json(ds, sk, [])

1 loops, best of 3: 400 ms per loop


In [6]:
%timeit small_particles_json(ds, sk, [])

1 loops, best of 3: 339 ms per loop


In [7]:
%timeit particles(ds, sk, [])

1 loops, best of 3: 389 ms per loop


In [8]:
%timeit small_particles(ds, sk, [])

1 loops, best of 3: 326 ms per loop


In [9]:
%timeit pete(ds, sk)

100 loops, best of 3: 5.43 ms per loop


In [10]:
%timeit m1(ds, 1000)

10 loops, best of 3: 39.2 ms per loop


In [11]:
%timeit m2(ds, 1000)

10000 loops, best of 3: 142 µs per loop


In [12]:
%timeit more_particles(ds, sk)

1 loops, best of 3: 335 ms per loop


In [13]:
parts = particles(ds, sk, [])

In [14]:
%timeit json.dumps(parts)

10 loops, best of 3: 26.9 ms per loop


In [15]:
s = json.dumps(parts)

In [16]:
%timeit json.loads(s)

100 loops, best of 3: 10.5 ms per loop


In [17]:
%timeit pickle.dumps(parts, protocol=-1)

10 loops, best of 3: 76.3 ms per loop


In [18]:
%timeit ds_to_dict(ds)

1000 loops, best of 3: 479 µs per loop


In [19]:
d = ds_to_dict(ds)

In [20]:
%timeit pickle.dumps(d, protocol=-1)

10 loops, best of 3: 30.1 ms per loop


In [21]:
len(json.dumps(parts)), len(json.dumps(d))

(257394, 52730)

In [22]:
%timeit to_csv(full_ds)

1 loops, best of 3: 1.59 s per loop
