Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Correct migration path for state table #6795

Merged
merged 10 commits into from Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/integration_tests.yml
Expand Up @@ -19,6 +19,7 @@ jobs:
- { integration_test: "meltano-basics", needs_postgres: false }
- { integration_test: "meltano-run", needs_postgres: true }
- { integration_test: "meltano-objects", needs_postgres: false }
- { integration_test: "meltano-migrations", needs_postgres: false }
fail-fast: false

runs-on: ubuntu-latest
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -153,3 +153,5 @@ src/meltano/api/static/js

# sqlite
*.db
# allow integration tests to ship/store/use sqlite
!docs/example-library/**/.meltano/meltano.db
Binary file not shown.
23 changes: 23 additions & 0 deletions docs/example-library/meltano-migrations/ending-meltano.yml
@@ -0,0 +1,23 @@
version: 1
default_environment: dev
project_id: abd310b5-698b-4244-a109-9490047fa2ec
send_anonymous_usage_stats: false
plugins:
extractors:
- name: tap-gitlab
variant: meltanolabs
pip_url: git+https://github.com/MeltanoLabs/tap-gitlab.git
config:
projects: meltano/meltano
start_date: '2022-04-25T00:00:00Z'
select:
- commits.*
- '!commits.stats.*'
loaders:
- name: target-sqlite
variant: meltanolabs
pip_url: git+https://github.com/MeltanoLabs/target-sqlite.git
environments:
- name: dev
- name: staging
- name: prod
26 changes: 26 additions & 0 deletions docs/example-library/meltano-migrations/index.md
@@ -0,0 +1,26 @@
# Basic meltano migrations test

A quick test of how meltano applies migrations.

## Setup

This test ships with a sample meltano database from Meltano v2.5.0 already in `.meltano`. To utilize it run:

```shell
meltano install
```

## Trigger a migration event

```shell
meltano --environment=dev state list
```

## Validate that a new run completes successfully

Validate that we can perform an EL task post migration:

```shell
meltano --environment=dev run tap-gitlab target-sqlite
meltano --environment=dev state list
```
23 changes: 23 additions & 0 deletions docs/example-library/meltano-migrations/meltano.yml
@@ -0,0 +1,23 @@
version: 1
default_environment: dev
project_id: abd310b5-698b-4244-a109-9490047fa2ec
send_anonymous_usage_stats: false
plugins:
extractors:
- name: tap-gitlab
variant: meltanolabs
pip_url: git+https://github.com/MeltanoLabs/tap-gitlab.git
config:
projects: meltano/meltano
start_date: '2022-04-25T00:00:00Z'
select:
- commits.*
- '!commits.stats.*'
loaders:
- name: target-sqlite
variant: meltanolabs
pip_url: git+https://github.com/MeltanoLabs/target-sqlite.git
environments:
- name: dev
- name: staging
- name: prod
12 changes: 6 additions & 6 deletions src/meltano/core/job_state.py
Expand Up @@ -22,32 +22,32 @@ class JobState(SystemModel):
Modified during `meltano elt` or `meltano run` runs whenever a
STATE message is emitted by a Singer target. Also written and read
by `meltano state` CLI invocations. Only holds the _current_ state
for a given job_name. Full job run history is held by the Job model.
for a given state_id. Full job run history is held by the Job model.
"""

__tablename__ = "state"
job_name = Column(types.String, unique=True, primary_key=True, nullable=False)
state_id = Column(types.String, unique=True, primary_key=True, nullable=False)

updated_at = Column(types.DATETIME, onupdate=datetime.now)

partial_state: Mapped[Any] = Column(MutableDict.as_mutable(JSONEncodedDict))
completed_state: Mapped[Any] = Column(MutableDict.as_mutable(JSONEncodedDict))

@classmethod
def from_job_history(cls, session: Session, job_name: str):
def from_job_history(cls, session: Session, state_id: str):
"""Build JobState from job run history.

Args:
session: the session to use in finding job history
job_name: the name of the job to build JobState for
state_id: state_id to build JobState for

Returns:
JobState built from job run history
"""
completed_state: dict[Any, Any] = {}
partial_state: dict[Any, Any] = {}
incomplete_since = None
finder = JobFinder(job_name)
finder = JobFinder(state_id)

# Get the state for the most recent completed job.
# Do not consider dummy jobs create via add_state.
Expand All @@ -69,7 +69,7 @@ def from_job_history(cls, session: Session, job_name: str):
partial_state = merge(incomplete_state_job.payload, partial_state)

return cls(
job_name=job_name,
state_id=state_id,
partial_state=partial_state,
completed_state=completed_state,
)
8 changes: 4 additions & 4 deletions src/meltano/core/state_service.py
Expand Up @@ -49,8 +49,8 @@ def list_state(self, state_id_pattern: str | None = None):
A dict with state_ids as keys and state payloads as values.
"""
return {
job_name: self.get_state(job_name)
for job_name in self.state_store_manager.get_job_names(state_id_pattern)
state_id: self.get_state(state_id)
for state_id in self.state_store_manager.get_state_ids(state_id_pattern)
}

def _get_or_create_job(self, job: Job | str) -> Job:
Expand Down Expand Up @@ -126,7 +126,7 @@ def add_state(
f"Added to state {state_to_add_to.job_name} state payload {new_state_dict}"
)
self.state_store_manager.set(
job_name=state_to_add_to.job_name,
state_id=state_to_add_to.job_name,
state=json.dumps(new_state_dict),
complete=(payload_flags == Payload.STATE),
)
Expand All @@ -140,7 +140,7 @@ def get_state(self, state_id: str):
Returns:
Dict representing state that would be used in the next run.
"""
return self.state_store_manager.get(job_name=state_id)
return self.state_store_manager.get(state_id=state_id)

def set_state(self, state_id: str, new_state: str | None, validate: bool = True):
"""Set the state for the state_id.
Expand Down
74 changes: 37 additions & 37 deletions src/meltano/core/state_store.py
Expand Up @@ -23,11 +23,11 @@ def __init__(self, **kwargs):
...

@abstractmethod
def set(self, job_name: str, state: str, complete: bool):
"""Set the job state for the given job_name.
def set(self, state_id: str, state: str, complete: bool):
"""Set the job state for the given state_id.

Args:
job_name: the name of the job to set state for.
state_id: the name of the job to set state for.
state: the state to set.
complete: true if the state being set is for a complete run, false if partial

Expand All @@ -37,50 +37,50 @@ def set(self, job_name: str, state: str, complete: bool):
...

@abstractmethod
def get(self, job_name):
"""Get the job state for the given job_name.
def get(self, state_id):
"""Get the job state for the given state_id.

Args:
job_name: the name of the job to get state for.
state_id: the name of the job to get state for.

Raises:
NotImplementedError: always, this is an abstract method
"""
...

@abstractmethod
def clear(self, job_name):
"""Clear state for the given job_name.
def clear(self, state_id):
"""Clear state for the given state_id.

Args:
job_name: the job name to clear state for
state_id: the state_id to clear state for
"""
...

@abstractmethod
def get_job_names(self, pattern=None):
"""Get all job names available in this state store manager.
def get_state_ids(self, pattern=None):
"""Get all state_ids available in this state store manager.

Args:
pattern: glob-style pattern to filter by
"""
...

@abstractmethod
def acquire_lock(self, job_name):
def acquire_lock(self, state_id):
"""Acquire a naive lock for the given job's state.

Args:
job_name: the job name to lock
state_id: the state_id to lock
"""
...

@abstractmethod
def release_lock(self, job_name):
def release_lock(self, state_id):
"""Release lock for given job's state.

Args:
job_name: the job name to unlock
state_id: the state_id to unlock
"""
...

Expand All @@ -99,16 +99,16 @@ def __init__(self, *args, session: Session, **kwargs):
super().__init__(*args, **kwargs)
self.session = session

def set(self, job_name: str, state: str, complete: bool) -> None:
"""Set the job state for the given job_name.
def set(self, state_id: str, state: str, complete: bool) -> None:
"""Set the job state for the given state_id.

Args:
job_name: the name of the job to set state for.
state_id: the name of the job to set state for.
state: the state to set.
complete: true if the state being set is for a complete run, false if partial
"""
existing_job_state = (
self.session.query(JobState).filter(JobState.job_name == job_name).first()
self.session.query(JobState).filter(JobState.state_id == state_id).first()
)
partial_state = {} if complete else json.loads(state)
completed_state = json.loads(state) if complete else {}
Expand All @@ -118,7 +118,7 @@ def set(self, job_name: str, state: str, complete: bool) -> None:
if not complete:
completed_state = existing_job_state.completed_state
new_job_state = JobState(
job_name=job_name,
state_id=state_id,
partial_state=partial_state,
completed_state=completed_state,
)
Expand All @@ -127,39 +127,39 @@ def set(self, job_name: str, state: str, complete: bool) -> None:
self.session.add(new_job_state)
self.session.commit()

def get(self, job_name):
"""Get the job state for the given job_name.
def get(self, state_id):
"""Get the job state for the given state_id.

Args:
job_name: the name of the job to get state for
state_id: the name of the job to get state for

Returns:
The current state for the given job
"""
job_state: JobState | None = (
self.session.query(JobState).filter(JobState.job_name == job_name).first()
self.session.query(JobState).filter(JobState.state_id == state_id).first()
)
return (
merge(job_state.partial_state, job_state.completed_state)
if job_state
else {}
)

def clear(self, job_name):
"""Clear state for the given job_name.
def clear(self, state_id):
"""Clear state for the given state_id.

Args:
job_name: the job name to clear state for
state_id: the state_id to clear state for
"""
job_state: JobState | None = (
self.session.query(JobState).filter(JobState.job_name == job_name).first()
self.session.query(JobState).filter(JobState.state_id == state_id).first()
)
if job_state:
self.session.delete(job_state)
self.session.commit()

def get_job_names(self, pattern: str | None = None):
"""Get all job names available in this state store manager.
def get_state_ids(self, pattern: str | None = None):
"""Get all state_ids available in this state store manager.

Args:
pattern: glob-style pattern to filter by
Expand All @@ -169,34 +169,34 @@ def get_job_names(self, pattern: str | None = None):
"""
if pattern:
return (
job_state.job_name
job_state.state_id
for job_state in self.session.query(JobState)
.filter(JobState.job_name.like(pattern.replace("*", "%")))
.filter(JobState.state_id.like(pattern.replace("*", "%")))
.all()
)
return (
record[0]
for record in self.session.execute(select(JobState.job_name)).all()
for record in self.session.execute(select(JobState.state_id)).all()
)

def acquire_lock(self, job_name):
def acquire_lock(self, state_id):
"""Acquire a naive lock for the given job's state.

For DBStateStoreManager, the db manages transactions.
This does nothing.

Args:
job_name: the job name to lock
state_id: the state_id to lock
"""
...

def release_lock(self, job_name):
def release_lock(self, state_id):
"""Release the lock for the given job's state.

For DBStateStoreManager, the db manages transactions.
This does nothing.

Args:
job_name: the job name to unlock
state_id: the state_id to unlock
"""
...
2 changes: 1 addition & 1 deletion src/meltano/migrations/db.lock
@@ -1 +1 @@
f4c225a9492f
6828cc5b1a4f