Skip to content

Commit

Permalink
Joinable subselects (#408)
Browse files Browse the repository at this point in the history
* Minimal implementation of joinable subselects (work in progress)
* Generate subselects in Set.nested_select() instead of BaseAdapter._select()
* Add support for subselects to BaseAdapter._count()
* Prevent table name collisions when applying record versioning filters
* Fix common filters in select with subselects
* Add support for subselect fields to Rows.render() and fix bugs
* Refactor Google datastore adapter and BaseAdapter.get_table()
* Check for table name collisions when building a SELECT query
* When building subselect string, ignore cache if outer_scoped is not empty
* Minor optimization in SQLAdapter._select_wcols()
* Add parameter "correlated" to Select constructor defaults to True. When set to False, the subquery will be self-contained, meaning it will never reference any tables from the parent scope.
* Implement proper expansion of correlated subqueries
* Check that subquery in belongs() expression has exactly 1 column
* Implement expansion of correlated subqueries in count(), update(), delete()
* Update Mongo unit tests to new adapter API
* Move part of Set.nested_select() to adapter code
* Unit tests for the Select class
* Additional unit tests for fixed bugs
* Expand aliased table names using dialect methods
* Fix expansion of correlated subqueries used in UPDATE/DELETE statements
  • Loading branch information
nextghost authored and gi0baro committed Dec 1, 2016
1 parent 13c729a commit 5cd99f5
Show file tree
Hide file tree
Showing 22 changed files with 1,216 additions and 591 deletions.
283 changes: 157 additions & 126 deletions pydal/adapters/base.py

Large diffs are not rendered by default.

17 changes: 9 additions & 8 deletions pydal/adapters/couchdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ def create_table(self, table, migrate=True, fake_migrate=False,
super(CouchDB, self).create_table(
table, migrate, fake_migrate, polymodel)

def _expand(self, expression, field_type=None):
def _expand(self, expression, field_type=None, query_env={}):
if isinstance(expression, Field):
if expression.type == 'id':
return "%s._id" % expression.tablename
return SQLAdapter._expand(self, expression, field_type)
return SQLAdapter._expand(self, expression, field_type,
query_env=query_env)

def insert(self, table, fields):
rid = uuid2int(self.db.uuid())
Expand Down Expand Up @@ -67,7 +68,7 @@ def _select(self, query, fields, left=False, join=False, distinct=False,
new_fields.append(item)

fields = new_fields
tablename = self.get_table(query)
tablename = self.get_table(query)._tablename
fieldnames = [f.name for f in (fields or self.db[tablename])]
colnames = [
'%s.%s' % (tablename, fieldname) for fieldname in fieldnames]
Expand All @@ -88,7 +89,7 @@ def select(self, query, fields, attributes):
processor = attributes.get('processor', self.parse)
return processor(rows, fields, colnames, False)

def update(self, tablename, query, fields):
def update(self, table, query, fields):
from ..drivers import couchdb
if not isinstance(query, Query):
raise SyntaxError("Not Supported")
Expand All @@ -105,7 +106,7 @@ def update(self, tablename, query, fields):
return 1
except couchdb.http.ResourceNotFound:
return 0
tablename = self.get_table(query)
tablename = self.get_table(query)._tablename
rows = self.select(query, [self.db[tablename]._id], {})
ctable = self.connection[tablename]
table = self.db[tablename]
Expand All @@ -121,11 +122,11 @@ def count(self, query, distinct=None):
raise RuntimeError("COUNT DISTINCT not supported")
if not isinstance(query, Query):
raise SyntaxError("Not Supported")
tablename = self.get_table(query)
tablename = self.get_table(query)._tablename
rows = self.select(query, [self.db[tablename]._id], {})
return len(rows)

def delete(self, tablename, query):
def delete(self, table, query):
from ..drivers import couchdb
if not isinstance(query, Query):
raise SyntaxError("Not Supported")
Expand All @@ -139,7 +140,7 @@ def delete(self, tablename, query):
return 1
except couchdb.http.ResourceNotFound:
return 0
tablename = self.get_table(query)
tablename = self.get_table(query)._tablename
rows = self.select(query, [self.db[tablename]._id], {})
ctable = self.connection[tablename]
for row in rows:
Expand Down
47 changes: 24 additions & 23 deletions pydal/adapters/google.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def create_table(self, table, migrate=True, fake_migrate=False,
"polymodel must be None, True, a table or a tablename")
return None

def _expand(self, expression, field_type=None):
def _expand(self, expression, field_type=None, query_env={}):
if expression is None:
return None
elif isinstance(expression, Field):
Expand All @@ -182,9 +182,10 @@ def _expand(self, expression, field_type=None):
return expression.name
elif isinstance(expression, (Expression, Query)):
if expression.second is not None:
return expression.op(expression.first, expression.second)
return expression.op(expression.first, expression.second,
query_env=query_env)
elif expression.first is not None:
return expression.op(expression.first)
return expression.op(expression.first, query_env=query_env)
else:
return expression.op()
elif field_type:
Expand Down Expand Up @@ -243,24 +244,24 @@ def select_raw(self, query, fields=None, attributes=None,

fields = new_fields
if query:
tablename = self.get_table(query)
table = self.get_table(query)
elif fields:
tablename = fields[0].tablename
table = fields[0].table
query = db._adapter.id_query(fields[0].table)
else:
raise SyntaxError("Unable to determine a tablename")
raise SyntaxError("Unable to determine the table")

if query:
if use_common_filters(query):
query = self.common_filter(query, [tablename])
query = self.common_filter(query, [table])

#tableobj is a GAE/NDB Model class (or subclass)
tableobj = db[tablename]._tableobj
tableobj = table._tableobj
filters = self.expand(query)

## DETERMINE PROJECTION
projection = None
if len(db[tablename].fields) == len(fields):
if len(table.fields) == len(fields):
# getting all fields, not a projection query
projection = None
elif args_get('projection') == True:
Expand All @@ -271,18 +272,18 @@ def select_raw(self, query, fields=None, attributes=None,
"text and blob field types not allowed in " +
"projection queries")
else:
projection.append(f.name)
projection.append(f)

elif args_get('filterfields') is True:
projection = []
for f in fields:
projection.append(f.name)
projection.append(f)

# real projection's can't include 'id'.
# it will be added to the result later
if projection and args_get('projection') == True:
query_projection = filter(lambda p: p != db[tablename]._id.name,
projection)
query_projection = [f.name for f in projection
if f.name != table._id.name]
else:
query_projection = None
## DONE WITH PROJECTION
Expand Down Expand Up @@ -339,7 +340,7 @@ def select_raw(self, query, fields=None, attributes=None,
# didn't return all results
if args_get('reusecursor'):
db['_lastcursor'] = cursor
return (items, tablename, projection or db[tablename].fields)
return (items, table, projection or [f for f in table])

def select(self, query, fields, attributes):
"""
Expand All @@ -366,30 +367,30 @@ def select(self, query, fields, attributes):
https://developers.google.com/appengine/docs/python/datastore/queries#Query_Cursors
"""

items, tablename, fields = self.select_raw(query, fields, attributes)
items, table, fields = self.select_raw(query, fields, attributes)
rows = [
[
(t == self.db[tablename]._id.name and item) or
(t == 'nativeRef' and item) or getattr(item, t)
(t.name == table._id.name and item) or
(t.name == 'nativeRef' and item) or getattr(item, t.name)
for t in fields
] for item in items]
colnames = ['%s.%s' % (tablename, t) for t in fields]
colnames = ['%s.%s' % (table._tablename, t.name) for t in fields]
processor = attributes.get('processor', self.parse)
return processor(rows, fields, colnames, False)

def count(self, query, distinct=None, limit=None):
if distinct:
raise RuntimeError("COUNT DISTINCT not supported")
items, tablename, fields = self.select_raw(query, count_only=True)
items, table, fields = self.select_raw(query, count_only=True)
return items[0]

def delete(self, tablename, query):
def delete(self, table, query):
"""
This function was changed on 2010-05-04 because according to
http://code.google.com/p/googleappengine/issues/detail?id=3119
GAE no longer supports deleting more than 1000 records.
"""
items, tablename, fields = self.select_raw(query)
items, table, fields = self.select_raw(query)
# items can be one item or a query
if not isinstance(items, list):
# use a keys_only query to ensure that this runs as a datastore
Expand All @@ -405,8 +406,8 @@ def delete(self, tablename, query):
ndb.delete_multi([item.key for item in items])
return counter

def update(self, tablename, query, update_fields):
items, tablename, fields = self.select_raw(query)
def update(self, table, query, update_fields):
items, table, fields = self.select_raw(query)
counter = 0
for item in items:
for field, value in update_fields:
Expand Down
20 changes: 11 additions & 9 deletions pydal/adapters/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def _parse_data(expression, attribute, value=None):
except (AttributeError, TypeError):
return None

def _expand(self, expression, field_type=None):
def _expand(self, expression, field_type=None, query_env={}):
if isinstance(expression, Field):
if expression.type == 'id':
result = "_id"
Expand All @@ -169,6 +169,7 @@ def _expand(self, expression, field_type=None):
second = self.object_id(expression.second)
op = expression.op
optional_args = expression.optional_args or {}
optional_args['query_env'] = query_env
if second is not None:
result = op(first, second, **optional_args)
elif first is not None:
Expand All @@ -178,7 +179,8 @@ def _expand(self, expression, field_type=None):
else:
result = op(**optional_args)
elif isinstance(expression, Expansion):
expression.query = (self.expand(expression.query, field_type))
expression.query = (self.expand(expression.query, field_type,
query_env=query_env))
result = expression
elif isinstance(expression, (list, tuple)):
result = [self.represent(item, field_type) for item in expression]
Expand Down Expand Up @@ -239,7 +241,7 @@ def __select(self, query, fields, left=False, join=False, distinct=False,
else:
new_fields.append(item)
fields = new_fields
tablename = self.get_table(query, *fields)
tablename = self.get_table(query, *fields)._tablename

if for_update:
self.db.logger.warning(
Expand Down Expand Up @@ -426,7 +428,7 @@ def insert(self, table, fields, safe=None):
else:
return None

def update(self, tablename, query, fields, safe=None):
def update(self, table, query, fields, safe=None):
# return amount of adjusted rows or zero, but no exceptions
# @ related not finding the result
if not isinstance(query, Query):
Expand Down Expand Up @@ -465,7 +467,7 @@ def update(self, tablename, query, fields, safe=None):
raise RuntimeError(
"uncaught exception when updating rows: %s" % e)

def delete(self, tablename, query, safe=None):
def delete(self, table, query, safe=None):
if not isinstance(query, Query):
raise RuntimeError("query type %s is not supported" % type(query))

Expand All @@ -479,19 +481,18 @@ def delete(self, tablename, query, safe=None):

# find references to deleted items
db = self.db
table = db[tablename]
cascade = []
set_null = []
for field in table._referenced_by:
if field.type == 'reference ' + tablename:
if field.type == 'reference ' + table._tablename:
if field.ondelete == 'CASCADE':
cascade.append(field)
if field.ondelete == 'SET NULL':
set_null.append(field)
cascade_list = []
set_null_list = []
for field in table._referenced_by_list:
if field.type == 'list:reference ' + tablename:
if field.type == 'list:reference ' + table._tablename:
if field.ondelete == 'CASCADE':
cascade_list.append(field)
if field.ondelete == 'SET NULL':
Expand Down Expand Up @@ -606,7 +607,8 @@ def __init__(self, adapter, crud, query, fields=(), tablename=None,
self.fields = [self.annotate_expression(f)
for f in (fields or [])]

self.tablename = tablename or adapter.get_table(query, *self.fields)
self.tablename = (tablename or
adapter.get_table(query, *self.fields)._tablename)
if use_common_filters(query):
query = adapter.common_filter(query, [self.tablename])
self.query = self.annotate_expression(query)
Expand Down
7 changes: 3 additions & 4 deletions pydal/adapters/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,13 @@ def select(self, query, fields, attributes):
self.execute('BEGIN IMMEDIATE TRANSACTION;')
return super(SQLite, self).select(query, fields, attributes)

def delete(self, tablename, query):
def delete(self, table, query):
db = self.db
table = db[tablename]
deleted = [x[table._id.name] for x in db(query).select(table._id)]
counter = super(SQLite, self).delete(tablename, query)
counter = super(SQLite, self).delete(table, query)
if counter:
for field in table._referenced_by:
if field.type == 'reference ' + tablename \
if field.type == 'reference ' + table._tablename \
and field.ondelete == 'CASCADE':
db(field.belongs(deleted)).delete()
return counter
Expand Down

0 comments on commit 5cd99f5

Please sign in to comment.