Skip to content

Commit

Permalink
Merge pull request #611 from modilabs/596-faster-add
Browse files Browse the repository at this point in the history
596 faster add
  • Loading branch information
Peter Lubell-Doughtie committed Sep 10, 2013
2 parents 929eaaf + bc7f656 commit b3ee280
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 13 deletions.
5 changes: 4 additions & 1 deletion bamboo/controllers/datasets.py
Expand Up @@ -282,16 +282,19 @@ def reset(self, dataset_id, **kwargs):
"""
return self.__create_or_update(dataset_id=dataset_id, **kwargs)

def update(self, dataset_id, update):
def update(self, dataset_id, update, clear_pending=False):
"""Update the `dataset_id` with the new rows as JSON.
:param dataset_id: The ID of the dataset to update.
:param update: The JSON to update the dataset with.
:param clear_pending: Remove any pending updates. Default False.
:returns: A JSON dict with the ID of the dataset updated, or with an
error message.
"""
def action(dataset, update=update):
if clear_pending:
dataset.clear_pending_updates()
update = safe_json_loads(update)
dataset.add_observations(update)

Expand Down
25 changes: 14 additions & 11 deletions bamboo/core/calculator.py
Expand Up @@ -4,8 +4,7 @@
from pandas import concat, DataFrame

from bamboo.core.aggregator import Aggregator
from bamboo.core.frame import add_parent_column, join_dataset,\
NonUniqueJoinError
from bamboo.core.frame import add_parent_column, join_dataset
from bamboo.core.parser import Parser
from bamboo.lib.datetools import recognize_dates
from bamboo.lib.jsontools import df_to_jsondict
Expand Down Expand Up @@ -71,6 +70,10 @@ def calculate_updates(dataset, new_data, new_dframe_raw=None,
:param parent_dataset_id: If passed add ID as parent ID to column,
default is None.
"""
if not __update_is_valid(dataset, new_dframe_raw):
dataset.remove_pending_update(update_id)
return

__ensure_ready(dataset, update_id)

if new_dframe_raw is None:
Expand Down Expand Up @@ -131,7 +134,6 @@ def dframe_from_update(dataset, new_data):

index = range(num_rows, num_rows + len(filtered_data))
new_dframe = DataFrame(filtered_data, index=index)
__check_update_is_valid(dataset, new_dframe)

return new_dframe

Expand Down Expand Up @@ -190,27 +192,28 @@ def __calculation_data(dataset):
return flatten(calcs_to_data.values())


def __check_update_is_valid(dataset, new_dframe_raw):
def __update_is_valid(dataset, new_dframe):
"""Check if the update is valid.
Check whether this is a right-hand side of any joins
and deny the update if the update would produce an invalid
join as a result.
:raises: `NonUniqueJoinError` if update is illegal given joins of
dataset.
:param dataset: The dataset to check if update valid for.
:param new_dframe: The update dframe to check.
:returns: True is the update is valid, False otherwise.
"""
select = {on: 1 for on in dataset.on_columns_for_rhs_of_joins if on in
new_dframe_raw.columns and on in dataset.columns}
new_dframe.columns and on in dataset.columns}
dframe = dataset.dframe(query_args=QueryArgs(select=select))

for on in select.keys():
merged_join_column = concat([new_dframe_raw[on], dframe[on]])
merged_join_column = concat([new_dframe[on], dframe[on]])

if len(merged_join_column) != merged_join_column.nunique():
msg = 'Cannot update. This is the right hand join and the column '\
'"%s" will become non-unique.' % on
raise NonUniqueJoinError(msg)
return False

return True


def __create_aggregator(dataset, formula, name, groups, dframe=None):
Expand Down
11 changes: 11 additions & 0 deletions bamboo/models/dataset.py
Expand Up @@ -283,6 +283,11 @@ def clear_cache(self):

return self

def clear_pending_updates(self):
self.collection.update(
{'_id': self.record['_id']},
{'$set': {self.PENDING_UPDATES: []}})

def clear_summary_stats(self, group=None, column=None):
"""Remove summary stats for `group` and optional `column`.
Expand Down Expand Up @@ -450,6 +455,7 @@ def info(self, update=None):
self.NUM_ROWS: self.num_rows,
self.STATE: self.state,
self.PARENT_IDS: self.parent_ids,
self.PENDING_UPDATES: self.pending_updates,
}

def is_dimension(self, col):
Expand Down Expand Up @@ -521,6 +527,11 @@ def remove_parent_observations(self, parent_id):
# clear the cached dframe
self.__dframe = None

def remove_pending_update(self, update_id):
self.collection.update(
{'_id': self.record['_id']},
{'$pull': {self.PENDING_UPDATES: update_id}})

def replace_observations(self, dframe, overwrite=False,
set_num_columns=True):
"""Remove all rows for this dataset and save the rows in `dframe`.
Expand Down
7 changes: 6 additions & 1 deletion bamboo/tests/controllers/test_datasets_edit.py
Expand Up @@ -217,13 +217,18 @@ def test_edit_row_with_join_invalid(self):

left_dataset_id = self._post_file()
right_dataset_id = self._post_file('good_eats_aux.csv')
num_rows_before = Dataset.find_one(right_dataset_id).num_rows
on = 'food_type'
json.loads(self.controller.join(
left_dataset_id, right_dataset_id, on=on))

results = json.loads(self.controller.row_update(
right_dataset_id, index, json.dumps(update)))
self.assertTrue(Datasets.ERROR in results.keys())
self.assertTrue(Datasets.SUCCESS in results.keys())

dataset = Dataset.find_one(right_dataset_id)
self.assertEqual(num_rows_before, dataset.num_rows)
self.assertEqual(dataset.pending_updates, [])

def test_edit_row_with_merge(self):
index = 0
Expand Down

0 comments on commit b3ee280

Please sign in to comment.