Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make pipelines aware of a timezone configuration #249

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions butterfree/pipelines/feature_set_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
"""FeatureSetPipeline entity."""
import os

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

14.4

import time
from typing import List

from butterfree.clients import SparkClient
Expand All @@ -16,6 +18,8 @@ class FeatureSetPipeline:
feature_set: feature set composed by features and context metadata.
sink: sink used to write the output dataframe in the desired locations.
spark_client: client used to access Spark connection.
timezone: timestamp feature transformations will assume this timezone
when they don't have a tz suffix.
Comment on lines +21 to +22
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd just inform here that spark con config (spark.sql.session.timeZone) and an env variable (TZ) will be set with this value.


Example:
This an example regarding the feature set pipeline definition. All
Expand Down Expand Up @@ -122,11 +126,25 @@ def __init__(
feature_set: FeatureSet,
sink: Sink,
spark_client: SparkClient = None,
timezone: str = "UTC",
):
self.source = source
self.feature_set = feature_set
self.sink = sink
self.spark_client = spark_client
self.timezone = timezone

@property
def timezone(self) -> str:
return self._timezone

@timezone.setter
def timezone(self, value: str):
if value:
self.spark_client.conn.conf.set("spark.sql.session.timeZone", value)
os.environ["TZ"] = value
time.tzset()
self._timezone = value

@property
def source(self) -> Source:
Expand Down