Skip to content

Commit

Permalink
Merge pull request #196 from caseybryant/HDFtoDB
Browse files Browse the repository at this point in the history
M117 / Issue #180 - Implemented mutable parameters.
  • Loading branch information
lukecampbell committed Jul 28, 2014
2 parents df22154 + 0f4c015 commit 8075c9e
Show file tree
Hide file tree
Showing 16 changed files with 865 additions and 152 deletions.
7 changes: 4 additions & 3 deletions coverage_model/config.py
Expand Up @@ -2,7 +2,7 @@

from ooi.logging import log
from pyon.util.config import Config
import datetime
from coverage_model.util.time_utils import get_current_ntp_time


class Singleton(type):
Expand All @@ -21,6 +21,7 @@ class CoverageConfig(object):
_default_ordered_lat_key_preferences = ['m_lat', 'm_gps_lat', 'c_wpt_lat', 'lat']
_default_ordered_lon_key_preferences = ['m_lon', 'm_gps_lon', 'c_wpt_lon', 'lon']
_default_ordered_vertical_key_preferences = ['m_depth', 'depth']
_default_ingest_time_key = 'ingest_time'
_default_time_db_key = 'time_range'
_default_geo_db_key = 'spatial_geometry'
_default_vertical_db_key = 'vertical_range'
Expand All @@ -29,7 +30,6 @@ class CoverageConfig(object):
_default_storage_location = None

def __init__(self):
print 'setting defaults'
self.ordered_time_key_preferences = self._default_ordered_time_key_preferences
self.ordered_lat_key_preferences = self._default_ordered_lat_key_preferences
self.ordered_lon_key_preferences = self._default_ordered_lon_key_preferences
Expand All @@ -43,6 +43,7 @@ def __init__(self):
self.using_default_config = True
self.config_time = 0
self.read_and_set_config()
self.ingest_time_key = self._default_ingest_time_key

def read_and_set_config(self):
one_from_config = False
Expand All @@ -52,7 +53,7 @@ def read_and_set_config(self):
self.__setattr__(k, v)
one_from_config = True
self.using_default_config = False
self.config_time = datetime.datetime.utcnow().time()
self.config_time = get_current_ntp_time()
except Exception as ex:
if one_from_config:
log.info("Load from config failed with '%s'. Using hybrid default/config file configuration" % ex.message)
Expand Down
58 changes: 33 additions & 25 deletions coverage_model/coverage.py
Expand Up @@ -34,25 +34,26 @@
# self.__spatial_domain = value

from copy import deepcopy
import os
import collections
import pickle
from collections import Iterable

import numpy as np
import os
import pickle


from ooi.logging import log
from pyon.util.async import spawn
from coverage_model import utils
from coverage_model.basic_types import AbstractIdentifiable, AxisTypeEnum, MutabilityEnum, VariabilityEnum, Dictable, \
Span
from coverage_model.config import CoverageConfig
from coverage_model.parameter import Parameter, ParameterDictionary, ParameterContext
from coverage_model.parameter_values import get_value_class, AbstractParameterValue
from coverage_model.persistence import InMemoryPersistenceLayer, is_persisted
from coverage_model.storage.parameter_persisted_storage import PostgresPersistenceLayer
from coverage_model.parameter_types import SparseConstantType, ParameterFunctionType, QuantityType
from coverage_model.parameter_values import get_value_class, AbstractParameterValue, NumericValue
from coverage_model.persistence import InMemoryPersistenceLayer, is_persisted, PersistedStorage, SparsePersistedStorage
from coverage_model.storage.parameter_persisted_storage import PostgresPersistenceLayer, PostgresPersistedStorage
from coverage_model.metadata_factory import MetadataManagerFactory
from coverage_model.parameter_functions import ParameterFunctionException
from coverage_model import utils
from coverage_model.utils import Interval, create_guid


Expand All @@ -74,6 +75,7 @@ class AbstractCoverage(AbstractIdentifiable):

VALUE_CACHE_LIMIT = 30
version = '0.0.3'
ingest_time_parameter_name = CoverageConfig().ingest_time_key

def __init__(self, mode=None):
AbstractIdentifiable.__init__(self)
Expand Down Expand Up @@ -308,6 +310,13 @@ def append_parameter(self, parameter_context):
storage = self._persistence_layer.value_list[pname]
self._range_value[pname] = get_value_class(param_type=pcontext.param_type, domain_set=pcontext.dom, storage=storage)

from coverage_model.data_span import Span
if self.temporal_parameter_name is not None and Span.ingest_time_str not in self._persistence_layer.value_list:
ctxt = ParameterContext(self.ingest_time_parameter_name, QuantityType(value_encoding='float64'), fill_value=-99.0)
ctxt.description = ''
ctxt.uom = 'seconds since 01-01-1900'
self.append_parameter(ctxt)

def get_parameter(self, param_name):
"""
Get a Parameter object by name
Expand All @@ -334,7 +343,6 @@ def _handle_closed(self):
def set_parameter_function(self, param_name, func):
self._handle_closed()
if param_name in self._range_dictionary:
from coverage_model.parameter_types import ParameterFunctionType
param_type = self._range_dictionary[param_name].param_type
if isinstance(param_type, ParameterFunctionType):
param_type.callback = func
Expand Down Expand Up @@ -364,7 +372,7 @@ def list_parameters(self, coords_only=False, data_only=False):

def get_parameter_values(self, param_names=None, time_segment=None, time=None,
sort_parameter=None, stride_length=None, return_value=None, fill_empty_params=False,
function_params=None, as_record_array=False):
function_params=None, as_record_array=False, remove_duplicate_records=False):
"""
Retrieve the value for a parameter
Expand Down Expand Up @@ -397,7 +405,8 @@ def get_parameter_values(self, param_names=None, time_segment=None, time=None,

vals = self._persistence_layer.read_parameters(param_names, time_segment, time, sort_parameter,
stride_length=stride_length, fill_empty_params=fill_empty_params,
function_params=function_params, as_record_array=as_record_array)
function_params=function_params, as_record_array=as_record_array,
remove_duplicate_records=remove_duplicate_records)
if self.value_caching:
self.cache_it(vals, param_names, time_segment, time, sort_parameter, stride_length, fill_empty_params)
return vals
Expand Down Expand Up @@ -568,7 +577,7 @@ def set_time_values(self, values):
values = np.array(values)
return self._persistence_layer.write_parameters(self.get_write_id(), {self.temporal_parameter_name: values})

def get_time_values(self, time_segement=None, stride_length=None, return_value=None):
def get_time_values(self, time_segement=None, stride_length=None, return_value=None, remove_duplicates=False):
"""
Convenience method for retrieving time values
Expand All @@ -577,7 +586,8 @@ def get_time_values(self, time_segement=None, stride_length=None, return_value=N
@param return_value If supplied, filled with response value
"""
return self.get_parameter_values(self.temporal_parameter_name, time_segment=time_segement,
stride_length=stride_length, return_value=return_value).get_data()[self.temporal_parameter_name]
stride_length=stride_length, return_value=return_value,
remove_duplicate_records=remove_duplicates).get_data()[self.temporal_parameter_name]

def _clear_value_cache_for_parameter(self, param_name):
for k in self._value_cache.keys():
Expand Down Expand Up @@ -1145,9 +1155,13 @@ def set_time_values(self, value, tdoa=None):
def set_parameter_values(self, param_name, value, tdoa=None, sdoa=None):
raise TypeError('Cannot set parameter values against a ViewCoverage')

def get_parameter_values(self, param_names, time_segment=None, time=None, sort_parameter=None, stride_length=None,
fill_empty_params=False, return_value=None):
return self.reference_coverage.get_parameter_values(param_names, time_segment, time, sort_parameter, stride_length, fill_empty_params, return_value)
def get_parameter_values(self, param_names=None, time_segment=None, time=None,
sort_parameter=None, stride_length=None, return_value=None, fill_empty_params=False,
function_params=None, as_record_array=False, remove_duplicate_records=False):
return self.reference_coverage.get_parameter_values(param_names, time_segment, time, sort_parameter, stride_length,
fill_empty_params=fill_empty_params, return_value=return_value,
function_params=function_params, as_record_array=as_record_array,
remove_duplicate_records=remove_duplicate_records)

def get_value_dictionary(self, *args, **kwargs):

Expand Down Expand Up @@ -1255,7 +1269,6 @@ def _new_coverage(self, root_dir, persistence_guid, name, reference_coverage_loc
if parameter_dictionary is None:
parameter_dictionary = ParameterDictionary()
elif complex_type != ComplexCoverageType.TIMESERIES:
from coverage_model import ParameterFunctionType
for pn, pc in parameter_dictionary.iteritems():
if not isinstance(pc[1].param_type, ParameterFunctionType):
log.warn('Parameters stored in a ComplexCoverage must be ParameterFunctionType parameters: discarding \'%s\'', pn)
Expand Down Expand Up @@ -1321,7 +1334,6 @@ def close(self, force=False, timeout=None):
def append_parameter(self, parameter_context):
if not isinstance(parameter_context, ParameterContext):
raise TypeError('\'parameter_context\' must be an instance of ParameterContext, not {0}'.format(type(parameter_context)))
from coverage_model import ParameterFunctionType
if not isinstance(parameter_context.param_type, ParameterFunctionType):
raise ValueError('Parameters stored in a ComplexCoverage must be ParameterFunctionType parameters: cannot append parameter \'{0}\''.format(parameter_context.name))

Expand Down Expand Up @@ -1397,7 +1409,8 @@ def _interval_qsort(cls, arr, left=None, right=None):
cls._interval_qsort(arr, left, pivot-1)
cls._interval_qsort(arr, pivot+1, right)

def get_parameter_values(self, param_names, time_segment=None, time=None, sort_parameter=None, stride_length=None, return_value=None):
def get_parameter_values(self, param_names, time_segment=None, time=None, sort_parameter=None,
stride_length=None, return_value=None, remove_duplicate_records=False):
'''
Obtain the value set for a given parameter over a specified domain
'''
Expand All @@ -1406,7 +1419,8 @@ def get_parameter_values(self, param_names, time_segment=None, time=None, sort_p
raise TypeError("A Timeseries ComplexCoverage doesn't support "
"get_parameter_values, please use "
"get_value_dictionary")
return AbstractCoverage.get_parameter_values(self, param_names, time_segment, time, sort_parameter, stride_length=stride_length, return_value=return_value)
return AbstractCoverage.get_parameter_values(self, param_names, time_segment, time, sort_parameter, stride_length=stride_length,
return_value=return_value, remove_duplicate_records=remove_duplicate_records)

# def get_value_dictionary(self, param_list=None, temporal_slice=None, domain_slice=None):
# if temporal_slice and domain_slice:
Expand Down Expand Up @@ -1881,7 +1895,6 @@ def _build_temporal_broadcast(self, rcovs, parameter_dictionary):
self._assign_domain(pc)
self._range_dictionary.add_context(pc)
# Add the sparse value class
from coverage_model.parameter_types import SparseConstantType
ppt = self._range_dictionary.get_context(p).param_type
self._range_value[p] = get_value_class(
SparseConstantType(value_encoding=ppt.value_encoding,
Expand Down Expand Up @@ -1965,7 +1978,6 @@ def _build_temporal_interleaved(self, rcovs, parameter_dictionary):
self._assign_domain(pc)
self._range_dictionary.add_context(pc)
# Add the sparse value class
from coverage_model.parameter_types import SparseConstantType
ppt=self._range_dictionary.get_context(p).param_type
self._range_value[p] = get_value_class(
SparseConstantType(value_encoding=ppt.value_encoding,
Expand Down Expand Up @@ -2050,7 +2062,6 @@ def _build_temporal_aggregation(self, rcovs, parameter_dictionary):
self._assign_domain(pc)
self._range_dictionary.add_context(pc)
# Add the sparse value class
from coverage_model.parameter_types import SparseConstantType
ppt=self._range_dictionary.get_context(p).param_type
self._range_value[p] = get_value_class(
SparseConstantType(value_encoding=ppt.value_encoding,
Expand Down Expand Up @@ -2160,8 +2171,6 @@ def _doload(self):
inline_data_writes = self._persistence_layer.inline_data_writes
self.value_caching = self._persistence_layer.value_caching

from coverage_model.persistence import PersistedStorage, SparsePersistedStorage
from coverage_model.storage.parameter_persisted_storage import PostgresPersistedStorage
for parameter_name in self._persistence_layer.parameter_metadata:
md = self._persistence_layer.parameter_metadata[parameter_name]
mm = self._persistence_layer.master_manager
Expand Down Expand Up @@ -2266,7 +2275,6 @@ def get_fill_value(self, param_name):

def calculate_statistics(self, params=None, time_range=None):
"""time_range is currently not implemented"""
from coverage_model.parameter_values import NumericValue
if isinstance(params, basestring):
tmp = set()
tmp.add(params)
Expand Down
12 changes: 7 additions & 5 deletions coverage_model/coverages/aggregate_coverage.py
Expand Up @@ -7,7 +7,7 @@
from coverage_model.parameter_values import get_value_class
from coverage_model.persistence import is_persisted
from coverage_model.storage.parameter_persisted_storage import PostgresPersistenceLayer, PostgresPersistedStorage
from coverage_model.util.numpy_utils import sort_flat_arrays
from coverage_model.util.numpy_utils import NumpyUtils
from coverage_model.utils import Interval


Expand Down Expand Up @@ -285,14 +285,14 @@ def get_time_values(self, time_segement=None, stride_length=None, return_value=N

combined_data = self._merge_value_dicts(cov_value_list, override_temporal_key=dummy_key, stride_length=stride_length)
if dummy_key in combined_data:
combined_data = sort_flat_arrays(combined_data, dummy_key)
combined_data = NumpyUtils.sort_flat_arrays(combined_data, dummy_key)
return combined_data[dummy_key] #TODO: Handle case where 'time' may not be temporal parameter name of all sub-coverages
else:
return np.array([])

def get_parameter_values(self, param_names=None, time_segment=None, time=None,
sort_parameter=None, stride_length=None, return_value=None, fill_empty_params=False,
function_params=None, as_record_array=False):
function_params=None, as_record_array=False, remove_duplicate_records=False):
'''
Obtain the value set for a given parameter over a specified domain
'''
Expand All @@ -306,7 +306,8 @@ def get_parameter_values(self, param_names=None, time_segment=None, time=None,
this_param_names = this_param_names.intersection(set(coverage.list_parameters()))
this_param_names = list(this_param_names)
params = coverage.get_parameter_values(this_param_names, time_segment, time, sort_parameter, stride_length,
return_value, fill_empty_params, function_params, as_record_array=False)
return_value, fill_empty_params, function_params, as_record_array=False,
remove_duplicate_records=remove_duplicate_records)
if len(params.get_data()) == 1 and coverage.temporal_parameter_name in params.get_data() and not get_times_too:
continue
cov_dict = params.get_data()
Expand Down Expand Up @@ -453,7 +454,8 @@ def get_data_bounds(self, parameter_name=None):
if param in coverage.list_parameters():
bounds = coverage.get_data_bounds(param)
if param in rt:
rt[param] = (min(bounds[0], rt[param][0]), max(bounds[1], rt[param][1]))
if len(bounds) > 0:
rt[param] = (min(bounds[0], rt[param][0]), max(bounds[1], rt[param][1]))
else:
rt[param] = bounds
return rt
Expand Down
13 changes: 3 additions & 10 deletions coverage_model/coverages/complex_coverage.py
@@ -1,15 +1,7 @@
__author__ = 'casey'

from copy import deepcopy
import os
import collections
import pickle
from collections import Iterable

import numpy as np

from ooi.logging import log
from pyon.util.async import spawn
from coverage_model.coverage import AbstractCoverage, ComplexCoverageType, SimplexCoverage
from coverage_model.coverages.aggregate_coverage import AggregateCoverage
from coverage_model.coverages.coverage_extents import ReferenceCoverageExtents, ExtentsDict
Expand Down Expand Up @@ -38,7 +30,7 @@ def num_timesteps(self):

def get_parameter_values(self, param_names=None, time_segment=None, time=None,
sort_parameter=None, stride_length=None, return_value=None, fill_empty_params=False,
function_params=None, as_record_array=False):
function_params=None, as_record_array=False, remove_duplicate_records=False):
'''
Obtain the value set for a given parameter over a specified domain
'''
Expand Down Expand Up @@ -79,7 +71,8 @@ def get_parameter_values(self, param_names=None, time_segment=None, time=None,
continue

params = coverage.get_parameter_values(this_param_names, current_time_segment, time, sort_parameter,
return_value, fill_empty_params, function_params, as_record_array=False)
return_value, fill_empty_params, function_params, as_record_array=False,
remove_duplicate_records=remove_duplicate_records)
# if len(params.get_data()) == 1 and coverage.temporal_parameter_name in params.get_data():
# continue
cov_dict = params.get_data()
Expand Down

0 comments on commit 8075c9e

Please sign in to comment.