Skip to content

Commit

Permalink
Support Unpickling CachedQueryResults Created By Py2 (#3234)
Browse files Browse the repository at this point in the history
This is a rabbit hole, as pickling across python versions is a terrible idea.

However, we can hack around a few of the major issues that come up. Start by adding a repro unit test that tries to load+depickle a value I got from the datastore admin tool for something written by the py2 app.

Next, we need to fix:
 - we changed the import path of models, so we'll need to rewrite those on the fly. We can do this with a custom `Unpickler`. See [this answer](https://stackoverflow.com/a/40916570).
 - bytestrings are handled totally differently between python 2 and 3, so we need to make sure that the py3 reader will unpickle using `encoding="bytes"`. We can do this in our custom unpickler too. Plus there's this whole thing about unpickling `datetimes`. See [this answer](https://stackoverflow.com/a/28218598).
 - Finally, there's a ndb incompatibility. The legacy app uses a [custom pickling format](https://github.com/GoogleCloudPlatform/datastore-ndb-python/blob/master/ndb/model.py#L2964-L2970) (a serialized protobuf), which it does by defining `__getstate__` and `__setstate` on the base `Model` class. Since the py3 compatible ndb library does not do this, unpickling them will fail. I filed googleapis/python-ndb#587 in hope of working with upstream to fix this, but if not, we can potentially hack around it ourselves by manually deserializing the protobuf and setting the field ourselves.


**WARNING**: After this, we should be able to read data written by the legacy app, but we can not yet write data that the legacy app can read. So after this, when we re-enable `CachedQueryResult`, we'll have to make sure that we don't write anything to the datastore.
  • Loading branch information
phil-lopreiato committed Jan 1, 2021
1 parent 4c04276 commit 10c7a61
Show file tree
Hide file tree
Showing 20 changed files with 988 additions and 2 deletions.
1 change: 1 addition & 0 deletions .flake8
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ exclude =
./subtrees/
./venv/
./node_modules/
./pyre
application_import_names = backend
import-order-style = edited

Expand Down
126 changes: 125 additions & 1 deletion src/backend/common/models/cached_model.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,22 @@
from typing import Dict, Optional, Set
import datetime
from typing import Any, Dict, Optional, Set

from google.cloud import ndb
from google.cloud.datastore.helpers import GeoPoint
from google.cloud.ndb._datastore_types import BlobKey
from google.cloud.ndb._legacy_entity_pb import (
EntityProto,
Property as ProtoProperty,
PropertyValue as ProtoPropertyValue,
)
from google.cloud.ndb.model import _CompressedValue


TAffectedReferences = Dict[str, Set[ndb.Key]]

_EPOCH = datetime.datetime.utcfromtimestamp(0)
_MEANING_URI_COMPRESSED = "ZLIB"


class CachedModel(ndb.Model):
"""
Expand Down Expand Up @@ -43,3 +55,115 @@ def __init__(self, *args, **kwargs):
# The initialization path is different for models vs those created via
# constructors, so make sure we have a common set of properties defined
self._fix_up_properties()

"""
From the legacy NDB model implementation:
```
def __getstate__(self):
return self._to_pb().Encode()
def __setstate__(self, serialized_pb):
pb = entity_pb.EntityProto(serialized_pb)
self.__init__()
self.__class__._from_pb(pb, set_key=False, ent=self)
```
See https://github.com/googleapis/python-ndb/issues/587 about fixing upstream
"""

def __setstate__(self, state: Any) -> None:
pb = EntityProto()
pb.MergePartialFromString(state)

self.__init__()
self._set_state_from_pb(pb)

def _set_state_from_pb(self, pb: EntityProto) -> None:
deserialized_props = {}
if len(pb.key().path().element_list()):
key_ref = pb.key()
app = key_ref.app().decode()
namespace = key_ref.name_space()
pairs = [
(elem.type().decode(), elem.id() or elem.name().decode())
for elem in key_ref.path().element_list()
]
deserialized_props["key"] = ndb.Key(
pairs=pairs, app=app, namespace=namespace
)

for pb_prop in pb.property_list():
prop_name = pb_prop.name().decode()

# There are some fields we did not port to py3, skip them
# StructuredProperty uses "." in field names, so skip those
if not hasattr(self, prop_name) and "." not in prop_name:
continue

prop_value = self._get_prop_value(pb_prop.value(), pb_prop)
if not hasattr(self, prop_name) and "." in prop_name:
supername, subname = prop_name.split(".", 1)
structprop = getattr(self.__class__, supername, None)
prop_type = structprop._model_class
if getattr(self, supername) is None:
self._set_attributes({supername: prop_type()})
getattr(self, supername)._set_attributes({subname: prop_value})

if pb_prop.multiple():
raise Exception("TODO multiple structured property")

continue

if pb_prop.multiple() and not isinstance(prop_value, list):
prop_value = [prop_value]
deserialized_props[prop_name] = prop_value
self._set_attributes(deserialized_props)

@staticmethod
def _get_prop_value(v: ProtoPropertyValue, p: ProtoProperty) -> Any:
# rougly based on https://github.com/GoogleCloudPlatform/datastore-ndb-python/blob/cf4cab3f1f69cd04e1a9229871be466b53729f3f/ndb/model.py#L2647
if v.has_stringvalue():
sval = v.stringvalue()
meaning = p.meaning()

if meaning == ProtoProperty.BLOBKEY:
sval = BlobKey(sval)
elif meaning == ProtoProperty.BLOB:
if p.meaning_uri() == _MEANING_URI_COMPRESSED:
sval = _CompressedValue(sval)
elif meaning == ProtoProperty.ENTITY_PROTO:
raise Exception("ENTITY_PROTO meaning implementation")
elif meaning != ProtoProperty.BYTESTRING:
try:
sval = sval.decode("ascii")
# If this passes, don't return unicode.
except UnicodeDecodeError:
try:
sval = str(sval.decode("utf-8"))
except UnicodeDecodeError:
pass
return sval
elif v.has_int64value():
ival = v.int64value()
if p.meaning() == ProtoProperty.GD_WHEN:
return _EPOCH + datetime.timedelta(microseconds=ival)
return ival
elif v.has_booleanvalue():
return bool(v.booleanvalue())
elif v.has_doublevalue():
return v.doublevalue()
elif v.has_referencevalue():
rv = v.referencevalue()
app = rv.app().decode()
namespace = rv.name_space()
pairs = [
(elem.type().decode(), elem.id() or elem.name().decode())
for elem in rv.pathelement_list()
]
return ndb.Key(pairs=pairs, app=app, namespace=namespace)
elif v.has_pointvalue():
pv = v.pointvalue()
return GeoPoint(pv.x(), pv.y())
else:
# A missing value implies null.
return None
46 changes: 45 additions & 1 deletion src/backend/common/models/cached_query_result.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,57 @@
import io
import pickle
from typing import Any

from google.cloud import ndb


class ImportFixingPickler(pickle.Pickler):
pass


class ImportFixingUnpickler(pickle.Unpickler):
def find_class(self, module, name):
renamed_module = module
if module.startswith("models."):
renamed_module = "backend.common." + module

return super().find_class(renamed_module, name)


class ImportFixingPickleProperty(ndb.BlobProperty):
def _to_base_type(self, value: Any) -> bytes:
"""Convert a value to the "base" value type for this property.
Args:
value (Any): The value to be converted.
Returns:
bytes: The pickled ``value``.
"""

file_obj = io.BytesIO()
ImportFixingPickler(file_obj, protocol=2, fix_imports=True).dump(value)
return file_obj.getvalue()

def _from_base_type(self, value: bytes) -> Any:
"""Convert a value from the "base" value type for this property.
Args:
value (bytes): The value to be converted.
Returns:
Any: The unpickled ``value``.
"""

file_obj = io.BytesIO(value)
return ImportFixingUnpickler(
file_obj, encoding="bytes", fix_imports=True
).load()


class CachedQueryResult(ndb.Model):
"""
A CachedQueryResult stores the result of an NDB query
"""

# Only one of result or result_dict should ever be populated for one model
result = ndb.PickleProperty(compressed=True) # Raw models
result = ImportFixingPickleProperty(compressed=True) # Raw models
result_dict = ndb.JsonProperty() # Dict version of models

created = ndb.DateTimeProperty(auto_now_add=True)
Expand Down

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions stubs/google/cloud/datastore/__init__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from google.cloud.datastore.batch import Batch as Batch
from google.cloud.datastore.client import Client as Client
from google.cloud.datastore.entity import Entity as Entity
from google.cloud.datastore.key import Key as Key
from google.cloud.datastore.query import Query as Query
from google.cloud.datastore.transaction import Transaction as Transaction
6 changes: 6 additions & 0 deletions stubs/google/cloud/datastore/_app_engine_key_pb2.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from google.protobuf import descriptor_pb2 as descriptor_pb2
from typing import Any

DESCRIPTOR: Any
Reference: Any
Path: Any
6 changes: 6 additions & 0 deletions stubs/google/cloud/datastore/_gapic.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from google.cloud._helpers import make_secure_channel as make_secure_channel
from google.cloud._http import DEFAULT_USER_AGENT as DEFAULT_USER_AGENT
from google.cloud.datastore_v1.gapic import datastore_client as datastore_client
from typing import Any

def make_datastore_api(client: Any): ...
20 changes: 20 additions & 0 deletions stubs/google/cloud/datastore/_http.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from google.cloud import exceptions as exceptions
from google.rpc import status_pb2 as status_pb2
from typing import Any, Optional

DATASTORE_API_HOST: str
API_BASE_URL: Any
API_VERSION: str
API_URL_TEMPLATE: str

def build_api_url(project: Any, method: Any, base_url: Any): ...

class HTTPDatastoreAPI:
client: Any = ...
def __init__(self, client: Any) -> None: ...
def lookup(self, project_id: Any, keys: Any, read_options: Optional[Any] = ...): ...
def run_query(self, project_id: Any, partition_id: Any, read_options: Optional[Any] = ..., query: Optional[Any] = ..., gql_query: Optional[Any] = ...): ...
def begin_transaction(self, project_id: Any, transaction_options: Optional[Any] = ...): ...
def commit(self, project_id: Any, mode: Any, mutations: Any, transaction: Optional[Any] = ...): ...
def rollback(self, project_id: Any, transaction: Any): ...
def allocate_ids(self, project_id: Any, keys: Any): ...
19 changes: 19 additions & 0 deletions stubs/google/cloud/datastore/batch.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from google.cloud.datastore import helpers as helpers
from typing import Any, Optional

class Batch:
def __init__(self, client: Any) -> None: ...
def current(self): ...
@property
def project(self): ...
@property
def namespace(self): ...
@property
def mutations(self): ...
def put(self, entity: Any) -> None: ...
def delete(self, key: Any) -> None: ...
def begin(self) -> None: ...
def commit(self, retry: Optional[Any] = ..., timeout: Optional[Any] = ...) -> None: ...
def rollback(self) -> None: ...
def __enter__(self): ...
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: ...
43 changes: 43 additions & 0 deletions stubs/google/cloud/datastore/client.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from google.api_core.gapic_v1 import client_info as client_info
from google.auth.credentials import AnonymousCredentials as AnonymousCredentials
from google.cloud.client import ClientWithProject as ClientWithProject
from google.cloud.datastore import helpers as helpers
from google.cloud.datastore._gapic import make_datastore_api as make_datastore_api
from google.cloud.datastore._http import HTTPDatastoreAPI as HTTPDatastoreAPI
from google.cloud.datastore.batch import Batch as Batch
from google.cloud.datastore.entity import Entity as Entity
from google.cloud.datastore.key import Key as Key
from google.cloud.datastore.query import Query as Query
from google.cloud.datastore.transaction import Transaction as Transaction
from typing import Any, Optional

DATASTORE_EMULATOR_HOST: str
DATASTORE_DATASET: str
DISABLE_GRPC: str

class Client(ClientWithProject):
SCOPE: Any = ...
namespace: Any = ...
def __init__(self, project: Optional[Any] = ..., namespace: Optional[Any] = ..., credentials: Optional[Any] = ..., client_info: Any = ..., client_options: Optional[Any] = ..., _http: Optional[Any] = ..., _use_grpc: Optional[Any] = ...) -> None: ...
@property
def base_url(self): ...
@base_url.setter
def base_url(self, value: Any) -> None: ...
@property
def current_batch(self): ...
@property
def current_transaction(self): ...
def get(self, key: Any, missing: Optional[Any] = ..., deferred: Optional[Any] = ..., transaction: Optional[Any] = ..., eventual: bool = ..., retry: Optional[Any] = ..., timeout: Optional[Any] = ...): ...
def get_multi(self, keys: Any, missing: Optional[Any] = ..., deferred: Optional[Any] = ..., transaction: Optional[Any] = ..., eventual: bool = ..., retry: Optional[Any] = ..., timeout: Optional[Any] = ...): ...
def put(self, entity: Any, retry: Optional[Any] = ..., timeout: Optional[Any] = ...) -> None: ...
def put_multi(self, entities: Any, retry: Optional[Any] = ..., timeout: Optional[Any] = ...) -> None: ...
def delete(self, key: Any, retry: Optional[Any] = ..., timeout: Optional[Any] = ...) -> None: ...
def delete_multi(self, keys: Any, retry: Optional[Any] = ..., timeout: Optional[Any] = ...) -> None: ...
def allocate_ids(self, incomplete_key: Any, num_ids: Any, retry: Optional[Any] = ..., timeout: Optional[Any] = ...): ...
def key(self, *path_args: Any, **kwargs: Any): ...
def batch(self): ...
def transaction(self, **kwargs: Any): ...
def query(self, **kwargs: Any): ...
def reserve_ids_sequential(self, complete_key: Any, num_ids: Any, retry: Optional[Any] = ..., timeout: Optional[Any] = ...) -> None: ...
def reserve_ids(self, complete_key: Any, num_ids: Any, retry: Optional[Any] = ..., timeout: Optional[Any] = ...): ...
def reserve_ids_multi(self, complete_keys: Any, retry: Optional[Any] = ..., timeout: Optional[Any] = ...) -> None: ...
12 changes: 12 additions & 0 deletions stubs/google/cloud/datastore/entity.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from typing import Any, Optional

class Entity(dict):
key: Any = ...
exclude_from_indexes: Any = ...
def __init__(self, key: Optional[Any] = ..., exclude_from_indexes: Any = ...) -> None: ...
def __eq__(self, other: Any) -> Any: ...
def __ne__(self, other: Any) -> Any: ...
@property
def kind(self): ...
@property
def id(self): ...
19 changes: 19 additions & 0 deletions stubs/google/cloud/datastore/helpers.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from google.cloud.datastore.entity import Entity as Entity
from google.cloud.datastore.key import Key as Key
from google.cloud.datastore_v1.proto import datastore_pb2 as datastore_pb2, entity_pb2 as entity_pb2
from google.protobuf import struct_pb2 as struct_pb2
from google.type import latlng_pb2 as latlng_pb2
from typing import Any

def entity_from_protobuf(pb: Any): ...
def entity_to_protobuf(entity: Any): ...
def get_read_options(eventual: Any, transaction_id: Any): ...
def key_from_protobuf(pb: Any): ...

class GeoPoint:
latitude: Any = ...
longitude: Any = ...
def __init__(self, latitude: Any, longitude: Any) -> None: ...
def to_protobuf(self): ...
def __eq__(self, other: Any) -> Any: ...
def __ne__(self, other: Any) -> Any: ...
32 changes: 32 additions & 0 deletions stubs/google/cloud/datastore/key.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import Any, Optional

class Key:
def __init__(self, *path_args: Any, **kwargs: Any) -> None: ...
def __eq__(self, other: Any) -> Any: ...
def __ne__(self, other: Any) -> Any: ...
def __hash__(self) -> Any: ...
def completed_key(self, id_or_name: Any): ...
def to_protobuf(self): ...
def to_legacy_urlsafe(self, location_prefix: Optional[Any] = ...): ...
@classmethod
def from_legacy_urlsafe(cls, urlsafe: Any): ...
@property
def is_partial(self): ...
@property
def namespace(self): ...
@property
def path(self): ...
@property
def flat_path(self): ...
@property
def kind(self): ...
@property
def id(self): ...
@property
def name(self): ...
@property
def id_or_name(self): ...
@property
def project(self): ...
@property
def parent(self): ...

0 comments on commit 10c7a61

Please sign in to comment.