Skip to content

Commit

Permalink
Merge 48f03b9 into f15a5d7
Browse files Browse the repository at this point in the history
  • Loading branch information
simonwoerpel committed Sep 8, 2023
2 parents f15a5d7 + 48f03b9 commit 1d218da
Show file tree
Hide file tree
Showing 27 changed files with 2,711 additions and 1,870 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build-docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Docker meta
Expand All @@ -37,7 +37,7 @@ jobs:
uses: docker/build-push-action@v4
with:
context: .
# platforms: linux/amd64,linux/arm64
platforms: linux/amd64,linux/arm64
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:
python-version: ["3.10", "3.11"]

steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
Expand Down
10 changes: 5 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ repos:
args: [ "--fix=lf" ]
- id: trailing-whitespace

- repo: https://github.com/asottile/pyupgrade
rev: v3.10.1
hooks:
- id: pyupgrade
args: [ "--py39-plus" ]
# - repo: https://github.com/asottile/pyupgrade
# rev: v3.10.1
# hooks:
# - id: pyupgrade
# args: [ "--py39-plus" ]

- repo: https://github.com/MarcoGorelli/absolufy-imports
rev: v0.3.1
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ RUN apt-get update && apt-get -y upgrade
RUN pip install -q -U pip setuptools

RUN apt-get install -y pkg-config libicu-dev
RUN apt-get install -y libleveldb-dev
RUN pip install -q --no-binary=:pyicu: pyicu
RUN pip install -q psycopg2-binary
RUN pip install -q asyncpg
Expand Down
3 changes: 3 additions & 0 deletions benchmark.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#!/bin/sh

investigraph add-block -b github/investigraph-eu -u https://github.com/investigativedata/investigraph-eu/datasets
9 changes: 5 additions & 4 deletions investigraph/cache.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from collections.abc import Iterable
from functools import cache
from typing import Any, Iterable, Set
from typing import Any, Set

import fakeredis
import redis
Expand Down Expand Up @@ -42,7 +43,7 @@ def set(self, data: Any, key: str | None = None) -> str:
self.cache.set(self.get_key(key), data)
return key

def get(self, key: str, delete: bool | None = False) -> Any:
def get(self, key: str, delete: bool | None = settings.DEBUG) -> Any:
key = self.get_key(key)
res = self.cache.get(key)
if delete:
Expand All @@ -57,15 +58,15 @@ def sadd(self, *values: Iterable[Any], key: str | None = None) -> str:
self.cache.sadd(self.get_key(key) + "#SET", *values)
return key

def smembers(self, key: str, delete: bool | None = False) -> Set[str]:
def smembers(self, key: str, delete: bool | None = settings.DEBUG) -> Set[str]:
key = self.get_key(key) + "#SET"
res: Set[bytes] = self.cache.smembers(key)
if delete:
self.cache.delete(key)
return {v.decode() for v in res} or None

def flushall(self):
self.cache.flushall()
return self.cache.flushall()

@staticmethod
def get_key(key: str) -> str:
Expand Down
13 changes: 11 additions & 2 deletions investigraph/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,18 @@
from investigraph.model.block import get_block
from investigraph.model.flow import FlowOptions
from investigraph.pipeline import run
from investigraph.settings import DATASETS_BLOCK, DATASETS_REPO
from investigraph.settings import DATASETS_BLOCK, DATASETS_REPO, VERSION

cli = typer.Typer()
cli = typer.Typer(no_args_is_help=True)


@cli.callback(invoke_without_command=True)
def cli_version(
version: Annotated[Optional[bool], typer.Option(..., help="Show version")] = False
):
if version:
print(VERSION)
raise typer.Exit()


@cli.command("run")
Expand Down
13 changes: 7 additions & 6 deletions investigraph/logic/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import TYPE_CHECKING, Literal, TypeAlias
from uuid import uuid4

from followthemoney import model
from ftmq.io import smart_read_proxies
from ftmq.model.coverage import Collector
from ftmstore import get_dataset
Expand All @@ -23,7 +24,8 @@

def merge(ctx: "Context", p1: CE, p2: CE) -> CE:
try:
p1.merge(p2)
p1 = p1.merge(p2)
p1.schema = model.common_schema(p1.schema, p2.schema)
return p1
except Exception as e:
# try common schemata, this will probably "downgrade" entities
Expand All @@ -33,7 +35,7 @@ def merge(ctx: "Context", p1: CE, p2: CE) -> CE:
p1 = ctx.make(schema, **p1.to_dict()["properties"])
p1.id = p2.id
p2 = ctx.make(schema, **p2.to_dict()["properties"])
p1.merge(p2)
p1 = p1.merge(p2)
return p1

ctx.log.warn(f"{e}, id: `{p1.id}`")
Expand All @@ -51,6 +53,7 @@ def __init__(self, ctx: "Context", fragment_uris: list[str]) -> None:
self.fragments = 0

def get_fragments(self) -> CEGenerator:
ix = -1
for ix, proxy in enumerate(smart_read_proxies(self.fragment_uris)):
if ix % self.ctx.config.aggregate.chunk_size == 0:
self.ctx.log.info("reading in proxy %d ..." % ix)
Expand Down Expand Up @@ -80,10 +83,8 @@ def aggregate_memory(self) -> CEGenerator:
def iterate(
self, collector: Collector, handler: Literal["memory", "db"] | None = "memory"
) -> CEGenerator:
iterator = (
self.aggregate_memory() if handler == "memory" else self.aggregate_db()
)
for proxy in iterator:
aggregator = self.aggregate_db if handler == "db" else self.aggregate_memory
for proxy in aggregator():
collector.collect(proxy)
yield proxy

Expand Down
48 changes: 25 additions & 23 deletions investigraph/logic/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,54 +8,56 @@
"""


from urllib.parse import urlencode

import requests
from prefect import flow, get_run_logger, task
from prefect.tasks import task_input_hash

from investigraph import settings
from investigraph.model.source import Source


def get_request_cache_key(*args, **kwargs) -> str:
return f"REQUEST#{task_input_hash(*args, **kwargs)}"
ckey = kwargs.pop("ckey", None)
if ckey is not None:
return ckey
return f"GET#{task_input_hash(*args, **kwargs)}"


@task(
name="get",
retries=settings.TASK_RETRIES,
retry_delay_seconds=settings.TASK_RETRY_DELAY,
cache_key_fn=get_request_cache_key,
persist_result=True,
cache_expiration=settings.TASK_CACHE_EXPIRATION,
refresh_cache=not settings.TASK_CACHE,
)
def http(method: str, url: str, *args, **kwargs):
def _get(url: str, *args, **kwargs):
log = get_run_logger()
log.info(f"{method.upper()} {url}")
func = getattr(requests, method)
return func(url, *args, **kwargs)


@flow(name="requests.head", flow_run_name="head-{url}", version=settings.VERSION)
def head(url: str, *args, **kwargs):
"""
Execute `requests.get` within prefect context.
"""
return http("head", url, *args, **kwargs)
kwargs.pop("ckey", None)
log.info(f"GET {url}?{urlencode(kwargs)}")
res = requests.get(url, *args, **kwargs)
assert res.ok
return res


@flow(name="requests.get", flow_run_name="get-{url}", version=settings.VERSION)
@flow(name="dispatch-get", flow_run_name="get-{url}", version=settings.VERSION)
def get(url: str, *args, **kwargs):
"""
Execute `requests.get` within prefect context.
Do a `head` request beforehand to be able to use cache.
"""
return http("get", url, *args, **kwargs)


@flow(name="requests.post", flow_run_name="post-{url}", version=settings.VERSION)
def post(url: str, *args, **kwargs):
"""
Execute `requests.post` within prefect context.
"""
return http("post", url, *args, **kwargs)
source = Source(uri=url)
head = source.head()
return _get(url, *args, ckey=head.ckey, **kwargs)


# convenience
post = requests.post
head = requests.head
put = requests.put
delete = requests.delete
patch = requests.patch
Response = requests.Response
17 changes: 17 additions & 0 deletions investigraph/logic/seed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""
Seed sources for extraction
"""

from typing import Generator

from fsspec import get_fs_token_paths

from investigraph.model.context import BaseContext
from investigraph.model.source import Source


def handle(ctx: BaseContext) -> Generator[Source, None, None]:
if ctx.config.seed.glob is not None:
_, _, uris = get_fs_token_paths(ctx.config.seed.glob)
for uri in uris:
yield Source(uri=uri)
6 changes: 3 additions & 3 deletions investigraph/logic/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@
if TYPE_CHECKING:
from investigraph.model import Context

from ftmq.io import make_proxy

from investigraph.types import CEGenerator, SDict
from investigraph.util import uplevel_proxy


def map_record(record: SDict, mapping: QueryMapping) -> CEGenerator:
mapping = mapping.get_mapping()
if mapping.source.check_filters(record):
entities = mapping.map(record)
for proxy in entities.values():
proxy = uplevel_proxy(proxy)
yield proxy
yield make_proxy(proxy.to_dict())


def map_ftm(ctx: "Context", data: SDict, ix: int) -> CEGenerator:
Expand Down
6 changes: 6 additions & 0 deletions investigraph/model/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
AggregateStage,
ExtractStage,
LoadStage,
SeedStage,
TransformStage,
)
from investigraph.settings import DATASETS_BLOCK
Expand All @@ -24,6 +25,7 @@
class Config(BaseModel, YamlMixin, RemoteMixin):
dataset: Dataset
base_path: Path | None = Path()
seed: SeedStage | None = SeedStage()
extract: ExtractStage | None = ExtractStage()
transform: TransformStage | None = TransformStage()
load: LoadStage | None = LoadStage()
Expand All @@ -48,6 +50,10 @@ def from_string(cls, data: str, base_path: PathLike | None = ".") -> "Config":
config = cls(**data)

# custom user code
if not is_module(config.seed.handler):
config.seed.handler = str(
absolute_path(config.seed.handler, config.base_path)
)
if not is_module(config.extract.handler):
config.extract.handler = str(
absolute_path(config.extract.handler, config.base_path)
Expand Down
39 changes: 32 additions & 7 deletions investigraph/model/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,24 @@
from pydantic import BaseModel

from investigraph.cache import Cache, get_cache
from investigraph.logic.aggregate import AggregatorResult
from investigraph.logic.aggregate import AggregatorResult, merge
from investigraph.model.config import Config
from investigraph.model.source import Source
from investigraph.types import CEGenerator
from investigraph.util import join_slug, make_proxy


class Context(BaseModel):
class BaseContext(BaseModel):
dataset: str
prefix: str
config: Config
source: Source

def __hash__(self) -> int:
return hash(repr(self.dict()))

def __eq__(self, other) -> bool:
return hash(self) == hash(other)

@property
def cache(self) -> Cache:
return get_cache()
Expand Down Expand Up @@ -77,16 +79,39 @@ def task(self) -> "TaskContext":
def emit(self) -> None:
raise NotImplementedError

def from_source(self, source: Source) -> "Context":
return Context(
dataset=self.config.dataset.name,
prefix=self.config.dataset.prefix,
config=self.config,
source=source,
)

@classmethod
def from_config(cls, config: Config) -> "BaseContext":
return cls(
dataset=config.dataset.name,
prefix=config.dataset.prefix,
config=config,
)


class Context(BaseContext):
source: Source


class TaskContext(Context):
proxies: list[CE] = []
proxies: dict[str, CE] = {}

def __iter__(self) -> CEGenerator:
yield from self.proxies
yield from self.proxies.values()

def emit(self, proxy: CE) -> None:
# mimic zavod
self.proxies.append(proxy)
# mimic zavod api, do merge already
if proxy.id in self.proxies:
self.proxies[proxy.id] = merge(self, self.proxies[proxy.id], proxy)
else:
self.proxies[proxy.id] = proxy


def init_context(config: Config, source: Source) -> Context:
Expand Down
6 changes: 2 additions & 4 deletions investigraph/model/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,8 @@ def get_cache_key(self) -> str:
slug = f"RESOLVE#{slugify(self.source.uri)}"
if self.source.is_http:
self._resolve_head()
if self.head.etag:
return f"{slug}#{self.head.etag}"
if self.head.last_modified:
return f"{slug}#{self.head.last_modified.isoformat()}"
if self.head.ckey:
return f"{slug}#{self.head.ckey}"
if not self.source.stream:
self._resolve_content()
return self.checksum
Expand Down
Loading

0 comments on commit 1d218da

Please sign in to comment.