Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[API] Add ingest endpoint to feature store #876

Merged
merged 12 commits into from
Apr 25, 2021
81 changes: 79 additions & 2 deletions mlrun/api/api/endpoints/feature_sets.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
from http import HTTPStatus
from typing import List

from fastapi import APIRouter, Depends, Header, Query, Response
from fastapi import APIRouter, Depends, Header, Query, Request, Response
from sqlalchemy.orm import Session

import mlrun.feature_store
from mlrun import mount_v3io
from mlrun.api import schemas
from mlrun.api.api import deps
from mlrun.api.api.utils import parse_reference
from mlrun.api.api.utils import get_secrets, log_and_raise, parse_reference
from mlrun.api.utils.singletons.db import get_db
from mlrun.data_types import InferOptions
from mlrun.datastore.targets import get_default_prefix_for_target
from mlrun.feature_store.api import RunConfig, ingest
from mlrun.model import DataSource, DataTargetBase

router = APIRouter()

Expand Down Expand Up @@ -137,6 +143,77 @@ def list_feature_sets(
return feature_sets


def _needs_v3io_mount(data_source, data_targets):
Hedingber marked this conversation as resolved.
Show resolved Hide resolved
paths = []
for target in data_targets:
# If the target does not have a path (i.e. default target), then retrieve the default path from config.
paths.append(target.path or get_default_prefix_for_target(target.kind))
paths.append(data_source.path)

return any(
path.startswith("v3io://") or path.startswith("v3ios://") for path in paths
Hedingber marked this conversation as resolved.
Show resolved Hide resolved
)


@router.post(
"/projects/{project}/feature-sets/{name}/references/{reference}/ingest",
response_model=schemas.FeatureSetIngestOutput,
)
def feature_set_ingest(
Hedingber marked this conversation as resolved.
Show resolved Hide resolved
request: Request,
project: str,
name: str,
reference: str,
ingest_parameters: schemas.FeatureSetIngest,
username: str = Header(None, alias="x-remote-user"),
db_session: Session = Depends(deps.get_db_session),
):
tag, uid = parse_reference(reference)
feature_set_record = get_db().get_feature_set(db_session, project, name, tag, uid)

feature_set = mlrun.feature_store.FeatureSet.from_dict(feature_set_record.dict())
# Need to override the default rundb since we're in the server.
feature_set._override_run_db(db_session)

data_source = DataSource.from_dict(ingest_parameters.source.dict())
data_targets = [
DataTargetBase.from_dict(data_target.dict())
for data_target in ingest_parameters.targets
]

run_config = RunConfig()

# Try to deduce whether the ingest job will need v3io mount, by analyzing the paths to the source and
# targets. If it needs it, apply v3io mount to the run_config. Note that the access-key and username are
# user-context parameters, we cannot use the api context.
if _needs_v3io_mount(data_source, data_targets):
secrets = get_secrets(request)
access_key = secrets.get("V3IO_ACCESS_KEY", None)

if not access_key or not username:
log_and_raise(
HTTPStatus.BAD_REQUEST.value,
reason="Request needs v3io access key and username in header",
)
run_config = run_config.apply(mount_v3io(access_key=access_key, user=username))

infer_options = ingest_parameters.infer_options or InferOptions.default()

run_params = ingest(
feature_set,
data_source,
data_targets,
infer_options=infer_options,
return_df=False,
run_config=run_config,
)
# ingest may modify the feature-set contents, so returning the updated feature-set.
result_feature_set = schemas.FeatureSet(**feature_set.to_dict())
return schemas.FeatureSetIngestOutput(
Hedingber marked this conversation as resolved.
Show resolved Hide resolved
feature_set=result_feature_set, run_object=run_params.to_dict()
)


@router.get("/projects/{project}/features", response_model=schemas.FeaturesOutput)
def list_features(
project: str,
Expand Down
2 changes: 2 additions & 0 deletions mlrun/api/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
FeatureSet,
FeatureSetDigestOutput,
FeatureSetDigestSpec,
FeatureSetIngest,
FeatureSetIngestOutput,
FeatureSetRecord,
FeatureSetsOutput,
FeatureSetSpec,
Expand Down
29 changes: 29 additions & 0 deletions mlrun/api/schemas/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,3 +113,32 @@ class FeatureVectorRecord(ObjectRecord):

class FeatureVectorsOutput(BaseModel):
feature_vectors: List[FeatureVector]


class DataSourceObject(BaseModel):
Hedingber marked this conversation as resolved.
Show resolved Hide resolved
kind: str
name: str
path: str

class Config:
extra = Extra.allow


class DataTargetObject(BaseModel):
Hedingber marked this conversation as resolved.
Show resolved Hide resolved
kind: str
name: str
path: Optional[str]

class Config:
extra = Extra.allow


class FeatureSetIngest(BaseModel):
Hedingber marked this conversation as resolved.
Show resolved Hide resolved
source: DataSourceObject
targets: List[DataTargetObject]
Hedingber marked this conversation as resolved.
Show resolved Hide resolved
infer_options: Optional[int]


class FeatureSetIngestOutput(BaseModel):
feature_set: FeatureSet
run_object: dict
14 changes: 9 additions & 5 deletions mlrun/datastore/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ def get_default_targets():
]


def get_default_prefix_for_target(kind):
data_prefixes = mlrun.mlconf.feature_store.data_prefixes
data_prefix = getattr(data_prefixes, kind, None)
if not data_prefix:
data_prefix = data_prefixes.default
return data_prefix


def validate_target_placement(graph, final_step, targets):
if final_step or graph.is_empty():
return True
Expand Down Expand Up @@ -610,11 +618,7 @@ def _get_target_path(driver, resource):
name = resource.metadata.name
version = resource.metadata.tag
project = resource.metadata.project or mlrun.mlconf.default_project
data_prefixes = mlrun.mlconf.feature_store.data_prefixes
data_prefix = getattr(data_prefixes, kind, None)
if not data_prefix:
data_prefix = data_prefixes.default
data_prefix = data_prefix.format(project=project, kind=kind)
data_prefix = get_default_prefix_for_target(kind).format(project=project, kind=kind)
# todo: handle ver tag changes, may need to copy files?
name = f"{name}-{version or 'latest'}"
return f"{data_prefix}/{kind_prefix}/{name}{suffix}"
16 changes: 14 additions & 2 deletions mlrun/db/sqldb.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,10 @@ def create_feature_set(self, feature_set, project="", versioned=True):
def get_feature_set(
self, name: str, project: str = "", tag: str = None, uid: str = None
):
return self._transform_db_error(
from_db = self._transform_db_error(
Hedingber marked this conversation as resolved.
Show resolved Hide resolved
self.db.get_feature_set, self.session, project, name, tag, uid
)
return from_db.dict()

def list_features(
self,
Expand Down Expand Up @@ -308,9 +309,20 @@ def list_feature_sets(
)

def store_feature_set(
self, feature_set, name=None, project="", tag=None, uid=None, versioned=True
self,
feature_set: Union[dict, mlrun.api.schemas.FeatureSet],
name=None,
project="",
tag=None,
uid=None,
versioned=True,
):
if isinstance(feature_set, dict):
feature_set = mlrun.api.schemas.FeatureSet(**feature_set)

name = name or feature_set.metadata.name
project = project or feature_set.metadata.project

return self._transform_db_error(
self.db.store_feature_set,
self.session,
Expand Down
24 changes: 21 additions & 3 deletions mlrun/feature_store/feature_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def __init__(
function=None,
analysis=None,
engine=None,
output_path=None,
):
self._features: ObjectList = None
self._entities: ObjectList = None
Expand All @@ -93,6 +94,7 @@ def __init__(
self.function = function
self.analysis = analysis or {}
self.engine = engine
self.output_path = output_path or mlconf.artifact_path

@property
def entities(self) -> List[Entity]:
Expand Down Expand Up @@ -238,6 +240,19 @@ def uri(self):
uri += ":" + self._metadata.tag
return uri

def _override_run_db(self, session):
# Import here, since this method only runs in API context. If this import was global, client would need
# API requirements and would fail.
from ..api.api.utils import get_run_db_instance

self._run_db = get_run_db_instance(session)

def _get_run_db(self):
if self._run_db:
return self._run_db
else:
return mlrun.get_run_db()

def get_target_path(self, name=None):
"""get the url/path for an offline or specified data target"""
target = get_offline_target(self, name=name)
Expand Down Expand Up @@ -391,9 +406,9 @@ def to_dataframe(self, columns=None, df_module=None, target_name=None):

def save(self, tag="", versioned=False):
"""save to mlrun db"""
db = mlrun.get_run_db()
db = self._get_run_db()
self.metadata.project = self.metadata.project or mlconf.default_project
tag = tag or self.metadata.tag
tag = tag or self.metadata.tag or "latest"
as_dict = self.to_dict()
as_dict["spec"]["features"] = as_dict["spec"].get(
"features", []
Expand All @@ -402,9 +417,12 @@ def save(self, tag="", versioned=False):

def reload(self, update_spec=True):
"""reload/sync the feature vector status and spec from the DB"""
from_db = mlrun.get_run_db().get_feature_set(
from_db = self._get_run_db().get_feature_set(
Hedingber marked this conversation as resolved.
Show resolved Hide resolved
self.metadata.name, self.metadata.project, self.metadata.tag
)
if isinstance(from_db, dict):
from_db = FeatureSet.from_dict(from_db)

self.status = from_db.status
if update_spec:
self.spec = from_db.spec
5 changes: 4 additions & 1 deletion mlrun/feature_store/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ def run_ingestion_job(name, featureset, run_config, schedule=None):
raise mlrun.errors.MLRunInvalidArgumentError("function image must be specified")

task = mlrun.new_task(
name=name, params=run_config.parameters, handler=run_config.handler
name=name,
params=run_config.parameters,
handler=run_config.handler,
out_path=featureset.spec.output_path,
)
task.spec.secret_sources = run_config.secret_sources
task.set_label("job-type", "feature-ingest").set_label(
Expand Down
7 changes: 4 additions & 3 deletions mlrun/platforms/iguazio.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import urllib3

import mlrun.errors
from mlrun.config import config as mlconf

_cached_control_session = None

Expand Down Expand Up @@ -99,14 +100,14 @@ def _mount_v3io_extended(task):
)

if not secret:
task = v3io_cred(access_key=access_key)(task)
task = v3io_cred(access_key=access_key, user=user)(task)
Hedingber marked this conversation as resolved.
Show resolved Hide resolved
return task

return _mount_v3io_extended


def _resolve_mount_user(user=None):
return os.environ.get("V3IO_USERNAME", user)
return user or os.environ.get("V3IO_USERNAME")


def mount_v3io(
Expand Down Expand Up @@ -295,7 +296,7 @@ def _use_v3io_cred(task):

from kubernetes import client as k8s_client

web_api = api or environ.get("V3IO_API")
web_api = api or environ.get("V3IO_API") or mlconf.v3io_api
_user = user or environ.get("V3IO_USERNAME")
_access_key = access_key or environ.get("V3IO_ACCESS_KEY")

Expand Down