Skip to content

Commit

Permalink
extract related changes
Browse files Browse the repository at this point in the history
  • Loading branch information
jniznan committed Oct 30, 2013
1 parent ff9ef26 commit 47c6543
Showing 1 changed file with 40 additions and 28 deletions.
68 changes: 40 additions & 28 deletions src/metriquec/metriquec/basesql.py
Expand Up @@ -140,6 +140,8 @@ def extract(self, exclude_fields=None, force=False,
... docs coming soon ...
:param list/str exclude_fields:
A list or csv string of the field names to exclude.
:param force:
If False (default), then it will try to extract only the objects
that have changed since the last extract.
Expand All @@ -149,9 +151,6 @@ def extract(self, exclude_fields=None, force=False,
Accept, but ignore unknown kwargs.
'''
if parse_timestamp is None:
parse_timestamp = self.get_property('parse_timestamp', None, True)

exclude_fields = strip_split(exclude_fields)

oids = []
Expand All @@ -169,10 +168,11 @@ def extract(self, exclude_fields=None, force=False,
# include objects updated since last mtime too
# apply delta sql clause's if we're not forcing a full run
if self.get_property('delta_mtime', None, False):
oids.extend(self._get_mtime_id_delta(last_update,
parse_timestamp))
mtime = self._fetch_mtime(last_update, parse_timestamp)
if mtime:
oids.extend(self.get_changed_oids(mtime))
if self.get_property('delta_new_ids', None, True):
oids.extend(self._get_new_ids())
oids.extend(self.get_new_oids())

if isinstance(force, list):
oids = force
Expand Down Expand Up @@ -217,7 +217,7 @@ def _fetchall(self, sql, field_order):
k, float(k) / (t1 - t0)))
return objects

def _fetch_mtime(self, last_update):
def _fetch_mtime(self, last_update, parse_timestamp):
mtime = None
if last_update:
if isinstance(last_update, basestring):
Expand All @@ -229,6 +229,22 @@ def _fetch_mtime(self, last_update):
# convert timestamp to datetime object
mtime = ts2dt(mtime)
self.logger.info("Last update mtime: %s" % mtime)

if mtime:
if parse_timestamp is None:
parse_timestamp = self.get_property('parse_timestamp',
None, True)
if not (hasattr(mtime, 'tzinfo') and mtime.tzinfo):
# We need the timezone, to readjust relative to the server's tz
mtime = mtime.replace(tzinfo=pytz.utc)
mtime = mtime.strftime('%Y-%m-%d %H:%M:%S %z')
dt_format = "yyyy-MM-dd HH:mm:ss z"

if parse_timestamp:
mtime = "parseTimestamp('%s', '%s')" % (mtime, dt_format)
else:
mtime = "'%s'" % mtime

return mtime

@property
Expand Down Expand Up @@ -284,8 +300,9 @@ def _get_id_delta_sql(self, table, id_delta):
else:
return []

def _get_new_ids(self):
def get_new_oids(self):
'''
Returns a list of new oids that have not been extracted yet.
'''
table = self.get_property('table')
_id = self.get_property('column')
Expand All @@ -309,29 +326,28 @@ def _get_new_ids(self):
ids = []
return ids

def _get_mtime_id_delta(self, last_update, parse_timestamp):
def get_changed_oids(self, mtime):
'''
Returns a list of object ids of those objects that have changed since
`mtime`. This method expects that the changed objects can be
determined based on the `delta_mtime` property of the cube which
specifies the field name that carries the time of the last change.
This method is expected to be overriden in the cube if it is not
possible to use a single field to determine the time of the change and
if another approach of determining the oids is available. In such
cubes the `delta_mtime` property is expected to be set to `True`.
If `delta_mtime` evaluates to False then this method is not expected
to be used.
'''
mtime = self._fetch_mtime(last_update)
mtime_columns = self.get_property('delta_mtime', None, [])
if not (mtime_columns and mtime):
return []
if isinstance(mtime_columns, basestring):
mtime_columns = [mtime_columns]

if not (hasattr(mtime, 'tzinfo') and mtime.tzinfo):
# We need the timezone, to readjust relative to the server's tz
mtime = mtime.replace(tzinfo=pytz.utc)
mtime = mtime.strftime(
'%Y-%m-%d %H:%M:%S %z')
dt_format = "yyyy-MM-dd HH:mm:ss z"

filters = []
if parse_timestamp:
mtime = "parseTimestamp('%s', '%s')" % (mtime, dt_format)
else:
mtime = "'%s'" % mtime

for _column in mtime_columns:
_sql = "%s > %s" % (_column, mtime)
filters.append(_sql)
Expand All @@ -343,12 +359,8 @@ def _get_mtime_id_delta(self, last_update, parse_timestamp):
sql = """SELECT DISTINCT %s.%s FROM %s.%s
WHERE %s""" % (table, _id, db, table,
' OR '.join(filters))
rows = self.proxy.fetchall(sql)
if rows:
ids = [x[0] for x in rows]
return ids
else:
return []
rows = self.proxy.fetchall(sql) or []
return [x[0] for x in rows]

def _get_sql_clause(self, clause, default=None):
'''
Expand Down

0 comments on commit 47c6543

Please sign in to comment.