Skip to content

Commit

Permalink
Fix v4.3.0 daily sums patch bug, implement V4.0 database
Browse files Browse the repository at this point in the history
  • Loading branch information
gjr80 committed Jan 24, 2021
1 parent 915e0a8 commit 5f972cb
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 53 deletions.
2 changes: 1 addition & 1 deletion bin/wee_database
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ def update(config_dict, db_binding, options):
log.info(msg)
print(msg)

if dbm.version is not None and dbm.version >= '3.0':
if dbm.version is not None and dbm.version >= '4.0':
# interval weighting fix has been applied
msg = "Interval weighting fix is not required."
log.info(msg)
Expand Down
115 changes: 63 additions & 52 deletions bin/weewx/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,30 +43,30 @@ class Manager(object):
is updating the table, while another is doing aggregate queries, the latter manager will be
unaware of later records in the database, and may choose the wrong query strategy. If this
might be the case, call member function _sync() before starting the query.
USEFUL ATTRIBUTES
database_name: The name of the database the manager is bound to.
table_name: The name of the main, archive table.
sqlkeys: A list of the SQL keys that the database table supports.
obskeys: A list of the observation types that the database table supports.
std_unit_system: The unit system used by the database table.
first_timestamp: The timestamp of the earliest record in the table.
last_timestamp: The timestamp of the last record in the table."""

def __init__(self, connection, table_name='archive', schema=None):
"""Initialize an object of type Manager.
connection: A weedb connection to the database to be managed.
table_name: The name of the table to be used in the database. Default is 'archive'.
schema: The schema to be used. Optional. If not supplied, then an exception of type
weedb.ProgrammingError will be raised if the database does not exist, and of type
weedb.UnitializedDatabase if it exists, but has not been initialized.
Expand All @@ -78,7 +78,7 @@ def __init__(self, connection, table_name='archive', schema=None):
self.last_timestamp = None
self.std_unit_system = None

# Now get the SQL types.
# Now get the SQL types.
try:
self.sqlkeys = self.connection.columnsOf(self.table_name)
except weedb.ProgrammingError:
Expand All @@ -100,8 +100,8 @@ def __init__(self, connection, table_name='archive', schema=None):

@classmethod
def open(cls, database_dict, table_name='archive'):
"""Open and return a Manager or a subclass of Manager.
"""Open and return a Manager or a subclass of Manager.
database_dict: A database dictionary holding the information necessary to open the
database.
Expand Down Expand Up @@ -189,7 +189,7 @@ def __exit__(self, etyp, einst, etb): # @UnusedVariable

def _initialize_database(self, schema):
"""Initialize the tables needed for the archive.
schema: The schema to be used
"""
# If this is an old-style schema, this will raise an exception. Be prepared to catch it.
Expand Down Expand Up @@ -231,7 +231,7 @@ def _sync(self):

def lastGoodStamp(self):
"""Retrieves the epoch time of the last good archive record.
returns: Time of the last good archive record as an epoch time, or None if there are no
records.
"""
Expand All @@ -240,7 +240,7 @@ def lastGoodStamp(self):

def firstGoodStamp(self):
"""Retrieves earliest timestamp in the archive.
returns: Time of the first good archive record as an epoch time, or None if there are no
records.
"""
Expand All @@ -249,7 +249,7 @@ def firstGoodStamp(self):

def addRecord(self, record_obj, accumulator=None, progress_fn=None):
"""Commit a single record or a collection of records to the archive.
record_obj: Either a data record, or an iterable that can return data records. Each data
record must look like a dictionary, where the keys are the SQL types and the values are the
values to be stored in the database.
Expand Down Expand Up @@ -388,12 +388,12 @@ def genBatchRecords(self, startstamp=None, stopstamp=None):

def getRecord(self, timestamp, max_delta=None):
"""Get a single archive record with a given epoch time stamp.
timestamp: The epoch time of the desired record.
max_delta: The largest difference in time that is acceptable.
max_delta: The largest difference in time that is acceptable.
[Optional. The default is no difference]
returns: a record dictionary or None if the record does not exist."""

with self.connection.cursor() as _cursor:
Expand All @@ -418,11 +418,11 @@ def updateValue(self, timestamp, obs_type, new_value):

def getSql(self, sql, sqlargs=(), cursor=None):
"""Executes an arbitrary SQL statement on the database.
sql: The SQL statement
sqlargs: A tuple containing the arguments for the SQL statement
returns: a tuple containing the results
"""
_cursor = cursor or self.connection.cursor()
Expand Down Expand Up @@ -571,9 +571,9 @@ def get_database_dict_from_config(config_dict, database):
Returns: a database dictionary, with everything needed to pass on to a Manager or weedb in
order to open a database.
Example. Given a configuration file snippet that looks like:
>>> import configobj
>>> from six.moves import StringIO
>>> config_snippet = '''
Expand Down Expand Up @@ -727,7 +727,7 @@ def show_progress(last_time, nrec=None):
# Class DaySummaryManager
#
# Adds daily summaries to the database.
#
#
# This class specializes method _addSingleRecord so that it adds the data to a daily summary,
# as well as the regular archive table.
#
Expand All @@ -747,8 +747,8 @@ def show_progress(last_time, nrec=None):
# ===============================================================================

class DaySummaryManager(Manager):
"""Manage a daily statistical summary.
"""Manage a daily statistical summary.
The daily summary consists of a separate table for each type. The columns of each table are
things like min, max, the timestamps for min and max, sum and sumtime. The values sum and
sumtime are kept to make it easy to calculate averages for different time periods.
Expand All @@ -760,13 +760,13 @@ class DaySummaryManager(Manager):
wsum is the "Weighted sum," that is, the sum weighted by the archive interval. sumtime is the
sum of the archive intervals.
In addition to all the tables for each type, there is one additional table called
'archive_day__metadata', which currently holds the version number and the time of the last
update.
"""

version = "3.0"
version = "4.0"

# Schemas used by the daily summaries:
day_schemas = {
Expand Down Expand Up @@ -808,9 +808,9 @@ class DaySummaryManager(Manager):

def __init__(self, connection, table_name='archive', schema=None):
"""Initialize an instance of DaySummaryManager
connection: A weedb connection to the database to be managed.
table_name: The name of the table to be used in the database. Default is 'archive'.
schema: The schema to be used. Optional. If not supplied, then an exception of type
Expand Down Expand Up @@ -901,7 +901,7 @@ def _addSingleRecord(self, record, cursor):
# First let my superclass handle adding the record to the main archive table:
super(DaySummaryManager, self)._addSingleRecord(record, cursor)

# Get the start of day for the record:
# Get the start of day for the record:
_sod_ts = weeutil.weeutil.startOfArchiveDay(record['dateTime'])

# Get the weight. If the value for 'interval' is bad, an exception will be raised.
Expand Down Expand Up @@ -951,7 +951,7 @@ def backfill_day_summary(self, start_d=None, stop_d=None,
progress_fn=show_progress, trans_days=5):

"""Fill the daily summaries from an archive database.
Normally, the daily summaries get filled by LOOP packets (to get maximum time resolution),
but if the database gets corrupted, or if a new user is starting up with imported wview
data, it's necessary to recreate it from straight archive data. The Hi/Lows will all be
Expand All @@ -965,13 +965,13 @@ def backfill_day_summary(self, start_d=None, stop_d=None,
stop_d: The last day to be included, specified as a datetime.date object [Optional. Default
is to include the date of the last archive record.]
progress_fn: This function will be called after processing every 1000 records.
trans_day: Number of days of archive data to be used for each daily summaries database
transaction. [Optional. Default is 5.]
returns: A 2-way tuple (nrecs, ndays) where
returns: A 2-way tuple (nrecs, ndays) where
nrecs is the number of records backfilled;
ndays is the number of days
"""
Expand Down Expand Up @@ -1236,7 +1236,8 @@ def _set_day_sums(self, day_accum, cursor):
# It will only include attributes that are in the accumulator for this type.
set_list = ['%s=%s' % (k, getattr(day_accum[obs_type], k))
for k in ['sum', 'count', 'wsum', 'sumtime',
'xsum', 'ysum', 'squaresum', 'wsquaresum']
'xsum', 'ysum', 'dirsumtime',
'squaresum', 'wsquaresum']
if hasattr(day_accum[obs_type], k)]
update_sql = "UPDATE {archive_table}_day_{obs_type} SET {set_stmt} " \
"WHERE dateTime = ?;".format(archive_table=self.table_name,
Expand All @@ -1246,28 +1247,38 @@ def _set_day_sums(self, day_accum, cursor):
cursor.execute(update_sql, (day_accum.timespan.start,))

def patch_sums(self):
"""Version 4.2 accidentally interpreted V2.0 daily sums as V1.0, so the weighted sums
were all given a weight of 1.0, instead of the interval length. This fixes that."""
if '1.0' < self.version < '3.0':
"""Version 4.2.0 accidentally interpreted V2.0 daily sums as V1.0, so the weighted sums
were all given a weight of 1.0, instead of the interval length. Version 4.3.0 attempted
to fix this bug but introduced its own bug by failing to weight 'dirsumtime'. This fixes
both bugs."""
if '1.0' < self.version < '4.0':
msg = "Daily summaries at V%s. Patching to V%s" \
% (self.version, DaySummaryManager.version)
print(msg)
log.info(msg)
# We need to upgrade from V2.0 to V3.0. The only difference is that the
# patch has been supplied to V3.0 daily summaries. The patch need only be
# done from a date well before the V4.2 release. We pick 1-Jun-2020.
# We need to upgrade from V2.0 or V3.0 to V4.0. The only difference is
# that the patch has been supplied to V4.0 daily summaries. The patch
# need only be done from a date well before the V4.2 release.
# We pick 1-Jun-2020.
self.recalculate_weights(start_d=datetime.date(2020, 6, 1))
self._write_metadata('Version', DaySummaryManager.version)
self.version = DaySummaryManager.version
log.info("Patch finished.")

def update(self):
"""Update the database to V3.0"""
"""Update the database to V4.0.
- all V1.0 daily sums need to be upgraded
- V2.0 daily sums need to be upgraded but only those after a date well before the
V4.2.0 release (we pick 1 June 2020)
- V3.0 daily sums need to be upgraded due to a bug in the V4.2.0 and V4.3.0 releases
but only those after 1 June 2020
"""
if self.version == '1.0':
self.recalculate_weights(weight_fn=DaySummaryManager._get_weight)
self._write_metadata('Version', DaySummaryManager.version)
self.version = DaySummaryManager.version
elif self.version == '2.0':
elif self.version == '2.0' or self.version == '3.0':
self.patch_sums()

# --------------------------- UTILITY FUNCTIONS -----------------------------------
Expand Down Expand Up @@ -1326,10 +1337,10 @@ def _get_day_summary(self, sod_ts, cursor=None):

def _set_day_summary(self, day_accum, lastUpdate, cursor):
"""Write all statistics for a day to the database in a single transaction.
day_accum: an accumulator with the daily summary. See weewx.accum
lastUpdate: the time of the last update will be set to this unless it is None.
lastUpdate: the time of the last update will be set to this unless it is None.
Normally, this is the timestamp of the last archive record added to the instance
day_accum. """

Expand Down

0 comments on commit 5f972cb

Please sign in to comment.