In [None]:
%reload_ext autoreload
%autoreload 2

from flowquery import Runner

Create virtual node test

In [None]:
query: str = """
    create virtual (:Fact) as {
        unwind range(0,2) as i
        load json from "https://catfact.ninja/fact" as item
        return i as id, item.fact
    }
"""

runner: Runner = Runner(query)
await runner.run()

Query virtual graph node

In [None]:
runner: Runner = Runner("match (n:Fact) return n")
await runner.run()
for record in runner.results:
    print(record)

Test extensibility

In [None]:
from flowquery.extensibility import (
    Function,
    FunctionDef,
    AggregateFunction,
    ReducerElement,
    AsyncFunction,
    PredicateFunction
)
import aiohttp
import json
from typing import Any, List, Iterator, Union, Dict

@FunctionDef({
    "description": "Converts a string to uppercase",
    "category": "string",
    "parameters": [
        {"name": "text", "description": "String to convert", "type": "string"}
    ],
    "output": {"description": "Uppercase string", "type": "string"}
})
class UpperCase(Function):
    def __init__(self):
        super().__init__("uppercase")
        self._expected_parameter_count = 1

    def value(self) -> str:
        return str(self.get_children()[0].value()).upper()
    
@FunctionDef({
    "description": "Extracts nodes from a collection",
    "category": "scalar",
    "parameters": [
        {"name": "collection", "description": "Collection to extract nodes from", "type": "any[]"}
    ],
    "output": {"description": "List of nodes extracted from the collection", "type": "node[]"}
})
class Nodes(Function):
    def __init__(self):
        super().__init__("nodes")
        self._expected_parameter_count = 1

    def value(self) -> List[Dict[str, Any]]:
        pattern: List[Dict[str, Any]] = self.get_children()[0].value()
        return list(self._nodes(pattern))

    def _nodes(self, pattern: List[Dict[str, Any]]) -> Iterator[Dict[str, Any]]:
        for element in pattern:
            if isinstance(element, dict) and "id" in element:
                yield element
    

class ProductElement(ReducerElement):
    def __init__(self):
        self._value: float = 1.0

    @property
    def value(self) -> float:
        return self._value

    @value.setter
    def value(self, v: float) -> None:
        self._value = v
    
@FunctionDef({
    "description": "Calculates the product of a list of numbers",
    "category": "aggregate",
    "parameters": [
        {"name": "numbers", "description": "List of numbers to multiply", "type": "number[]"}
    ],
    "output": {"description": "Product of the numbers", "type": "number"}
})
class Product(AggregateFunction):
    def __init__(self):
        super().__init__("product")

    def reduce(self, element: ReducerElement) -> None:
        element.value *= self.first_child().value()

    def element(self) -> ReducerElement:
        return ProductElement()

@FunctionDef({
    "description": "Asynchronous function that fetches data from a URL",
    "category": "async",
    "parameters": [
        {"name": "url", "description": "URL to fetch data from", "type": "string"}
    ],
    "output": {"description": "Fetched data", "type": "string"}
})
class get(AsyncFunction):
    async def generate(self, url: str):
        async with aiohttp.ClientSession() as session:
            async with session.get(url) as response:
                yield await response.json()

@FunctionDef({
    "description": "Fetch json data from a file path",
    "category": "async",
    "parameters": [
        {"name": "path", "description": "File path to fetch data from", "type": "string"}
    ],
    "output": {"description": "Fetched data", "type": "string"}
})
class json_file(AsyncFunction):
    async def generate(self, path: str):
        with open(path, "r") as file:
            yield json.load(file)

@FunctionDef({
    "description": "Extracts values from an array with optional filtering. Uses list comprehension syntax: extract(variable IN array [WHERE condition] | expression)",
    "category": "predicate",
    "parameters": [
        {"name": "variable", "description": "Variable name to bind each element", "type": "string"},
        {"name": "array", "description": "Array to iterate over", "type": "array"},
        {"name": "expression", "description": "Expression to return for each element", "type": "any"},
        {"name": "where", "description": "Optional filter condition", "type": "boolean", "required": False}
    ],
    "output": {"description": "Extracted values from the array after applying the optional filter", "type": "array", "example": [2, 4]},
    "examples": [
        "WITH [1, 2, 3] AS nums RETURN extract(n IN nums | n)",
        "WITH [1, 2, 3, 4] AS nums RETURN extract(n IN nums WHERE n > 1 | n * 2)"
    ]
})
class PredicateExtract(PredicateFunction):
    """PredicateExtract function.
    
    Extracts values from an array with optional filtering.
    """

    def __init__(self):
        super().__init__("extract")

    def value(self) -> List[Any]:
        return list(self._extract())
    
    def _extract(self) -> Iterator[Any]:
        self.reference.referred = self._value_holder
        array = self.array.value()
        if array is None or not isinstance(array, list):
            raise ValueError("Invalid array for extract function")
        
        for item in array:
            self._value_holder.holder = item
            if self.where is None or self.where.value():
                yield self._return.value()

@FunctionDef({
    "description": "Checks if any element in the array satisfies the condition. Uses list comprehension syntax: any(variable IN array [WHERE condition])",
    "category": "predicate",
    "parameters": [
        {"name": "variable", "description": "Variable name to bind each element", "type": "string"},
        {"name": "array", "description": "Array to iterate over", "type": "array"},
        {"name": "where", "description": "Condition to check for each element", "type": "boolean", "required": False}
    ],
    "output": {"description": "True if any element satisfies the condition, otherwise false", "type": "boolean", "example": True},
    "examples": [
        "WITH [1, 2, 3] AS nums RETURN any(n IN nums | n > 2)",
        "WITH [1, 2, 3] AS nums RETURN any(n IN nums | n > 5)"
    ]
})
class Any(PredicateFunction):
    """Any function.
    
    Returns true if any element in the array satisfies the condition.
    """

    def __init__(self):
        super().__init__("any")

    def value(self) -> bool:
        return any(self._any())
    
    def _any(self) -> Iterator[bool]:
        self.reference.referred = self._value_holder
        array = self.array.value()
        if array is None or not isinstance(array, list):
            raise ValueError("Invalid array for any function")
        
        for item in array:
            self._value_holder.holder = item
            if self.where is None or self.where.value():
                yield True

Test functions just created

In [None]:
runner: Runner = Runner("""
    return uppercase("hello world") as uppercased
""")
await runner.run()
for record in runner.results:
    print(record)

runner: Runner = Runner("""
    unwind [1, 2, 3, 4, 5] as num
    return product(num) as total_product
""")
await runner.run()
for record in runner.results:
    print(record)

runner: Runner = Runner("""
    load json from get("https://catfact.ninja/fact") as result
    return result.fact as cat_fact
""")
await runner.run()
for record in runner.results:
    print(record)

runner: Runner = Runner("""
    load json from json_file("../misc/data/test.json") as result
    unwind result as entry
    return entry
""")
await runner.run()
for record in runner.results:
    print(record)

runner: Runner = Runner("""
    with [
        {"age": 25, "name": "Alice"},
        {"age": 30, "name": "Bob"},
        {"age": 22, "name": "Charlie"},
        {"age": 28, "name": "Diana"}
    ] as people
    return extract(p IN people | p.name WHERE p.age > 25) as names_over_25
""")
await runner.run()
for record in runner.results:
    print(record)

runner: Runner = Runner("""
    with [1, 2, 3, 4, 5] as numbers
    return any(n IN numbers | n where n > 6) as has_greater_than_3
""")
await runner.run()
for record in runner.results:
    print(record)

List functions

In [None]:
runner: Runner = Runner("""
    unwind functions() as func
    return func
""")
await runner.run()
for record in runner.results:
    print(record)

Test virtual graph

In [None]:
await Runner("""
    create virtual (:User) as {
        load json from json_file(
            "../misc/data/users.json"
        ) as users
        unwind users as user
        return user.id as id,
            user.name as name,
            user.title as title,
            user.department as department,
            user.email as email,
            user.managerId as managerId
    }
""").run()

await Runner("""
    create virtual (:User)-[:MANAGED_BY]->(:User) as {
        load json from json_file(
            "../misc/data/users.json"
        ) as users
        unwind users as user
        return user.id as left_id, user.managerId as right_id
    }
""").run()

runner: Runner = Runner("""
    MATCH p=(u:User)-[:MANAGED_BY*]->(ceo:User)
    WHERE NOT (ceo)-[:MANAGED_BY]->(:User)
    and any(n IN nodes(p) | n where n.department = "Litigation")
    RETURN
        u.name as employee,
        extract(n IN nodes(p) | n.name) as management_chain
""")
await runner.run()
print(f"Total results: {len(runner.results)}")
for record in runner.results:
    print(record)