Skip to content

Commit

Permalink
refactor: Use timezone-aware datetime objects
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Feb 2, 2024
1 parent cab5812 commit cef07b4
Show file tree
Hide file tree
Showing 21 changed files with 220 additions and 67 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ select = [
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"COM", # flake8-commas
"DTZ", # flake8-datetimez
"E", # pycodestyle (error)
"ERA", # flake8-eradicate
"F", # pyflakes
Expand Down
9 changes: 5 additions & 4 deletions src/meltano/cli/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import re
import typing as t
from datetime import datetime as dt
from datetime import timezone as tz
from functools import partial, reduce
from operator import xor

Expand Down Expand Up @@ -167,7 +168,7 @@ def copy_state(

logger.info(
f"State for {dst_state_id} was successfully copied from " # noqa: G004
f"{src_state_id} at {dt.utcnow():%Y-%m-%d %H:%M:%S}.", # noqa: WPS323
f"{src_state_id} at {dt.now(tz=tz.utc):%Y-%m-%d %H:%M:%S%z}.", # noqa: WPS323
)


Expand Down Expand Up @@ -197,7 +198,7 @@ def move_state(

logger.info(
f"State for {src_state_id} was successfully moved to {dst_state_id} " # noqa: G004
f"at {dt.utcnow():%Y-%m-%d %H:%M:%S}.", # noqa: WPS323
f"at {dt.now(tz=tz.utc):%Y-%m-%d %H:%M:%S%z}.", # noqa: WPS323
)


Expand Down Expand Up @@ -248,7 +249,7 @@ def merge_state(
state_service.merge_state(from_state_id, state_id)
logger.info(
f"State for {state_id} was successfully " # noqa: G004
f"merged at {dt.utcnow():%Y-%m-%d %H:%M:%S}.", # noqa: WPS323
f"merged at {dt.now(tz=tz.utc):%Y-%m-%d %H:%M:%S%z}.", # noqa: WPS323
)


Expand Down Expand Up @@ -289,7 +290,7 @@ def set_state(
state_service.set_state(state_id, state)
logger.info(
f"State for {state_id} was successfully set " # noqa: G004
f"at {dt.utcnow():%Y-%m-%d %H:%M:%S}.", # noqa: WPS323
f"at {dt.now(tz=tz.utc):%Y-%m-%d %H:%M:%S%z}.", # noqa: WPS323
)


Expand Down
4 changes: 2 additions & 2 deletions src/meltano/core/job/finder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from __future__ import annotations

from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone

from .job import HEARTBEAT_VALID_MINUTES, HEARTBEATLESS_JOB_VALID_HOURS, Job, State

Expand Down Expand Up @@ -141,7 +141,7 @@ def all_stale(cls, session):
Returns:
All stale states with any state ID
"""
now = datetime.utcnow()
now = datetime.now(tz=timezone.utc)
last_valid_heartbeat_at = now - timedelta(minutes=HEARTBEAT_VALID_MINUTES)
last_valid_started_at = now - timedelta(hours=HEARTBEATLESS_JOB_VALID_HOURS)

Expand Down
29 changes: 19 additions & 10 deletions src/meltano/core/job/job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Defines Job model class."""

from __future__ import annotations

import asyncio
Expand All @@ -7,7 +8,7 @@
import typing as t
import uuid
from contextlib import asynccontextmanager, contextmanager, suppress
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from enum import Enum
from enum import IntFlag as EnumIntFlag

Expand All @@ -18,7 +19,13 @@

from meltano.core.error import Error
from meltano.core.models import SystemModel
from meltano.core.sqlalchemy import GUIDType, IntFlag, IntPK, JSONEncodedDict
from meltano.core.sqlalchemy import (
DateTimeUTC,
GUIDType,
IntFlag,
IntPK,
JSONEncodedDict,
)

HEARTBEATLESS_JOB_VALID_HOURS = 24
HEARTBEAT_VALID_MINUTES = 5
Expand Down Expand Up @@ -105,9 +112,11 @@ class Job(SystemModel): # noqa: WPS214
job_name: Mapped[t.Optional[str]] # noqa: UP007
run_id: Mapped[GUIDType]
_state: Mapped[t.Optional[str]] = mapped_column(name="state") # noqa: UP007
started_at: Mapped[t.Optional[datetime]] # noqa: UP007
last_heartbeat_at: Mapped[t.Optional[datetime]] # noqa: UP007
ended_at: Mapped[t.Optional[datetime]] # noqa: UP007
started_at: Mapped[t.Optional[datetime]] = mapped_column(DateTimeUTC) # noqa: UP007
last_heartbeat_at: Mapped[t.Optional[datetime]] = mapped_column( # noqa: UP007
DateTimeUTC,
)
ended_at: Mapped[t.Optional[datetime]] = mapped_column(DateTimeUTC) # noqa: UP007
payload: Mapped[dict] = mapped_column(MutableDict.as_mutable(JSONEncodedDict))
payload_flags: Mapped[Payload] = mapped_column(IntFlag, default=0)
trigger: Mapped[t.Optional[str]] = mapped_column( # noqa: UP007
Expand Down Expand Up @@ -184,7 +193,7 @@ def is_stale(self):
timestamp = self.started_at
valid_for = timedelta(hours=HEARTBEATLESS_JOB_VALID_HOURS)

return datetime.utcnow() - timestamp > valid_for
return datetime.now(timezone.utc) - timestamp > valid_for

def has_error(self):
"""Return whether a job has failed.
Expand Down Expand Up @@ -280,7 +289,7 @@ async def run(self, session):

def start(self):
"""Mark the job has having started."""
self.started_at = datetime.utcnow()
self.started_at = datetime.now(timezone.utc)
self.transit(State.RUNNING)

def fail(self, error=None):
Expand All @@ -289,14 +298,14 @@ def fail(self, error=None):
Args:
error: the error to associate with the job's failure
"""
self.ended_at = datetime.utcnow()
self.ended_at = datetime.now(timezone.utc)
self.transit(State.FAIL)
if error:
self.payload.update({"error": str(error)})

def success(self):
"""Mark the job as having succeeded."""
self.ended_at = datetime.utcnow()
self.ended_at = datetime.now(timezone.utc)
self.transit(State.SUCCESS)

def fail_stale(self):
Expand Down Expand Up @@ -345,7 +354,7 @@ def save(self, session):

def _heartbeat(self):
"""Update last_heartbeat_at for this job in the db."""
self.last_heartbeat_at = datetime.utcnow()
self.last_heartbeat_at = datetime.now(timezone.utc)

async def _heartbeater(self, session):
"""Heartbeat to the db every second.
Expand Down
6 changes: 4 additions & 2 deletions src/meltano/core/plugin/singer/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import json
import logging
import typing as t
from datetime import datetime
from datetime import datetime, timezone

from meltano.core.behavior.hookable import hook
from meltano.core.job import Job, Payload
Expand Down Expand Up @@ -86,7 +86,9 @@ def writeline(self, line: str):
"incremental state has not been updated",
)
else:
logger.info(f"Incremental state has been updated at {datetime.utcnow()}.") # noqa: G004
logger.info(
f"Incremental state has been updated at {datetime.now(tz=timezone.utc)}.", # noqa: E501, G004
)
logger.debug(f"Incremental state: {new_state}") # noqa: G004


Expand Down
4 changes: 2 additions & 2 deletions src/meltano/core/schedule_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import logging
import typing as t
from datetime import date, datetime
from datetime import date, datetime, timezone

from croniter import croniter

Expand Down Expand Up @@ -202,7 +202,7 @@ def default_start_date(self, session, extractor: str) -> datetime:
if isinstance(start_date, datetime):
return start_date

return iso8601_datetime(start_date) or datetime.utcnow()
return iso8601_datetime(start_date) or datetime.now(tz=timezone.utc)

def add_schedule(self, schedule: Schedule) -> Schedule:
"""Add a schedule to the project.
Expand Down
39 changes: 38 additions & 1 deletion src/meltano/core/sqlalchemy.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from __future__ import annotations

import datetime
import json
import typing as t
import uuid

from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.orm import mapped_column
from sqlalchemy.types import CHAR, INTEGER, VARCHAR, TypeDecorator
from sqlalchemy.types import CHAR, INTEGER, VARCHAR, DateTime, TypeDecorator
from typing_extensions import Annotated


Expand Down Expand Up @@ -94,6 +95,42 @@ def process_result_value( # noqa: D102
return value


class DateTimeUTC(TypeDecorator):
"""Parses datetimes timezone-aware and stores them as UTC."""

impl = DateTime
cache_ok = True

def process_bind_param(self, value: datetime.datetime | None, _dialect: str):
"""Convert the datetime value to UTC and remove the timezone.
Args:
value: The datetime value to convert.
Returns:
The converted datetime value.
"""
if value is None:
return None

if value.tzinfo:
value = value.astimezone(datetime.timezone.utc)
return value.replace(tzinfo=None)

def process_result_value(self, value: datetime.datetime | None, _dialect: str):
"""Convert the naive datetime value to UTC.
Args:
value: The datetime value to convert.
Returns:
The converted datetime value.
"""
if value is not None:
value = value.replace(tzinfo=datetime.timezone.utc)
return value


GUIDType = Annotated[uuid.UUID, mapped_column(GUID, default=uuid.uuid4)]
StateType = Annotated[
t.Dict[str, str],
Expand Down
16 changes: 11 additions & 5 deletions src/meltano/core/state_store/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from abc import abstractmethod, abstractproperty
from base64 import b64decode, b64encode
from contextlib import contextmanager
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
from functools import reduce
from pathlib import Path
from time import sleep
Expand Down Expand Up @@ -214,9 +214,15 @@ def is_locked(self, state_id: str) -> bool:
lock_path = self.get_lock_path(state_id)
try:
with self.get_reader(lock_path) as reader:
locked_at = datetime.fromtimestamp(float(reader.read()))
if locked_at and locked_at < datetime.utcnow() - timedelta(
seconds=self.lock_timeout_seconds,
locked_at = datetime.fromtimestamp(
float(reader.read()),
tz=timezone.utc,
)
if locked_at and locked_at < (
datetime.now(timezone.utc)
- timedelta(
seconds=self.lock_timeout_seconds,
)
):
self.delete(lock_path)
return False
Expand Down Expand Up @@ -255,7 +261,7 @@ def acquire_lock(self, state_id: str, retry_seconds: int = 1) -> Iterator[None]:
while self.is_locked(state_id):
sleep(retry_seconds)
with self.get_writer(lock_path) as writer:
writer.write(str(datetime.utcnow().timestamp()))
writer.write(str(datetime.now(timezone.utc).timestamp()))
yield
finally:
self.delete(lock_path)
Expand Down
4 changes: 2 additions & 2 deletions src/meltano/core/tracking/contexts/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import uuid
from collections import defaultdict
from contextlib import suppress
from datetime import datetime
from datetime import datetime, timezone
from functools import cached_property
from pathlib import Path
from warnings import warn
Expand Down Expand Up @@ -135,7 +135,7 @@ def get_process_timestamp(process: psutil.Process) -> str:
Returns:
A ISO 8601 timestamp formatted string.
"""
return f"{datetime.utcfromtimestamp(process.create_time()).isoformat()}Z"
return f"{datetime.fromtimestamp(process.create_time(), tz=timezone.utc).isoformat()}" # noqa: E501

@cached_property
def process_info(self) -> dict[str, t.Any]:
Expand Down
8 changes: 4 additions & 4 deletions src/meltano/core/tracking/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import typing as t
import uuid
from contextlib import contextmanager, suppress
from datetime import datetime
from datetime import datetime, timezone
from enum import Enum, auto
from urllib.parse import urlparse
from warnings import warn
Expand Down Expand Up @@ -535,19 +535,19 @@ def track_exit_event(self):
return
cli.exit_code_reported = True

start_time = datetime.utcfromtimestamp(Process().create_time())
start_time = datetime.fromtimestamp(Process().create_time(), tz=timezone.utc)

# This is the reported "end time" for this process, though in reality
# the process will end a short time after this time as it takes time
# to emit the event.
now = datetime.utcnow()
now = datetime.now(timezone.utc)

self.track_unstruct_event(
SelfDescribingJson(
ExitEventSchema.url,
{
"exit_code": cli.exit_code,
"exit_timestamp": f"{now.isoformat()}Z",
"exit_timestamp": f"{now.isoformat()}",
"process_duration_microseconds": int(
(now - start_time).total_seconds() * MICROSECONDS_PER_SECOND,
),
Expand Down
9 changes: 6 additions & 3 deletions src/meltano/core/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import unicodedata
from contextlib import suppress
from copy import copy, deepcopy
from datetime import date, datetime, time
from datetime import date, datetime, time, timezone
from enum import IntEnum
from functools import reduce
from operator import setitem
Expand Down Expand Up @@ -347,20 +347,23 @@ def iso8601_datetime(d: str) -> datetime:
... # noqa: WPS428


def iso8601_datetime(d):
def iso8601_datetime(d: str | None):
if d is None:
return None

isoformats = [
"%Y-%m-%dT%H:%M:%SZ", # noqa: WPS323
"%Y-%m-%dT%H:%M:%S+00:00", # noqa: WPS323
"%Y-%m-%dT%H:%M:%S", # noqa: WPS323
"%Y-%m-%d %H:%M:%S", # noqa: WPS323
"%Y-%m-%d", # noqa: WPS323
]

for format_string in isoformats:
with suppress(ValueError):
return coerce_datetime(datetime.strptime(d, format_string))
return coerce_datetime(
datetime.strptime(d, format_string).replace(tzinfo=timezone.utc),
)
raise ValueError(f"{d} is not a valid UTC date.")


Expand Down
Loading

0 comments on commit cef07b4

Please sign in to comment.