Skip to content

Commit

Permalink
Merge 183e171 into 60888a8
Browse files Browse the repository at this point in the history
  • Loading branch information
simonwoerpel committed Jul 13, 2023
2 parents 60888a8 + 183e171 commit 481285e
Show file tree
Hide file tree
Showing 49 changed files with 2,589 additions and 547 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ name: Python

on:
push:
branches: [ "main" ]
branches: [ "develop" ]
pull_request:
branches: [ "main" ]
branches: [ "main", "develop" ]

jobs:
test:
Expand All @@ -30,15 +30,15 @@ jobs:
- name: set PY
run: echo "PY=$(python -VV | sha256sum | cut -d' ' -f1)" >> $GITHUB_ENV
- name: Set up poetry cache
uses: actions/cache@v1
uses: actions/cache@v3
with:
path: .venv
key: venv-${{ runner.os }}-${{ env.PY }}-${{ hashFiles('**/poetry.lock') }}
- name: Ensure cache is healthy
if: steps.cache.outputs.cache-hit == 'true'
run: poetry run pip --version >/dev/null 2>&1 || rm -rf .venv
- name: Set up pre-commit cache
uses: actions/cache@v1
uses: actions/cache@v3
with:
path: ~/.cache/pre-commit
key: pre-commit-${{ runner.os }}-${{ env.PY }}-${{ hashFiles('.pre-commit-config.yaml') }}
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.prefect
.test
data*
*.store
Expand Down
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ repos:
rev: 5.12.0
hooks:
- id: isort
args: ["--profile", "black"]

- repo: https://github.com/psf/black
rev: 23.1.0
Expand Down
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# investigraph-etl

## CHANGELOG

### 0.1.0 (2023-07.13)
- breaking changes in:
- cli invocation for `investigraph run`
- `config.yaml` structure
- make all three stages customizable via *bring your own code*
- many small fixes and improvements
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ RUN pip install -q asyncpg

COPY investigraph /investigraph/investigraph
COPY setup.py /investigraph/
COPY setup.cfg /investigraph/
COPY pyproject.toml /investigraph/
COPY VERSION /investigraph/
COPY README.md /investigraph/

RUN pip install -q /investigraph

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pre-commit:

test:
rm -rf .test
poetry run pytest tests -v -s --cov=investigraph --cov-report term-missing
poetry run pytest tests -v --capture=sys --cov=investigraph --cov-report term-missing
rm -rf .test

typecheck:
Expand Down
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[![Python test and package](https://github.com/investigativedata/investigraph-etl/actions/workflows/python.yml/badge.svg)](https://github.com/investigativedata/investigraph-etl/actions/workflows/python.yml) [![Build docker container](https://github.com/investigativedata/investigraph-etl/actions/workflows/build-docker.yml/badge.svg)](https://github.com/investigativedata/investigraph-etl/actions/workflows/build-docker.yml) [![pre-commit](https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit)](https://github.com/pre-commit/pre-commit) [![Coverage Status](https://coveralls.io/repos/github/investigativedata/investigraph-etl/badge.svg?branch=main)](https://coveralls.io/github/investigativedata/investigraph-etl?branch=main)
[![investigraph on pypi](https://img.shields.io/pypi/v/investigraph)](https://pypi.org/project/investigraph/) [![Python test and package](https://github.com/investigativedata/investigraph-etl/actions/workflows/python.yml/badge.svg)](https://github.com/investigativedata/investigraph-etl/actions/workflows/python.yml) [![Build docker container](https://github.com/investigativedata/investigraph-etl/actions/workflows/build-docker.yml/badge.svg)](https://github.com/investigativedata/investigraph-etl/actions/workflows/build-docker.yml) [![pre-commit](https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit)](https://github.com/pre-commit/pre-commit) [![Coverage Status](https://coveralls.io/repos/github/investigativedata/investigraph-etl/badge.svg?branch=main)](https://coveralls.io/github/investigativedata/investigraph-etl?branch=main) [![MIT License](https://img.shields.io/pypi/l/investigraph)](./LICENSE)

# investigraph

Expand Down Expand Up @@ -26,19 +26,21 @@ There is a dedicated [repo](https://github.com/investigativedata/investigraph-da

## run locally

Clone repo first.

Install app and dependencies (use a virtualenv):

pip install -e .
pip install investigraph

Or, e.g. when using [poetry](https://python-poetry.org/):

poetry add investigraph

After installation, `investigraph` as a command should be available:

investigraph --help

Quick run a local dataset definition:

investigraph run <dataset_name> -c ./path/to/config.yml
investigraph run -c ./path/to/config.yml

Register a local datasets block:

Expand All @@ -50,7 +52,7 @@ Register github datasets block:

Run a dataset pipeline from a dataset defined in a registered block:

investigraph run ec_meetings
investigraph run -d ec_meetings -b github/investigraph-datasets

View prefect dashboard:

Expand Down
2 changes: 1 addition & 1 deletion investigraph/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

class Cache:
"""
This is an extremly simple cache interface for sharing tasks data
This is an extremely simple cache interface for sharing tasks data
efficiently via redis (or fakeredis during development)
it creates (prefixed) random keys during data set to cache.
Expand Down
4 changes: 3 additions & 1 deletion investigraph/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@

@cli.command("run")
def cli_run(
dataset: str,
dataset: Annotated[Optional[str], typer.Option("-d")] = None,
block: Annotated[Optional[str], typer.Option("-b")] = None,
config: Annotated[Optional[str], typer.Option("-c")] = None,
index_uri: Annotated[Optional[str], typer.Option(...)] = None,
fragments_uri: Annotated[Optional[str], typer.Option(...)] = None,
entities_uri: Annotated[Optional[str], typer.Option(...)] = None,
aggregate: Annotated[Optional[bool], typer.Option(...)] = True,
chunk_size: Annotated[Optional[int], typer.Option(...)] = None,
):
"""
Execute a dataset pipeline
Expand All @@ -45,6 +46,7 @@ def cli_run(
fragments_uri=fragments_uri,
entities_uri=entities_uri,
aggregate=aggregate,
chunk_size=chunk_size,
)
run(options)

Expand Down
29 changes: 18 additions & 11 deletions investigraph/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from investigraph.model.config import Config, get_config
from investigraph.model.context import init_context
from investigraph.model.source import Source
from investigraph.util import get_func


def print_error(msg: str):
Expand All @@ -26,7 +25,7 @@ def get_records(source: Source) -> list[dict[str, Any]]:
records: list[dict[str, Any]] = []
print("Fetching `%s` ..." % source.uri)
res = fetch_source(source)
for ix, rec in enumerate(iter_records(res)):
for ix, rec in enumerate(iter_records(res), 1):
records.append(rec)
if ix == 5:
return records
Expand All @@ -35,21 +34,30 @@ def get_records(source: Source) -> list[dict[str, Any]]:
def inspect_config(p: PathLike) -> Config:
config = get_config(path=p)
try:
func = get_func(config.parse_module_path)
if not callable(func):
if not callable(config.extract.get_handler()):
print_error(f"module not found or not callable: `{config.extract.handler}`")
except ModuleNotFoundError:
print_error(f"no custom extract module: `{config.extract.handler}`")
try:
if not callable(config.transform.get_handler()):
print_error(
f"module not found or not callable: `{config.parse_module_path}`"
f"module not found or not callable: `{config.transform.handler}`"
)
except ModuleNotFoundError:
print_error(f"no parsing function: `{config.parse_module_path}`")
print_error(f"no custom transform module: `{config.transform.handler}`")
try:
if not callable(config.load.get_handler()):
print_error(f"module not found or not callable: `{config.load.handler}`")
except ModuleNotFoundError:
print_error(f"no custom load module: `{config.load.handler}`")
return config


def inspect_extract(config: Config) -> Generator[tuple[str, pd.DataFrame], None, None]:
"""
Preview fetched & extracted records in tabular format
"""
for source in config.pipeline.sources:
for source in config.extract.sources:
df = pd.DataFrame(get_records(source))
yield source.name, df

Expand All @@ -58,11 +66,10 @@ def inspect_transform(config: Config) -> Generator[tuple[str, CE], None, None]:
"""
Preview first proxies
"""
func = get_func(config.parse_module_path)
for source in config.pipeline.sources:
for source in config.extract.sources:
ctx = init_context(config, source)
proxies: list[CE] = []
for rec in get_records(source):
for proxy in func(ctx, rec):
for ix, rec in enumerate(get_records(source)):
for proxy in ctx.config.transform.handle(ctx, rec, ix):
proxies.append(proxy)
yield source.name, proxies
5 changes: 3 additions & 2 deletions investigraph/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
from typing import List

import structlog
from banal import ensure_list
from structlog.contextvars import merge_contextvars
from structlog.stdlib import get_logger as get_raw_logger
from structlog.types import Processor


def configure_logging(
level: int = logging.INFO,
extra_processors: List[Processor] = [],
extra_processors: List[Processor] = None,
) -> None:
"""Configure log levels and structured logging."""
processors: List[Processor] = [
Expand All @@ -23,7 +24,7 @@ def configure_logging(
structlog.processors.UnicodeDecoder(),
merge_contextvars,
]
processors.extend(extra_processors)
processors.extend(ensure_list(extra_processors))
renderer = structlog.dev.ConsoleRenderer(
exception_formatter=structlog.dev.plain_traceback
)
Expand Down
24 changes: 12 additions & 12 deletions investigraph/logic/aggregate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,29 @@
import logging
from uuid import uuid4

from ftmq.io import smart_read_proxies
from ftmstore import get_dataset

from investigraph.cache import get_cache
from investigraph.model import Context
from investigraph.types import CEGenerator
from investigraph.util import smart_iter_proxies

log = logging.getLogger(__name__)


def get_smart_proxies(uri: str) -> CEGenerator:
def get_smart_proxies(ctx: Context, uri: str) -> CEGenerator:
"""
see if we have parts in cache during run time
(mimics efficient globbing for remote sources)
"""
cache = get_cache()
uris = cache.smembers(uri)
uris = cache.smembers(ctx.make_cache_key(uri))
if uris:
for uri in uris:
yield from smart_iter_proxies(uri)
yield from smart_read_proxies(uri)
return

yield from smart_iter_proxies(uri)
yield from smart_read_proxies(uri)


def in_memory(ctx: Context, in_uri: str) -> tuple[int, int]:
Expand All @@ -40,14 +40,14 @@ def in_memory(ctx: Context, in_uri: str) -> tuple[int, int]:
"""
fragments = 0
buffer = {}
for proxy in get_smart_proxies(in_uri):
for proxy in get_smart_proxies(ctx, in_uri):
fragments += 1
if proxy.id in buffer:
buffer[proxy.id].merge(proxy)
else:
buffer[proxy.id] = proxy

ctx.entities_loader.write(buffer.values(), serialize=True)
ctx.load_entities(buffer.values(), serialize=True)
return fragments, len(buffer.values())


Expand All @@ -58,17 +58,17 @@ def in_db(ctx: Context, in_uri: str) -> tuple[int, int]:
"""
dataset = get_dataset("aggregate_%s" % uuid4().hex)
bulk = dataset.bulk()
for ix, proxy in enumerate(get_smart_proxies(in_uri)):
if ix % 10000 == 0:
for ix, proxy in enumerate(get_smart_proxies(ctx, in_uri)):
if ix % 10_000 == 0:
log.info("Write [%s]: %s entities", dataset.name, ix)
bulk.put(proxy, fragment=str(ix))
bulk.flush()
proxies = []
for ox, proxy in enumerate(dataset.iterate()):
proxies.append(proxy)
if ox % 10000 == 0:
ctx.entities_loader.write(proxies, serialize=True)
if ox % 10_000 == 0:
ctx.load_entities(proxies, serialize=True)
proxies = []
ctx.entities_loader.write(proxies, serialize=True)
ctx.load_entities(proxies, serialize=True)
dataset.drop()
return ix, ox
24 changes: 19 additions & 5 deletions investigraph/logic/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

from io import BytesIO, StringIO

import orjson
import pandas as pd
from pantomime import types

from investigraph.model import Source
from investigraph.model.context import Context
from investigraph.model.source import TResponse
from investigraph.types import RecordGenerator

TABULAR = [types.CSV, types.EXCEL, types.XLS, types.XLSX]
Expand All @@ -26,13 +28,14 @@ def read_pandas(mimetype: str, content: str | bytes, **kwargs) -> pd.DataFrame:

def yield_pandas(df: pd.DataFrame) -> RecordGenerator:
for _, row in df.iterrows():
yield dict(row)
row = {k: v if not pd.isna(v) else None for k, v in row.items()}
yield row


def iter_records(res: Source) -> RecordGenerator:
def iter_records(res: TResponse) -> RecordGenerator:
if res.mimetype in TABULAR:
kwargs = {**{"dtype": str}, **res.extract_kwargs}
if res.is_stream:
if res.stream:
# yield pandas chunks to be able to apply extract_kwargs
# doesn't work for excel here
lines = []
Expand All @@ -46,7 +49,7 @@ def iter_records(res: Source) -> RecordGenerator:
# fix initial kwargs for next chunk
kwargs["names"] = df.columns
kwargs["header"] = 0
kwargs.pop("skiprows") # FIXME what else?
kwargs.pop("skiprows", None)
yield from yield_pandas(df)
if lines:
content = b"\r".join(lines)
Expand All @@ -57,4 +60,15 @@ def iter_records(res: Source) -> RecordGenerator:
yield from yield_pandas(df)
return

if res.mimetype == types.JSON:
if res.stream:
for line in res.iter_lines():
yield orjson.loads(line)
return

raise NotImplementedError("unsupported mimetype: `%s`" % res.mimetype)


# entrypoint
def handle(ctx: Context, *args, **kwargs) -> RecordGenerator:
yield from iter_records(*args, **kwargs)
7 changes: 3 additions & 4 deletions investigraph/logic/fetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@
def fetch_source(source: Source) -> Literal[HttpSourceResponse, SmartSourceResponse]:
if source.is_http:
head = source.head()
stream = head.should_stream()
res = requests.get(source.uri, stream=stream)
source.stream = head.should_stream()
res = requests.get(source.uri, stream=source.stream)
assert res.ok
return HttpSourceResponse(
**source.dict(),
header=slugified_dict(res.headers),
response=res,
is_stream=stream,
)
return SmartSourceResponse(**source.dict())

Expand All @@ -38,4 +37,4 @@ def get_cache_key(_, params) -> str:
return f"{url}-{head.etag}"
if head.last_modified:
return f"{url}-{head.last_modified}"
return f"{url}-{datetime.now()}" # actually don't cache
return datetime.now().isoformat() # actually don't cache
Loading

0 comments on commit 481285e

Please sign in to comment.