New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DM-16227: Move collection integrity constraint into DB and fix transactions #117
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, few minor comments.
elif isinstance(v, str): | ||
message.update(v.encode('utf8')) | ||
elif isinstance(v, datetime.datetime): | ||
message.update(v.isoformat().encode('utf8')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does isoformat
make unambiguous representation (I'm thinking about timezones)? Would it be easier to just take number of seconds in UTC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it does, but I'm not certain, and I didn't see an easy way to get number of seconds in UTC from Python datetime
. In any case, I expect this all to get fixed in DM-15890, which will switch all of the datetimes we use in the database to TAI MJD to avoid these kinds of problems entirely.
argument to update the hash. | ||
""" | ||
for k, v in self.items(): | ||
message.update(k.encode('utf8')) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
utf-8
is the default for encode()
, maybe just drop it rather than repeat everywhere?
"""Add existing Datasets to a collection, implicitly creating the | ||
collection if it does not already exist. | ||
|
||
If a DatasetRef with the same exact `dataset_id`` is already in a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One more backtick on the left
return None | ||
if not isinstance(value, bytes): | ||
raise TypeError(f"Base64Bytes fields require 'bytes' values; got {value}") | ||
return b64encode(value).decode('utf8') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick - base64 is guaranteed to make ascii
bytes
@@ -13,6 +13,7 @@ schema: | |||
- | |||
name: dataset_type_name | |||
type: string | |||
length: 128 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For future extension I'd probably suggest doing something like:
type: string
typeKwargs: {length: 128}
but with just two of these new keywords this maybe an overkill. Can wait until we add third 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to just get the schema definition out of YAML (DM-17154) and into direct SQLAlchemy Python code; we're on the road to trying to map the full complexity of SQLAlchemy schema definition to a home-brew YAML format, and I think we need to get off that road, especially before getting serious about an Oracle implementation.
config/schema.yaml
Outdated
nbytes: 32 | ||
nullable: false | ||
doc: > | ||
Secture hash of the data ID (i.e. dimension link values) and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Secure
config/schema.yaml
Outdated
primary_key: true | ||
doc: > | ||
Name of the Instrument with which this filter is associated. | ||
- | ||
name: physical_filter | ||
type: string | ||
length: 8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is 8 characters enough for everyone?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most filter names are single characters, and physical filters may contain instrument prefixes or version suffixes, that'll still be pretty small. That said, maybe the size of the instrument field + 8 would be better (i.e. 16).
|
||
If a DatasetRef with the same exact `dataset_id`` is already in a | ||
collection nothing is changed. If a `DatasetRef` with the same | ||
`DatasetType1` and dimension values but with different ``dataset_id`` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DatasetType1?
impl = String | ||
|
||
def __init__(self, nbytes, *args, **kwds): | ||
length = 4*ceil(nbytes/3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it divide by 4?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so: https://stackoverflow.com/questions/13378815/base64-length-calculation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I was confused by what nbytes
mean - I though it is stored data size. Never mind.
if value is None: | ||
return None | ||
if not isinstance(value, bytes): | ||
raise TypeError(f"Base64Bytes fields require 'bytes' values; got {value}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe type(value)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll include both the value and its type.
e7124ac
to
40a530b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think latest associate()
update should be OK for sqlite, for non-sqlite backends we probably need different strategy.
ref.datasetType, ref.dataId, collection | ||
) | ||
) | ||
# If the same Dataset is already in this collection, do nothing. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose this (select+check+insert) will work OK for SQLite because SQLite locks the whole database for the duration of transaction. For other backends which support concurrent transactions I think there is a race condition - if two clients do select
at the same instant they both get empty set and will try to insert (one of them will fail).
I think more or less portable strategy for this kind of update is to just try to insert+commit and analyse DuplicateKey error that could happen on commit.
Some backends support select ... for update
for row-level locking (and index gap locking) but it also needs special care so I would not recommend that as portable fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting - I think this indicates a fundamental misunderstanding by me on what transactions guarantee in general, which means I need to spend some time thinking about what those guarantees actually are.
I don't think "analyzing the DuplicateKey
error" approach can really be done portably, either, because we don't have any guarantee about what kind of information the error message might contain (and we need to know which constraint failed), so ultimately I think this (and several other Registry
methods) need to be left pure-abstract in SqlRegistry
so they must be implemented by DB-specific derived classes.
As I said on Jira, I think it's best not to do that now because (for now) a solution that's subject to race conditions is better than no solution at all. I'll be sure to time the future changes carefully with NCSA to make sure we don't have a regression in Oracle support at an awkward time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By analyzing I meant that you would need to select from database at that point, after failure. Relying on exception content is not possible of course.
"collection": collection} for ref in refs]) | ||
elif row.dataset_id != ref.id: | ||
# A different Dataset with this DatasetType and Data ID already | ||
# exist in this collection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exists
de1cc8d
to
e85c780
Compare
This has been fully supplanted by SqlRegistryDatabaseDict, which handles transactions better.
It seems the SQLite Python module doesn't actually begin transactions when you ask it to; this can be worked around via SQLAlchemy event listeners that emit BEGIN statements directly. Recipe is from the SQLAlchemy docs: https://docs.sqlalchemy.org/en/latest/dialects/sqlite.html In order to make that work, we also need to make sure all Registry operations go through a single connection (i.e. disable SQLAlchemy's connection pooling and don't call engine.connect() multiple times). That's actually the right thing to do given our usage pattern for all DBs.
This was always part of the SqlRegistry implementation, and should not have been considered part of the core butler package.
e85c780
to
a806dc4
Compare
No description provided.