-
Notifications
You must be signed in to change notification settings - Fork 239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature Store] Performance improvements for preview and hist calculation in Spark engine #1860
Conversation
…query in most cases)
mlrun/data_types/spark.py
Outdated
pass | ||
hist_columns.append(col) | ||
|
||
# We may need multiple queries here. See above comment for reasoning. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"above comment" is ambiguous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's the only comment above this one 😄
mlrun/data_types/spark.py
Outdated
hist_columns.append(col) | ||
|
||
# We may need multiple queries here. See above comment for reasoning. | ||
max_columns_per_query = int(MAX_HISTOGRAM_COLUMNS_IN_QUERY // num_bins) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think //
already results in an int, so the cast is redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought that too, but you'll be surprised to find out it doesn't. If you do //
between two floats (or float and int), the result would be a float. See for example: https://stackoverflow.com/questions/1282945/why-does-integer-division-yield-a-float-instead-of-another-integer.
So, the cast is not redundant.
mlrun/data_types/spark.py
Outdated
# how many histograms will be calculated in a single query. By default we're using 20 bins per histogram, so | ||
# using 500 will calculate histograms over 25 columns in a single query. If there are more, more queries will | ||
# be executed. | ||
MAX_HISTOGRAM_COLUMNS_IN_QUERY = 500 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make it configurable ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, why not?
When ingesting a feature-set using Spark engine, by default the code produces a preview (20 first rows of the data-frame) and histogram (calculated over all the DF) for int & float columns. The existing code was naive in its approach to these calculations, basically running a
select
on the DF for each column, which may result in tens of queries for the calculations.This poses no issue when the DF is a basic tabular DF, and would complete in negligible time. However, when adding aggregations, these queries are done on the query that calculates the aggregations, which contains summaries and group-by expressions. Running these queries so many times would accumulate to very long execution times.
This PR reduces the amount of queries performed in both of these stages. It uses a single query with
sample
for the preview (not sure why the previous approach was needed, it was needlessly complex). For histogram calculation it basically adds a column per histogram bin, and to protect from the number of columns becoming too large, it puts a limit on the number of histograms calculated per query (500 columns, which are 25 fields to calculate since we're using 20 bins per histogram). If there are more fields that need histograms, more queries will be performed. Still, it reduces the number of queries performed by a factor of 25.The max number of columns is configurable by passing an environment variable to the Spark runtime, for example:
And passing this
config
to theingest
call.This also fixes a bug in the existing preview code that used
zip
to align lists of values, but in the case where there arenulls
in columns would cause results to be empty - this is often the case when doing aggregations in the usual, emit-per-period mode, since the calculation fields only have values for a given window per row, and if multiple windows are used there are alwaysnull
values in some fields.