Skip to content

Commit

Permalink
refactor: Use timezone-aware datetime objects
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Nov 13, 2023
1 parent b573fba commit 2cc63c2
Show file tree
Hide file tree
Showing 17 changed files with 112 additions and 58 deletions.
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -573,21 +573,21 @@ ignore = [
]
select = [
"ARG", # flake8-unused-arguments
"S", # flake8-bandit
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"COM", # flake8-commas
"DTZ", # flake8-datetimez
"E", # pycodestyle (error)
"F", # pyflakes
"I", # isort
"ISC", # flake8-implicit-str-concat
"PT", # flake8-pytest-style
"RSE", # flake8-raise
"S", # flake8-bandit
"SIM", # flake8-simplify
"TID", # flake8-tidy-imports
"UP", # pyupgrade
"W", # pycodestyle (warning)
"TID", # flake8-tidy-imports
]

[tool.ruff.per-file-ignores]
Expand Down
9 changes: 5 additions & 4 deletions src/meltano/cli/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import re
import typing as t
from datetime import datetime as dt
from datetime import timezone as tz
from functools import partial, reduce
from operator import xor

Expand Down Expand Up @@ -167,7 +168,7 @@ def copy_state(

logger.info(
f"State for {dst_state_id} was successfully copied from "
f"{src_state_id} at {dt.utcnow():%Y-%m-%d %H:%M:%S}.", # noqa: WPS323
f"{src_state_id} at {dt.now(tz=tz.utc):%Y-%m-%d %H:%M:%S}.", # noqa: WPS323
)


Expand Down Expand Up @@ -197,7 +198,7 @@ def move_state(

logger.info(
f"State for {src_state_id} was successfully moved to {dst_state_id} "
f"at {dt.utcnow():%Y-%m-%d %H:%M:%S}.", # noqa: WPS323
f"at {dt.now(tz=tz.utc):%Y-%m-%d %H:%M:%S}.", # noqa: WPS323
)


Expand Down Expand Up @@ -248,7 +249,7 @@ def merge_state(
state_service.merge_state(from_state_id, state_id)
logger.info(
f"State for {state_id} was successfully "
f"merged at {dt.utcnow():%Y-%m-%d %H:%M:%S}.", # noqa: WPS323
f"merged at {dt.now(tz=tz.utc):%Y-%m-%d %H:%M:%S}.", # noqa: WPS323
)


Expand Down Expand Up @@ -289,7 +290,7 @@ def set_state(
state_service.set_state(state_id, state)
logger.info(
f"State for {state_id} was successfully set "
f"at {dt.utcnow():%Y-%m-%d %H:%M:%S}.", # noqa: WPS323
f"at {dt.now(tz=tz.utc):%Y-%m-%d %H:%M:%S}.", # noqa: WPS323
)


Expand Down
4 changes: 2 additions & 2 deletions src/meltano/core/job/finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from __future__ import annotations

from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone

from .job import HEARTBEAT_VALID_MINUTES, HEARTBEATLESS_JOB_VALID_HOURS, Job, State

Expand Down Expand Up @@ -141,7 +141,7 @@ def all_stale(cls, session):
Returns:
All stale states with any state ID
"""
now = datetime.utcnow()
now = datetime.now(tz=timezone.utc)
last_valid_heartbeat_at = now - timedelta(minutes=HEARTBEAT_VALID_MINUTES)
last_valid_started_at = now - timedelta(hours=HEARTBEATLESS_JOB_VALID_HOURS)

Expand Down
13 changes: 7 additions & 6 deletions src/meltano/core/job/job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Defines Job model class."""

from __future__ import annotations

import asyncio
Expand All @@ -7,7 +8,7 @@
import typing as t
import uuid
from contextlib import asynccontextmanager, contextmanager, suppress
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from enum import Enum
from enum import IntFlag as EnumIntFlag

Expand Down Expand Up @@ -184,7 +185,7 @@ def is_stale(self):
timestamp = self.started_at
valid_for = timedelta(hours=HEARTBEATLESS_JOB_VALID_HOURS)

return datetime.utcnow() - timestamp > valid_for
return datetime.now(timezone.utc) - timestamp > valid_for

def has_error(self):
"""Return whether a job has failed.
Expand Down Expand Up @@ -280,7 +281,7 @@ async def run(self, session):

def start(self):
"""Mark the job has having started."""
self.started_at = datetime.utcnow()
self.started_at = datetime.now(timezone.utc)
self.transit(State.RUNNING)

def fail(self, error=None):
Expand All @@ -289,14 +290,14 @@ def fail(self, error=None):
Args:
error: the error to associate with the job's failure
"""
self.ended_at = datetime.utcnow()
self.ended_at = datetime.now(timezone.utc)
self.transit(State.FAIL)
if error:
self.payload.update({"error": str(error)})

def success(self):
"""Mark the job as having succeeded."""
self.ended_at = datetime.utcnow()
self.ended_at = datetime.now(timezone.utc)
self.transit(State.SUCCESS)

def fail_stale(self):
Expand Down Expand Up @@ -345,7 +346,7 @@ def save(self, session):

def _heartbeat(self):
"""Update last_heartbeat_at for this job in the db."""
self.last_heartbeat_at = datetime.utcnow()
self.last_heartbeat_at = datetime.now(timezone.utc)

async def _heartbeater(self, session):
"""Heartbeat to the db every second.
Expand Down
6 changes: 4 additions & 2 deletions src/meltano/core/plugin/singer/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import json
import logging
from datetime import datetime
from datetime import datetime, timezone

from meltano.core.behavior.hookable import hook
from meltano.core.job import Job, Payload
Expand Down Expand Up @@ -80,7 +80,9 @@ def writeline(self, line: str):
"incremental state has not been updated",
)
else:
logger.info(f"Incremental state has been updated at {datetime.utcnow()}.")
logger.info(
f"Incremental state has been updated at {datetime.now(tz=timezone.utc)}.", # noqa: E501
)
logger.debug(f"Incremental state: {new_state}")


Expand Down
4 changes: 2 additions & 2 deletions src/meltano/core/schedule_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import logging
import subprocess
from datetime import date, datetime
from datetime import date, datetime, timezone

from croniter import croniter

Expand Down Expand Up @@ -198,7 +198,7 @@ def default_start_date(self, session, extractor: str) -> datetime:
if isinstance(start_date, datetime):
return start_date

return iso8601_datetime(start_date) or datetime.utcnow()
return iso8601_datetime(start_date) or datetime.now(tz=timezone.utc)

def add_schedule(self, schedule: Schedule) -> Schedule:
"""Add a schedule to the project.
Expand Down
16 changes: 11 additions & 5 deletions src/meltano/core/state_store/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from base64 import b64decode, b64encode
from collections.abc import Iterator
from contextlib import contextmanager
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from functools import reduce
from io import TextIOWrapper
from pathlib import Path
Expand Down Expand Up @@ -211,9 +211,15 @@ def is_locked(self, state_id: str) -> bool:
lock_path = self.get_lock_path(state_id)
try:
with self.get_reader(lock_path) as reader:
locked_at = datetime.fromtimestamp(float(reader.read()))
if locked_at and locked_at < datetime.utcnow() - timedelta(
seconds=self.lock_timeout_seconds,
locked_at = datetime.fromtimestamp(
float(reader.read()),
tz=timezone.utc,
)
if locked_at and locked_at < (
datetime.now(timezone.utc)
- timedelta(
seconds=self.lock_timeout_seconds,
)
):
self.delete(lock_path)
return False
Expand Down Expand Up @@ -252,7 +258,7 @@ def acquire_lock(self, state_id: str, retry_seconds: int = 1) -> Iterator[None]:
while self.is_locked(state_id):
sleep(retry_seconds)
with self.get_writer(lock_path) as writer:
writer.write(str(datetime.utcnow().timestamp()))
writer.write(str(datetime.now(timezone.utc).timestamp()))
yield
finally:
self.delete(lock_path)
Expand Down
4 changes: 2 additions & 2 deletions src/meltano/core/tracking/contexts/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import uuid
from collections import defaultdict
from contextlib import suppress
from datetime import datetime
from datetime import datetime, timezone
from functools import cached_property
from pathlib import Path
from warnings import warn
Expand Down Expand Up @@ -135,7 +135,7 @@ def get_process_timestamp(process: psutil.Process) -> str:
Returns:
A ISO 8601 timestamp formatted string.
"""
return f"{datetime.utcfromtimestamp(process.create_time()).isoformat()}Z"
return f"{datetime.fromtimestamp(process.create_time(), tz=timezone.utc).isoformat()}Z" # noqa: E501

@cached_property
def process_info(self) -> dict[str, t.Any]:
Expand Down
6 changes: 3 additions & 3 deletions src/meltano/core/tracking/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import uuid
from collections.abc import Mapping
from contextlib import contextmanager, suppress
from datetime import datetime
from datetime import datetime, timezone
from enum import Enum, auto
from pathlib import Path
from urllib.parse import urlparse
Expand Down Expand Up @@ -534,12 +534,12 @@ def track_exit_event(self):
return
cli.exit_code_reported = True

start_time = datetime.utcfromtimestamp(Process().create_time())
start_time = datetime.fromtimestamp(Process().create_time(), tz=timezone.utc)

# This is the reported "end time" for this process, though in reality
# the process will end a short time after this time as it takes time
# to emit the event.
now = datetime.utcnow()
now = datetime.now(timezone.utc)

self.track_unstruct_event(
SelfDescribingJson(
Expand Down
6 changes: 4 additions & 2 deletions src/meltano/core/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import unicodedata
from contextlib import suppress
from copy import copy, deepcopy
from datetime import date, datetime, time
from datetime import date, datetime, time, timezone
from enum import IntEnum
from functools import reduce
from operator import setitem
Expand Down Expand Up @@ -360,7 +360,9 @@ def iso8601_datetime(d):

for format_string in isoformats:
with suppress(ValueError):
return coerce_datetime(datetime.strptime(d, format_string))
return coerce_datetime(
datetime.strptime(d, format_string).replace(tzinfo=timezone.utc),
)
raise ValueError(f"{d} is not a valid UTC date.")


Expand Down
9 changes: 7 additions & 2 deletions tests/fixtures/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2025,7 +2025,7 @@ def elt_schedule(
loader=target.name,
transform="skip",
interval="@daily",
start_date=datetime.datetime.now(),
start_date=datetime.datetime.now(datetime.timezone.utc),
)
except ScheduleAlreadyExistsError as err:
return err.schedule
Expand Down Expand Up @@ -2203,7 +2203,12 @@ def state_ids(
def mock_time():
def _mock_time():
for idx in itertools.count(): # noqa: WPS526
yield datetime.datetime(1, 1, 1) + datetime.timedelta(hours=idx)
yield datetime.datetime(
1,
1,
1,
tzinfo=datetime.timezone.utc,
) + datetime.timedelta(hours=idx)

return _mock_time()

Expand Down
8 changes: 4 additions & 4 deletions tests/meltano/core/job/test_finder.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone

import pytest

Expand Down Expand Up @@ -35,13 +35,13 @@ def test_all_stale(
is_stale,
session,
):
started_at = datetime.utcnow()
started_at = datetime.now(timezone.utc)
if started_at_hours_ago:
started_at -= timedelta(hours=started_at_hours_ago)

last_heartbeat_at = None
if last_heartbeat_at_minutes_ago is not None:
last_heartbeat_at = datetime.utcnow() - timedelta(
last_heartbeat_at = datetime.now(timezone.utc) - timedelta(
minutes=last_heartbeat_at_minutes_ago,
)

Expand All @@ -60,7 +60,7 @@ def test_all_stale(
def test_stale(self, session):
job = Job(job_name="test")
job.start()
job.last_heartbeat_at = datetime.utcnow() - timedelta(minutes=10)
job.last_heartbeat_at = datetime.now(timezone.utc) - timedelta(minutes=10)
job.save(session)

assert job in JobFinder(state_id=job.job_name).stale(session)
Expand Down
14 changes: 7 additions & 7 deletions tests/meltano/core/job/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import platform
import signal
import uuid
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone

import pytest

Expand Down Expand Up @@ -43,11 +43,11 @@ def test_transit(self, session):

transition = subject.transit(State.RUNNING)
assert transition == (State.IDLE, State.RUNNING)
subject.started_at = datetime.utcnow()
subject.started_at = datetime.now(timezone.utc)

transition = subject.transit(State.SUCCESS)
assert transition == (State.RUNNING, State.SUCCESS)
subject.ended_at = datetime.utcnow()
subject.ended_at = datetime.now(timezone.utc)

@pytest.mark.asyncio()
async def test_run(self, session):
Expand Down Expand Up @@ -146,7 +146,7 @@ def test_is_stale(self):

# Jobs started more than 25 hours ago without a heartbeat are stale
offset = timedelta(hours=HEARTBEATLESS_JOB_VALID_HOURS + 1)
job.started_at = datetime.utcnow() - offset
job.started_at = datetime.now(timezone.utc) - offset
assert job.is_stale()

# Jobs with a recent heartbeat are not stale
Expand All @@ -155,7 +155,7 @@ def test_is_stale(self):

# Jobs without a heartbeat for 5 minutes are stale
offset = timedelta(minutes=HEARTBEAT_VALID_MINUTES + 1)
job.last_heartbeat_at = datetime.utcnow() - offset
job.last_heartbeat_at = datetime.now(timezone.utc) - offset
assert job.is_stale()

# Completed jobs are not stale
Expand All @@ -172,7 +172,7 @@ def test_fail_stale(self):
# Fails a stale job without a heartbeat
job.start()
offset = timedelta(hours=HEARTBEATLESS_JOB_VALID_HOURS + 1)
job.started_at = datetime.utcnow() - offset
job.started_at = datetime.now(timezone.utc) - offset

assert job.fail_stale()
assert job.has_error()
Expand All @@ -185,7 +185,7 @@ def test_fail_stale(self):
job = Job()
job.start()
offset = timedelta(minutes=HEARTBEAT_VALID_MINUTES + 1)
job.last_heartbeat_at = datetime.utcnow() - offset
job.last_heartbeat_at = datetime.now(timezone.utc) - offset

assert job.fail_stale()
assert job.has_error()
Expand Down

0 comments on commit 2cc63c2

Please sign in to comment.