Skip to content

Commit

Permalink
Merge 8ae2892 into 007b845
Browse files Browse the repository at this point in the history
  • Loading branch information
simonwoerpel committed Jul 28, 2023
2 parents 007b845 + 8ae2892 commit 1468092
Show file tree
Hide file tree
Showing 42 changed files with 1,694 additions and 1,799 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ commit = True
tag = True
message = 🔖 Bump version: {current_version} → {new_version}

[bumpversion:file:investigraph/__init__.py]
[bumpversion:file:investigraph/settings.py]

[bumpversion:file:VERSION]

Expand Down
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.prefect
.test
data*
data
*.store
# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
5 changes: 5 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ repos:
- id: pyupgrade
args: [ "--py37-plus" ]

- repo: https://github.com/MarcoGorelli/absolufy-imports
rev: v0.3.1
hooks:
- id: absolufy-imports

- repo: https://github.com/pycqa/isort
rev: 5.12.0
hooks:
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## CHANGELOG

### 0.3.0 (2023-07-27)
- implement incremental task caching

### 0.2.0 (2023-07-19)
- integrate [runpandarun](https://github.com/simonwoerpel/runpandarun) into extract stage

Expand Down
5 changes: 4 additions & 1 deletion investigraph/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
__version__ = "0.2.0"
from investigraph.model import Context, Resolver, Source, TaskContext
from investigraph.settings import VERSION as __version__

__all__ = [__version__, Context, TaskContext, Source, Resolver]
24 changes: 13 additions & 11 deletions investigraph/cache.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import logging
from functools import cache
from typing import Any, Iterable, Set

import fakeredis
import redis
import shortuuid
from cachelib.serializers import RedisSerializer

from investigraph import settings
from investigraph.logging import get_logger
from investigraph.util import data_checksum

log = get_logger(__name__)
log = logging.getLogger(__name__)


class Cache:
"""
This is an extremely simple cache interface for sharing tasks data
efficiently via redis (or fakeredis during development)
it creates (prefixed) random keys during data set to cache.
it creates (prefixed) keys based on input data
it mimics redis GETDEL so that after fetching data from cache the key is
deleted (turn of by `delete=False`)
Expand All @@ -28,19 +28,21 @@ class Cache:
def __init__(self):
if settings.DEBUG:
con = fakeredis.FakeStrictRedis()
con.ping()
log.info("Redis connected: `fakeredis`")
else:
con = redis.from_url(settings.REDIS_URL)
con.ping()
log.info("Redis initialized", url=settings.REDIS_URL)
con.ping()
log.info("Redis connected: `{settings.REDIS_URL}`")
self.cache = con

def set(self, data: Any, key: str | None = None) -> str:
key = key or data_checksum(data)
data = self.serializer.dumps(data)
key = key or shortuuid.uuid()
self.cache.set(self.get_key(key), data)
return key

def get(self, key: str, delete: bool | None = True) -> Any:
def get(self, key: str, delete: bool | None = False) -> Any:
key = self.get_key(key)
res = self.cache.get(key)
if delete:
Expand All @@ -50,12 +52,12 @@ def get(self, key: str, delete: bool | None = True) -> Any:
return data

def sadd(self, *values: Iterable[Any], key: str | None = None) -> str:
key = key or shortuuid.uuid()
values = [str(v) for v in values]
key = key or data_checksum(values)
self.cache.sadd(self.get_key(key) + "#SET", *values)
return key

def smembers(self, key: str, delete: bool | None = True) -> Set[str]:
def smembers(self, key: str, delete: bool | None = False) -> Set[str]:
key = self.get_key(key) + "#SET"
res: Set[bytes] = self.cache.smembers(key)
if delete:
Expand All @@ -67,7 +69,7 @@ def flushall(self):

@staticmethod
def get_key(key: str) -> str:
return f"{settings.CACHE_PREFIX}:{key}"
return f"{settings.REDIS_PREFIX}:{key}"


@cache
Expand Down
13 changes: 6 additions & 7 deletions investigraph/catalog.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
import logging
from datetime import datetime
from typing import Any, Dict, Generator, Optional, Union

import requests
import yaml
from nomenklatura.dataset import DataCatalog
from nomenklatura.util import datetime_iso
from zavod.dataset import ZavodDataset

from investigraph.model.dataset import DEFAULT_CATALOG
from investigraph.model.dataset import NKCatalog as DataCatalog
from investigraph.util import PathLike

from .logging import get_logger

log = get_logger(__name__)
log = logging.getLogger(__name__)


def flatten_catalog(url: str) -> Generator[Dict[str, Any], None, None]:
Expand Down Expand Up @@ -50,12 +49,12 @@ def build_catalog(catalog_in: PathLike) -> DataCatalog:
```
"""
log.info("building catalog", catalog=str(catalog_in))
log.info("building catalog: `{catalog_in}`")
seen = set()
with open(catalog_in) as fh:
catalog_in_data = yaml.safe_load(fh)
catalog_in = str(catalog_in) # for logging
catalog = DataCatalog(ZavodDataset, {})
catalog = DEFAULT_CATALOG
catalog.updated_at = datetime_iso(datetime.utcnow())
for ds_data in catalog_in_data["datasets"]:
include_url: Optional[str] = ds_data.pop("include", None)
Expand Down
13 changes: 5 additions & 8 deletions investigraph/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
from investigraph.pipeline import run
from investigraph.settings import DATASETS_BLOCK, DATASETS_REPO

from .logging import configure_logging

configure_logging()

cli = typer.Typer()


Expand Down Expand Up @@ -87,8 +83,8 @@ def cli_inspect(
):
config = inspect_config(config_path)
print(f"[bold green]OK[/bold green] `{config_path}`")
print(f"[bold]dataset:[/bold] {config.dataset}")
print(f"[bold]title:[/bold] {config.metadata.get('title')}")
print(f"[bold]dataset:[/bold] {config.dataset.name}")
print(f"[bold]title:[/bold] {config.dataset.title}")

if extract:
for name, df in inspect_extract(config):
Expand Down Expand Up @@ -128,8 +124,9 @@ def cli_catalog(
data = orjson.dumps(catalog.to_dict(), option=orjson.OPT_APPEND_NEWLINE)
if uri == "-":
sys.stdout.write(data.decode())
with open(uri, "wb") as fh:
fh.write(data)
else:
with open(uri, "wb") as fh:
fh.write(data)


@cli.command("reset")
Expand Down
54 changes: 0 additions & 54 deletions investigraph/logging.py

This file was deleted.

51 changes: 27 additions & 24 deletions investigraph/logic/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,56 @@
aggregate fragments
"""

import logging
from uuid import uuid4

from ftmq.io import smart_read_proxies
from ftmstore import get_dataset

from investigraph.cache import get_cache
from investigraph.model import Context
from investigraph.types import CEGenerator
from investigraph.settings import CHUNK_SIZE
from investigraph.types import CE

log = logging.getLogger(__name__)
COMMON_SCHEMAS = ("Organization", "LegalEntity")


def get_smart_proxies(ctx: Context, uri: str) -> CEGenerator:
"""
see if we have parts in cache during run time
(mimics efficient globbing for remote sources)
"""
cache = get_cache()
uris = cache.smembers(ctx.make_cache_key(uri))
if uris:
for uri in uris:
yield from smart_read_proxies(uri)
return
def merge(ctx: Context, p1: CE, p2: CE) -> CE:
try:
p1.merge(p2)
return p1
except Exception as e:
# try common schemata, this will probably "downgrade" entities
# as in, losing some schema specific properties
for schema in COMMON_SCHEMAS:
if p1.schema.is_a(schema) and p2.schema.is_a(schema):
p1 = ctx.make(schema, **p1.to_dict()["properties"])
p1.id = p2.id
p2 = ctx.make(schema, **p2.to_dict()["properties"])
p1.merge(p2)
return p1

yield from smart_read_proxies(uri)
ctx.log.warn(f"{e}, id: `{p1.id}`")


def in_memory(ctx: Context, in_uri: str) -> tuple[int, int]:
def in_memory(ctx: Context, *uris: tuple[str]) -> tuple[int, int]:
"""
aggregate in memory: read fragments from `in_uri` and write aggregated
proxies to `out_uri`
as `smart_open` is used here, `in_uri` and `out_uri` can be any local or
remote locations
"""
fragments = 0
ix = 0
buffer = {}
for proxy in get_smart_proxies(ctx, in_uri):
fragments += 1
for ix, proxy in enumerate(smart_read_proxies(uris), 1):
if ix % (CHUNK_SIZE * 10) == 0:
ctx.log.info("reading in proxy %d ..." % ix)
if proxy.id in buffer:
buffer[proxy.id].merge(proxy)
buffer[proxy.id] = merge(ctx, buffer[proxy.id], proxy)
else:
buffer[proxy.id] = proxy

ctx.load_entities(buffer.values(), serialize=True)
return fragments, len(buffer.values())
return ix, len(buffer.values())


def in_db(ctx: Context, in_uri: str) -> tuple[int, int]:
Expand All @@ -58,9 +61,9 @@ def in_db(ctx: Context, in_uri: str) -> tuple[int, int]:
"""
dataset = get_dataset("aggregate_%s" % uuid4().hex)
bulk = dataset.bulk()
for ix, proxy in enumerate(get_smart_proxies(ctx, in_uri)):
for ix, proxy in enumerate(smart_read_proxies(in_uri)):
if ix % 10_000 == 0:
log.info("Write [%s]: %s entities", dataset.name, ix)
ctx.log.info("Write [%s]: %s entities", dataset.name, ix)
bulk.put(proxy, fragment=str(ix))
bulk.flush()
proxies = []
Expand Down
7 changes: 5 additions & 2 deletions investigraph/logic/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
Extract sources to iterate objects to dict records
"""

import numpy as np
import pandas as pd
from pantomime import types
from runpandarun.io import guess_handler_from_mimetype

from investigraph.model.context import Context
from investigraph.model.resolver import Resolver
Expand All @@ -15,15 +17,16 @@

def yield_pandas(df: pd.DataFrame) -> RecordGenerator:
for _, row in df.iterrows():
row = {k: v if not pd.isna(v) else None for k, v in row.items()}
yield row
yield dict(row.replace(np.nan, None))


def extract_pandas(
resolver: Resolver, chunk_size: int | None = 10_000
) -> RecordGenerator:
play = resolver.source.pandas
play.read.uri = resolver.source.uri
if play.read.handler is None:
play.read.handler = f"read_{guess_handler_from_mimetype(resolver.mimetype)}"
for ix, chunk in enumerate(resolver.iter(chunk_size)):
df = play.read.handle(chunk)
if resolver.mimetype == types.CSV:
Expand Down
Loading

0 comments on commit 1468092

Please sign in to comment.