Skip to content

Commit

Permalink
Use attr keys when testing bulk update params for primary key
Browse files Browse the repository at this point in the history
Fixed bug in :meth:`.Session.bulk_update_mappings` where alternate mapped
attribute names would result in the primary key column of the UPDATE
statement being included in the SET clause, as well as the WHERE clause;
while usually harmless, for SQL Server this can raise an error due to the
IDENTITY column.  This is a continuation of the same bug that was fixed in
🎫`.3849`, where testing was insufficient to catch this additional
flaw.

Fixes: #4357
Change-Id: Iead058c0465dfa31c5b8a8780769278b7000acc8
  • Loading branch information
zzzeek committed Nov 2, 2018
1 parent 7d372da commit e991684
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 13 deletions.
11 changes: 11 additions & 0 deletions doc/build/changelog/unreleased_12/4357.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
.. change::
:tags: bug, orm
:tickets: 4357

Fixed bug in :meth:`.Session.bulk_update_mappings` where alternate mapped
attribute names would result in the primary key column of the UPDATE
statement being included in the SET clause, as well as the WHERE clause;
while usually harmless, for SQL Server this can raise an error due to the
IDENTITY column. This is a continuation of the same bug that was fixed in
:ticket:`.3849`, where testing was insufficient to catch this additional
flaw.
8 changes: 7 additions & 1 deletion lib/sqlalchemy/orm/persistence.py
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,8 @@ def _collect_insert_commands(
params[colkey] = None

if not bulk or return_defaults:
# params are in terms of Column key objects, so
# compare to pk_keys_by_table
has_all_pks = mapper._pk_keys_by_table[table].issubset(params)

if mapper.base_mapper.eager_defaults:
Expand Down Expand Up @@ -468,11 +470,13 @@ def _collect_update_commands(
propkey_to_col = mapper._propkey_to_col[table]

if bulk:
# keys here are mapped attrbute keys, so
# look at mapper attribute keys for pk
params = dict(
(propkey_to_col[propkey].key, state_dict[propkey])
for propkey in
set(propkey_to_col).intersection(state_dict).difference(
mapper._pk_keys_by_table[table])
mapper._pk_attr_keys_by_table[table])
)
has_all_defaults = True
else:
Expand Down Expand Up @@ -537,6 +541,8 @@ def _collect_update_commands(

has_all_pks = True
if bulk:
# keys here are mapped attrbute keys, so
# look at mapper attribute keys for pk
pk_params = dict(
(propkey_to_col[propkey]._label, state_dict.get(propkey))
for propkey in
Expand Down
72 changes: 60 additions & 12 deletions test/orm/test_bulk.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,56 +391,104 @@ def setup_mappers(cls):
})

def test_insert_keys(self):
self._test_insert(self.classes.PersonKeys)
asserter = self._test_insert(self.classes.PersonKeys)
asserter.assert_(
CompiledSQL(
"INSERT INTO people_keys (person_id, name) "
"VALUES (:id, :personname)",
[{'id': 5, 'personname': 'thename'}]
),
)

def test_insert_attrs(self):
self._test_insert(self.classes.PersonAttrs)
asserter = self._test_insert(self.classes.PersonAttrs)
asserter.assert_(
CompiledSQL(
"INSERT INTO people_attrs (person_id, name) "
"VALUES (:person_id, :name)",
[{'person_id': 5, 'name': 'thename'}]
),
)

def test_insert_both(self):
self._test_insert(self.classes.PersonBoth)
asserter = self._test_insert(self.classes.PersonBoth)
asserter.assert_(
CompiledSQL(
"INSERT INTO people_both (person_id, name) "
"VALUES (:id_key, :name_key)",
[{'id_key': 5, 'name_key': 'thename'}]
),
)

def test_update_keys(self):
self._test_update(self.classes.PersonKeys)
asserter = self._test_update(self.classes.PersonKeys)
asserter.assert_(
CompiledSQL(
"UPDATE people_keys SET name=:personname "
"WHERE people_keys.person_id = :people_keys_person_id",
[{'personname': 'newname', 'people_keys_person_id': 5}]
),
)

@testing.requires.updateable_autoincrement_pks
def test_update_attrs(self):
self._test_update(self.classes.PersonAttrs)
asserter = self._test_update(self.classes.PersonAttrs)
asserter.assert_(
CompiledSQL(
"UPDATE people_attrs SET name=:name "
"WHERE people_attrs.person_id = :people_attrs_person_id",
[{'name': 'newname', 'people_attrs_person_id': 5}]
),
)

@testing.requires.updateable_autoincrement_pks
def test_update_both(self):
# want to make sure that before [ticket:3849], this did not have
# a successful behavior or workaround
self._test_update(self.classes.PersonBoth)
asserter = self._test_update(self.classes.PersonBoth)
asserter.assert_(
CompiledSQL(
"UPDATE people_both SET name=:name_key "
"WHERE people_both.person_id = :people_both_person_id",
[{'name_key': 'newname', 'people_both_person_id': 5}]
),
)

def _test_insert(self, person_cls):
Person = person_cls

s = Session()
s.bulk_insert_mappings(
Person, [{"id": 5, "personname": "thename"}]
)
with self.sql_execution_asserter(testing.db) as asserter:
s.bulk_insert_mappings(
Person, [{"id": 5, "personname": "thename"}]
)

eq_(
s.query(Person).first(),
Person(id=5, personname="thename")
)

return asserter

def _test_update(self, person_cls):
Person = person_cls

s = Session()
s.add(Person(id=5, personname="thename"))
s.commit()

s.bulk_update_mappings(
Person, [{"id": 5, "personname": "newname"}]
)
with self.sql_execution_asserter(testing.db) as asserter:
s.bulk_update_mappings(
Person, [{"id": 5, "personname": "newname"}]
)

eq_(
s.query(Person).first(),
Person(id=5, personname="newname")
)

return asserter


class BulkInheritanceTest(BulkTest, fixtures.MappedTest):
@classmethod
Expand Down

0 comments on commit e991684

Please sign in to comment.