From 082d134a6299d060e3be044cbb930122e5239e53 Mon Sep 17 00:00:00 2001 From: Igor Hoelscher Date: Wed, 23 Sep 2020 15:26:14 -0300 Subject: [PATCH] add timezone property to pipeline --- butterfree/pipelines/feature_set_pipeline.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/butterfree/pipelines/feature_set_pipeline.py b/butterfree/pipelines/feature_set_pipeline.py index c5b9b6ac..48c6d55b 100644 --- a/butterfree/pipelines/feature_set_pipeline.py +++ b/butterfree/pipelines/feature_set_pipeline.py @@ -1,4 +1,6 @@ """FeatureSetPipeline entity.""" +import os +import time from typing import List from butterfree.clients import SparkClient @@ -16,6 +18,8 @@ class FeatureSetPipeline: feature_set: feature set composed by features and context metadata. sink: sink used to write the output dataframe in the desired locations. spark_client: client used to access Spark connection. + timezone: timestamp feature transformations will assume this timezone + when they don't have a tz suffix. Example: This an example regarding the feature set pipeline definition. All @@ -122,11 +126,25 @@ def __init__( feature_set: FeatureSet, sink: Sink, spark_client: SparkClient = None, + timezone: str = "UTC", ): self.source = source self.feature_set = feature_set self.sink = sink self.spark_client = spark_client + self.timezone = timezone + + @property + def timezone(self) -> str: + return self._timezone + + @timezone.setter + def timezone(self, value: str): + if value: + self.spark_client.conn.conf.set("spark.sql.session.timeZone", value) + os.environ["TZ"] = value + time.tzset() + self._timezone = value @property def source(self) -> Source: