diff --git a/segment_source_resource/__init__.py b/segment_source_resource/__init__.py index a1d2e92..c2f6a9d 100644 --- a/segment_source_resource/__init__.py +++ b/segment_source_resource/__init__.py @@ -1,2 +1,2 @@ -from segment_source_resource.resource import Resource +from segment_source_resource.resource import Resource, RawObj, Obj from segment_source_resource.sync import execute diff --git a/segment_source_resource/resource.py b/segment_source_resource/resource.py index 8e70e9d..b2535cd 100644 --- a/segment_source_resource/resource.py +++ b/segment_source_resource/resource.py @@ -2,9 +2,8 @@ import numbers from dateutil.parser import parse as parse_date -from segment_source import client as source from dateutil.tz import tzlocal -from pydash import get, omit +from pydash import get def serialize_datetime(timestamp): @@ -36,6 +35,20 @@ def serialize_boolean(val): "or number, not a {}".format(type(val))) +class RawObj(object): + def __init__(self, data, collection, schema): + self.data = data + self.collection = collection + self.schema = schema + + +class Obj(object): + def __init__(self, id, properties, collection): + self.id = id + self.properties = properties + self.collection = collection + + class Resource(object): parent = None @@ -52,6 +65,12 @@ def schema(self): raise NotImplementedError def fetch(self, seed): + """ + can yield values of: + dict (in this case the're casted to instances RawObj using resource's default collection and schema) + object of RawObj (such objects will be transformed to Obj using resource's transform method) + object of Obj (will be set as is) + """ raise NotImplementedError _parser_map = { @@ -62,17 +81,19 @@ def fetch(self, seed): 'datetime': serialize_datetime } - def transform(self, obj, seed=None, schema=None): - if schema is None: - schema = self.schema + def transform(self, raw_obj, seed=None): + obj = Obj( + id=None, + properties={}, + collection=raw_obj.collection, + ) - ret = {} - for column, definition in schema.items(): + for column, definition in raw_obj.schema.items(): source_name = definition.get('path', column) if isinstance(source_name, str): - source_value = obj.get(source_name) + source_value = raw_obj.data.get(source_name) elif isinstance(source_name, list): - source_value = get(obj, source_name) + source_value = get(raw_obj.data, source_name) else: raise ValueError("Invalid path: {}".format(source_name)) @@ -86,7 +107,11 @@ def transform(self, obj, seed=None, schema=None): raise ValueError("Invalid type: {}".format(definition['type'])) try: - ret[column] = parser_func(source_value) + if column == 'id': + obj.id = parser_func(source_value) + else: + obj.properties[column] = parser_func(source_value) + except (ValueError, TypeError) as err: message = "Failed to cast {} with value {} to {}".format( column, @@ -95,7 +120,4 @@ def transform(self, obj, seed=None, schema=None): ) raise ValueError(message) from err - return ret - - def set(self, obj): - source.set(self.collection, obj['id'], omit(obj, 'id')) + return obj diff --git a/segment_source_resource/sync.py b/segment_source_resource/sync.py index 2069082..6634fb2 100644 --- a/segment_source_resource/sync.py +++ b/segment_source_resource/sync.py @@ -3,6 +3,7 @@ from gevent.pool import Group from segment_source_resource.exceptions import PublicError +from segment_source_resource.resource import RawObj, Obj from segment_source import client as source @@ -26,11 +27,17 @@ def handler(thread): def _process_resource(resources, seed, resource): - for obj in resource.fetch(seed): - morphed = resource.transform(obj, seed) - resource.set(morphed) + for raw_obj in resource.fetch(seed): + if not isinstance(raw_obj, (Obj, RawObj)): + raw_obj = RawObj(data=raw_obj, collection=resource.collection, schema=resource.schema) - _enqueue_children(resources, obj, resource) + if isinstance(raw_obj, Obj): + obj = raw_obj + else: + obj = resource.transform(raw_obj, seed) + + source.set(obj.collection, obj.id, obj.properties) + _enqueue_children(resources, raw_obj, resource) def _enqueue_children(resources, seed, parent):