From 2cc63c26b2ef63b730a31c300aabb067b97aed09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Mon, 13 Nov 2023 08:47:12 -0600 Subject: [PATCH] refactor: Use timezone-aware datetime objects --- pyproject.toml | 4 +- src/meltano/cli/state.py | 9 ++-- src/meltano/core/job/finder.py | 4 +- src/meltano/core/job/job.py | 13 ++--- src/meltano/core/plugin/singer/target.py | 6 ++- src/meltano/core/schedule_service.py | 4 +- src/meltano/core/state_store/filesystem.py | 16 ++++-- .../core/tracking/contexts/environment.py | 4 +- src/meltano/core/tracking/tracker.py | 6 +-- src/meltano/core/utils/__init__.py | 6 ++- tests/fixtures/core.py | 9 +++- tests/meltano/core/job/test_finder.py | 8 +-- tests/meltano/core/job/test_job.py | 14 ++--- .../meltano/core/job/test_stale_job_failer.py | 6 +-- .../core/plugin/test_plugin_settings.py | 6 +-- tests/meltano/core/test_project_files.py | 51 ++++++++++++++++--- tests/meltano/core/test_schedule_service.py | 4 +- 17 files changed, 112 insertions(+), 58 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 41c3eee0b6..905995d41f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -573,21 +573,21 @@ ignore = [ ] select = [ "ARG", # flake8-unused-arguments - "S", # flake8-bandit "B", # flake8-bugbear "C4", # flake8-comprehensions "COM", # flake8-commas + "DTZ", # flake8-datetimez "E", # pycodestyle (error) "F", # pyflakes "I", # isort "ISC", # flake8-implicit-str-concat "PT", # flake8-pytest-style "RSE", # flake8-raise + "S", # flake8-bandit "SIM", # flake8-simplify "TID", # flake8-tidy-imports "UP", # pyupgrade "W", # pycodestyle (warning) - "TID", # flake8-tidy-imports ] [tool.ruff.per-file-ignores] diff --git a/src/meltano/cli/state.py b/src/meltano/cli/state.py index edfe38c005..2f5ef93d23 100644 --- a/src/meltano/cli/state.py +++ b/src/meltano/cli/state.py @@ -6,6 +6,7 @@ import re import typing as t from datetime import datetime as dt +from datetime import timezone as tz from functools import partial, reduce from operator import xor @@ -167,7 +168,7 @@ def copy_state( logger.info( f"State for {dst_state_id} was successfully copied from " - f"{src_state_id} at {dt.utcnow():%Y-%m-%d %H:%M:%S}.", # noqa: WPS323 + f"{src_state_id} at {dt.now(tz=tz.utc):%Y-%m-%d %H:%M:%S}.", # noqa: WPS323 ) @@ -197,7 +198,7 @@ def move_state( logger.info( f"State for {src_state_id} was successfully moved to {dst_state_id} " - f"at {dt.utcnow():%Y-%m-%d %H:%M:%S}.", # noqa: WPS323 + f"at {dt.now(tz=tz.utc):%Y-%m-%d %H:%M:%S}.", # noqa: WPS323 ) @@ -248,7 +249,7 @@ def merge_state( state_service.merge_state(from_state_id, state_id) logger.info( f"State for {state_id} was successfully " - f"merged at {dt.utcnow():%Y-%m-%d %H:%M:%S}.", # noqa: WPS323 + f"merged at {dt.now(tz=tz.utc):%Y-%m-%d %H:%M:%S}.", # noqa: WPS323 ) @@ -289,7 +290,7 @@ def set_state( state_service.set_state(state_id, state) logger.info( f"State for {state_id} was successfully set " - f"at {dt.utcnow():%Y-%m-%d %H:%M:%S}.", # noqa: WPS323 + f"at {dt.now(tz=tz.utc):%Y-%m-%d %H:%M:%S}.", # noqa: WPS323 ) diff --git a/src/meltano/core/job/finder.py b/src/meltano/core/job/finder.py index 6bb7fde296..5d41ed8663 100644 --- a/src/meltano/core/job/finder.py +++ b/src/meltano/core/job/finder.py @@ -2,7 +2,7 @@ from __future__ import annotations -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from .job import HEARTBEAT_VALID_MINUTES, HEARTBEATLESS_JOB_VALID_HOURS, Job, State @@ -141,7 +141,7 @@ def all_stale(cls, session): Returns: All stale states with any state ID """ - now = datetime.utcnow() + now = datetime.now(tz=timezone.utc) last_valid_heartbeat_at = now - timedelta(minutes=HEARTBEAT_VALID_MINUTES) last_valid_started_at = now - timedelta(hours=HEARTBEATLESS_JOB_VALID_HOURS) diff --git a/src/meltano/core/job/job.py b/src/meltano/core/job/job.py index e784c9520c..32382c9ed2 100644 --- a/src/meltano/core/job/job.py +++ b/src/meltano/core/job/job.py @@ -1,4 +1,5 @@ """Defines Job model class.""" + from __future__ import annotations import asyncio @@ -7,7 +8,7 @@ import typing as t import uuid from contextlib import asynccontextmanager, contextmanager, suppress -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from enum import Enum from enum import IntFlag as EnumIntFlag @@ -184,7 +185,7 @@ def is_stale(self): timestamp = self.started_at valid_for = timedelta(hours=HEARTBEATLESS_JOB_VALID_HOURS) - return datetime.utcnow() - timestamp > valid_for + return datetime.now(timezone.utc) - timestamp > valid_for def has_error(self): """Return whether a job has failed. @@ -280,7 +281,7 @@ async def run(self, session): def start(self): """Mark the job has having started.""" - self.started_at = datetime.utcnow() + self.started_at = datetime.now(timezone.utc) self.transit(State.RUNNING) def fail(self, error=None): @@ -289,14 +290,14 @@ def fail(self, error=None): Args: error: the error to associate with the job's failure """ - self.ended_at = datetime.utcnow() + self.ended_at = datetime.now(timezone.utc) self.transit(State.FAIL) if error: self.payload.update({"error": str(error)}) def success(self): """Mark the job as having succeeded.""" - self.ended_at = datetime.utcnow() + self.ended_at = datetime.now(timezone.utc) self.transit(State.SUCCESS) def fail_stale(self): @@ -345,7 +346,7 @@ def save(self, session): def _heartbeat(self): """Update last_heartbeat_at for this job in the db.""" - self.last_heartbeat_at = datetime.utcnow() + self.last_heartbeat_at = datetime.now(timezone.utc) async def _heartbeater(self, session): """Heartbeat to the db every second. diff --git a/src/meltano/core/plugin/singer/target.py b/src/meltano/core/plugin/singer/target.py index cf6115a1ff..d9fb1c23d3 100644 --- a/src/meltano/core/plugin/singer/target.py +++ b/src/meltano/core/plugin/singer/target.py @@ -3,7 +3,7 @@ import json import logging -from datetime import datetime +from datetime import datetime, timezone from meltano.core.behavior.hookable import hook from meltano.core.job import Job, Payload @@ -80,7 +80,9 @@ def writeline(self, line: str): "incremental state has not been updated", ) else: - logger.info(f"Incremental state has been updated at {datetime.utcnow()}.") + logger.info( + f"Incremental state has been updated at {datetime.now(tz=timezone.utc)}.", # noqa: E501 + ) logger.debug(f"Incremental state: {new_state}") diff --git a/src/meltano/core/schedule_service.py b/src/meltano/core/schedule_service.py index 3acaf45449..f858fc79cb 100644 --- a/src/meltano/core/schedule_service.py +++ b/src/meltano/core/schedule_service.py @@ -4,7 +4,7 @@ import logging import subprocess -from datetime import date, datetime +from datetime import date, datetime, timezone from croniter import croniter @@ -198,7 +198,7 @@ def default_start_date(self, session, extractor: str) -> datetime: if isinstance(start_date, datetime): return start_date - return iso8601_datetime(start_date) or datetime.utcnow() + return iso8601_datetime(start_date) or datetime.now(tz=timezone.utc) def add_schedule(self, schedule: Schedule) -> Schedule: """Add a schedule to the project. diff --git a/src/meltano/core/state_store/filesystem.py b/src/meltano/core/state_store/filesystem.py index f93eee1f8c..4c359384a5 100644 --- a/src/meltano/core/state_store/filesystem.py +++ b/src/meltano/core/state_store/filesystem.py @@ -10,7 +10,7 @@ from base64 import b64decode, b64encode from collections.abc import Iterator from contextlib import contextmanager -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from functools import reduce from io import TextIOWrapper from pathlib import Path @@ -211,9 +211,15 @@ def is_locked(self, state_id: str) -> bool: lock_path = self.get_lock_path(state_id) try: with self.get_reader(lock_path) as reader: - locked_at = datetime.fromtimestamp(float(reader.read())) - if locked_at and locked_at < datetime.utcnow() - timedelta( - seconds=self.lock_timeout_seconds, + locked_at = datetime.fromtimestamp( + float(reader.read()), + tz=timezone.utc, + ) + if locked_at and locked_at < ( + datetime.now(timezone.utc) + - timedelta( + seconds=self.lock_timeout_seconds, + ) ): self.delete(lock_path) return False @@ -252,7 +258,7 @@ def acquire_lock(self, state_id: str, retry_seconds: int = 1) -> Iterator[None]: while self.is_locked(state_id): sleep(retry_seconds) with self.get_writer(lock_path) as writer: - writer.write(str(datetime.utcnow().timestamp())) + writer.write(str(datetime.now(timezone.utc).timestamp())) yield finally: self.delete(lock_path) diff --git a/src/meltano/core/tracking/contexts/environment.py b/src/meltano/core/tracking/contexts/environment.py index f251a79413..2fd188ad87 100644 --- a/src/meltano/core/tracking/contexts/environment.py +++ b/src/meltano/core/tracking/contexts/environment.py @@ -8,7 +8,7 @@ import uuid from collections import defaultdict from contextlib import suppress -from datetime import datetime +from datetime import datetime, timezone from functools import cached_property from pathlib import Path from warnings import warn @@ -135,7 +135,7 @@ def get_process_timestamp(process: psutil.Process) -> str: Returns: A ISO 8601 timestamp formatted string. """ - return f"{datetime.utcfromtimestamp(process.create_time()).isoformat()}Z" + return f"{datetime.fromtimestamp(process.create_time(), tz=timezone.utc).isoformat()}Z" # noqa: E501 @cached_property def process_info(self) -> dict[str, t.Any]: diff --git a/src/meltano/core/tracking/tracker.py b/src/meltano/core/tracking/tracker.py index 133aa6154d..482b5cb2be 100644 --- a/src/meltano/core/tracking/tracker.py +++ b/src/meltano/core/tracking/tracker.py @@ -11,7 +11,7 @@ import uuid from collections.abc import Mapping from contextlib import contextmanager, suppress -from datetime import datetime +from datetime import datetime, timezone from enum import Enum, auto from pathlib import Path from urllib.parse import urlparse @@ -534,12 +534,12 @@ def track_exit_event(self): return cli.exit_code_reported = True - start_time = datetime.utcfromtimestamp(Process().create_time()) + start_time = datetime.fromtimestamp(Process().create_time(), tz=timezone.utc) # This is the reported "end time" for this process, though in reality # the process will end a short time after this time as it takes time # to emit the event. - now = datetime.utcnow() + now = datetime.now(timezone.utc) self.track_unstruct_event( SelfDescribingJson( diff --git a/src/meltano/core/utils/__init__.py b/src/meltano/core/utils/__init__.py index e6f9948d30..0327f91570 100644 --- a/src/meltano/core/utils/__init__.py +++ b/src/meltano/core/utils/__init__.py @@ -18,7 +18,7 @@ import unicodedata from contextlib import suppress from copy import copy, deepcopy -from datetime import date, datetime, time +from datetime import date, datetime, time, timezone from enum import IntEnum from functools import reduce from operator import setitem @@ -360,7 +360,9 @@ def iso8601_datetime(d): for format_string in isoformats: with suppress(ValueError): - return coerce_datetime(datetime.strptime(d, format_string)) + return coerce_datetime( + datetime.strptime(d, format_string).replace(tzinfo=timezone.utc), + ) raise ValueError(f"{d} is not a valid UTC date.") diff --git a/tests/fixtures/core.py b/tests/fixtures/core.py index 2aa199fbb3..2a2a1bbb8e 100644 --- a/tests/fixtures/core.py +++ b/tests/fixtures/core.py @@ -2025,7 +2025,7 @@ def elt_schedule( loader=target.name, transform="skip", interval="@daily", - start_date=datetime.datetime.now(), + start_date=datetime.datetime.now(datetime.timezone.utc), ) except ScheduleAlreadyExistsError as err: return err.schedule @@ -2203,7 +2203,12 @@ def state_ids( def mock_time(): def _mock_time(): for idx in itertools.count(): # noqa: WPS526 - yield datetime.datetime(1, 1, 1) + datetime.timedelta(hours=idx) + yield datetime.datetime( + 1, + 1, + 1, + tzinfo=datetime.timezone.utc, + ) + datetime.timedelta(hours=idx) return _mock_time() diff --git a/tests/meltano/core/job/test_finder.py b/tests/meltano/core/job/test_finder.py index f4c6d1d9ff..850b439ddb 100644 --- a/tests/meltano/core/job/test_finder.py +++ b/tests/meltano/core/job/test_finder.py @@ -1,6 +1,6 @@ from __future__ import annotations -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import pytest @@ -35,13 +35,13 @@ def test_all_stale( is_stale, session, ): - started_at = datetime.utcnow() + started_at = datetime.now(timezone.utc) if started_at_hours_ago: started_at -= timedelta(hours=started_at_hours_ago) last_heartbeat_at = None if last_heartbeat_at_minutes_ago is not None: - last_heartbeat_at = datetime.utcnow() - timedelta( + last_heartbeat_at = datetime.now(timezone.utc) - timedelta( minutes=last_heartbeat_at_minutes_ago, ) @@ -60,7 +60,7 @@ def test_all_stale( def test_stale(self, session): job = Job(job_name="test") job.start() - job.last_heartbeat_at = datetime.utcnow() - timedelta(minutes=10) + job.last_heartbeat_at = datetime.now(timezone.utc) - timedelta(minutes=10) job.save(session) assert job in JobFinder(state_id=job.job_name).stale(session) diff --git a/tests/meltano/core/job/test_job.py b/tests/meltano/core/job/test_job.py index a840273447..68cbe8d316 100644 --- a/tests/meltano/core/job/test_job.py +++ b/tests/meltano/core/job/test_job.py @@ -4,7 +4,7 @@ import platform import signal import uuid -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import pytest @@ -43,11 +43,11 @@ def test_transit(self, session): transition = subject.transit(State.RUNNING) assert transition == (State.IDLE, State.RUNNING) - subject.started_at = datetime.utcnow() + subject.started_at = datetime.now(timezone.utc) transition = subject.transit(State.SUCCESS) assert transition == (State.RUNNING, State.SUCCESS) - subject.ended_at = datetime.utcnow() + subject.ended_at = datetime.now(timezone.utc) @pytest.mark.asyncio() async def test_run(self, session): @@ -146,7 +146,7 @@ def test_is_stale(self): # Jobs started more than 25 hours ago without a heartbeat are stale offset = timedelta(hours=HEARTBEATLESS_JOB_VALID_HOURS + 1) - job.started_at = datetime.utcnow() - offset + job.started_at = datetime.now(timezone.utc) - offset assert job.is_stale() # Jobs with a recent heartbeat are not stale @@ -155,7 +155,7 @@ def test_is_stale(self): # Jobs without a heartbeat for 5 minutes are stale offset = timedelta(minutes=HEARTBEAT_VALID_MINUTES + 1) - job.last_heartbeat_at = datetime.utcnow() - offset + job.last_heartbeat_at = datetime.now(timezone.utc) - offset assert job.is_stale() # Completed jobs are not stale @@ -172,7 +172,7 @@ def test_fail_stale(self): # Fails a stale job without a heartbeat job.start() offset = timedelta(hours=HEARTBEATLESS_JOB_VALID_HOURS + 1) - job.started_at = datetime.utcnow() - offset + job.started_at = datetime.now(timezone.utc) - offset assert job.fail_stale() assert job.has_error() @@ -185,7 +185,7 @@ def test_fail_stale(self): job = Job() job.start() offset = timedelta(minutes=HEARTBEAT_VALID_MINUTES + 1) - job.last_heartbeat_at = datetime.utcnow() - offset + job.last_heartbeat_at = datetime.now(timezone.utc) - offset assert job.fail_stale() assert job.has_error() diff --git a/tests/meltano/core/job/test_stale_job_failer.py b/tests/meltano/core/job/test_stale_job_failer.py index ed664edec8..13b30a76a8 100644 --- a/tests/meltano/core/job/test_stale_job_failer.py +++ b/tests/meltano/core/job/test_stale_job_failer.py @@ -1,6 +1,6 @@ from __future__ import annotations -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import pytest @@ -21,7 +21,7 @@ def live_job(self, session): def stale_job(self, session): job = Job(job_name="test") job.start() - job.last_heartbeat_at = datetime.utcnow() - timedelta(minutes=10) + job.last_heartbeat_at = datetime.now(timezone.utc) - timedelta(minutes=10) job.save(session) return job @@ -30,7 +30,7 @@ def stale_job(self, session): def other_stale_job(self, session): job = Job(job_name="other") job.start() - job.last_heartbeat_at = datetime.utcnow() - timedelta(minutes=10) + job.last_heartbeat_at = datetime.now(timezone.utc) - timedelta(minutes=10) job.save(session) return job diff --git a/tests/meltano/core/plugin/test_plugin_settings.py b/tests/meltano/core/plugin/test_plugin_settings.py index 7b122a6125..49286cdc79 100644 --- a/tests/meltano/core/plugin/test_plugin_settings.py +++ b/tests/meltano/core/plugin/test_plugin_settings.py @@ -1,7 +1,7 @@ from __future__ import annotations import platform -from datetime import date, datetime +from datetime import datetime, timezone import dotenv import pytest @@ -702,11 +702,11 @@ def test_custom_setting(self, session, subject, env_var): ) def test_date_values(self, subject, monkeypatch): - today = date.today() + today = datetime.now(timezone.utc).date() monkeypatch.setitem(subject.plugin.config, "start_date", today) assert subject.get("start_date") == today.isoformat() - now = datetime.now() + now = datetime.now(timezone.utc) monkeypatch.setitem(subject.plugin.config, "start_date", now) assert subject.get("start_date") == now.isoformat() diff --git a/tests/meltano/core/test_project_files.py b/tests/meltano/core/test_project_files.py index d0eae72506..f799b7b924 100644 --- a/tests/meltano/core/test_project_files.py +++ b/tests/meltano/core/test_project_files.py @@ -83,7 +83,14 @@ def test_resolve_subfiles(self, project_files): "loader": "target-meltano-yml", "transform": "skip", "interval": "@daily", - "start_date": datetime.datetime(2020, 8, 5, 0, 0), # noqa: WPS432 + "start_date": datetime.datetime( + 2020, + 8, + 5, + 0, + 0, + tzinfo=datetime.timezone.utc, + ), }, ], "environments": [ @@ -182,7 +189,12 @@ def test_load(self, project_files): "extractor": "tap-meltano-yml", "loader": "target-meltano-yml", "transform": "skip", - "start_date": datetime.datetime(2020, 8, 5), # noqa: WPS432 + "start_date": datetime.datetime( + 2020, + 8, + 5, + tzinfo=datetime.timezone.utc, + ), "interval": "@daily", }, { @@ -190,7 +202,12 @@ def test_load(self, project_files): "extractor": "tap-subconfig-2-yml", "loader": "target-subconfig-2-yml", "transform": "skip", - "start_date": datetime.datetime(2020, 8, 4), # noqa: WPS432 + "start_date": datetime.datetime( + 2020, + 8, + 4, + tzinfo=datetime.timezone.utc, + ), "interval": "@daily", }, { @@ -198,7 +215,12 @@ def test_load(self, project_files): "extractor": "tap-subconfig-1-yml", "loader": "target-subconfig-1-yml", "transform": "skip", - "start_date": datetime.datetime(2020, 8, 6), # noqa: WPS432 + "start_date": datetime.datetime( + 2020, + 8, + 6, + tzinfo=datetime.timezone.utc, + ), "interval": "@daily", }, ], @@ -279,7 +301,12 @@ def test_update(self, project_files): "interval": "@daily", "loader": "target-meltano-yml", "name": "modified-test-meltano-yml", - "start_date": datetime.datetime(2020, 8, 5), # noqa: WPS432 + "start_date": datetime.datetime( + 2020, + 8, + 5, + tzinfo=datetime.timezone.utc, + ), # noqa: WPS432 "transform": "skip", }, { @@ -287,7 +314,12 @@ def test_update(self, project_files): "interval": "@daily", "loader": "target-subconfig-2-yml", "name": "test-subconfig-2-yml", - "start_date": datetime.datetime(2020, 8, 4), # noqa: WPS432 + "start_date": datetime.datetime( + 2020, + 8, + 4, + tzinfo=datetime.timezone.utc, + ), # noqa: WPS432 "transform": "skip", }, { @@ -295,7 +327,12 @@ def test_update(self, project_files): "interval": "@daily", "loader": "target-subconfig-1-yml", "name": "test-subconfig-1-yml", - "start_date": datetime.datetime(2020, 8, 6), # noqa: WPS432 + "start_date": datetime.datetime( + 2020, + 8, + 6, + tzinfo=datetime.timezone.utc, + ), # noqa: WPS432 "transform": "skip", }, ], diff --git a/tests/meltano/core/test_schedule_service.py b/tests/meltano/core/test_schedule_service.py index f94419d449..1484a8d977 100644 --- a/tests/meltano/core/test_schedule_service.py +++ b/tests/meltano/core/test_schedule_service.py @@ -1,7 +1,7 @@ from __future__ import annotations import platform -from datetime import datetime +from datetime import datetime, timezone import mock import pytest @@ -180,7 +180,7 @@ def add_elt(name, start_date): start_date=start_date, ) - mock_date = datetime(2002, 1, 1) # noqa: WPS432 + mock_date = datetime(2002, 1, 1, tzinfo=timezone.utc) # noqa: WPS432 # when a start_date is set, the schedule should use it schedule = add_elt("with_start_date", mock_date)