From 65a4d71ed4f19c98a78b4aacbd55bbe3edc0d865 Mon Sep 17 00:00:00 2001 From: "Cody J. Hanson" Date: Mon, 26 Sep 2022 13:52:00 -0500 Subject: [PATCH 1/8] Correct migration path for state table --- src/meltano/core/job_state.py | 10 +- src/meltano/migrations/db.lock | 2 +- ...28cc5b1a4f_create_dedicated_state_table.py | 367 ++++++++++++++++++ ...a9492f_create_dedicated_job_state_table.py | 5 + 4 files changed, 378 insertions(+), 6 deletions(-) create mode 100644 src/meltano/migrations/versions/6828cc5b1a4f_create_dedicated_state_table.py diff --git a/src/meltano/core/job_state.py b/src/meltano/core/job_state.py index 1157bd073c..fa5a658f8c 100644 --- a/src/meltano/core/job_state.py +++ b/src/meltano/core/job_state.py @@ -26,7 +26,7 @@ class JobState(SystemModel): """ __tablename__ = "state" - job_name = Column(types.String, unique=True, primary_key=True, nullable=False) + state_id = Column(types.String, unique=True, primary_key=True, nullable=False) updated_at = Column(types.DATETIME, onupdate=datetime.now) @@ -34,12 +34,12 @@ class JobState(SystemModel): completed_state = Column(MutableDict.as_mutable(JSONEncodedDict)) @classmethod - def from_job_history(cls, session: Session, job_name: str): + def from_job_history(cls, session: Session, state_id: str): """Build JobState from job run history. Args: session: the session to use in finding job history - job_name: the name of the job to build JobState for + state_id: state_id to build JobState for Returns: JobState built from job run history @@ -47,7 +47,7 @@ def from_job_history(cls, session: Session, job_name: str): completed_state: dict[Any, Any] = {} partial_state: dict[Any, Any] = {} incomplete_since = None - finder = JobFinder(job_name) + finder = JobFinder(state_id) # Get the state for the most recent completed job. # Do not consider dummy jobs create via add_state. @@ -69,7 +69,7 @@ def from_job_history(cls, session: Session, job_name: str): partial_state = merge(incomplete_state_job.payload, partial_state) return cls( - job_name=job_name, + state_id=state_id, partial_state=partial_state, completed_state=completed_state, ) diff --git a/src/meltano/migrations/db.lock b/src/meltano/migrations/db.lock index f5d999e631..3d9a8d6eea 100644 --- a/src/meltano/migrations/db.lock +++ b/src/meltano/migrations/db.lock @@ -1 +1 @@ -f4c225a9492f +6828cc5b1a4f diff --git a/src/meltano/migrations/versions/6828cc5b1a4f_create_dedicated_state_table.py b/src/meltano/migrations/versions/6828cc5b1a4f_create_dedicated_state_table.py new file mode 100644 index 0000000000..2a3f95d7e7 --- /dev/null +++ b/src/meltano/migrations/versions/6828cc5b1a4f_create_dedicated_state_table.py @@ -0,0 +1,367 @@ +"""Create dedicated state table + +Revision ID: 6828cc5b1a4f +Revises: 5b43800443d1 +Create Date: 2022-09-26 12:47:53.512069 + +""" +from __future__ import annotations + +import uuid +from datetime import datetime, timedelta +from enum import Enum + +import sqlalchemy as sa +from alembic import op +from sqlalchemy import Column, MetaData, func, types +from sqlalchemy.engine.reflection import Inspector +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.ext.mutable import MutableDict +from sqlalchemy.orm.session import Session + +from meltano.core.sqlalchemy import GUID, IntFlag, JSONEncodedDict +from meltano.core.utils import merge +from meltano.migrations.utils.dialect_typing import ( + get_dialect_name, + max_string_length_for_dialect, +) + +# revision identifiers, used by Alembic. +revision = "6828cc5b1a4f" +down_revision = ("5b43800443d1", "f4c225a9492f") +branch_labels = None +depends_on = None + +SystemMetadata = MetaData() +SystemModel = declarative_base(metadata=SystemMetadata) + +# Copied from core/job_state.py +class JobState(SystemModel): + """Model class that represents the current state of a given job. + + Modified during `meltano elt` or `meltano run` runs whenever a + STATE message is emitted by a Singer target. Also written and read + by `meltano state` CLI invocations. Only holds the _current_ state + for a given job_name. Full job run history is held by the Job model. + """ + + __tablename__ = "state" + state_id = Column(types.String, unique=True, primary_key=True, nullable=False) + + updated_at = Column( + types.TIMESTAMP, server_default=func.now(), onupdate=func.current_timestamp() + ) + + partial_state = Column(MutableDict.as_mutable(JSONEncodedDict)) + completed_state = Column(MutableDict.as_mutable(JSONEncodedDict)) + + @classmethod + def from_job_history(cls, session: Session, state_id: str): + """Build JobState from job run history. + + Args: + session: the session to use in finding job history + job_name: the name of the job to build JobState for + + Returns: + JobState built from job run history + """ + completed_state = {} + partial_state = {} + incomplete_since = None + finder = JobFinder(state_id) + + # Get the state for the most recent completed job. + # Do not consider dummy jobs create via add_state. + state_job = finder.latest_with_payload(session, flags=Payload.STATE) + if state_job: + incomplete_since = state_job.ended_at + if "singer_state" in state_job.payload: + merge(state_job.payload, partial_state) + + # If there have been any incomplete jobs since the most recent completed jobs, + # merge the state emitted by those jobs into the state for the most recent + # completed job. If there are no completed jobs, get the full history of + # incomplete jobs and use the most recent state emitted per stream + incomplete_state_jobs = finder.with_payload( + session, flags=Payload.INCOMPLETE_STATE, since=incomplete_since + ) + for incomplete_state_job in incomplete_state_jobs: + if "singer_state" in incomplete_state_job.payload: + partial_state = merge(incomplete_state_job.payload, partial_state) + return cls( + state_id=state_id, + partial_state=partial_state, + completed_state=completed_state, + ) + + +# Copied from core/job/finder.py +class JobFinder: + """Query builder for the `Job` model for a certain `elt_uri`.""" + + def __init__(self, state_id: str): + """Initialize the JobFinder. + + Args: + state_id: the state_id to build queries for. + """ + self.state_id = state_id + + def latest(self, session): + """Get the latest state for this instance's state ID. + + Args: + session: the session to use in querying the db + + Returns: + The latest state for this instance's state ID + """ + return ( + session.query(Job) + .filter(Job.job_name == self.state_id) + .order_by(Job.started_at.desc()) + .first() + ) + + def successful(self, session): + """Get all successful jobs for this instance's state ID. + + Args: + session: the session to use in querying the db + + Returns: + All successful jobs for this instance's state ID + """ + return session.query(Job).filter( + (Job.job_name == self.state_id) # noqa: WPS465 + & (Job.state == State.SUCCESS) # noqa: WPS465 + & Job.ended_at.isnot(None) + ) + + def running(self, session): + """Find states in the running state. + + Args: + session: the session to use in querying the db + + Returns: + All runnings states for state_id. + """ + return session.query(Job).filter( + (Job.job_name == self.state_id) # noqa: WPS465 + & (Job.state == State.RUNNING) + ) + + def latest_success(self, session): + """Get the latest successful state for this instance's state ID. + + Args: + session: the session to use in querying the db + + Returns: + The latest successful state for this instance's state ID + """ + return self.successful(session).order_by(Job.ended_at.desc()).first() + + def latest_running(self, session): + """Find the most recent state in the running state, if any. + + Args: + session: the session to use in querying the db + + Returns: + The latest running state for this instance's state ID + """ + return self.running(session).order_by(Job.started_at.desc()).first() + + def with_payload(self, session, flags=0, since=None, state=None): + """Get all states for this instance's state ID matching the given args. + + Args: + session: the session to use in querying the db + flags: only return states with these flags + since: only return states which ended after this time + state: only include states with state matching this state + + Returns: + All states matching these args. + """ + query = ( + session.query(Job) + .filter( + (Job.job_name == self.state_id) # noqa: WPS465 + & (Job.payload_flags != 0) # noqa: WPS465 + & (Job.payload_flags.op("&")(flags) == flags) # noqa: WPS465 + & Job.ended_at.isnot(None) + ) + .order_by(Job.ended_at.asc()) + ) + + if since: + query = query.filter(Job.ended_at > since) + if state: + query = query.filter(Job.state == state) + return query + + def latest_with_payload(self, session, **kwargs): + """Return the latest state matching the given kwargs. + + Args: + session: the session to use to query the db + kwargs: keyword args to pass to with_payload + + Returns: + The most recent state returned by with_payload(kwargs) + """ + return ( + self.with_payload(session, **kwargs) + .order_by(None) # Reset ascending order + .order_by(Job.ended_at.desc()) + .first() + ) + + @classmethod + def all_stale(cls, session): + """Return all stale states. + + Args: + session: the session to use to query the db + + Returns: + All stale states with any state ID + """ + now = datetime.utcnow() + last_valid_heartbeat_at = now - timedelta(minutes=HEARTBEAT_VALID_MINUTES) + last_valid_started_at = now - timedelta(hours=HEARTBEATLESS_JOB_VALID_HOURS) + + return session.query(Job).filter( + (Job.state == State.RUNNING) # noqa: WPS465 + & ( + ( + Job.last_heartbeat_at.isnot(None) # noqa: WPS465 + & (Job.last_heartbeat_at < last_valid_heartbeat_at) + ) + | ( + Job.last_heartbeat_at.is_(None) # noqa: WPS465 + & (Job.started_at < last_valid_started_at) + ) + ) + ) + + def stale(self, session): + """Return stale states with the instance's state ID. + + Args: + session: the session to use in querying the db + + Returns: + All stale states with instance's state ID + """ + return self.all_stale(session).filter(Job.job_name == self.state_id) + + def get_all(self, session: object, since=None): + """Return all state with the instance's state ID. + + Args: + session: the session to use in querying the db + since: only return state which ended after this datetime + + Returns: + All state with instance's state ID which ended after 'since' + """ + query = ( + session.query(Job) + .filter(Job.job_name == self.state_id) + .order_by(Job.ended_at.asc()) + ) + + if since: + query = query.filter(Job.ended_at > since) + return query + + +# Copied from core/job/job.py +HEARTBEATLESS_JOB_VALID_HOURS = 24 +HEARTBEAT_VALID_MINUTES = 5 + + +class State(Enum): + """Represents status of a Job.""" + + IDLE = (0, ("RUNNING", "FAIL")) + RUNNING = (1, ("SUCCESS", "FAIL")) + SUCCESS = (2, ()) + FAIL = (3, ("RUNNING",)) + DEAD = (4, ()) + STATE_EDIT = (5, ()) + + +class Payload(IntFlag): + """Flag indicating whether a Job has state in its payload field.""" + + STATE = 1 + INCOMPLETE_STATE = 2 + + +class Job(SystemModel): # noqa: WPS214 + """Model class that represents a `meltano elt` run in the system database. + + Includes State.STATE_EDIT rows which represent CLI invocations of the + `meltano state` command which wrote state to the db. Queries that are + meant to return only actual job runs should filter out records with + state == State.STATE_EDIT. + """ + + __tablename__ = "runs" + + id = Column(types.Integer, primary_key=True) + job_name = Column(types.String(1024)) + run_id = Column(GUID, nullable=False, default=uuid.uuid4) + _state = Column(name="state", type_=types.String(10)) + started_at = Column(types.DateTime) + last_heartbeat_at = Column(types.DateTime) + ended_at = Column(types.DateTime) + payload = Column(MutableDict.as_mutable(JSONEncodedDict)) + payload_flags = Column(IntFlag, default=0) + trigger = Column(types.String, default="") + + +def upgrade(): + # Create state table + dialect_name = get_dialect_name() + max_string_length = max_string_length_for_dialect(dialect_name) + conn = op.get_bind() + inspector = Inspector.from_engine(conn) + if "state" in inspector.get_table_names(): + + op.drop_table("state") + op.create_table( + "state", + sa.Column("state_id", sa.String(900), nullable=False), + sa.Column( + "partial_state", + MutableDict.as_mutable(JSONEncodedDict(max_string_length)), + ), + sa.Column( + "completed_state", + MutableDict.as_mutable(JSONEncodedDict(max_string_length)), + ), + sa.Column( + "updated_at", + sa.types.TIMESTAMP if dialect_name == "postgresql" else sa.types.DATETIME, + onupdate=datetime.now, + ), + sa.PrimaryKeyConstraint("state_id"), + sa.UniqueConstraint("state_id"), + ) + session = Session(bind=conn) + for state_id in set([job_run.job_name for job_run in session.query(Job).all()]): + session.add(JobState.from_job_history(session, state_id)) + session.commit() + + +def downgrade(): + # Remove job_state table + # Job run history is still maintained, so no need to copy + op.drop_table("state") diff --git a/src/meltano/migrations/versions/f4c225a9492f_create_dedicated_job_state_table.py b/src/meltano/migrations/versions/f4c225a9492f_create_dedicated_job_state_table.py index ffbe3399f0..27769db01c 100644 --- a/src/meltano/migrations/versions/f4c225a9492f_create_dedicated_job_state_table.py +++ b/src/meltano/migrations/versions/f4c225a9492f_create_dedicated_job_state_table.py @@ -14,6 +14,7 @@ import sqlalchemy as sa from alembic import op from sqlalchemy import Column, MetaData, func, types +from sqlalchemy.engine.reflection import Inspector from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.mutable import MutableDict from sqlalchemy.orm.session import Session @@ -371,6 +372,10 @@ def upgrade(): # Create state table dialect_name = get_dialect_name() max_string_length = max_string_length_for_dialect(dialect_name) + conn = op.get_bind() + inspector = Inspector.from_engine(conn) + if "state" in inspector.get_table_names(): + return op.create_table( "state", sa.Column("job_name", sa.String(900), nullable=False), From f34e71c1e06054d384467145740225015356ef23 Mon Sep 17 00:00:00 2001 From: "Cody J. Hanson" Date: Mon, 26 Sep 2022 14:29:49 -0500 Subject: [PATCH 2/8] job name -> state id --- src/meltano/core/job_state.py | 2 +- src/meltano/core/state_service.py | 8 ++-- src/meltano/core/state_store.py | 74 +++++++++++++++---------------- 3 files changed, 42 insertions(+), 42 deletions(-) diff --git a/src/meltano/core/job_state.py b/src/meltano/core/job_state.py index fa5a658f8c..6050385fda 100644 --- a/src/meltano/core/job_state.py +++ b/src/meltano/core/job_state.py @@ -22,7 +22,7 @@ class JobState(SystemModel): Modified during `meltano elt` or `meltano run` runs whenever a STATE message is emitted by a Singer target. Also written and read by `meltano state` CLI invocations. Only holds the _current_ state - for a given job_name. Full job run history is held by the Job model. + for a given state_id. Full job run history is held by the Job model. """ __tablename__ = "state" diff --git a/src/meltano/core/state_service.py b/src/meltano/core/state_service.py index 39d9cb0e27..fdf01abebb 100644 --- a/src/meltano/core/state_service.py +++ b/src/meltano/core/state_service.py @@ -49,8 +49,8 @@ def list_state(self, state_id_pattern: str | None = None): A dict with state_ids as keys and state payloads as values. """ return { - job_name: self.get_state(job_name) - for job_name in self.state_store_manager.get_job_names(state_id_pattern) + state_id: self.get_state(state_id) + for state_id in self.state_store_manager.get_state_ids(state_id_pattern) } def _get_or_create_job(self, job: Job | str) -> Job: @@ -126,7 +126,7 @@ def add_state( f"Added to state {state_to_add_to.job_name} state payload {new_state_dict}" ) self.state_store_manager.set( - job_name=state_to_add_to.job_name, + state_id=state_to_add_to.state_id, state=json.dumps(new_state_dict), complete=(payload_flags == Payload.STATE), ) @@ -140,7 +140,7 @@ def get_state(self, state_id: str): Returns: Dict representing state that would be used in the next run. """ - return self.state_store_manager.get(job_name=state_id) + return self.state_store_manager.get(state_id=state_id) def set_state(self, state_id: str, new_state: str | None, validate: bool = True): """Set the state for the state_id. diff --git a/src/meltano/core/state_store.py b/src/meltano/core/state_store.py index 92f8b38f36..f88bc011a2 100644 --- a/src/meltano/core/state_store.py +++ b/src/meltano/core/state_store.py @@ -23,11 +23,11 @@ def __init__(self, **kwargs): ... @abstractmethod - def set(self, job_name: str, state: str, complete: bool): - """Set the job state for the given job_name. + def set(self, state_id: str, state: str, complete: bool): + """Set the job state for the given state_id. Args: - job_name: the name of the job to set state for. + state_id: the name of the job to set state for. state: the state to set. complete: true if the state being set is for a complete run, false if partial @@ -37,11 +37,11 @@ def set(self, job_name: str, state: str, complete: bool): ... @abstractmethod - def get(self, job_name): - """Get the job state for the given job_name. + def get(self, state_id): + """Get the job state for the given state_id. Args: - job_name: the name of the job to get state for. + state_id: the name of the job to get state for. Raises: NotImplementedError: always, this is an abstract method @@ -49,17 +49,17 @@ def get(self, job_name): ... @abstractmethod - def clear(self, job_name): - """Clear state for the given job_name. + def clear(self, state_id): + """Clear state for the given state_id. Args: - job_name: the job name to clear state for + state_id: the state_id to clear state for """ ... @abstractmethod - def get_job_names(self, pattern=None): - """Get all job names available in this state store manager. + def get_state_ids(self, pattern=None): + """Get all state_ids available in this state store manager. Args: pattern: glob-style pattern to filter by @@ -67,20 +67,20 @@ def get_job_names(self, pattern=None): ... @abstractmethod - def acquire_lock(self, job_name): + def acquire_lock(self, state_id): """Acquire a naive lock for the given job's state. Args: - job_name: the job name to lock + state_id: the state_id to lock """ ... @abstractmethod - def release_lock(self, job_name): + def release_lock(self, state_id): """Release lock for given job's state. Args: - job_name: the job name to unlock + state_id: the state_id to unlock """ ... @@ -99,16 +99,16 @@ def __init__(self, *args, session: Session, **kwargs): super().__init__(*args, **kwargs) self.session = session - def set(self, job_name: str, state: str, complete: bool) -> None: - """Set the job state for the given job_name. + def set(self, state_id: str, state: str, complete: bool) -> None: + """Set the job state for the given state_id. Args: - job_name: the name of the job to set state for. + state_id: the name of the job to set state for. state: the state to set. complete: true if the state being set is for a complete run, false if partial """ existing_job_state = ( - self.session.query(JobState).filter(JobState.job_name == job_name).first() + self.session.query(JobState).filter(JobState.state_id == state_id).first() ) partial_state = {} if complete else json.loads(state) completed_state = json.loads(state) if complete else {} @@ -118,7 +118,7 @@ def set(self, job_name: str, state: str, complete: bool) -> None: if not complete: completed_state = existing_job_state.completed_state new_job_state = JobState( - job_name=job_name, + state_id=state_id, partial_state=partial_state, completed_state=completed_state, ) @@ -127,17 +127,17 @@ def set(self, job_name: str, state: str, complete: bool) -> None: self.session.add(new_job_state) self.session.commit() - def get(self, job_name): - """Get the job state for the given job_name. + def get(self, state_id): + """Get the job state for the given state_id. Args: - job_name: the name of the job to get state for + state_id: the name of the job to get state for Returns: The current state for the given job """ job_state: JobState | None = ( - self.session.query(JobState).filter(JobState.job_name == job_name).first() + self.session.query(JobState).filter(JobState.state_id == state_id).first() ) return ( merge(job_state.partial_state, job_state.completed_state) @@ -145,21 +145,21 @@ def get(self, job_name): else {} ) - def clear(self, job_name): - """Clear state for the given job_name. + def clear(self, state_id): + """Clear state for the given state_id. Args: - job_name: the job name to clear state for + state_id: the state_id to clear state for """ job_state: JobState | None = ( - self.session.query(JobState).filter(JobState.job_name == job_name).first() + self.session.query(JobState).filter(JobState.state_id == state_id).first() ) if job_state: self.session.delete(job_state) self.session.commit() - def get_job_names(self, pattern: str | None = None): - """Get all job names available in this state store manager. + def get_state_ids(self, pattern: str | None = None): + """Get all state_ids available in this state store manager. Args: pattern: glob-style pattern to filter by @@ -169,34 +169,34 @@ def get_job_names(self, pattern: str | None = None): """ if pattern: return ( - job_state.job_name + job_state.state_id for job_state in self.session.query(JobState) - .filter(JobState.job_name.like(pattern.replace("*", "%"))) + .filter(JobState.state_id.like(pattern.replace("*", "%"))) .all() ) return ( record[0] - for record in self.session.execute(select(JobState.job_name)).all() + for record in self.session.execute(select(JobState.state_id)).all() ) - def acquire_lock(self, job_name): + def acquire_lock(self, state_id): """Acquire a naive lock for the given job's state. For DBStateStoreManager, the db manages transactions. This does nothing. Args: - job_name: the job name to lock + state_id: the state_id to lock """ ... - def release_lock(self, job_name): + def release_lock(self, state_id): """Release the lock for the given job's state. For DBStateStoreManager, the db manages transactions. This does nothing. Args: - job_name: the job name to unlock + state_id: the state_id to unlock """ ... From d2f3cb446ba71a0c02cca1890be223e3674545c1 Mon Sep 17 00:00:00 2001 From: "Cody J. Hanson" Date: Mon, 26 Sep 2022 14:40:43 -0500 Subject: [PATCH 3/8] Make first job state migration a noop --- ...28cc5b1a4f_create_dedicated_state_table.py | 1 - ...a9492f_create_dedicated_job_state_table.py | 383 +----------------- 2 files changed, 1 insertion(+), 383 deletions(-) diff --git a/src/meltano/migrations/versions/6828cc5b1a4f_create_dedicated_state_table.py b/src/meltano/migrations/versions/6828cc5b1a4f_create_dedicated_state_table.py index 2a3f95d7e7..bf085389f0 100644 --- a/src/meltano/migrations/versions/6828cc5b1a4f_create_dedicated_state_table.py +++ b/src/meltano/migrations/versions/6828cc5b1a4f_create_dedicated_state_table.py @@ -334,7 +334,6 @@ def upgrade(): conn = op.get_bind() inspector = Inspector.from_engine(conn) if "state" in inspector.get_table_names(): - op.drop_table("state") op.create_table( "state", diff --git a/src/meltano/migrations/versions/f4c225a9492f_create_dedicated_job_state_table.py b/src/meltano/migrations/versions/f4c225a9492f_create_dedicated_job_state_table.py index 27769db01c..54bc9896b7 100644 --- a/src/meltano/migrations/versions/f4c225a9492f_create_dedicated_job_state_table.py +++ b/src/meltano/migrations/versions/f4c225a9492f_create_dedicated_job_state_table.py @@ -7,24 +7,7 @@ """ from __future__ import annotations -import uuid -from datetime import datetime, timedelta -from enum import Enum - -import sqlalchemy as sa from alembic import op -from sqlalchemy import Column, MetaData, func, types -from sqlalchemy.engine.reflection import Inspector -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.ext.mutable import MutableDict -from sqlalchemy.orm.session import Session - -from meltano.core.sqlalchemy import GUID, IntFlag, JSONEncodedDict -from meltano.core.utils import merge -from meltano.migrations.utils.dialect_typing import ( - get_dialect_name, - max_string_length_for_dialect, -) # revision identifiers, used by Alembic. revision = "f4c225a9492f" @@ -33,373 +16,9 @@ depends_on = None -SystemMetadata = MetaData() -SystemModel = declarative_base(metadata=SystemMetadata) - -# Copied from core/job_state.py -class JobState(SystemModel): - """Model class that represents the current state of a given job. - - Modified during `meltano elt` or `meltano run` runs whenever a - STATE message is emitted by a Singer target. Also written and read - by `meltano state` CLI invocations. Only holds the _current_ state - for a given job_name. Full job run history is held by the Job model. - """ - - __tablename__ = "state" - job_name = Column(types.String, unique=True, primary_key=True, nullable=False) - - updated_at = Column( - types.TIMESTAMP, server_default=func.now(), onupdate=func.current_timestamp() - ) - - partial_state = Column(MutableDict.as_mutable(JSONEncodedDict)) - completed_state = Column(MutableDict.as_mutable(JSONEncodedDict)) - - @classmethod - def from_job_history(cls, session: Session, job_name: str): - """Build JobState from job run history. - - Args: - session: the session to use in finding job history - job_name: the name of the job to build JobState for - - Returns: - JobState built from job run history - """ - completed_state = {} - partial_state = {} - incomplete_since = None - finder = JobFinder(job_name) - - # Get the state for the most recent completed job. - # Do not consider dummy jobs create via add_state. - state_job = finder.latest_with_payload(session, flags=Payload.STATE) - if state_job: - incomplete_since = state_job.ended_at - if "singer_state" in state_job.payload: - merge(state_job.payload, partial_state) - - # If there have been any incomplete jobs since the most recent completed jobs, - # merge the state emitted by those jobs into the state for the most recent - # completed job. If there are no completed jobs, get the full history of - # incomplete jobs and use the most recent state emitted per stream - incomplete_state_jobs = finder.with_payload( - session, flags=Payload.INCOMPLETE_STATE, since=incomplete_since - ) - for incomplete_state_job in incomplete_state_jobs: - if "singer_state" in incomplete_state_job.payload: - partial_state = merge(incomplete_state_job.payload, partial_state) - return cls( - job_name=job_name, - partial_state=partial_state, - completed_state=completed_state, - ) - - -# Copied from core/job/finder.py -class JobFinder: - """Query builder for the `Job` model for a certain `elt_uri`.""" - - def __init__(self, state_id: str): - """Initialize the JobFinder. - - Args: - state_id: the state_id to build queries for. - """ - self.state_id = state_id - - def latest(self, session): - """Get the latest state for this instance's state ID. - - Args: - session: the session to use in querying the db - - Returns: - The latest state for this instance's state ID - """ - return ( - session.query(Job) - .filter(Job.job_name == self.state_id) - .order_by(Job.started_at.desc()) - .first() - ) - - def successful(self, session): - """Get all successful jobs for this instance's state ID. - - Args: - session: the session to use in querying the db - - Returns: - All successful jobs for this instance's state ID - """ - return session.query(Job).filter( - (Job.job_name == self.state_id) # noqa: WPS465 - & (Job.state == State.SUCCESS) # noqa: WPS465 - & Job.ended_at.isnot(None) - ) - - def running(self, session): - """Find states in the running state. - - Args: - session: the session to use in querying the db - - Returns: - All runnings states for state_id. - """ - return session.query(Job).filter( - (Job.job_name == self.state_id) # noqa: WPS465 - & (Job.state == State.RUNNING) - ) - - def latest_success(self, session): - """Get the latest successful state for this instance's state ID. - - Args: - session: the session to use in querying the db - - Returns: - The latest successful state for this instance's state ID - """ - return self.successful(session).order_by(Job.ended_at.desc()).first() - - def latest_running(self, session): - """Find the most recent state in the running state, if any. - - Args: - session: the session to use in querying the db - - Returns: - The latest running state for this instance's state ID - """ - return self.running(session).order_by(Job.started_at.desc()).first() - - def with_payload(self, session, flags=0, since=None, state=None): - """Get all states for this instance's state ID matching the given args. - - Args: - session: the session to use in querying the db - flags: only return states with these flags - since: only return states which ended after this time - state: only include states with state matching this state - - Returns: - All states matching these args. - """ - query = ( - session.query(Job) - .filter( - (Job.job_name == self.state_id) # noqa: WPS465 - & (Job.payload_flags != 0) # noqa: WPS465 - & (Job.payload_flags.op("&")(flags) == flags) # noqa: WPS465 - & Job.ended_at.isnot(None) - ) - .order_by(Job.ended_at.asc()) - ) - - if since: - query = query.filter(Job.ended_at > since) - if state: - query = query.filter(Job.state == state) - return query - - def latest_with_payload(self, session, **kwargs): - """Return the latest state matching the given kwargs. - - Args: - session: the session to use to query the db - kwargs: keyword args to pass to with_payload - - Returns: - The most recent state returned by with_payload(kwargs) - """ - return ( - self.with_payload(session, **kwargs) - .order_by(None) # Reset ascending order - .order_by(Job.ended_at.desc()) - .first() - ) - - @classmethod - def all_stale(cls, session): - """Return all stale states. - - Args: - session: the session to use to query the db - - Returns: - All stale states with any state ID - """ - now = datetime.utcnow() - last_valid_heartbeat_at = now - timedelta(minutes=HEARTBEAT_VALID_MINUTES) - last_valid_started_at = now - timedelta(hours=HEARTBEATLESS_JOB_VALID_HOURS) - - return session.query(Job).filter( - (Job.state == State.RUNNING) # noqa: WPS465 - & ( - ( - Job.last_heartbeat_at.isnot(None) # noqa: WPS465 - & (Job.last_heartbeat_at < last_valid_heartbeat_at) - ) - | ( - Job.last_heartbeat_at.is_(None) # noqa: WPS465 - & (Job.started_at < last_valid_started_at) - ) - ) - ) - - def stale(self, session): - """Return stale states with the instance's state ID. - - Args: - session: the session to use in querying the db - - Returns: - All stale states with instance's state ID - """ - return self.all_stale(session).filter(Job.job_name == self.state_id) - - def get_all(self, session: object, since=None): - """Return all state with the instance's state ID. - - Args: - session: the session to use in querying the db - since: only return state which ended after this datetime - - Returns: - All state with instance's state ID which ended after 'since' - """ - query = ( - session.query(Job) - .filter(Job.job_name == self.state_id) - .order_by(Job.ended_at.asc()) - ) - - if since: - query = query.filter(Job.ended_at > since) - return query - - -# Copied from core/job/job.py -HEARTBEATLESS_JOB_VALID_HOURS = 24 -HEARTBEAT_VALID_MINUTES = 5 - - -class State(Enum): - """Represents status of a Job.""" - - IDLE = (0, ("RUNNING", "FAIL")) - RUNNING = (1, ("SUCCESS", "FAIL")) - SUCCESS = (2, ()) - FAIL = (3, ("RUNNING",)) - DEAD = (4, ()) - STATE_EDIT = (5, ()) - - -class Payload(IntFlag): - """Flag indicating whether a Job has state in its payload field.""" - - STATE = 1 - INCOMPLETE_STATE = 2 - - -class Job(SystemModel): # noqa: WPS214 - """Model class that represents a `meltano elt` run in the system database. - - Includes State.STATE_EDIT rows which represent CLI invocations of the - `meltano state` command which wrote state to the db. Queries that are - meant to return only actual job runs should filter out records with - state == State.STATE_EDIT. - """ - - __tablename__ = "runs" - - id = Column(types.Integer, primary_key=True) - job_name = Column(types.String(1024)) - run_id = Column(GUID, nullable=False, default=uuid.uuid4) - _state = Column(name="state", type_=types.String(10)) - started_at = Column(types.DateTime) - last_heartbeat_at = Column(types.DateTime) - ended_at = Column(types.DateTime) - payload = Column(MutableDict.as_mutable(JSONEncodedDict)) - payload_flags = Column(IntFlag, default=0) - trigger = Column(types.String, default="") - - @classmethod - def from_job_history(cls, session: Session, job_name: str): - """Build JobState from job run history. - - Args: - session: the session to use in finding job history - job_name: the name of the job to build JobState for - - Returns: - JobState built from job run history - """ - completed_state = {} - partial_state = {} - incomplete_since = None - finder = JobFinder(job_name) - - # Get the state for the most recent completed job. - # Do not consider dummy jobs create via add_state. - state_job = finder.latest_with_payload(session, flags=Payload.STATE) - if state_job: - incomplete_since = state_job.ended_at - if "singer_state" in state_job.payload: - merge(state_job.payload, partial_state) - - # If there have been any incomplete jobs since the most recent completed jobs, - # merge the state emitted by those jobs into the state for the most recent - # completed job. If there are no completed jobs, get the full history of - # incomplete jobs and use the most recent state emitted per stream - incomplete_state_jobs = finder.with_payload( - session, flags=Payload.INCOMPLETE_STATE, since=incomplete_since - ) - for incomplete_state_job in incomplete_state_jobs: - if "singer_state" in incomplete_state_job.payload: - partial_state = merge(incomplete_state_job.payload, partial_state) - return cls( - job_name=job_name, - partial_state=partial_state, - completed_state=completed_state, - ) - - def upgrade(): # Create state table - dialect_name = get_dialect_name() - max_string_length = max_string_length_for_dialect(dialect_name) - conn = op.get_bind() - inspector = Inspector.from_engine(conn) - if "state" in inspector.get_table_names(): - return - op.create_table( - "state", - sa.Column("job_name", sa.String(900), nullable=False), - sa.Column( - "partial_state", - MutableDict.as_mutable(JSONEncodedDict(max_string_length)), - ), - sa.Column( - "completed_state", - MutableDict.as_mutable(JSONEncodedDict(max_string_length)), - ), - sa.Column( - "updated_at", - sa.types.TIMESTAMP if dialect_name == "postgresql" else sa.types.DATETIME, - onupdate=datetime.now, - ), - sa.PrimaryKeyConstraint("job_name"), - sa.UniqueConstraint("job_name"), - ) - - session = Session(bind=op.get_bind()) - for job_run in set(session.query(Job).all()): - session.add(JobState.from_job_history(session, job_run.job_name)) - session.commit() + pass def downgrade(): From b08210177f0ad97de1616dd1259182bea8b8655f Mon Sep 17 00:00:00 2001 From: "Cody J. Hanson" Date: Mon, 26 Sep 2022 14:57:25 -0500 Subject: [PATCH 4/8] correct state_id ref --- src/meltano/core/state_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meltano/core/state_service.py b/src/meltano/core/state_service.py index fdf01abebb..a89e3546c7 100644 --- a/src/meltano/core/state_service.py +++ b/src/meltano/core/state_service.py @@ -126,7 +126,7 @@ def add_state( f"Added to state {state_to_add_to.job_name} state payload {new_state_dict}" ) self.state_store_manager.set( - state_id=state_to_add_to.state_id, + state_id=state_to_add_to.job_name, state=json.dumps(new_state_dict), complete=(payload_flags == Payload.STATE), ) From 72d6bd52015438340c5815da10c6defebe693306 Mon Sep 17 00:00:00 2001 From: "Cody J. Hanson" Date: Mon, 26 Sep 2022 15:11:48 -0500 Subject: [PATCH 5/8] Don't need to branch now that first migration is noop --- .../versions/6828cc5b1a4f_create_dedicated_state_table.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meltano/migrations/versions/6828cc5b1a4f_create_dedicated_state_table.py b/src/meltano/migrations/versions/6828cc5b1a4f_create_dedicated_state_table.py index bf085389f0..8bb7aceb31 100644 --- a/src/meltano/migrations/versions/6828cc5b1a4f_create_dedicated_state_table.py +++ b/src/meltano/migrations/versions/6828cc5b1a4f_create_dedicated_state_table.py @@ -28,7 +28,7 @@ # revision identifiers, used by Alembic. revision = "6828cc5b1a4f" -down_revision = ("5b43800443d1", "f4c225a9492f") +down_revision = "f4c225a9492f" branch_labels = None depends_on = None From 38045817365adc86ffb9a7c334177d9819715e08 Mon Sep 17 00:00:00 2001 From: Florian Hines Date: Mon, 26 Sep 2022 15:12:29 -0500 Subject: [PATCH 6/8] test: add basic migrations test (#6793) * Migrations test - maybe * Use installed target-sqlite --- .github/workflows/integration_tests.yml | 1 + .gitignore | 2 ++ .../meltano-migrations/.meltano/meltano.db | Bin 0 -> 77824 bytes .../meltano-migrations/ending-meltano.yml | 23 ++++++++++++++++ .../meltano-migrations/index.md | 26 ++++++++++++++++++ .../meltano-migrations/meltano.yml | 23 ++++++++++++++++ 6 files changed, 75 insertions(+) create mode 100644 docs/example-library/meltano-migrations/.meltano/meltano.db create mode 100644 docs/example-library/meltano-migrations/ending-meltano.yml create mode 100644 docs/example-library/meltano-migrations/index.md create mode 100644 docs/example-library/meltano-migrations/meltano.yml diff --git a/.github/workflows/integration_tests.yml b/.github/workflows/integration_tests.yml index d655f4e472..4c248a82ab 100644 --- a/.github/workflows/integration_tests.yml +++ b/.github/workflows/integration_tests.yml @@ -19,6 +19,7 @@ jobs: - { integration_test: "meltano-basics", needs_postgres: false } - { integration_test: "meltano-run", needs_postgres: true } - { integration_test: "meltano-objects", needs_postgres: false } + - { integration_test: "meltano-migrations", needs_postgres: false } fail-fast: false runs-on: ubuntu-latest diff --git a/.gitignore b/.gitignore index 9e194d8f0b..1693fb36e6 100644 --- a/.gitignore +++ b/.gitignore @@ -153,3 +153,5 @@ src/meltano/api/static/js # sqlite *.db +# allow integration tests to ship/store/use sqlite +!docs/example-library/**/.meltano/meltano.db diff --git a/docs/example-library/meltano-migrations/.meltano/meltano.db b/docs/example-library/meltano-migrations/.meltano/meltano.db new file mode 100644 index 0000000000000000000000000000000000000000..147ff63ce5e30c9486d663279e5736543f69ecb7 GIT binary patch literal 77824 zcmeI*&2QW09S3kqew8gpd8w;Ia^MsgwiC!JmkaY z`TU;e5f$la9zNKz1H#u`uVnz{tIFulrMO@7a17BYWJe!p;K;lI;Aq`#g2>)e0le?ON>t<7GWx|{y<A?Dk7QeT7 zzrvD%alNE!y=gT3enr-JTDvN(7`yO_EYS<)0?%$Vh?hG)xoP-8 z^(isDphk=!UV&flZr(5NKIZRL9`h@~>(gjvv{% z5|N5z^}zb|iiml{wA(guf_@cOBvoCF976VKb~QL?A1jU=hW zo#mJwH5e6-E3iW!ZErq!RN+?!^$S!YU4O6>`W-&GG?QIgO1zv1{Ya}uuNSyq z62~9j&P4QvbH?{d2E-e!~GO7GIE{BGrLWw)}uQF+LB`$Bh=f1aMn=JSc8 zMhN4%O;T+WuVwo_n<(S^3lXk(?iji7(5PH|#G0-XkY|JG^yf6LS8AkjEUotzaQJk7 zCVTT{;$?QwIKRpoAGWKvnI36UFJnw=PKf6T4LoTI8V=3a+Oy^lAN;yFzLyH;fTL~) zXGOE;+(_fbv??}iz0Q9+JDtrhCZ2~G_Omcb;R`E>wLR5qN$&!i+aFavvdQa1+ zk!IxXZ@81}RX*EeeK*pXM(%o8uHQDy=#3FOZ{KKk==;ak&K30TAf4c2 zyU9rGW0@@Wz0;~rr!|-k=nZ@1K5b)X`{CYhnZ8Hzu@b87FHJrYzoGJ2?2_YR7cWeV zB}~xxgad1VZX_~4aP$uz5P$##AOHafKmY;|fB*y_009VG5P?gHi!?3QJzvR7>YgB!!atwq=(4)lp;gmuwd}w@K1Iz-`ko|| z6tSer>w+NM4o}X#cwrcYdO@=4Mbj`8onA*JmY}J1T~Z}O&~&qDkJg&Dr>Log!qJV% zH|CK_g0`-Rg0ASX3(*{%)>YC}?XB?$X5Ao#LctXDno_IRYLcnf1d+(9shO%(9Odb@ zM=Of5baZ3ljd^Imq;**mHCc-BgrSslQP$+Q##1cFnxdFBT`&a2Q1zlLsRGfclOSuB zS{UW&wx^&71?BLAjw?GAnEB8VYNkJ6qhOXc(qZjiKBm>@PMsRYRG$uE7F$8pm00z=IhvqIbY>V-f5UJbH{j_M;RSqmi35K; z9hLaXGh%kawIKZRar!b(m)$ijo}F4_>*qU;wN8%>Ji7GG^`eKijDzsNrv_c_S0lvX zZHHd3$yRF|kbo}3q0!S`*tb zU5~C`4c2KWik4zY3Y`oM8UV#8ND49Prl^UMt`W(ip7su;AOHafKmY;|fB*y_ z009U<00I!Wzyj>||M>iWfdh=DK>z{}fB*y_009U<00Izz00gK2zW+xzfB*y_009U< z00Izz00bZa0SH`t0et^|@neh@LI45~fB*y_009U<00Izz00i*)AAJA<5P$##AOHaf zKmY;|fB*y_aPbA$@BfpTe{h*^=?fkZfB*y_009U<00Izz00bZa0SLSc0@>swcYVnt zjZV|>?)UyZfk!^^-cL+&*=56O*-k&X%y!uK|H;gET;@mmf(Ha3009U<00Izz00bZa z0SG_<0`I!O{KWM{?{5Z@vlH1=_?H3r|Np(~VMYBR009U<00Izz00bZa0SG_<0}Q|00Izz00bZa0SG_<0uVT70{Hj;&RIZE9SA@G z0uX=z1Rwwb2tWV=5P-lL2;lqwGeAX=5P$##AOHafKmY;|fB*y_0D*HRfbaj$SwK)7 z2tWV=5P$##AOHafKmY;|fWR3D;Pd|(prS|!KmY;|fB*y_009U<00Izzz&R86A5Q8| A+yDRo literal 0 HcmV?d00001 diff --git a/docs/example-library/meltano-migrations/ending-meltano.yml b/docs/example-library/meltano-migrations/ending-meltano.yml new file mode 100644 index 0000000000..398ca7acc4 --- /dev/null +++ b/docs/example-library/meltano-migrations/ending-meltano.yml @@ -0,0 +1,23 @@ +version: 1 +default_environment: dev +project_id: abd310b5-698b-4244-a109-9490047fa2ec +send_anonymous_usage_stats: false +plugins: + extractors: + - name: tap-gitlab + variant: meltanolabs + pip_url: git+https://github.com/MeltanoLabs/tap-gitlab.git + config: + projects: meltano/meltano + start_date: '2022-04-25T00:00:00Z' + select: + - commits.* + - '!commits.stats.*' + loaders: + - name: target-sqlite + variant: meltanolabs + pip_url: git+https://github.com/MeltanoLabs/target-sqlite.git +environments: +- name: dev +- name: staging +- name: prod diff --git a/docs/example-library/meltano-migrations/index.md b/docs/example-library/meltano-migrations/index.md new file mode 100644 index 0000000000..00eb8b5625 --- /dev/null +++ b/docs/example-library/meltano-migrations/index.md @@ -0,0 +1,26 @@ +# Basic meltano migrations test + +A quick test of how meltano applies migrations. + +## Setup + +This test ships with a sample meltano database from Meltano v2.5.0 already in `.meltano`. To utilize it run: + +```shell +meltano install +``` + +## Trigger a migration event + +```shell +meltano --environment=dev state list +``` + +## Validate that a new run completes successfully + +Validate that we can perform an EL task post migration: + +```shell +meltano --environment=dev run tap-gitlab target-sqlite +meltano --environment=dev state list +``` diff --git a/docs/example-library/meltano-migrations/meltano.yml b/docs/example-library/meltano-migrations/meltano.yml new file mode 100644 index 0000000000..398ca7acc4 --- /dev/null +++ b/docs/example-library/meltano-migrations/meltano.yml @@ -0,0 +1,23 @@ +version: 1 +default_environment: dev +project_id: abd310b5-698b-4244-a109-9490047fa2ec +send_anonymous_usage_stats: false +plugins: + extractors: + - name: tap-gitlab + variant: meltanolabs + pip_url: git+https://github.com/MeltanoLabs/tap-gitlab.git + config: + projects: meltano/meltano + start_date: '2022-04-25T00:00:00Z' + select: + - commits.* + - '!commits.stats.*' + loaders: + - name: target-sqlite + variant: meltanolabs + pip_url: git+https://github.com/MeltanoLabs/target-sqlite.git +environments: +- name: dev +- name: staging +- name: prod From cbac32af7f0e16ca7a614df8eebfbea62b22af47 Mon Sep 17 00:00:00 2001 From: "Cody J. Hanson" Date: Mon, 26 Sep 2022 15:15:26 -0500 Subject: [PATCH 7/8] Lint --- .../versions/6828cc5b1a4f_create_dedicated_state_table.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/meltano/migrations/versions/6828cc5b1a4f_create_dedicated_state_table.py b/src/meltano/migrations/versions/6828cc5b1a4f_create_dedicated_state_table.py index 8bb7aceb31..9f91e4b563 100644 --- a/src/meltano/migrations/versions/6828cc5b1a4f_create_dedicated_state_table.py +++ b/src/meltano/migrations/versions/6828cc5b1a4f_create_dedicated_state_table.py @@ -355,7 +355,7 @@ def upgrade(): sa.UniqueConstraint("state_id"), ) session = Session(bind=conn) - for state_id in set([job_run.job_name for job_run in session.query(Job).all()]): + for state_id in {job_run.job_name for job_run in session.query(Job).all()}: session.add(JobState.from_job_history(session, state_id)) session.commit() From 6668dcedd3d843b745040b20a7aec6d3ba645167 Mon Sep 17 00:00:00 2001 From: "Cody J. Hanson" Date: Mon, 26 Sep 2022 15:25:29 -0500 Subject: [PATCH 8/8] job_name -> state_id in tests --- tests/meltano/core/test_state_store.py | 32 +++++++++++++------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/tests/meltano/core/test_state_store.py b/tests/meltano/core/test_state_store.py index 3ad5ba9d34..1f086aca97 100644 --- a/tests/meltano/core/test_state_store.py +++ b/tests/meltano/core/test_state_store.py @@ -28,52 +28,52 @@ def test_set_state(self, subject: DBStateStoreManager): second_partial = {"singer_state": {"partial_2": 2}} mock_state_with_only_partial = JobState( - job_name="mock_state_with_only_partial", partial_state=partial_state + state_id="mock_state_with_only_partial", partial_state=partial_state ) mock_state_with_only_complete = JobState( - job_name="mock_state_with_only_complete", completed_state=complete_state + state_id="mock_state_with_only_complete", completed_state=complete_state ) subject.set( - mock_state_with_only_partial.job_name, json.dumps(partial_state), False + mock_state_with_only_partial.state_id, json.dumps(partial_state), False ) - assert subject.get(mock_state_with_only_partial.job_name) == partial_state + assert subject.get(mock_state_with_only_partial.state_id) == partial_state subject.set( - mock_state_with_only_complete.job_name, json.dumps(complete_state), True + mock_state_with_only_complete.state_id, json.dumps(complete_state), True ) - assert subject.get(mock_state_with_only_complete.job_name) == complete_state + assert subject.get(mock_state_with_only_complete.state_id) == complete_state subject.set( - mock_state_with_only_complete.job_name, json.dumps(partial_state), False + mock_state_with_only_complete.state_id, json.dumps(partial_state), False ) - assert subject.get(mock_state_with_only_complete.job_name) == merge( + assert subject.get(mock_state_with_only_complete.state_id) == merge( partial_state, complete_state ) subject.set( - mock_state_with_only_complete.job_name, + mock_state_with_only_complete.state_id, json.dumps(overwritten_complete), True, ) assert ( - subject.get(mock_state_with_only_complete.job_name) == overwritten_complete + subject.get(mock_state_with_only_complete.state_id) == overwritten_complete ) subject.set( - mock_state_with_only_partial.job_name, + mock_state_with_only_partial.state_id, json.dumps(overwritten_partial), False, ) - assert subject.get(mock_state_with_only_partial.job_name) == overwritten_partial + assert subject.get(mock_state_with_only_partial.state_id) == overwritten_partial subject.set( - mock_state_with_only_partial.job_name, json.dumps(second_partial), False + mock_state_with_only_partial.state_id, json.dumps(second_partial), False ) - assert subject.get(mock_state_with_only_partial.job_name) == merge( + assert subject.get(mock_state_with_only_partial.state_id) == merge( overwritten_partial, second_partial ) - def test_get_job_names(self, subject: DBStateStoreManager, state_ids_with_jobs): - assert set(subject.get_job_names()) == set(state_ids_with_jobs.keys()) + def test_get_state_ids(self, subject: DBStateStoreManager, state_ids_with_jobs): + assert set(subject.get_state_ids()) == set(state_ids_with_jobs.keys())