Skip to content

Commit

Permalink
Merge 53bbd90 into 6a2d2ab
Browse files Browse the repository at this point in the history
  • Loading branch information
simonwoerpel committed Aug 21, 2023
2 parents 6a2d2ab + 53bbd90 commit 0657ddc
Show file tree
Hide file tree
Showing 29 changed files with 1,655 additions and 291 deletions.
6 changes: 4 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,20 @@ repos:
hooks:
- id: pyproject-flake8
additional_dependencies: [ flake8-bugbear ]
args: [ "--extend-ignore", "E501" ]
args: [ "--extend-ignore", "E203, E501" ]
exclude: (test_[\w]+\.py|\.csv|\.json|\.lock)$

- repo: https://github.com/codespell-project/codespell
rev: v2.2.5
hooks:
- id: codespell
exclude: \.(csv|json|lock)$
exclude: (test_[\w]+\.py|\.csv|\.json|\.lock)$

- repo: https://github.com/pre-commit/pygrep-hooks
rev: v1.10.0
hooks:
- id: python-check-blanket-noqa
exclude: (test_[\w]+\.py)$
- id: python-check-blanket-type-ignore
- id: python-no-eval
- id: python-use-type-annotations
Expand Down
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ typecheck:
poetry run mypy --strict ftmq

test:
rm -rf .test
poetry run pytest -v --capture=sys --cov=ftmq --cov-report term-missing
rm -rf .test

build:
poetry run build
Expand Down
54 changes: 54 additions & 0 deletions benchmark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import os
import time
from contextlib import contextmanager
from shutil import rmtree

from ftmq.io import smart_read_proxies
from ftmq.query import Query
from ftmq.store import get_store

DATASET = "ec_meetings"


def get_proxies():
yield from smart_read_proxies("./tests/fixtures/ec_meetings.ftm.json")


@contextmanager
def measure(*msg: str):
start = time.time()
try:
yield None
finally:
end = time.time()
print(*msg, round(end - start, 2))


def benchmark(uri: str):
store = get_store(uri, dataset=DATASET)
prefix = store.__class__.__name__
print(prefix, uri)

with measure(prefix, "write"):
with store.writer() as bulk:
for proxy in get_proxies():
bulk.add_entity(proxy)

with measure(prefix, "iterate"):
_ = [p for p in store.iterate()]

view = store.query()
q = Query().where(
dataset=DATASET, schema="Event", prop="date", value=2023, operator="gte"
)
with measure(prefix, "query"):
_ = [p for p in view.entities(q)]


if __name__ == "__main__":
os.mkdir(".benchmark")
benchmark("memory:///")
benchmark("leveldb://.benchmark/leveldb")
benchmark("sqlite:///.benchmark/sqlite")
benchmark("postgresql:///ftm")
rmtree(".benchmark", ignore_errors=True)
124 changes: 124 additions & 0 deletions ftmq/aleph.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from collections import defaultdict
from collections.abc import Generator
from functools import cached_property
from typing import Any
from urllib.parse import urlparse, urlunparse

from alephclient.api import AlephAPI
from alephclient.settings import API_KEY, HOST, MAX_TRIES
from followthemoney.namespace import Namespace
from nomenklatura.dataset import DS
from nomenklatura.entity import CE, CompositeEntity
from nomenklatura.resolver import Resolver
from nomenklatura.statement import Statement
from nomenklatura.store import Store, View, Writer

from ftmq.util import make_proxy

uns = Namespace()


def parse_uri(uri: str) -> tuple[str, str, str]:
"""
http+aleph://host.org
http+aleph://dataset@host.org
https+aleph://dataset:api_key@host.org
"""
api_key = API_KEY
dataset = None
parsed = urlparse(uri)
scheme = parsed.scheme.split("+")[0]
*datasets, host = parsed.netloc.split("@", 1)
host = urlunparse([scheme, host, *parsed[2:]])
if len(datasets) == 1:
dataset, *api_key = datasets[0].split(":", 1)
if len(api_key) == 1:
api_key = api_key[0]

return host, api_key or None, dataset


class AlephStore(Store[CE, DS]):
def __init__(
self,
dataset: DS,
resolver: Resolver,
host: str | None = None,
api_key: str | None = None,
):
super().__init__(dataset, resolver)
self.host = host or HOST
self.api_key = api_key or API_KEY

@cached_property
def api(self):
return AlephAPI(self.host, self.api_key, retries=MAX_TRIES)

@cached_property
def collection(self) -> dict[str, Any]:
return self.api.load_collection_by_foreign_id(self.dataset.name)

def view(self, scope: DS, external: bool = False) -> View[DS, CE]:
return AlephView(self, scope, external=external)

def writer(self) -> Writer[DS, CE]:
return AlephWriter(self)


class AlephView(View[CE, DS]):
def __init__(
self, store: AlephStore[DS, CE], scope: DS, external: bool = False
) -> None:
super().__init__(store, scope, external=external)
self.store: AlephStore[DS, CE] = store

def entities(self, *args) -> Generator[CE, None, None]:
for proxy in self.store.api.stream_entities(self.store.collection):
proxy = make_proxy(proxy, dataset=self.store.dataset.name)
yield uns.apply(proxy)

def get_entity(self, id: str) -> CE | None:
ns = Namespace(self.store.dataset.name)
entity_id = ns.sign(id)
proxy = self.store.api.get_entity(entity_id)
if proxy is not None:
proxy = make_proxy(proxy, self.store.dataset.name)
return uns.apply(proxy)
return None


class AlephWriter(Writer[DS, CE]):
BATCH = 1_000

def __init__(self, store: AlephStore[DS, CE]):
self.store: AlephStore[DS, CE] = store
self.batch: dict[str, set[Statement]] = defaultdict(set)

def flush(self) -> None:
entities = []
if self.batch:
for stmts in self.batch.values():
entities.append(
CompositeEntity.from_statements(self.store.dataset, stmts)
)
self.store.api.write_entities(self.store.collection.get("id"), entities)
self.batch = defaultdict(set)

def add_statement(self, stmt: Statement) -> None:
if stmt.entity_id is None:
return
if len(self.batch) >= self.BATCH:
self.flush()
canonical_id = self.store.resolver.get_canonical(stmt.entity_id)
stmt.canonical_id = canonical_id
self.batch[stmt.canonical_id].add(stmt)

def pop(self, entity_id: str) -> list[Statement]:
# FIXME this actually doesn't delete anything
self.flush()
statements: list[Statement] = []
view = self.store.default_view()
entity = view.get_entity(entity_id)
if entity is not None:
statements = list(entity.statements)
return statements
31 changes: 28 additions & 3 deletions ftmq/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,26 @@ def cli():
@click.option(
"--schema-include-matchable", is_flag=True, default=False, show_default=True
)
@click.option("--sort", help="Properties to sort for", multiple=True)
@click.option(
"--sort-ascending/--sort-descending",
is_flag=True,
help="Sort in ascending order",
default=True,
show_default=True,
)
@click.option(
"--coverage-uri",
default=None,
show_default=True,
help="If specified, print coverage information to this uri",
)
@click.option(
"--store-dataset",
default=None,
show_default=True,
help="If specified, default dataset for source and target stores",
)
@click.argument("properties", nargs=-1)
def q(
input_uri: str | None = "-",
Expand All @@ -51,8 +66,11 @@ def q(
schema: tuple[str] | None = (),
schema_include_descendants: bool | None = False,
schema_include_matchable: bool | None = False,
sort: tuple[str] | None = None,
sort_ascending: bool | None = True,
properties: tuple[str] | None = (),
coverage_uri: str | None = None,
store_dataset: str | None = None,
):
"""
Apply ftmq filter to a json stream of ftm entities.
Expand All @@ -68,13 +86,20 @@ def q(
)
for prop, value, op in parse_unknown_cli_filters(properties):
q = q.where(prop=prop, value=value, operator=op)
if len(sort):
q = q.order_by(*sort, ascending=sort_ascending)

proxies = q.apply_iter(smart_read_proxies(input_uri))
if len(dataset) == 1:
store_dataset = store_dataset or dataset[0]
proxies = smart_read_proxies(input_uri, dataset=store_dataset, query=q)
if coverage_uri:
coverage = Collector()
proxies = coverage.apply(proxies)
smart_write_proxies(output_uri, proxies, serialize=True, dataset=store_dataset)
if coverage_uri:
coverage = Collector.apply(proxies)
coverage = coverage.export()
coverage = orjson.dumps(coverage.dict(), option=orjson.OPT_APPEND_NEWLINE)
smart_write(coverage_uri, coverage)
smart_write_proxies(output_uri, proxies, serialize=True)


@cli.command("apply")
Expand Down
26 changes: 24 additions & 2 deletions ftmq/enums.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from collections.abc import Iterable
from enum import Enum
from typing import Any, Iterable
from typing import Any

from followthemoney import model
from nomenklatura.dataset.coverage import DataCoverage
Expand Down Expand Up @@ -34,5 +35,26 @@ def name(self, name: str):

Schemata = StrEnum("Schemata", [k for k in model.schemata.keys()])
Properties = StrEnum("Properties", [n for n in {p.name for p in model.properties}])
Operators = StrEnum("Operators", ["in", "null", "gt", "gte", "lt", "lte"])
PropertyTypes = {p.name: p.type for p in model.properties}
PropertyTypes = Enum("PropertyTypes", PropertyTypes.items())
Operators = StrEnum(
"Operators",
[
"not",
"in",
"not_in",
"null",
"gt",
"gte",
"lt",
"lte",
"like",
"ilike",
"notlike",
"notilike",
"between",
"startswith",
"endswith",
],
)
Frequencies = StrEnum("Frequencies", [f for f in DataCoverage.FREQUENCIES])
4 changes: 4 additions & 0 deletions ftmq/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
class ValidationError(Exception):
pass


class ImproperlyConfigured(Exception):
pass
Loading

0 comments on commit 0657ddc

Please sign in to comment.