Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions darwin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pathlib import Path

import requests.exceptions
from darwin.datatypes import AnnotatorReportGrouping
from rich.console import Console

import darwin.cli_functions as f
Expand Down Expand Up @@ -220,6 +221,19 @@ def _run(args: Namespace, parser: ArgumentParser) -> None:
output=args.output,
)

# Report generation commands
elif args.command == "report":
if args.action == "annotators":
f.report_annotators(
args.datasets,
args.start,
args.stop,
[AnnotatorReportGrouping(value) for value in args.group_by],
args.pretty,
)
elif args.action == "help" or args.action is None:
f.help(parser, "report")


if __name__ == "__main__":
main()
85 changes: 85 additions & 0 deletions darwin/cli_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from darwin.dataset.upload_manager import LocalFile
from darwin.dataset.utils import get_release_path
from darwin.datatypes import (
AnnotatorReportGrouping,
ExportParser,
ImportParser,
NumberLike,
Expand Down Expand Up @@ -1348,6 +1349,90 @@ def post_comment(
console.print(f"[red]{traceback.format_exc()}")


def report_annotators(
dataset_slugs: list[str],
start: datetime.datetime,
stop: datetime.datetime,
group_by: list[AnnotatorReportGrouping],
pretty: bool,
) -> None:
"""
Prints an annotators report in CSV format.

Parameters
----------
dataset_slugs : list[str]
Slugs of datasets to include in the report.
start : datetime.datetime
Timezone aware report start DateTime.
stop : datetime.datetime
Timezone aware report end DateTime.
group_by: list[AnnotatorReportGrouping]
Non-empty list of grouping options for the report.
pretty : bool
If ``True``, it will print the output in a Rich formatted table.
"""
client: Client = _load_client()
console = Console(theme=_console_theme())

dataset_ids = []
for dataset in client.list_remote_datasets():
if dataset.slug in dataset_slugs:
dataset_ids.append(dataset.dataset_id)
dataset_slugs.remove(dataset.slug)

if dataset_slugs:
_error(f"Datasets '{dataset_slugs}' do not exist.")

report: str = client.get_annotators_report(
dataset_ids,
start,
stop,
group_by,
).text

# the API does not return CSV headers if the report is empty
if not report:
report = "timestamp,dataset_id,dataset_name,dataset_slug,workflow_id,workflow_name,current_stage_id,current_stage_name,actor_id,actor_type,actor_email,actor_full_name,active_time,total_annotations,review_pass_rate,total_items_annotated,time_per_annotation,time_per_item\n"

if not pretty:
print(report)
return

lines: List[str] = report.split("\n")
lines.pop(0) # remove csv headers
lines.pop() # remove last line, which is empty

table: Table = Table(show_header=True, header_style="bold cyan")

table.add_column("Date")
for header in [
"Dataset Id",
"Dataset Name",
"Dataset Slug",
"Workflow Id",
"Workflow Name",
"Current Stage Id",
"Current Stage Name",
"User Id",
"User Type",
"Email",
"Full Name",
"Active Time",
"Total Annotations",
"Review Pass Rate",
"Total Items Annotated",
"Time Per Annotation",
"Time Per Item",
]:
table.add_column(header, justify="right")

for row in lines:
table.add_row(*row.split(","))

console.print(table)


def help(parser: argparse.ArgumentParser, subparser: Optional[str] = None) -> None:
"""
Prints the help text for the given command.
Expand Down
122 changes: 120 additions & 2 deletions darwin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,36 @@
import logging
import os
import zlib
from datetime import datetime
from logging import Logger
from pathlib import Path
from typing import Dict, Iterator, List, Optional, Union, cast
from requests.exceptions import HTTPError

import requests
from requests import Response
from requests.adapters import HTTPAdapter
from tenacity import RetryCallState, retry, stop_after_attempt, wait_exponential_jitter
from requests.exceptions import HTTPError
from tenacity import (
RetryCallState,
retry,
retry_if_exception_type,
stop_after_attempt,
stop_after_delay,
wait_exponential_jitter,
)
from tenacity.wait import wait_exponential

from darwin.backend_v2 import BackendV2
from darwin.config import Config
from darwin.dataset.identifier import DatasetIdentifier
from darwin.dataset.remote_dataset import RemoteDataset
from darwin.dataset.remote_dataset_v2 import RemoteDatasetV2
from darwin.datatypes import (
AnnotatorReportGrouping,
DarwinVersionNumber,
Feature,
ObjectStore,
ReportJob,
Team,
UnknownType,
)
Expand Down Expand Up @@ -55,6 +68,12 @@
MAX_WAIT = int(os.getenv("DARWIN_RETRY_MAX_WAIT", "300"))
MAX_RETRIES = int(os.getenv("DARWIN_RETRY_MAX_ATTEMPTS", "10"))

HOUR = 60 * 60


class JobPendingException(Exception):
"""Raised when a requested job is not finished or failed."""


def log_rate_limit_exceeded(retry_state: RetryCallState):
wait_time = retry_state.next_action.sleep
Expand Down Expand Up @@ -633,6 +652,105 @@ def get_report(
the_team_slug,
)

def get_annotators_report(
self,
dataset_ids: list[int],
start: datetime,
stop: datetime,
group_by: list[AnnotatorReportGrouping],
team_slug: Optional[str] = None,
) -> Response:
"""
Gets annotators the report for the given dataset.

Parameters
----------
dataset_ids: int
Ids of the datasets to include in the report.
start : datetime.datetime
Timezone aware report start DateTime
stop : datetime.datetime
Timezone aware report end DateTime
group_by: list[AnnotatorReportGrouping]
Non-empty list of grouping options for the report.
team_slug: Optional[str]
Team slug of the team the dataset will belong to. Defaults to None.

Returns
------
Response
The raw response of the report (CSV format) or None if the Team was not found.

Raises
------
ValueError
If no team was found. If start or stop parameters are not timezone aware. If no group_by options provided.
"""
if start.utcoffset() is None or stop.utcoffset() is None:
raise ValueError(
"start and stop parameters must be timezone aware (e.g. 2024-11-04T00:00:00Z)"
)

if not group_by:
raise ValueError(
f"At least one group_by option is required, any of: {[option.value for option in AnnotatorReportGrouping]}"
)

the_team: Optional[Team] = self.config.get_team(team_slug or self.default_team)

if not the_team:
raise ValueError("No team was found.")

the_team_slug: str = the_team.slug

response_data = self._post(
f"/v3/teams/{the_team_slug}/reports/annotator/jobs",
{
"start": start.isoformat(timespec="seconds"),
"stop": stop.isoformat(timespec="seconds"),
"dataset_ids": dataset_ids,
"group_by": [option.value for option in group_by],
"format": "csv",
"metrics": [
"active_time",
"total_annotations",
"total_items_annotated",
"time_per_item",
"time_per_annotation",
"review_pass_rate",
],
},
the_team_slug,
)
report_job = ReportJob.model_validate(response_data)

finished_report_job = self.poll_pending_report_job(the_team_slug, report_job.id)
assert isinstance(finished_report_job.url, str)

return self._get_raw_from_full_url(finished_report_job.url, the_team_slug)

@retry(
reraise=True,
wait=wait_exponential(max=MAX_WAIT),
stop=stop_after_delay(2 * HOUR),
retry=retry_if_exception_type(JobPendingException),
)
def poll_pending_report_job(self, team_slug: str, job_id: str) -> ReportJob:
job_status_url = f"/v3/teams/{team_slug}/reports/annotator/jobs/{job_id}"

response_data = self._get(job_status_url, team_slug)
report_job = ReportJob.model_validate(response_data)

if report_job.status == "failed":
raise ValueError("Building an annotator report failed, try again later.")

if report_job.status != "finished":
raise JobPendingException(
f"Polling for generated report results timed out, job status can be requested manually: {urljoin(self.url, job_status_url)}"
)

return report_job

def fetch_binary(self, url: str) -> Response:
"""
Fetches binary data from the given url via a stream.
Expand Down
14 changes: 14 additions & 0 deletions darwin/datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -1567,3 +1567,17 @@ class StorageKeyDictModel(BaseModel):

class StorageKeyListModel(BaseModel):
storage_keys: List[str]


class ReportJob(BaseModel):
id: str
status: str
format: str
url: str | None
team_id: int


class AnnotatorReportGrouping(str, Enum):
ANNOTATORS = "annotators"
DATASETS = "datasets"
STAGES = "stages"
46 changes: 46 additions & 0 deletions darwin/options.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import sys
from argparse import ArgumentParser, Namespace
from datetime import datetime
from typing import Any, Optional, Tuple

import argcomplete
from darwin.datatypes import AnnotatorReportGrouping


class Options:
Expand Down Expand Up @@ -543,6 +545,50 @@ def cpu_default_types(input: Any) -> Optional[int]: # type: ignore
# Help
dataset_action.add_parser("help", help="Show this help message and exit.")

# REPORT
report = subparsers.add_parser(
"report",
help="Report related functions.",
description="Arguments to interact with reports",
)
report_action = report.add_subparsers(dest="action")

# Annotators
parser_annotators = report_action.add_parser(
"annotators", help="Report about the annotators."
)
parser_annotators.add_argument(
"--datasets",
default=[],
type=lambda csv: [value.strip() for value in csv.split(",")],
help="List of comma-separated dataset slugs to include in the report.",
)
parser_annotators.add_argument(
"--start",
required=True,
type=lambda dt: datetime.strptime(dt, "%Y-%m-%dT%H:%M:%S%z"),
help="Report start DateTime in RFC3339 format (e.g. 2020-01-20T14:00:00Z).",
)
parser_annotators.add_argument(
"--stop",
required=True,
type=lambda dt: datetime.strptime(dt, "%Y-%m-%dT%H:%M:%S%z"),
help="Report end DateTime in RFC3339 format (e.g. 2020-01-20T15:00:00Z).",
)
parser_annotators.add_argument(
"--group-by",
required=True,
type=lambda csv: [value.strip() for value in csv.split(",")],
help=f"Non-empty list of comma-separated grouping options for the report, any of: f{[name.value for name in AnnotatorReportGrouping]}.",
)
parser_annotators.add_argument(
"-r",
"--pretty",
action="store_true",
default=False,
help="Prints the results formatted in a rich table.",
)

# VERSION
subparsers.add_parser(
"version", help="Check current version of the repository. "
Expand Down
Loading