Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion ci/conda_recipe/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,25 @@ requirements:
- python
- absl-py>=0.15,<2
- anyio>=3.5.0,<4
- cloudpickle
- fsspec>=2022.11,<=2023.1
- numpy>=1.23,<1.24
- packaging>=23.0,<24
- pyyaml>=6.0,<7
- scipy>=1.9,<2
- snowflake-connector-python
- snowflake-snowpark-python>=1.4.0,<=2
- sqlparse>=0.4,<1
- typing-extensions>=4.1.0,<5

# conda-libmamba-solver is conda-specific requirement, and should not appear in wheel's dependency.
- conda-libmamba-solver>=23.1.0,<24

# TODO(snandamuri): Versions of these packages must be exactly same between user's workspace and
# snowpark sandbox. Generic definitions like scikit-learn>=1.1.0,<2 wont work because snowflake conda channel
# only has a few allowlisted versions of scikit-learn available, so we must force users to use scikit-learn
# versions that are available in the snowflake conda channel. Since there is no way to specify allow list of
# versions in the requirements file, we are pinning the versions here.
- joblib>=1.0.0,<=1.1.1
- scikit-learn>=1.2.1,<2
- xgboost==1.7.3
about:
Expand Down
9 changes: 6 additions & 3 deletions codegen/sklearn_wrapper_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,9 +802,10 @@ def generate(self) -> "SklearnWrapperGenerator":
if self._is_hist_gradient_boosting_regressor:
self.test_estimator_input_args_list.extend(["min_samples_leaf=1", "max_leaf_nodes=100"])

# TODO(snandamuri): Replace cloudpickle with joblib after latest version of joblib is added to snowflake conda.
self.fit_sproc_deps = self.predict_udf_deps = (
"f'numpy=={np.__version__}', f'pandas=={pd.__version__}', f'scikit-learn=={sklearn.__version__}', "
"f'xgboost=={xgboost.__version__}', f'joblib=={joblib.__version__}'"
"f'xgboost=={xgboost.__version__}', f'cloudpickle=={cp.__version__}'"
)
self._construct_string_from_lists()
return self
Expand All @@ -819,9 +820,10 @@ def generate(self) -> "XGBoostWrapperGenerator":
self.estimator_imports_list.append("import xgboost")
self.test_estimator_input_args_list.extend(["random_state=0", "subsample=1.0", "colsample_bynode=1.0"])
self.fit_sproc_imports = "import xgboost"
# TODO(snandamuri): Replace cloudpickle with joblib after latest version of joblib is added to snowflake conda.
self.fit_sproc_deps = self.predict_udf_deps = (
"f'numpy=={np.__version__}', f'pandas=={pd.__version__}', f'xgboost=={xgboost.__version__}', "
"f'joblib=={joblib.__version__}'"
"f'cloudpickle=={cp.__version__}'"
)
self._construct_string_from_lists()
return self
Expand All @@ -836,9 +838,10 @@ def generate(self) -> "LightGBMWrapperGenerator":
self.estimator_imports_list.append("import lightgbm")
self.test_estimator_input_args_list.extend(["random_state=0"])
self.fit_sproc_imports = "import lightgbm"
# TODO(snandamuri): Replace cloudpickle with joblib after latest version of joblib is added to snowflake conda.
self.fit_sproc_deps = self.predict_udf_deps = (
"f'numpy=={np.__version__}', f'pandas=={pd.__version__}', f'lightgbm=={lightgbm.__version__}', "
"f'joblib=={joblib.__version__}'"
"f'cloudpickle=={cp.__version__}'"
)
self._construct_string_from_lists()
return self
64 changes: 50 additions & 14 deletions codegen/sklearn_wrapper_template.py_template
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import os
from typing import Iterable, Optional, Union, List, Any, Dict, Callable
from uuid import uuid4

import joblib
import cloudpickle as cp
import pandas as pd
import numpy as np
{transform.estimator_imports}
Expand Down Expand Up @@ -183,7 +183,8 @@ class {transform.original_class_name}(BaseTransformer):

# Create a temp file and dump the transform to that file.
local_transform_file_name = get_temp_file_path()
joblib.dump(self._sklearn_object, local_transform_file_name)
with open(local_transform_file_name, mode="w+b") as local_transform_file:
cp.dump(self._sklearn_object, local_transform_file)

# Create temp stage to run fit.
transform_stage_name = "SNOWML_TRANSFORM_{{safe_id}}".format(safe_id=self.id)
Expand Down Expand Up @@ -214,7 +215,13 @@ class {transform.original_class_name}(BaseTransformer):
custom_tags=dict([("autogen", True)]),
)
# Put locally serialized transform on stage.
session.file.put(local_transform_file_name, stage_transform_file_name, auto_compress=False, overwrite=True, statement_params=statement_params)
session.file.put(
local_transform_file_name,
stage_transform_file_name,
auto_compress=False,
overwrite=True,
statement_params=statement_params
)

@sproc(
is_permanent=False,
Expand All @@ -233,7 +240,7 @@ class {transform.original_class_name}(BaseTransformer):
label_cols: List[str],
sample_weight_col: Optional[str]
) -> str:
import joblib
import cloudpickle as cp
import numpy as np
import os
import pandas
Expand All @@ -251,7 +258,12 @@ class {transform.original_class_name}(BaseTransformer):

session.file.get(stage_transform_file_name, local_transform_file_name, statement_params=statement_params)

estimator = joblib.load(os.path.join(local_transform_file_name, os.listdir(local_transform_file_name)[0]))
local_transform_file_path = os.path.join(
local_transform_file_name,
os.listdir(local_transform_file_name)[0]
)
with open(local_transform_file_path, mode="r+b") as local_transform_file_obj:
estimator = cp.load(local_transform_file_obj)

argspec = inspect.getfullargspec(estimator.fit)
args = {{'X': df[input_cols]}}
Expand All @@ -268,12 +280,20 @@ class {transform.original_class_name}(BaseTransformer):
local_result_file_name = local_result_file.name
local_result_file.close()

joblib_dump_files = joblib.dump(estimator, local_result_file_name)
session.file.put(local_result_file_name, stage_result_file_name, auto_compress = False, overwrite = True, statement_params=statement_params)
with open(local_result_file_name, mode="w+b") as local_result_file_obj:
cp.dump(estimator, local_result_file_obj)

session.file.put(
local_result_file_name,
stage_result_file_name,
auto_compress = False,
overwrite = True,
statement_params=statement_params
)

# Note: you can add something like + "|" + str(df) to the return string
# to pass debug information to the caller.
return str(os.path.basename(joblib_dump_files[0]))
return str(os.path.basename(local_result_file_name))

# Call fit sproc
statement_params = telemetry.get_function_usage_statement_params(
Expand Down Expand Up @@ -302,8 +322,13 @@ class {transform.original_class_name}(BaseTransformer):
if len(fields) > 1:
print("\n".join(fields[1:]))

session.file.get(os.path.join(stage_result_file_name, sproc_export_file_name), local_result_file_name, statement_params=statement_params)
self._sklearn_object = joblib.load(os.path.join(local_result_file_name, sproc_export_file_name))
session.file.get(
os.path.join(stage_result_file_name, sproc_export_file_name),
local_result_file_name,
statement_params=statement_params
)
with open(os.path.join(local_result_file_name, sproc_export_file_name),mode="r+b") as result_file_obj:
self._sklearn_object = cp.load(result_file_obj)

cleanup_temp_files([local_transform_file_name, local_result_file_name])

Expand Down Expand Up @@ -843,7 +868,8 @@ class {transform.original_class_name}(BaseTransformer):

# Create a temp file and dump the score to that file.
local_score_file_name = get_temp_file_path()
joblib.dump(self._sklearn_object, local_score_file_name)
with open(local_score_file_name, mode="w+b") as local_score_file:
cp.dump(self._sklearn_object, local_score_file)

# Create temp stage to run score.
score_stage_name = "SNOWML_SCORE_{{safe_id}}".format(safe_id=self.id)
Expand Down Expand Up @@ -872,7 +898,13 @@ class {transform.original_class_name}(BaseTransformer):
custom_tags=dict([("autogen", True)]),
)
# Put locally serialized score on stage.
session.file.put(local_score_file_name, stage_score_file_name, auto_compress=False, overwrite=True, statement_params=statement_params)
session.file.put(
local_score_file_name,
stage_score_file_name,
auto_compress=False,
overwrite=True,
statement_params=statement_params
)

@sproc(
is_permanent=False,
Expand All @@ -890,7 +922,7 @@ class {transform.original_class_name}(BaseTransformer):
label_cols: List[str],
sample_weight_col: Optional[str]
) -> float:
import joblib
import cloudpickle as cp
import numpy as np
import os
import pandas
Expand All @@ -905,7 +937,11 @@ class {transform.original_class_name}(BaseTransformer):
local_score_file.close()

session.file.get(stage_score_file_name, local_score_file_name, statement_params=statement_params)
estimator = joblib.load(os.path.join(local_score_file_name, os.listdir(local_score_file_name)[0]))

local_score_file_name_path = os.path.join(local_score_file_name, os.listdir(local_score_file_name)[0])
with open(local_score_file_name_path, mode="r+b") as local_score_file_obj:
estimator = cp.load(local_score_file_obj)

argspec = inspect.getfullargspec(estimator.score)
if "X" in argspec.args:
args = {{'X': df[input_cols]}}
Expand Down
3 changes: 2 additions & 1 deletion conda-env-snowflake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies:
- lightgbm==3.3.5
- networkx==2.8.4
- numpy==1.23.4
- packaging==23.0
- pandas==1.4.4
- pytest==7.1.2
- python==3.8.13
Expand All @@ -35,6 +36,6 @@ dependencies:
- scikit-learn==1.2.2
- snowflake-snowpark-python==1.4.0
- sqlparse==0.4.3
- typing-extensions==4.3.0
- typing-extensions==4.5.0
- xgboost==1.7.3
- mypy==0.981 # not a package dependency.
3 changes: 2 additions & 1 deletion conda-env.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies:
- mypy==0.981
- networkx==2.8.4
- numpy==1.23.4
- packaging==23.0
- pandas==1.4.4
- pytest==7.1.2
- python==3.8.13
Expand All @@ -36,5 +37,5 @@ dependencies:
- torchdata==0.4.1
- transformers==4.27.1
- types-PyYAML==6.0.12
- typing-extensions==4.3.0
- typing-extensions==4.5.0
- xgboost==1.7.3
4 changes: 3 additions & 1 deletion snowflake/ml/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@ snowml_wheel(
requires = [
"absl-py>=0.15,<2",
"anyio>=3.5.0,<4",
"cloudpickle", # Version range is specified by snowpark. We are implicitly depending on it.
"fsspec[http]>=2022.11,<=2023.1",
"numpy>=1.23,<1.24",
"packaging>=23.0,<24",
"pyyaml>=6.0,<7",
"scipy>=1.9,<2",
"snowflake-connector-python[pandas]",
"snowflake-snowpark-python>=1.4.0,<2",
"sqlparse>=0.4,<1",
"typing-extensions>=4.1.0,<5",

# TODO(snandamuri): Versions of these packages must be exactly same between user's workspace and
# snowpark sandbox. Generic definitions like scikit-learn>=1.1.0,<2 wont work because snowflake conda channel
Expand All @@ -53,7 +56,6 @@ snowml_wheel(
# versions in the requirements file, we are pinning the versions here.
"scikit-learn>=1.2.1,<2",
"xgboost==1.7.3",
"joblib>=1.0.0,<=1.1.1", # All the release versions between 1.0.0 and 1.1.1 are available in SF Conda channel.
],
version = VERSION,
deps = [
Expand Down
1 change: 1 addition & 0 deletions snowflake/ml/_internal/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ py_test(
srcs = ["env_utils_test.py"],
deps = [
":env_utils",
":env",
"//snowflake/ml/test_utils:mock_data_frame",
"//snowflake/ml/test_utils:mock_session",
],
Expand Down
65 changes: 51 additions & 14 deletions snowflake/ml/_internal/env_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from snowflake.ml._internal.utils import query_result_checker
from snowflake.snowpark import session

_INFO_SCHEMA_PACKAGES_HAS_RUNTIME_VERSION: Optional[bool] = None
_SNOWFLAKE_CONDA_PACKAGE_CACHE: Dict[str, List[version.Version]] = {}


Expand Down Expand Up @@ -219,13 +220,16 @@ def relax_requirement_version(req: requirements.Requirement) -> requirements.Req
return new_req


def resolve_conda_environment(packages: List[requirements.Requirement], channels: List[str]) -> Optional[List[str]]:
def resolve_conda_environment(
packages: List[requirements.Requirement], channels: List[str], python_version: str
) -> Optional[List[str]]:
"""Use conda api to check if given packages are resolvable in given channels. Only work when conda is
locally installed.

Args:
packages: Packages to be installed.
channels: Anaconda channels (name or url) where conda should search into.
python_version: A string of python version where model is run.

Returns:
List of frozen dependencies represented in PEP 508 form if resolvable, None otherwise.
Expand All @@ -234,7 +238,7 @@ def resolve_conda_environment(packages: List[requirements.Requirement], channels
from conda_libmamba_solver import solver

package_names = list(map(lambda x: x.name, packages))
specs = list(map(str, packages))
specs = list(map(str, packages)) + [f"python=={python_version}"]

conda_solver = solver.LibMambaSolver("snow-env", channels=channels, specs_to_add=specs)
try:
Expand All @@ -252,18 +256,38 @@ def resolve_conda_environment(packages: List[requirements.Requirement], channels
)


def _check_runtime_version_column_existence(session: session.Session) -> bool:
sql = textwrap.dedent(
"""
SHOW COLUMNS
LIKE 'runtime_version'
IN TABLE information_schema.packages;
"""
)
result = session.sql(sql).count()
return result == 1


def validate_requirements_in_snowflake_conda_channel(
session: session.Session, reqs: List[requirements.Requirement]
session: session.Session, reqs: List[requirements.Requirement], python_version: str
) -> Optional[List[str]]:
"""Search the snowflake anaconda channel for packages with version meet the specifier.

Args:
session: Snowflake connection session.
reqs: List of requirement specifiers.
python_version: A string of python version where model is run.

Raises:
ValueError: Raised when the specifier cannot be supported when creating UDF.

Returns:
A list of pinned latest version that available in Snowflake anaconda channel and meet the version specifier.
"""
global _INFO_SCHEMA_PACKAGES_HAS_RUNTIME_VERSION

if _INFO_SCHEMA_PACKAGES_HAS_RUNTIME_VERSION is None:
_INFO_SCHEMA_PACKAGES_HAS_RUNTIME_VERSION = _check_runtime_version_column_existence(session)
ret_list = []
reqs_to_request = []
for req in reqs:
Expand All @@ -273,14 +297,26 @@ def validate_requirements_in_snowflake_conda_channel(
pkg_names_str = " OR ".join(
f"package_name = '{req_name}'" for req_name in sorted(req.name for req in reqs_to_request)
)
sql = textwrap.dedent(
f"""
SELECT PACKAGE_NAME, VERSION
FROM information_schema.packages
WHERE ({pkg_names_str})
AND language = 'python';
"""
)
if _INFO_SCHEMA_PACKAGES_HAS_RUNTIME_VERSION:
parsed_python_version = version.Version(python_version)
sql = textwrap.dedent(
f"""
SELECT PACKAGE_NAME, VERSION
FROM information_schema.packages
WHERE ({pkg_names_str})
AND language = 'python'
AND runtime_version = '{parsed_python_version.major}.{parsed_python_version.minor}';
"""
)
else:
sql = textwrap.dedent(
f"""
SELECT PACKAGE_NAME, VERSION
FROM information_schema.packages
WHERE ({pkg_names_str})
AND language = 'python';
"""
)

try:
result = (
Expand All @@ -301,10 +337,11 @@ def validate_requirements_in_snowflake_conda_channel(
except snowflake.connector.DataError:
return None
for req in reqs:
available_versions = list(req.specifier.filter(_SNOWFLAKE_CONDA_PACKAGE_CACHE.get(req.name, [])))
if len(req.specifier) > 1 or any(spec.operator != "==" for spec in req.specifier):
raise ValueError("At most 1 version specifier using == operator is supported without local conda resolver.")
available_versions = list(req.specifier.filter(set(_SNOWFLAKE_CONDA_PACKAGE_CACHE.get(req.name, []))))
if not available_versions:
return None
else:
latest_version = max(available_versions)
ret_list.append(f"{req.name}=={latest_version}")
ret_list.append(str(req))
return sorted(ret_list)
Loading