Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrations: Implement bookkeeping #487

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
514a3d0
WIP: Implement `Bookkeeper` class for recording migrations
eecavanna Feb 19, 2024
525e25b
Simplify schema version indicator document in Mongo view
eecavanna Feb 19, 2024
0a72a8d
Standardize terminology
eecavanna Feb 21, 2024
386d3ba
Refrain from relying on `StrEnum` type introduced in Python 3.11
eecavanna Feb 21, 2024
7ef1762
Add comment demonstrating timestamp format
eecavanna Feb 21, 2024
aa565ee
Create "Stakeholders" list, which migration notebooks can reference
eecavanna Feb 29, 2024
1fa8aa5
WIP: Start creating an example migration notebook
eecavanna Mar 1, 2024
fba09c5
Merge branch 'main' into 432-migrations-implement-bookkeeping-so-data…
eecavanna Mar 5, 2024
fde50d3
Continue implementing the example migration notebook
eecavanna Mar 5, 2024
831677c
Update example migration notebook to use a `Bookkeeper`
eecavanna Mar 5, 2024
941add8
Update bookkeeper to fall back to target version defined in migrator
eecavanna Mar 5, 2024
99de020
Extract `Bookkeeper` class definition into own module
eecavanna Mar 5, 2024
6099895
Clarify `import` statements
eecavanna Mar 5, 2024
047ad80
style: reformat
invalid-email-address Mar 5, 2024
babdb95
Reformat code to avoid "obfuscation" by `black`
eecavanna Mar 5, 2024
8ae73ff
Add unit tests targeting the `Config` class
eecavanna Mar 5, 2024
c021d08
Add unit test targeting part of `Bookkeeper` class
eecavanna Mar 5, 2024
f58807a
Check correct variable
eecavanna Mar 5, 2024
d597104
Add unit tests targeting the core bookkeeper functionality
eecavanna Mar 5, 2024
741803a
Test the Mongo view more thoroughly
eecavanna Mar 5, 2024
9e892c5
Use absolute imports instead of relative ones
eecavanna Mar 5, 2024
d29271c
Get Mongo connection parameters from environment variables
eecavanna Mar 5, 2024
c61e127
Get database name from environment variable
eecavanna Mar 5, 2024
169ee0b
Use same environment variable name as in `.env.test`
eecavanna Mar 6, 2024
62d2fc5
Rename manual test file to opt out of automated testing
eecavanna Mar 6, 2024
c6fbc61
Fix typo in comment
eecavanna Mar 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions demo/metadata_migration/notebooks/bookkeeper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from typing import Optional
from enum import Enum
from datetime import datetime

from pymongo import MongoClient
from nmdc_schema.migrators.migrator_base import MigratorBase


class MigrationEvent(str, Enum):
r"""
Enumeration of all migration events that can be recorded.
Reference: https://docs.python.org/3.10/library/enum.html#others

>>> MigrationEvent.MIGRATION_COMPLETED.value
'MIGRATION_COMPLETED'
>>> MigrationEvent.MIGRATION_STARTED.value
'MIGRATION_STARTED'
"""
MIGRATION_STARTED = "MIGRATION_STARTED"
MIGRATION_COMPLETED = "MIGRATION_COMPLETED"


class Bookkeeper:
r"""
A class you can use to record migration-related events in a Mongo database.
"""
def __init__(
self,
mongo_client: MongoClient,
database_name: str = "nmdc",
collection_name: str = "_migration_events",
view_name: str = "_migration_latest_schema_version",
):
self.database_name = database_name
self.collection_name = collection_name
self.view_name = view_name

# Store references to the database and collection.
self.db = mongo_client[self.database_name]
self.collection = self.db.get_collection(self.collection_name)

# Ensure the MongoDB view that indicates the current schema version, exists.
self.ensure_current_schema_version_view_exists()

@staticmethod
def get_current_timestamp() -> str:
r"""Returns an ISO 8601 timestamp (string) representing the current time in UTC."""
utc_now = datetime.utcnow()
iso_utc_now = utc_now.isoformat()
return iso_utc_now # e.g. "2024-02-21T04:31:03.115107"

def record_migration_event(
self,
migrator: MigratorBase,
event: MigrationEvent,
to_schema_version: Optional[str] = None,
) -> None:
r"""
Records a migration event in the collection.

The `to_schema_version` parameter is independent of the `migrator` parameter because, even though the migrator
does have a `.get_destination_version()` method, the string returned by that method is the one defined when the
migrator was _written_, which is sometimes more generic than the one used for data validation when the migrator
is _run_ (e.g. "1.2" as opposed to "1.2.3"). So, this method provides a means by which the calling code can,
optionally, specify a more precise version identifier.
"""

# If a custom schema version identifier was specified, use that; otherwise, use the one built into the migrator.
to_schema_version_str = migrator.get_destination_version()
if to_schema_version is not None:
to_schema_version_str = to_schema_version

document = dict(
created_at=self.get_current_timestamp(),
event=event.value,
from_schema_version=migrator.get_origin_version(),
to_schema_version=to_schema_version_str,
migrator_module=migrator.__module__, # name of the Python module in which the `Migrator` class is defined
)
self.collection.insert_one(document)

def ensure_current_schema_version_view_exists(self) -> None:
r"""
Ensures the MongoDB view that indicates the current schema version, exists.

References:
- https://www.mongodb.com/community/forums/t/is-there-anyway-to-create-view-using-python/161363/2
- https://www.mongodb.com/docs/manual/reference/method/db.createView/#mongodb-method-db.createView
- https://pymongo.readthedocs.io/en/stable/api/pymongo/database.html#pymongo.database.Database.create_collection
"""
if (
self.view_name not in self.db.list_collection_names()
): # returns list of names of both collections and views
agg_pipeline = [
# Sort the documents so the most recent one is first.
{"$sort": {"created_at": -1}},
# Only preserve the first document.
{"$limit": 1},
# Derive a simplified document for the view.
# Examples: `{ "schema_version": "1.2.3" }` or `{ "schema_version": null }`
{
"$project": {
"_id": 0, # omit the `_id` field
"schema_version": { # add this field based upon the migration status
"$cond": {
"if": {
"$eq": [
"$event",
MigrationEvent.MIGRATION_COMPLETED.value,
]
},
"then": "$to_schema_version", # database conforms to this version of the NMDC Schema
"else": None, # database doesn't necessarily conform to any version of the NMDC Schema
}
},
}
},
]
self.db.create_collection(
name=self.view_name,
viewOn=self.collection_name,
pipeline=agg_pipeline,
check_exists=True, # only create the view if it doesn't already exist
comment="The version of the NMDC Schema to which the database conforms.",
)
186 changes: 186 additions & 0 deletions demo/metadata_migration/notebooks/manual_test_bookkeeper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# Note: This module's name starts with `manual_test_` instead of `test_` to signify that its author (currently) expects
# people to run it manually as opposed to via automated test infrastructure. Its author chose that route after a
# GitHub Actions workflow that runs tests in `test_` modules kept failing due to a database access issue.

from typing import Optional
import unittest
import re
import os
from datetime import datetime, timedelta

from pymongo import MongoClient, timeout
from pymongo.database import Database
from nmdc_schema.migrators.migrator_base import MigratorBase

from demo.metadata_migration.notebooks.bookkeeper import Bookkeeper, MigrationEvent

# Consume environment variables.
MONGO_HOST: str = os.getenv("MONGO_HOST", "localhost")
MONGO_USER: Optional[str] = os.getenv("MONGO_USERNAME", None)
MONGO_PASS: Optional[str] = os.getenv("MONGO_PASSWORD", None)
MONGO_DATABASE_NAME: str = os.getenv("MONGO_TEST_DBNAME", "test-migration-bookkeeper")

MONGO_TIMEOUT_DURATION: int = 3 # seconds


class FakeMigrator(MigratorBase):
_to_version = "A.B.C"
_from_version = "X.Y.Z"

def upgrade(self):
pass


class TestBookkeeper(unittest.TestCase):
r"""
Tests targeting the `Bookkeeper` class.

You can format this file like this:
$ python -m black demo/metadata_migration/notebooks/manual_test_bookkeeper.py

You can start up a containerized MongoDB server like this:
$ docker run --rm --detach --name mongo-test-migration-bookkeeper -p 27017:27017 mongo

One that's running, other containers will be able to access it via:
- host.docker.internal:27017

You can run these tests like this:
$ python -m unittest -v demo/metadata_migration/notebooks/manual_test_bookkeeper.py

Reference: https://docs.python.org/3/library/unittest.html#basic-example
"""

mongo_client: Optional[MongoClient] = None
db: Optional[Database] = None

def setUp(self) -> None:
r"""
Connects to the MongoDB server and gets a reference to the database.

Note: This function runs before each test starts.
"""

# Connect to the MongoDB server and store a reference to the connection.
self.mongo_client = MongoClient(
host=MONGO_HOST,
username=MONGO_USER,
password=MONGO_PASS,
)
with timeout(MONGO_TIMEOUT_DURATION):
# Try connecting to the database server.
_ = self.mongo_client.server_info()
db = self.mongo_client[MONGO_DATABASE_NAME]

# Ensure the database contains no collections.
if len(db.list_collection_names()):
raise KeyError(f"Database is not empty: {MONGO_DATABASE_NAME}")

# Store a reference to the database.
self.db = db

def tearDown(self) -> None:
r"""
Drops all collections in the database and closes the connection to the MongoDB server.

Note: This function runs after each test finishes.
"""

# Drop all collections in the database.
for collection_name in self.db.list_collection_names():
self.db.drop_collection(collection_name)

# Close the connection to the server.
self.mongo_client.close()

def test_get_current_timestamp(self):
pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6}$"
ts_str = Bookkeeper.get_current_timestamp()

# Verify the timestamp is a string having a valid format.
self.assertIsInstance(ts_str, str)
self.assertTrue(re.match(pattern, ts_str))

# Verify the moment represented by the timestamp was within the past minute.
ts = datetime.fromisoformat(ts_str)
now_ts = datetime.now()
time_difference = now_ts - ts
self.assertLess(time_difference, timedelta(minutes=1))

def test_init_method(self):
# Confirm the view does not exist yet.
view_name = "test_view"
self.assertFalse(view_name in self.db.list_collection_names())

# Instantiate the class-under-test.
_ = Bookkeeper(
mongo_client=self.mongo_client,
database_name=MONGO_DATABASE_NAME,
view_name=view_name,
)

# Confirm the view exists now.
self.assertTrue(view_name in self.db.list_collection_names())

def test_record_migration_event(self):
# Instantiate a bookkeeper.
bk = Bookkeeper(
mongo_client=self.mongo_client,
database_name=MONGO_DATABASE_NAME,
)

# Verify the collection is empty.
collection = self.db.get_collection(bk.collection_name)
self.assertTrue(collection.count_documents({}) == 0)

# Record a "migration started" event.
migrator = FakeMigrator()
bk.record_migration_event(
migrator=migrator, event=MigrationEvent.MIGRATION_STARTED
)

# Verify the migration event was recorded.
self.assertTrue(collection.count_documents({}) == 1)
doc = collection.find({})[0]
self.assertIsInstance(doc["created_at"], str)
self.assertIsInstance(doc["event"], str)
self.assertEqual(doc["event"], MigrationEvent.MIGRATION_STARTED)
self.assertIsInstance(doc["from_schema_version"], str)
self.assertEqual(doc["from_schema_version"], migrator.get_origin_version())
self.assertIsInstance(doc["to_schema_version"], str)
self.assertEqual(doc["to_schema_version"], migrator.get_destination_version())
self.assertIsInstance(doc["migrator_module"], str)

# Verify the document in the view says the schema version is `null`.
# Note: That's what I expect, since no "migration complete" events have been recorded yet.
view = self.db.get_collection(bk.view_name)
self.assertTrue(view.count_documents({}) == 1)
view_doc = view.find({})[0]
self.assertIsNone(view_doc["schema_version"])

# Record a "migration completed" event.
bk.record_migration_event(
migrator=migrator, event=MigrationEvent.MIGRATION_COMPLETED
)

# Verify the migration event was recorded.
self.assertTrue(collection.count_documents({}) == 2)

# Verify the document in the view says the schema version matches the one recorded.
view = self.db.get_collection(bk.view_name)
self.assertTrue(view.count_documents({}) == 1)
view_doc = view.find({})[0]
self.assertEqual(view_doc["schema_version"], migrator.get_destination_version())

# Finally, record another "migration started" event.
bk.record_migration_event(
migrator=migrator, event=MigrationEvent.MIGRATION_STARTED
)

# Confirm the document in the view once again says the schema version is `null`.
self.assertTrue(view.count_documents({}) == 1)
view_doc = view.find({})[0]
self.assertIsNone(view_doc["schema_version"])


if __name__ == "__main__":
unittest.main()
Loading
Loading