Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
2329257
remove unused import
davitbzh Jul 13, 2022
e38ca9c
Merge remote-tracking branch 'upstream/master'
davitbzh Jul 13, 2022
10ad8b9
Merge remote-tracking branch 'upstream/master'
davitbzh Jul 13, 2022
16b2e52
Merge remote-tracking branch 'upstream/master'
davitbzh Jul 15, 2022
63b812b
Merge remote-tracking branch 'upstream/master'
davitbzh Jul 18, 2022
9fdd925
Merge remote-tracking branch 'upstream/master'
davitbzh Aug 4, 2022
3be3d58
Merge remote-tracking branch 'upstream/master'
davitbzh Sep 7, 2022
6873412
Merge remote-tracking branch 'upstream/master'
davitbzh Sep 14, 2022
c033dad
Merge remote-tracking branch 'upstream/master'
davitbzh Sep 15, 2022
1b3d076
Merge remote-tracking branch 'upstream/master'
davitbzh Sep 15, 2022
738ce21
Merge remote-tracking branch 'upstream/master'
davitbzh Sep 19, 2022
afe2459
Merge remote-tracking branch 'upstream/master'
davitbzh Sep 20, 2022
f972c0d
Merge remote-tracking branch 'upstream/master'
davitbzh Sep 22, 2022
3145a76
Merge remote-tracking branch 'upstream/master'
davitbzh Sep 26, 2022
fc39ad2
Merge remote-tracking branch 'upstream/master'
davitbzh Sep 28, 2022
b750dd4
Merge remote-tracking branch 'upstream/master'
davitbzh Oct 3, 2022
941d0e3
Merge remote-tracking branch 'upstream/master'
davitbzh Oct 3, 2022
3fbbe22
Merge remote-tracking branch 'upstream/master'
davitbzh Oct 11, 2022
75b9d89
Merge remote-tracking branch 'upstream/master'
davitbzh Oct 25, 2022
ac281ff
Merge remote-tracking branch 'upstream/master'
davitbzh Oct 26, 2022
2a493ea
Merge remote-tracking branch 'upstream/master'
davitbzh Nov 2, 2022
5a70a55
Merge remote-tracking branch 'upstream/master'
davitbzh Nov 6, 2022
20812f4
Merge remote-tracking branch 'upstream/master'
davitbzh Nov 8, 2022
737f9f2
use util to get epoch
davitbzh Nov 8, 2022
e297749
use util to get epoch
davitbzh Nov 8, 2022
3a0854b
use util to get epoch
davitbzh Nov 8, 2022
65ec6d6
unit tests
davitbzh Nov 8, 2022
1a8b067
util function for date and datetime
davitbzh Nov 9, 2022
64544fc
allow only epoch in seconds
davitbzh Nov 10, 2022
7ca6b0f
in TrainingDataset use convert_event_time_to_timestamp only once
davitbzh Nov 10, 2022
0e5071b
fix tests and add documenation
davitbzh Nov 10, 2022
36575d6
devide by 1000 if length 13
davitbzh Nov 11, 2022
2e4c117
Merge remote-tracking branch 'upstream/master' into td_eventtime_sele…
davitbzh Nov 12, 2022
9d6e3b7
address comments
davitbzh Nov 14, 2022
c469b84
address comments
davitbzh Nov 15, 2022
cfab620
address comments
davitbzh Nov 15, 2022
dddba08
Update java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java
davitbzh Nov 15, 2022
990704e
1000
davitbzh Nov 15, 2022
2791b85
Update java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java
davitbzh Nov 15, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.streaming.DataStreamReader;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.StreamingQuery;
Expand All @@ -57,6 +58,7 @@
import org.apache.spark.sql.types.BinaryType;
import org.apache.spark.sql.types.BooleanType;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DateType;
import org.apache.spark.sql.types.DecimalType;
import org.apache.spark.sql.types.DoubleType;
Expand Down Expand Up @@ -303,27 +305,43 @@ private Dataset<Row>[] timeSeriesSplit(TrainingDataset trainingDataset, Query qu
int i = 0;
for (Split split : splits) {
if (dataset.count() > 0) {
String eventTime = query.getLeftFeatureGroup().getEventTime();
String eventTimeType =
query.getLeftFeatureGroup().getFeature(query.getLeftFeatureGroup().getEventTime()).getType();
query.getLeftFeatureGroup().getFeature(eventTime).getType();

if (BIGINT.getType().equals(eventTimeType)) {
String tmpEventTime = eventTime + "_hopsworks_tmp";
sparkSession.sqlContext()
.udf()
.register("checkEpochUDF", (Long input) -> {
if (Long.toString(input).length() > 10) {
input = input / 1000;
return input.longValue();
} else {
return input;
}
}, DataTypes.LongType);
dataset = dataset.withColumn(tmpEventTime,functions.callUDF(
"checkEpochUDF", dataset.col(eventTime)));

// event time in second. `getTime()` return in millisecond.
datasetSplits[i] = dataset.filter(
String.format(
"%d/1000 <= `%s` and `%s` < %d/1000",
split.getStartTime().getTime(),
query.getLeftFeatureGroup().getEventTime(),
query.getLeftFeatureGroup().getEventTime(),
tmpEventTime,
tmpEventTime,
split.getEndTime().getTime()
)
);
).drop(tmpEventTime);
} else if (DATE.getType().equals(eventTimeType) || TIMESTAMP.getType().equals(eventTimeType)) {
// unix_timestamp return in second. `getTime()` return in millisecond.
datasetSplits[i] = dataset.filter(
String.format(
"%d/1000 <= unix_timestamp(`%s`) and unix_timestamp(`%s`) < %d/1000",
split.getStartTime().getTime(),
query.getLeftFeatureGroup().getEventTime(),
query.getLeftFeatureGroup().getEventTime(),
eventTime,
eventTime,
split.getEndTime().getTime()
)
);
Expand Down
15 changes: 7 additions & 8 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
concat,
col,
from_json,
unix_timestamp,
udf,
)
from pyspark.sql.avro.functions import from_avro, to_avro
from pyspark.sql.types import (
Expand Down Expand Up @@ -492,14 +492,13 @@ def _random_split(self, dataset, training_dataset):
def _time_series_split(
self, training_dataset, dataset, event_time, drop_event_time=False
):
result_dfs = {}
ts_type = dataset.select(event_time).dtypes[0][1]
ts_col = (
unix_timestamp(col(event_time)) * 1000
if ts_type in ["date", "timestamp"]
# jdbc supports timestamp precision up to second only.
else col(event_time) * 1000
# registering the UDF
_convert_event_time_to_timestamp = udf(
util.convert_event_time_to_timestamp, LongType()
)

result_dfs = {}
ts_col = _convert_event_time_to_timestamp(col(event_time))
for split in training_dataset.splits:
result_df = dataset.filter(ts_col >= split.start_time).filter(
ts_col < split.end_time
Expand Down
54 changes: 27 additions & 27 deletions python/hsfs/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ def get_batch_query(

# Arguments
start_time: Start event time for the batch query. Optional. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`,
`%Y-%m-%d %H:%M:%S`, or `%Y-%m-%d %H:%M:%S.%f`.
`%Y-%m-%d %H:%M:%S`, or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
end_time: End event time for the batch query. Optional. Strings should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`,
`%Y-%m-%d %H:%M:%S`, or `%Y-%m-%d %H:%M:%S.%f`.
`%Y-%m-%d %H:%M:%S`, or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.

# Returns
`str`: batch query
Expand Down Expand Up @@ -272,10 +272,10 @@ def get_batch_data(
# Arguments
start_time: Start event time for the batch query. Optional. Strings should be
formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
end_time: End event time for the batch query. Optional. Strings should be
formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
read_options: User provided read options. Defaults to `{}`.
"""

Expand Down Expand Up @@ -336,10 +336,10 @@ def create_training_data(
# Arguments
start_time: Start event time for the training dataset query. Optional. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
end_time: End event time for the training dataset query. Optional. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
storage_connector: Storage connector defining the sink location for the
training dataset, defaults to `None`, and materializes training dataset
on HopsFS.
Expand Down Expand Up @@ -444,16 +444,16 @@ def create_train_test_split(
test_size: size of test set.
train_start: Start event time for the train split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
train_end: End event time for the train split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
test_start: Start event time for the test split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
test_end: End event time for the test split query. Strings should
be formatted in one of the following ormats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
storage_connector: Storage connector defining the sink location for the
training dataset, defaults to `None`, and materializes training dataset
on HopsFS.
Expand Down Expand Up @@ -570,22 +570,22 @@ def create_train_validation_test_split(
test_size: size of test set.
train_start: Start event time for the train split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
train_end: End event time for the train split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
validation_start: Start event time for the validation split query. Strings
should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
validation_end: End event time for the validation split query. Strings
should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
test_start: Start event time for the test split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
test_end: End event time for the test split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
storage_connector: Storage connector defining the sink location for the
training dataset, defaults to `None`, and materializes training dataset
on HopsFS.
Expand Down Expand Up @@ -718,11 +718,11 @@ def training_data(
start_time: Start event time for the training dataset query. Strings should
be formatted in one of the following
formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
end_time: End event time for the training dataset query. Strings should be
formatted in one of the following
formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
description: A string describing the contents of the training dataset to
improve discoverability for Data Scientists, defaults to empty string
`""`.
Expand Down Expand Up @@ -794,13 +794,13 @@ def train_test_split(
or `%Y-%m-%d %H:%M:%S.%f`.
train_end: End event time for the train split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
test_start: Start event time for the test split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
test_end: End event time for the test split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
description: A string describing the contents of the training dataset to
improve discoverability for Data Scientists, defaults to empty string
`""`.
Expand Down Expand Up @@ -893,22 +893,22 @@ def train_validation_test_split(
test_size: size of test set. Should be between 0 and 1.
train_start: Start event time for the train split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
train_end: End event time for the train split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
validation_start: Start event time for the validation split query. Strings
should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
validation_end: End event time for the validation split query. Strings
should be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
test_start: Start event time for the test split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
test_end: End event time for the test split query. Strings should
be formatted in one of the following formats `%Y-%m-%d`, `%Y-%m-%d %H`, `%Y-%m-%d %H:%M`, `%Y-%m-%d %H:%M:%S`,
or `%Y-%m-%d %H:%M:%S.%f`.
or `%Y-%m-%d %H:%M:%S.%f`. Int, i.e Unix Epoch should be in seconds.
description: A string describing the contents of the training dataset to
improve discoverability for Data Scientists, defaults to empty string
`""`.
Expand Down
4 changes: 2 additions & 2 deletions python/hsfs/training_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ def _append_time_split(
TrainingDatasetSplit(
name=split_name,
split_type=TrainingDatasetSplit.TIME_SERIES_SPLIT,
start_time=util.convert_event_time_to_timestamp(start_time),
end_time=util.convert_event_time_to_timestamp(end_time),
start_time=start_time,
end_time=end_time,
)
)

Expand Down
2 changes: 1 addition & 1 deletion python/hsfs/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ def convert_event_time_to_timestamp(event_time):
if event_time == 0:
raise ValueError("Event time should be greater than 0.")
# jdbc supports timestamp precision up to second only.
if len(str(event_time)) < 13:
if len(str(event_time)) <= 10:
event_time = event_time * 1000
return event_time
else:
Expand Down
13 changes: 9 additions & 4 deletions python/tests/core/test_feature_view_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,9 @@ def test_save_time_travel_query(self, mocker):
)

fv = feature_view.FeatureView(
name="fv_name", query=query.as_of(1000), featurestore_id=feature_store_id
name="fv_name",
query=query.as_of(1000000000),
featurestore_id=feature_store_id,
)

# Act
Expand Down Expand Up @@ -449,7 +451,10 @@ def test_get_batch_query(self, mocker):

# Act
fv_engine.get_batch_query(
feature_view_obj=fv, start_time=1, end_time=2, with_label=False
feature_view_obj=fv,
start_time=1000000000,
end_time=2000000000,
with_label=False,
)

# Assert
Expand Down Expand Up @@ -486,7 +491,7 @@ def test_get_batch_query_string(self, mocker):

# Act
result = fv_engine.get_batch_query_string(
feature_view_obj=fv, start_time=1, end_time=2
feature_view_obj=fv, start_time=1000000000, end_time=2000000000
)

# Assert
Expand Down Expand Up @@ -526,7 +531,7 @@ def test_get_batch_query_string_pit_query(self, mocker):

# Act
result = fv_engine.get_batch_query_string(
feature_view_obj=fv, start_time=1, end_time=2
feature_view_obj=fv, start_time=1000000000, end_time=2000000000
)

# Assert
Expand Down
Loading