Skip to content

Commit

Permalink
Stats Rebuild Aggregates shouldn't run more than once at the same time
Browse files Browse the repository at this point in the history
  • Loading branch information
flavour committed Oct 10, 2012
1 parent 7badee0 commit eec3cd2
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 90 deletions.
2 changes: 1 addition & 1 deletion VERSION
@@ -1 +1 @@
f21ad7b (2012-10-10 14:56:53)
7badee0 (2012-10-11 00:10:37)
61 changes: 28 additions & 33 deletions models/tasks.py
Expand Up @@ -19,9 +19,8 @@ def gis_download_kml(record_id, filename, user_id=None):
if user_id:
# Authenticate
auth.s3_impersonate(user_id)
# Run the Task
result = gis.download_kml(record_id, filename)
return result
# Run the Task & return the result
return gis.download_kml(record_id, filename)

tasks["gis_download_kml"] = gis_download_kml

Expand All @@ -37,10 +36,9 @@ def gis_update_location_tree(feature, user_id=None):
if user_id:
# Authenticate
auth.s3_impersonate(user_id)
# Run the Task
# Run the Task & return the result
feature = json.loads(feature)
result = gis.update_location_tree(feature)
return result
return gis.update_location_tree(feature)

tasks["gis_update_location_tree"] = gis_update_location_tree

Expand Down Expand Up @@ -121,9 +119,8 @@ def msg_process_outbox(contact_method, user_id=None):
if user_id:
# Authenticate
auth.s3_impersonate(user_id)
# Run the Task
result = msg.process_outbox(contact_method)
return result
# Run the Task & return the result
return msg.process_outbox(contact_method)

tasks["msg_process_outbox"] = msg_process_outbox

Expand All @@ -135,9 +132,8 @@ def msg_process_inbound_email(username, user_id):
@param username: email address of the email source to read from.
This uniquely identifies one inbound email task.
"""
# Run the Task
result = msg.fetch_inbound_email(username)
return result
# Run the Task & return the result
return msg.fetch_inbound_email(username)

tasks["msg_process_inbound_email"] = msg_process_inbound_email

Expand All @@ -149,9 +145,8 @@ def msg_twilio_inbound_sms(account, user_id):
@param account: account name for the SMS source to read from.
This uniquely identifies one inbound SMS task.
"""
# Run the Task
result = msg.twilio_inbound_sms(account)
return result
# Run the Task & return the result
return msg.twilio_inbound_sms(account)

tasks["msg_twilio_inbound_sms"] = msg_twilio_inbound_sms

Expand All @@ -160,15 +155,18 @@ def msg_parse_workflow(workflow, source, user_id):
"""
Processes the msg_log for unparsed messages.
"""
# Run the Task
result = msg.parse_import(workflow, source)
return result
# Run the Task & return the result
return msg.parse_import(workflow, source)

tasks["msg_parse_workflow"] = msg_parse_workflow

# --------------------------------------------------------------------------
def msg_search_subscription_notifications(frequency):
return eden.msg.search_subscription_notifications(frequency=frequency)
"""
Search Subscriptions & send Notifications.
"""
# Run the Task & return the result
return s3db.msg_search_subscription_notifications(frequency=frequency)

tasks["msg_search_subscription_notifications"] = msg_search_subscription_notifications

Expand All @@ -183,9 +181,8 @@ def stats_group_clean(user_id=None):
if user_id:
# Authenticate
auth.s3_impersonate(user_id)
# Run the Task
result = s3db.stats_group_clean()
return result
# Run the Task & return the result
return s3db.stats_group_clean()

tasks["stats_group_clean"] = stats_group_clean

Expand All @@ -199,9 +196,8 @@ def stats_update_time_aggregate(data_id=None, user_id=None):
if user_id:
# Authenticate
auth.s3_impersonate(user_id)
# Run the Task
result = s3db.stats_update_time_aggregate(data_id)
return result
# Run the Task & return the result
return s3db.stats_update_time_aggregate(data_id)

tasks["stats_update_time_aggregate"] = stats_update_time_aggregate

Expand All @@ -224,14 +220,13 @@ def stats_update_aggregate_location(location_level,
if user_id:
# Authenticate
auth.s3_impersonate(user_id)
# Run the Task
result = s3db.stats_update_aggregate_location(location_level,
root_location_id,
parameter_id,
start_date,
end_date,
)
return result
# Run the Task & return the result
return s3db.stats_update_aggregate_location(location_level,
root_location_id,
parameter_id,
start_date,
end_date,
)

tasks["stats_update_aggregate_location"] = stats_update_aggregate_location

Expand Down
3 changes: 2 additions & 1 deletion modules/eden/msg.py
Expand Up @@ -35,6 +35,7 @@
"S3TwitterModel",
"S3XFormsModel",
"S3ParsingModel",
"msg_search_subscription_notifications",
]

from gluon import *
Expand Down Expand Up @@ -814,7 +815,7 @@ def source_represent(id, show_link=True):
return repr

# =============================================================================
def search_subscription_notifications(frequency):
def msg_search_subscription_notifications(frequency):
"""
Send Notifications for all Subscriptions
"""
Expand Down
149 changes: 94 additions & 55 deletions modules/eden/stats.py
Expand Up @@ -241,11 +241,34 @@ def stats_rebuild_aggregates():
- should be reworked to delete old data after new data has been added?
"""

# Check to see whether an existing task is running and if it is then kill it
db = current.db
ttable = db.scheduler_task
rtable = db.scheduler_run
wtable = db.scheduler_worker
query = (ttable.task_name == "stats_group_clean") & \
(rtable.scheduler_task == ttable.id) & \
(rtable.status == "RUNNING")
rows = db(query).select(rtable.id,
rtable.scheduler_task,
rtable.worker_name)
now = current.request.utcnow
for row in rows:
db(wtable.worker_name == row.worker_name).update(status="KILL")
db(rtable.id == row.id).update(stop_time=now,
status="STOPPED")
db(ttable.id == row.scheduler_task).update(stop_time=now,
status="STOPPED")

# Mark all stats_group records as needing to be updated
s3db = current.s3db
db(s3db.stats_group.deleted != True).update(dirty=True)

# Delete the existing data
resource = s3db.resource("stats_aggregate")
resource.delete()
current.db(s3db.stats_group.id > 0).update(dirty=True)

# Fire off a rebuild task
current.s3task.async("stats_group_clean")

# ---------------------------------------------------------------------
Expand Down Expand Up @@ -285,11 +308,21 @@ def stats_update_time_aggregate(cls, data_id=None):
if not data_id:
query = (dtable.deleted != True) & \
(dtable.approved_by != None)
records = db(query).select()
records = db(query).select(dtable.location_id,
dtable.parameter_id,
dtable.data_id,
dtable.date,
dtable.value,
)
elif isinstance(data_id, Rows):
records = data_id
elif not isinstance(data_id, Row):
records = db(dtable.data_id == data_id).select(limitby=(0, 1))
records = db(dtable.data_id == data_id).select(dtable.location_id,
dtable.parameter_id,
dtable.data_id,
dtable.date,
dtable.value,
limitby=(0, 1))
else:
records = [data_id]
data_id = data_id.data_id
Expand Down Expand Up @@ -490,7 +523,9 @@ def stats_update_time_aggregate(cls, data_id=None):
if changed_periods == []:
continue
# The following structures are used in the OPTIMISATION steps later
loc_level_list[location_id] = gis_table[location_id].level
loc_level_list[location_id] = db(gis_table.id == location_id).select(gis_table.level,
limitby=(0, 1)
).first().level
if parameter_id not in param_location_dict:
param_location_dict[parameter_id] = {location_id : changed_periods}
elif location_id not in param_location_dict[parameter_id]:
Expand Down Expand Up @@ -583,7 +618,7 @@ def stats_update_time_aggregate(cls, data_id=None):
# Now calculate the resilence indicators
vulnerability_resilience = s3db.vulnerability_resilience
resilience_pid = s3db.vulnerability_resilience_id()
for (location_id, (period, loc_level,use_location)) in resilence_parents.items():
for (location_id, (period, loc_level, use_location)) in resilence_parents.items():
for (start_date, end_date) in changed_periods:
s, e = str(start_date), str(end_date)
vulnerability_resilience(loc_level,
Expand Down Expand Up @@ -725,8 +760,9 @@ def stats_aggregated_period(data_date = None):

if data_date is None:
data_date = date.today()
soap = date(data_date.year, 1, 1)
eoap = date(data_date.year, 12, 31)
year = data_date.year
soap = date(year, 1, 1)
eoap = date(year, 12, 31)
return (soap, eoap)

# =============================================================================
Expand Down Expand Up @@ -877,6 +913,9 @@ class S3StatsGroupModel(S3Model):
def model(self):

T = current.T
db = current.db
configure = self.configure
define_table = self.define_table

# ---------------------------------------------------------------------
# Document-source entities
Expand All @@ -898,7 +937,7 @@ def model(self):
# Reusable Field
source_id = S3ReusableField("source_id", table,
requires = IS_NULL_OR(
IS_ONE_OF(current.db,
IS_ONE_OF(db,
"stats_source.source_id",
stats_source_represent)),
represent = stats_source_represent,
Expand Down Expand Up @@ -942,64 +981,64 @@ def model(self):

# Components
self.add_component("stats_group", stats_source=self.super_key(table))
self.configure("stats_source",
deduplicate = self.stats_source_duplicate,
)
configure("stats_source",
deduplicate = self.stats_source_duplicate,
)

# ---------------------------------------------------------------------
# The type of document held as a stats_group.
#
tablename = "stats_group_type"
table = self.define_table(tablename,
Field("stats_group_instance",
label=T("Instance Type")),
Field("name",
label=T("Name")),
Field("display",
label=T("Display")),
*s3_meta_fields()
)
table = define_table(tablename,
Field("stats_group_instance",
label=T("Instance Type")),
Field("name",
label=T("Name")),
Field("display",
label=T("Display")),
*s3_meta_fields()
)
# Reusable Field
group_type_id = S3ReusableField("group_type_id", table,
requires = IS_NULL_OR(
IS_ONE_OF(current.db,
"stats_group_type.id",
stats_group_type_represent)),
represent = stats_group_type_represent,
label = T("Source Type"),
ondelete = "CASCADE")
requires = IS_NULL_OR(
IS_ONE_OF(db,
"stats_group_type.id",
stats_group_type_represent)),
represent = stats_group_type_represent,
label = T("Source Type"),
ondelete = "CASCADE")
# Resource Configuration
self.configure("stats_group_type",
deduplicate=self.stats_group_type_duplicate,
)
configure("stats_group_type",
deduplicate=self.stats_group_type_duplicate,
)

# ---------------------------------------------------------------------
# Container for documents and stats records
#
tablename = "stats_group"
table = self.define_table(tablename,
# This is a component, so needs to be a super_link
# - can't override field name, ondelete or requires
self.super_link("source_id", "stats_source"),
s3_date(label = T("Date Published")),
self.gis_location_id(),
group_type_id(),
# Used to indicate if the record has not yet
# been used in aggregate calculations
Field("dirty", "boolean",
#label = T("Dirty"),
default=True,
readable=False,
writable=False),
#Field("reliability",
# label=T("Reliability")),
#Field("review",
# label=T("Review")),
*s3_meta_fields()
)
table = define_table(tablename,
# This is a component, so needs to be a super_link
# - can't override field name, ondelete or requires
self.super_link("source_id", "stats_source"),
s3_date(label = T("Date Published")),
self.gis_location_id(),
group_type_id(),
# Used to indicate if the record has not yet
# been used in aggregate calculations
Field("dirty", "boolean",
#label = T("Dirty"),
default=True,
readable=False,
writable=False),
#Field("reliability",
# label=T("Reliability")),
#Field("review",
# label=T("Review")),
*s3_meta_fields()
)
# Reusable Field
group_id = S3ReusableField("group_id", table,
requires = IS_ONE_OF(current.db,
requires = IS_ONE_OF(db,
"stats_group.id",
stats_group_represent),
represent = stats_group_represent,
Expand All @@ -1008,10 +1047,10 @@ def model(self):

table.virtualfields.append(StatsGroupVirtualFields())
# Resource Configuration
self.configure("stats_group",
deduplicate=self.stats_group_duplicate,
requires_approval = True,
)
configure("stats_group",
deduplicate=self.stats_group_duplicate,
requires_approval = True,
)

# ---------------------------------------------------------------------
# Pass model-global names to response.s3
Expand Down

0 comments on commit eec3cd2

Please sign in to comment.