Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Data Store] My Sql - target, source and driver(storey) #2407

Closed
wants to merge 120 commits into from
Closed
Show file tree
Hide file tree
Changes from 81 commits
Commits
Show all changes
120 commits
Select commit Hold shift + click to select a range
7f7899c
fix bug for loging extra data in all kinds of artifacts
davesh0812 Mar 10, 2022
484af50
fix bug for https://jira.iguazeng.com/browse/ML-1936
davesh0812 Apr 7, 2022
ddb83bd
Merge branch 'mlrun:development' into development
davesh0812 Apr 7, 2022
2c3ebf9
Merge branch 'mlrun:development' into development
davesh0812 Apr 11, 2022
97890dc
black+isort
davesh0812 Apr 20, 2022
ff6fe5e
Merge branch 'mlrun:development' into development
davesh0812 May 9, 2022
ea7438f
Merge branch 'mlrun:development' into development
davesh0812 May 11, 2022
a7f10ee
support filter for story engine
davesh0812 May 11, 2022
1287266
support filter for spark engine
davesh0812 May 11, 2022
c21b21f
Revert "support filter for spark engine"
davesh0812 May 11, 2022
184cbbc
Revert "support filter for story engine"
davesh0812 May 11, 2022
7ca059d
method to local function
davesh0812 May 12, 2022
4cd0189
Merge branch 'mlrun:development' into development
davesh0812 May 25, 2022
08574ff
mongodb (pandas engine only)
davesh0812 May 26, 2022
8c0625c
Revert "method to local function"
davesh0812 May 26, 2022
1dfbf60
undo del _id
davesh0812 May 26, 2022
62740c2
_id to string
davesh0812 May 26, 2022
5366114
_id to string + DB
davesh0812 May 26, 2022
4043de4
one line func
davesh0812 Jun 1, 2022
5be56f7
Merge remote-tracking branch 'origin/development' into development
davesh0812 Jun 1, 2022
4846ec8
to static method
davesh0812 Jun 1, 2022
6173ec6
to_step template
davesh0812 Jun 1, 2022
b85103b
mongodb
davesh0812 Jun 1, 2022
f3ad4bc
to_step works without time filter
davesh0812 Jun 1, 2022
3c40df5
comments
davesh0812 Jun 6, 2022
b92f853
lint
davesh0812 Jun 8, 2022
ddbd4a4
mongodb test
davesh0812 Jun 8, 2022
2a6c80d
pymongo to requirements.txt
davesh0812 Jun 9, 2022
78ffea9
imports
davesh0812 Jun 9, 2022
53b1bc2
imports
davesh0812 Jun 9, 2022
2461267
docs
davesh0812 Jun 13, 2022
3a2fcc3
Merge branch 'development' into mongodb
davesh0812 Jun 13, 2022
0fef892
mongodb target init
davesh0812 Jun 16, 2022
70a8114
mongodb target init - override option
davesh0812 Jun 16, 2022
295be8f
target to test
davesh0812 Jun 20, 2022
cb68f6e
target to test
davesh0812 Jun 20, 2022
015beda
comments
davesh0812 Jun 20, 2022
4113150
write_df
davesh0812 Jun 20, 2022
a26cea2
change names
davesh0812 Jun 20, 2022
a4cca77
change names
davesh0812 Jun 20, 2022
bfa604d
lint
davesh0812 Jun 20, 2022
0a358d8
revert steps.py
davesh0812 Jun 20, 2022
0606ba8
imports
davesh0812 Jun 20, 2022
396bf2b
lint
davesh0812 Jun 20, 2022
fe91801
lint
davesh0812 Jun 20, 2022
06ac2a9
test
davesh0812 Jun 20, 2022
f31159b
test
davesh0812 Jun 20, 2022
f39aaa7
system test
davesh0812 Jun 27, 2022
921cf4e
Merge branch 'development' into development
davesh0812 Jun 27, 2022
b52b087
lint
davesh0812 Jun 27, 2022
4eb30e9
lint
davesh0812 Jun 27, 2022
710ebfe
save attr
davesh0812 Jun 27, 2022
59795d6
save attr
davesh0812 Jun 27, 2022
19f7f95
test2
davesh0812 Jun 27, 2022
a718117
try
davesh0812 Jun 28, 2022
6c9e12f
try2
davesh0812 Jun 28, 2022
b57ebcd
try2
davesh0812 Jun 28, 2022
63e7788
Merge branch 'mlrun:development' into development
davesh0812 Jun 28, 2022
dbdcab2
is online
davesh0812 Jun 28, 2022
fdb869b
table
davesh0812 Jun 28, 2022
5f32605
Update tests/system/feature_store/test_feature_store.py
davesh0812 Jul 4, 2022
cb1af4e
review request commit
davesh0812 Jul 4, 2022
3b4f445
review request commit
davesh0812 Jul 4, 2022
3406ade
storey driver and target
davesh0812 Jul 5, 2022
04032c1
fix bug when using "as" if the feature list
davesh0812 Jul 11, 2022
f775d96
for driver without aggregate
davesh0812 Jul 11, 2022
fb344e7
test mongodb get online features
davesh0812 Jul 11, 2022
b7ec79c
lint
davesh0812 Jul 11, 2022
640e6a2
Merge branch 'development' into mongodb-target2
davesh0812 Jul 11, 2022
d7b0981
lint
davesh0812 Jul 11, 2022
9947c51
lint
davesh0812 Jul 11, 2022
aec1825
requirements
davesh0812 Jul 11, 2022
6a99c13
lint
davesh0812 Jul 11, 2022
e92e53b
req
davesh0812 Jul 11, 2022
7f1e421
req
davesh0812 Jul 11, 2022
e227791
req
davesh0812 Jul 11, 2022
a4f15b7
req
davesh0812 Jul 11, 2022
7ad38b7
Merge branch 'mlrun:development' into development
davesh0812 Jul 12, 2022
8708ec3
code review
davesh0812 Jul 14, 2022
4bbd570
code review 2
davesh0812 Jul 19, 2022
79ccc14
end day
davesh0812 Jul 25, 2022
fa02e6e
pr changes
davesh0812 Aug 7, 2022
a6b36f1
save
davesh0812 Aug 17, 2022
7b16655
Merge branch 'mlrun:development' into development
davesh0812 Aug 17, 2022
0f5f673
init test for sql
davesh0812 Aug 21, 2022
5eda29b
test and change storey driver
davesh0812 Aug 22, 2022
19cf7b4
comments + lint
davesh0812 Aug 23, 2022
5c80013
code style
davesh0812 Aug 23, 2022
2e4dcef
code style
davesh0812 Aug 23, 2022
15e93c4
Merge branch 'mlrun:development' into development
davesh0812 Aug 24, 2022
5935914
lint + comments (split temp from tempFromStorey.py to 2 diffrent files)
davesh0812 Aug 25, 2022
228127a
Merge branch 'development' into mongodb-target2
davesh0812 Aug 25, 2022
f9c722a
Copyright + lint
davesh0812 Aug 25, 2022
b912beb
Merge remote-tracking branch 'origin/mongodb-target2' into mongodb-ta…
davesh0812 Aug 25, 2022
b679520
lint
davesh0812 Aug 25, 2022
d7062ce
requirements.txt
davesh0812 Aug 25, 2022
3884740
Merge branch 'mlrun:development' into development
davesh0812 Aug 25, 2022
2131276
Merge branch 'mlrun:development' into development
davesh0812 Aug 28, 2022
b1dbedd
Merge branch 'mlrun:development' into development
davesh0812 Aug 30, 2022
5838380
Merge branch 'mlrun:development' into development
davesh0812 Aug 31, 2022
5bf8bc6
Merge branch 'mlrun:development' into development
davesh0812 Sep 1, 2022
f5b8da7
Merge branch 'development' into mongodb-target2
davesh0812 Sep 6, 2022
188fcdd
Merge branch 'mlrun:development' into development
davesh0812 Sep 6, 2022
25a3f87
review
davesh0812 Sep 12, 2022
5c44a62
only sql db
davesh0812 Sep 19, 2022
8bd2433
lint
davesh0812 Sep 19, 2022
8239217
review
davesh0812 Oct 11, 2022
3465111
review
davesh0812 Oct 11, 2022
00ac3e5
Merge branch 'mlrun:development' into development
davesh0812 Oct 11, 2022
084bdd4
test + create table not in the init
davesh0812 Oct 12, 2022
4e6f532
Merge branch 'development' into sql-target-and-driver
davesh0812 Oct 12, 2022
0cfec0c
del storey from mlrun
davesh0812 Oct 18, 2022
c6c79f6
test + update option
davesh0812 Oct 18, 2022
126f385
lint
davesh0812 Oct 18, 2022
f23603a
if_exists
davesh0812 Oct 18, 2022
923b525
del unnecessary function
davesh0812 Oct 19, 2022
8bd0c04
SqlDB -> Sql
davesh0812 Oct 19, 2022
61006b5
SqlDB -> SQL
davesh0812 Oct 19, 2022
6d4dab5
lint
davesh0812 Oct 19, 2022
69eadfc
SqlDB -> SQL
davesh0812 Oct 23, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
143 changes: 143 additions & 0 deletions mlrun/datastore/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from datetime import datetime
from typing import Dict, List, Optional, Union

import pandas as pd
import v3io
import v3io.dataplane
from nuclio import KafkaTrigger
Expand Down Expand Up @@ -49,6 +50,29 @@ def get_source_step(source, key_fields=None, time_field=None, context=None):
return source.to_step(key_fields, time_field, context)


class _SqlDBIterator:
def __init__(self, collection, iter_chunksize):
"""
Iterate over given Sql collection

:param iter_chunksize: number of rows per chunk
:param collection: sql collection
"""
self.collection = collection
self.iter_chunksize = iter_chunksize
self.keys = self.collection.keys()

def __iter__(self):
return self

def __next__(self):
chunk = self.collection.fetchmany(self.iter_chunksize)
if len(chunk) != 0:
return pd.DataFrame(chunk, columns=self.keys)
else:
raise StopIteration


class BaseSourceDriver(DataSource):
support_spark = False
support_storey = False
Expand Down Expand Up @@ -826,6 +850,124 @@ def add_nuclio_trigger(self, function):
return func


class SqlDBSource(BaseSourceDriver):

kind = "sqldb"
support_storey = True
support_spark = False
_SQL_DB_PATH_STRING_ENV_VAR = "SQL_DB_PATH_STRING"

def __init__(
self,
name: str = "",
chunksize: int = None,
key_field: str = None,
time_field: str = None,
schedule: str = None,
start_time: Optional[Union[datetime, str]] = None,
end_time: Optional[Union[datetime, str]] = None,
db_path: str = None,
collection_name: str = None,
spark_options: dict = None,
):
"""
Reads SqlDB as input source for a flow.

example::
db_path = "sqlite:///stockmarket.db"
source = SqlDBSource(
collection_name='source_name', db_path=self.db, key_field='key'
)

:param name: source name
:param chunksize: number of rows per chunk (default large single chunk)
:param key_field: the column to be used as the key for the collection.
:param time_field: the column to be parsed as the timestamp for events. Defaults to None
:param start_time: filters out data before this time
:param end_time: filters out data after this time
:param schedule: string to configure scheduling of the ingestion job. For example '*/30 * * * *' will
cause the job to run every 30 minutes
:param db_path: url string connection to sql database.
If not set, the SQL_DB_PATH_STRING environment variable will be used.
:param collection_name: the name of the collection to access,
from the current database
:param spark_options: additional spark read options
"""

db_path = db_path or os.getenv(self._SQL_DB_PATH_STRING_ENV_VAR)
if db_path is None:
raise mlrun.errors.MLRunInvalidArgumentError(
f"cannot specify without db_path arg or secret {self._SQL_DB_PATH_STRING_ENV_VAR}"
)
attrs = {
"chunksize": chunksize,
"spark_options": spark_options,
"collection_name": collection_name,
"db_path": db_path,
}
attrs = {key: value for key, value in attrs.items() if value is not None}
super().__init__(
name,
attributes=attrs,
key_field=key_field,
time_field=time_field,
schedule=schedule,
start_time=start_time,
end_time=end_time,
)

def to_dataframe(self):
import sqlalchemy as db

query = self.attributes.get("query")
db_path = self.attributes.get("db_path")
collection_name = self.attributes.get("collection_name")
chunksize = self.attributes.get("chunksize")
if collection_name and db_path:
engine = db.create_engine(db_path)
metadata = db.MetaData()
connection = engine.connect()
collection = db.Table(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why call a table a "collection"? Where does this terminology come from? It looks like the term collection only exists in Oracle's PL/SQL, and it doesn't mean a table there either.

collection_name, metadata, autoload=True, autoload_with=engine
)
results = connection.execute(db.select([collection]))
if chunksize:
return _SqlDBIterator(
collection=results, iter_chunksize=chunksize, iter_query=query
)
else:
results = results.fetchall()
df = pd.DataFrame(results)
df.columns = results[0].keys()
connection.close()
return df
else:
raise mlrun.errors.MLRunInvalidArgumentError(
"collection_name and db_name args must be specified"
)

def to_step(self, key_field=None, time_field=None, context=None):
from mlrun.datastore.storeySourse import SqlDBSourceStorey

attributes = self.attributes or {}
if context:
attributes["context"] = context

return SqlDBSourceStorey(
key_field=self.key_field or key_field,
time_field=self.time_field or time_field,
# storage_options=self._get_store().get_storage_options(),
end_filter=self.end_time,
start_filter=self.start_time,
filter_column=self.time_field or time_field,
**attributes,
)
pass

def is_iterator(self):
return True if self.attributes.get("chunksize") else False


# map of sources (exclude DF source which is not serializable)
source_kind_to_driver = {
"": BaseSourceDriver,
Expand All @@ -837,4 +979,5 @@ def add_nuclio_trigger(self, function):
CustomSource.kind: CustomSource,
BigQuerySource.kind: BigQuerySource,
SnowflakeSource.kind: SnowflakeSource,
SqlDBSource.kind: SqlDBSource,
}
136 changes: 136 additions & 0 deletions mlrun/datastore/storeyDriver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Copyright 2018 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List

import storey


class SqlDBDriver(storey.Driver):
"""
Database connection to Sql data basw.
:param db_path: url string connection to sql database.
:param primary_key: the primary key of the collection.
"""

def __init__(
self,
primary_key: str,
db_path: str,
aggregation_attribute_prefix: str = "aggr_",
aggregation_time_attribute_prefix: str = "_",
):
self._db_path = db_path
self._sql_connection = None
self._primary_key = primary_key
self._mtime_name = "$_mtime_"
self._storey_key = "storey_key"

self._aggregation_attribute_prefix = aggregation_attribute_prefix
self._aggregation_time_attribute_prefix = aggregation_time_attribute_prefix

def _lazy_init(self):
import sqlalchemy as db

self._closed = False
if not self._sql_connection:
self._engine = db.create_engine(self._db_path)
self._sql_connection = self._engine.connect()

def collection(self, table_path):
import sqlalchemy as db

metadata = db.MetaData()
return db.Table(
table_path[3:].split("/")[1],
metadata,
autoload=True,
autoload_with=self._engine,
)

async def _save_schema(self, container, table_path, schema):
self._lazy_init()
return None

async def _load_schema(self, container, table_path):
self._lazy_init()
return None

async def _save_key(
self, container, table_path, key, aggr_item, partitioned_by_key, additional_data
):
import sqlalchemy as db

self._lazy_init()

collection = self.collection(table_path)
return_val = None
try:
return_val = self._sql_connection.execute(
collection.insert(), [additional_data]
)
except db.exc.IntegrityError:
pass
return return_val

async def _load_aggregates_by_key(self, container, table_path, key):
self._lazy_init()
collection = self.collection(table_path)
try:
agg_val, values = await self._get_all_fields(key, collection)
if not agg_val:
agg_val = None
if not values:
values = None
return [agg_val, values]
except Exception:
return [None, None]

async def _load_by_key(self, container, table_path, key, attribute):
self._lazy_init()
collection = self.collection(table_path)
if attribute == "*":
_, values = await self._get_all_fields(key, collection)
else:
values = None
return values

async def close(self):
pass

async def _get_all_fields(self, key, collection):

try:
my_query = f"SELECT * FROM {collection} where {self._primary_key}={key}"
results = self._sql_connection.execute(my_query).fetchall()
except Exception as e:
raise RuntimeError(f"Failed to get key {key}. Response error was: {e}")

return None, {
results[0]._fields[i]: results[0][i] for i in range(len(results[0]))
}

async def _get_specific_fields(self, key: str, collection, attributes: List[str]):
try:
my_query = f"SELECT {','.join(attributes)} FROM {collection} where {self._primary_key}={key}"
results = self._sql_connection.execute(my_query).fetchall()
except Exception as e:
raise RuntimeError(f"Failed to get key {key}. Response error was: {e}")

return None, {
results[0]._fields[i]: results[0][i] for i in range(len(results[0]))
}

def supports_aggregations(self):
return False