Skip to content

Commit

Permalink
Add an identity_token to the identity key
Browse files Browse the repository at this point in the history
For the purposes of assisting with sharded setups, add a new
member to the identity key that can be customized.  this allows
sharding across databases where the primary key space is shared.

Change-Id: Iae3909f5d4c501b62c10d0371fbceb01abda51db
Fixes: #4137
  • Loading branch information
zzzeek committed Dec 22, 2017
1 parent 0493765 commit 50d9f16
Show file tree
Hide file tree
Showing 15 changed files with 388 additions and 177 deletions.
49 changes: 49 additions & 0 deletions doc/build/changelog/migration_12.rst
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,55 @@ to query across the two proxies ``A.c_values``, ``AtoB.c_value``:

:ticket:`3769`

.. _change_4137:

Identity key enhancements to support sharding
---------------------------------------------

The identity key structure used by the ORM now contains an additional
member, so that two identical primary keys that originate from different
contexts can co-exist within the same identity map.

The example at :ref:`examples_sharding` has been updated to illustrate this
behavior. The example shows a sharded class ``WeatherLocation`` that
refers to a dependent ``WeatherReport`` object, where the ``WeatherReport``
class is mapped to a table that stores a simple integer primary key. Two
``WeatherReport`` objects from different databases may have the same
primary key value. The example now illustrates that a new ``identity_token``
field tracks this difference so that the two objects can co-exist in the
same identity map::

tokyo = WeatherLocation('Asia', 'Tokyo')
newyork = WeatherLocation('North America', 'New York')

tokyo.reports.append(Report(80.0))
newyork.reports.append(Report(75))

sess = create_session()

sess.add_all([tokyo, newyork, quito])

sess.commit()

# the Report class uses a simple integer primary key. So across two
# databases, a primary key will be repeated. The "identity_token" tracks
# in memory that these two identical primary keys are local to different
# databases.

newyork_report = newyork.reports[0]
tokyo_report = tokyo.reports[0]

assert inspect(newyork_report).identity_key == (Report, (1, ), "north_america")
assert inspect(tokyo_report).identity_key == (Report, (1, ), "asia")

# the token representing the originating shard is also available directly

assert inspect(newyork_report).identity_token == "north_america"
assert inspect(tokyo_report).identity_token == "asia"


:ticket:`4137`

New Features and Improvements - Core
====================================

Expand Down
17 changes: 17 additions & 0 deletions doc/build/changelog/unreleased_12/4137.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
.. change::
:tags: orm, feature
:tickets: 4137

Added a new data member to the identity key tuple
used by the ORM's identity map, known as the
"identity_token". This token defaults to None but
may be used by database sharding schemes to differentiate
objects in memory with the same primary key that come
from different databases. The horizontal sharding
extension integrates this token applying the shard
identifier to it, thus allowing primary keys to be
duplicated across horizontally sharded backends.

.. seealso::

:ref:`change_4137`
205 changes: 106 additions & 99 deletions examples/sharding/attribute_shard.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@

# step 1. imports
from sqlalchemy import (create_engine, MetaData, Table, Column, Integer,
String, ForeignKey, Float, DateTime, event)
from sqlalchemy.orm import sessionmaker, mapper, relationship
from sqlalchemy import (create_engine, Table, Column, Integer,
String, ForeignKey, Float, DateTime)
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy.ext.horizontal_shard import ShardedSession
from sqlalchemy.sql import operators, visitors
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import inspect

import datetime

# step 2. databases.
# db1 is used for id generation. The "pool_threadlocal"
# causes the id_generator() to use the same connection as that
# of an ongoing transaction within db1.
Expand All @@ -19,61 +18,79 @@
db4 = create_engine('sqlite://', echo=echo)


# step 3. create session function. this binds the shard ids
# create session function. this binds the shard ids
# to databases within a ShardedSession and returns it.
create_session = sessionmaker(class_=ShardedSession)

create_session.configure(shards={
'north_america':db1,
'asia':db2,
'europe':db3,
'south_america':db4
'north_america': db1,
'asia': db2,
'europe': db3,
'south_america': db4
})


# step 4. table setup.
meta = MetaData()
# mappings and tables
Base = declarative_base()

# we need a way to create identifiers which are unique across all
# databases. one easy way would be to just use a composite primary key, where one
# value is the shard id. but here, we'll show something more "generic", an
# id generation function. we'll use a simplistic "id table" stored in database
# #1. Any other method will do just as well; UUID, hilo, application-specific, etc.
# we need a way to create identifiers which are unique across all databases.
# one easy way would be to just use a composite primary key, where one value
# is the shard id. but here, we'll show something more "generic", an id
# generation function. we'll use a simplistic "id table" stored in database
# #1. Any other method will do just as well; UUID, hilo, application-specific,
# etc.

ids = Table('ids', meta,
ids = Table(
'ids', Base.metadata,
Column('nextid', Integer, nullable=False))


def id_generator(ctx):
# in reality, might want to use a separate transaction for this.
c = db1.connect()
nextid = c.execute(ids.select(for_update=True)).scalar()
c.execute(ids.update(values={ids.c.nextid : ids.c.nextid + 1}))
with db1.connect() as conn:
nextid = conn.scalar(ids.select(for_update=True))
conn.execute(ids.update(values={ids.c.nextid: ids.c.nextid + 1}))
return nextid

# table setup. we'll store a lead table of continents/cities,
# and a secondary table storing locations.
# a particular row will be placed in the database whose shard id corresponds to the
# 'continent'. in this setup, secondary rows in 'weather_reports' will
# be placed in the same DB as that of the parent, but this can be changed
# if you're willing to write more complex sharding functions.

weather_locations = Table("weather_locations", meta,
Column('id', Integer, primary_key=True, default=id_generator),
Column('continent', String(30), nullable=False),
Column('city', String(50), nullable=False)
)

weather_reports = Table("weather_reports", meta,
Column('id', Integer, primary_key=True),
Column('location_id', Integer, ForeignKey('weather_locations.id')),
Column('temperature', Float),
Column('report_time', DateTime, default=datetime.datetime.now),
)
# table setup. we'll store a lead table of continents/cities, and a secondary
# table storing locations. a particular row will be placed in the database
# whose shard id corresponds to the 'continent'. in this setup, secondary rows
# in 'weather_reports' will be placed in the same DB as that of the parent, but
# this can be changed if you're willing to write more complex sharding
# functions.


class WeatherLocation(Base):
__tablename__ = "weather_locations"

id = Column(Integer, primary_key=True, default=id_generator)
continent = Column(String(30), nullable=False)
city = Column(String(50), nullable=False)

reports = relationship("Report", backref='location')

def __init__(self, continent, city):
self.continent = continent
self.city = city


class Report(Base):
__tablename__ = "weather_reports"

id = Column(Integer, primary_key=True)
location_id = Column(
'location_id', Integer, ForeignKey('weather_locations.id'))
temperature = Column('temperature', Float)
report_time = Column(
'report_time', DateTime, default=datetime.datetime.now)

def __init__(self, temperature):
self.temperature = temperature

# create tables
for db in (db1, db2, db3, db4):
meta.drop_all(db)
meta.create_all(db)
Base.metadata.drop_all(db)
Base.metadata.create_all(db)

# establish initial "id" in db1
db1.execute(ids.insert(), nextid=1)
Expand All @@ -84,12 +101,13 @@ def id_generator(ctx):
# we'll use a straight mapping of a particular set of "country"
# attributes to shard id.
shard_lookup = {
'North America':'north_america',
'Asia':'asia',
'Europe':'europe',
'South America':'south_america'
'North America': 'north_america',
'Asia': 'asia',
'Europe': 'europe',
'South America': 'south_america'
}


def shard_chooser(mapper, instance, clause=None):
"""shard chooser.
Expand All @@ -104,6 +122,7 @@ def shard_chooser(mapper, instance, clause=None):
else:
return shard_chooser(mapper, instance.location)


def id_chooser(query, ident):
"""id chooser.
Expand All @@ -116,6 +135,7 @@ def id_chooser(query, ident):
"""
return ['north_america', 'asia', 'europe', 'south_america']


def query_chooser(query):
"""query chooser.
Expand All @@ -133,9 +153,9 @@ def query_chooser(query):
# statement column, adjusting for any annotations present.
# (an annotation is an internal clone of a Column object
# and occur when using ORM-mapped attributes like
# "WeatherLocation.continent"). A simpler comparison, though less accurate,
# would be "column.key == 'continent'".
if column.shares_lineage(weather_locations.c.continent):
# "WeatherLocation.continent"). A simpler comparison, though less
# accurate, would be "column.key == 'continent'".
if column.shares_lineage(WeatherLocation.__table__.c.continent):
if operator == operators.eq:
ids.append(shard_lookup[value])
elif operator == operators.in_op:
Expand All @@ -146,6 +166,7 @@ def query_chooser(query):
else:
return ids


def _get_query_comparisons(query):
"""Search an orm.Query object for binary expressions.
Expand Down Expand Up @@ -185,65 +206,39 @@ def visit_binary(binary):
binary.operator == operators.in_op and \
hasattr(binary.right, 'clauses'):
comparisons.append(
(binary.left, binary.operator,
(
binary.left, binary.operator,
tuple(binds[bind] for bind in binary.right.clauses)
)
)
elif binary.left in clauses and binary.right in binds:
comparisons.append(
(binary.left, binary.operator,binds[binary.right])
(binary.left, binary.operator, binds[binary.right])
)

elif binary.left in binds and binary.right in clauses:
comparisons.append(
(binary.right, binary.operator,binds[binary.left])
(binary.right, binary.operator, binds[binary.left])
)

# here we will traverse through the query's criterion, searching
# for SQL constructs. We will place simple column comparisons
# into a list.
if query._criterion is not None:
visitors.traverse_depthfirst(query._criterion, {},
{'bindparam':visit_bindparam,
'binary':visit_binary,
'column':visit_column
}
visitors.traverse_depthfirst(
query._criterion, {},
{'bindparam': visit_bindparam,
'binary': visit_binary,
'column': visit_column}
)
return comparisons

# further configure create_session to use these functions
create_session.configure(
shard_chooser=shard_chooser,
id_chooser=id_chooser,
query_chooser=query_chooser
)

# step 6. mapped classes.
class WeatherLocation(object):
def __init__(self, continent, city):
self.continent = continent
self.city = city

class Report(object):
def __init__(self, temperature):
self.temperature = temperature

# step 7. mappers
mapper(WeatherLocation, weather_locations, properties={
'reports':relationship(Report, backref='location')
})

mapper(Report, weather_reports)

# step 8 (optional), events. The "shard_id" is placed
# in the QueryContext where it can be intercepted and associated
# with objects, if needed.

def add_shard_id(instance, ctx):
instance.shard_id = ctx.attributes["shard_id"]

event.listen(WeatherLocation, "load", add_shard_id)
event.listen(Report, "load", add_shard_id)
shard_chooser=shard_chooser,
id_chooser=id_chooser,
query_chooser=query_chooser
)

# save and load objects!

Expand All @@ -260,21 +255,33 @@ def add_shard_id(instance, ctx):
quito.reports.append(Report(85))

sess = create_session()
for c in [tokyo, newyork, toronto, london, dublin, brasilia, quito]:
sess.add(c)
sess.commit()

tokyo_id = tokyo.id
sess.add_all([tokyo, newyork, toronto, london, dublin, brasilia, quito])

sess.close()
sess.commit()

t = sess.query(WeatherLocation).get(tokyo_id)
t = sess.query(WeatherLocation).get(tokyo.id)
assert t.city == tokyo.city
assert t.reports[0].temperature == 80.0

north_american_cities = sess.query(WeatherLocation).filter(WeatherLocation.continent == 'North America')
assert [c.city for c in north_american_cities] == ['New York', 'Toronto']
north_american_cities = sess.query(WeatherLocation).filter(
WeatherLocation.continent == 'North America')
assert {c.city for c in north_american_cities} == {'New York', 'Toronto'}

asia_and_europe = sess.query(WeatherLocation).filter(
WeatherLocation.continent.in_(['Europe', 'Asia']))
assert {c.city for c in asia_and_europe} == {'Tokyo', 'London', 'Dublin'}

# the Report class uses a simple integer primary key. So across two databases,
# a primary key will be repeated. The "identity_token" tracks in memory
# that these two identical primary keys are local to different databases.
newyork_report = newyork.reports[0]
tokyo_report = tokyo.reports[0]

assert inspect(newyork_report).identity_key == (Report, (1, ), "north_america")
assert inspect(tokyo_report).identity_key == (Report, (1, ), "asia")

asia_and_europe = sess.query(WeatherLocation).filter(WeatherLocation.continent.in_(['Europe', 'Asia']))
assert set([c.city for c in asia_and_europe]) == set(['Tokyo', 'London', 'Dublin'])
# the token representing the originating shard is also available directly

assert inspect(newyork_report).identity_token == "north_america"
assert inspect(tokyo_report).identity_token == "asia"

0 comments on commit 50d9f16

Please sign in to comment.