Skip to content

Commit

Permalink
add loading JSON and column.contains for filtering #103
Browse files Browse the repository at this point in the history
  • Loading branch information
donaldcampbelljr committed Oct 27, 2023
1 parent a73a30a commit c5cb162
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 35 deletions.
63 changes: 63 additions & 0 deletions pipestat/backends/db_backend/db_helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# DB Sepcific imports
from typing import Any, Dict, List, Optional, Tuple, Union
from urllib.parse import quote_plus
import json

try:
import sqlalchemy.orm
Expand Down Expand Up @@ -88,3 +89,65 @@ def _unpack_tripartite(x):
statement = statement.where(getattr(ORM, col) == value)

return statement

def selection_filter(
ORM: Any,
statement: Any,
filter_conditions: Optional[List[Tuple[str, str, Union[str, List[str]]]]] = None,
json_filter_conditions: Optional[List[Tuple[str, str, str]]] = None,
) -> Any:
"""
Return filtered query based on condition.
:param sqlalchemy.orm.DeclarativeMeta ORM:
:param sqlalchemy.orm.Query query: takes query
:param [(key,operator,value)] filter_conditions: e.g. [("id", "eq", 1)] operator list
- eq for ==
- lt for <
- ge for >=
- in for in_
- like for like
:param [(col,key,value)] json_filter_conditions: conditions for JSONB column to query.
Only '==' is supported e.g. [("other", "genome", "hg38")]
:return: query
"""

def _unpack_tripartite(x):
if not (isinstance(x, List) or isinstance(x, Tuple)):
raise TypeError("Wrong filter class; a List or Tuple is required")
if len(x) != 3:
raise ValueError(
f"Invalid filter value: {x}. The filter must be a tripartite iterable"
)
return tuple(x)

if filter_conditions is not None:
for filter_condition in filter_conditions:
key, op, value = _unpack_tripartite(filter_condition)
column = getattr(ORM, key, None)
if column is None:
raise ValueError(f"Selected filter column does not exist: {key}")
if op == "in":
filt = column.in_(value if isinstance(value, list) else value.split(","))
else:
attr = next(
filter(lambda a: hasattr(column, a), [op, op + "_", f"__{op}__"]),
None,
)
if attr is None:
raise ValueError(f"Invalid filter operator: {op}")
if value == "null":
value = None
filt = getattr(column, attr)(value)
statement = statement.where(filt)

if json_filter_conditions is not None:
for json_filter_condition in json_filter_conditions:
col, key, value = _unpack_tripartite(json_filter_condition)
column = getattr(ORM, col)
#statement = statement.where(getattr(ORM, col) == value)
value = json.loads(value)
statement = statement.where(column.contains(value))
print(statement)

return statement
2 changes: 1 addition & 1 deletion pipestat/backends/db_backend/dbbackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,7 @@ def select_records(
if cursor is not None:
statement = statement.where(ORM.id > cursor)

statement = dynamic_filter(
statement = selection_filter(
ORM=ORM,
statement=statement,
filter_conditions=filter_conditions,
Expand Down
113 changes: 79 additions & 34 deletions tests/test_pipestat.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,46 +639,91 @@ def test_select_records(

for i in range(100):
r_id = "sample" + str(i)
val = {"md5sum": "hash" + str(i)}
val = {"md5sum": "hash" + str(i),
"number_of_things": i*10,
"switch_value": bool(i%2),
"output_image": {
"path": "path_to_"+str(i),
"thumbnail_path": "thumbnail_path"+str(i),
"title": "title_string"+str(i),},
}


psm.report(record_identifier=r_id, values=val, force_overwrite=True)

# Gets one or many records
result4 = psm.retrieve_one(record_identifier="sample1")

result5 = psm.retrieve_many(["sample1", "sample3"])
# result4 = psm.retrieve_one(record_identifier="sample1")
#
# result5 = psm.retrieve_many(["sample1", "sample3"])

# Gets everything, need to implement paging
result6 = psm.select_records()

# Attempt filtering with columns and filter conditions
result11 = psm.backend.select_records(
columns=["name_of_something", "record_identifier", "md5sum"],
filter_conditions=[("record_identifier", "eq", "sample4")],
)

result12 = psm.backend.select_records(
columns=["name_of_something", "record_identifier", "md5sum"],
filter_conditions=[("record_identifier", "eq", "sample4")],
cursor=4,
)

result13 = psm.backend.select_records(
columns=["name_of_something", "record_identifier", "md5sum"],
limit=10,
)

next_cursor = result13["next_page_token"]

result14 = psm.backend.select_records(
columns=["name_of_something", "record_identifier", "md5sum"],
cursor=next_cursor,
limit=10,
)

result15 = psm.backend.select_records(
columns=["name_of_something", "record_identifier", "md5sum"],
cursor=95,
limit=100,
#
# # Attempt filtering with columns and filter conditions
# result11 = psm.backend.select_records(
# columns=["name_of_something", "record_identifier", "md5sum"],
# filter_conditions=[("record_identifier", "eq", "sample4")],
# )
#
# result12 = psm.backend.select_records(
# columns=["name_of_something", "record_identifier", "md5sum"],
# filter_conditions=[("record_identifier", "eq", "sample4")],
# cursor=4,
# )
#
# result13 = psm.backend.select_records(
# columns=["name_of_something", "record_identifier", "md5sum"],
# limit=10,
# )
#
# next_cursor = result13["next_page_token"]
#
# result14 = psm.backend.select_records(
# columns=["md5sum", "record_identifier"],
# cursor=next_cursor,
# limit=50,
# )
#
# result15 = psm.backend.select_records(
# columns=["name_of_something", "record_identifier", "md5sum"],
# cursor=95,
# limit=100,
# )
#
#
# result16 = psm.backend.select_records(
# columns=["name_of_something", "record_identifier", "md5sum", "number_of_things"],
# filter_conditions=[("id", "ge", "0"),("id", "lt", "25") ],
# limit=50,
# )
#
# result17 = psm.backend.select_records(
# columns=["name_of_something", "record_identifier", "md5sum", "number_of_things"],
# filter_conditions=[("id", "ge", "0"),("id", "lt", "25") ],
# limit=50,
# )
# json_entry = '{"path": "path_to_39", "title": "title_string39", "thumbnail_path": "thumbnail_path39"}'
#
# result18 = psm.backend.select_records(
# #columns=["name_of_something", "record_identifier", "md5sum", "number_of_things"],
# json_filter_conditions=[("output_image", "ge", json_entry)],
# limit=50,
# )

# # THis should not return any results
# tuple_example = ("number_of_things", "eq", 390)
# result19 = psm.backend.select_records(
# #columns=["name_of_something", "record_identifier", "md5sum", "number_of_things"],
# filter_conditions=[("output_image", "eq", tuple_example)],
# limit=50,
# )

# This works for filtering based on items within the JSONB!
json_entry = '{"path": "path_to_39"}'
result19 = psm.backend.select_records(
#columns=["name_of_something", "record_identifier", "md5sum", "number_of_things"],
json_filter_conditions=[("output_image", "eq", json_entry)],
limit=50,
)

print("Done")
Expand Down

0 comments on commit c5cb162

Please sign in to comment.