Skip to content

Commit

Permalink
[wrapper.py] 労務管理データを登録するメソッドを追加 (#273)
Browse files Browse the repository at this point in the history
* version

* [wraper.py] 労務管理データを登録するメソッドを追加

* [wraper.py] 労務管理データを登録するメソッドを追加

* version up

* poetr update
  • Loading branch information
yuji38kwmt committed Jan 31, 2021
1 parent f95f40e commit 6f15650
Show file tree
Hide file tree
Showing 5 changed files with 298 additions and 77 deletions.
2 changes: 1 addition & 1 deletion annofabapi/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.41.2"
__version__ = "0.42.0"
171 changes: 170 additions & 1 deletion annofabapi/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ def _first_true(iterable, default=None, pred=None):
return next(filter(pred, iterable), default)


def _hour_to_millisecond(hour: Optional[float]) -> Optional[int]:
return int(hour * 3600_000) if hour is not None else None


_ORGNIZATION_ID_FOR_AVAILABILITY = "___plannedWorktime___"
"""予定稼働時間用の組織ID"""


class Wrapper:
"""
AnnofabApiのラッパー.
Expand Down Expand Up @@ -1971,10 +1979,171 @@ def _to_new_data(labor: Dict[str, Any]) -> Dict[str, Any]:
return labor

query_params = {
"organization_id": "___plannedWorktime___",
"organization_id": _ORGNIZATION_ID_FOR_AVAILABILITY,
"account_id": account_id,
"from": from_date,
"to": to_date,
}
labor_list, _ = self.api.get_labor_control(query_params)
return [_to_new_data(e) for e in labor_list]

def put_labor_control_actual_worktime(
self,
organization_id: str,
project_id: str,
account_id: str,
date: str,
actual_worktime: Optional[float],
working_description: Optional[str],
) -> Dict[str, Any]:
"""
労務管理の実績作業時間を登録する。
Args:
organization_id: 組織ID
project_id: プロジェクトID
account_id: account_id
date: 登録対象日
actual_worktime: 実績作業時間[hour]
working_description: 実績に関するコメント
Returns:
登録後の労務管理データ
"""
labor_list, _ = self.api.get_labor_control(
query_params={
"organization_id": organization_id,
"project_id": project_id,
"account_id": account_id,
"from": date,
"to": date,
}
)
target_working_time_by_user = {
"description": working_description,
"results": _hour_to_millisecond(actual_worktime),
}

if len(labor_list) > 0:
assert len(labor_list) == 1, "get_labor_control APIで取得した労務管理データが2件以上存在した。"
labor = labor_list[0]
request_body = labor
request_body["last_updated_datetime"] = labor["updated_datetime"]

if "working_time_by_user" in request_body["values"]:
request_body["values"]["working_time_by_user"].update(target_working_time_by_user)
else:
request_body["values"]["working_time_by_user"] = target_working_time_by_user
else:
request_body = {
"organization_id": organization_id,
"project_id": project_id,
"account_id": account_id,
"date": date,
"values": {"working_time_by_user": target_working_time_by_user},
}
content, _ = self.api.put_labor_control(request_body=request_body)
return content

def put_labor_control_plan_worktime(
self,
organization_id: str,
project_id: str,
account_id: str,
date: str,
plan_worktime: Optional[float],
) -> Dict[str, Any]:
"""
労務管理の予定作業時間を登録する。
Args:
organization_id: 組織ID
project_id: プロジェクトID
account_id: account_id
date: 登録対象日
plan_worktime: 予定作業時間[hour]
Returns:
登録後の労務管理データ
"""
labor_list, _ = self.api.get_labor_control(
query_params={
"organization_id": organization_id,
"project_id": project_id,
"account_id": account_id,
"from": date,
"to": date,
}
)
target_working_time_by_user = {"plans": _hour_to_millisecond(plan_worktime)}

if len(labor_list) > 0:
assert len(labor_list) == 1, "get_labor_control APIで取得した労務管理データが2件以上存在した。"
labor = labor_list[0]
request_body = labor
request_body["last_updated_datetime"] = labor["updated_datetime"]

if "working_time_by_user" in request_body["values"]:
request_body["values"]["working_time_by_user"].update(target_working_time_by_user)
else:
request_body["values"]["working_time_by_user"] = target_working_time_by_user

else:
request_body = {
"organization_id": organization_id,
"project_id": project_id,
"account_id": account_id,
"date": date,
"values": {"working_time_by_user": target_working_time_by_user},
}
content, _ = self.api.put_labor_control(request_body=request_body)
return content

def put_labor_control_availability(
self,
account_id: str,
date: str,
availability: Optional[float],
) -> Dict[str, Any]:
"""
労務管理の予定稼働時間を登録する
Args:
account_id: account_id
date: 登録対象日
availability: 予定稼働時間[hour]
Returns:
登録後の労務管理データ
"""

organization_id = _ORGNIZATION_ID_FOR_AVAILABILITY

labor_list, _ = self.api.get_labor_control(
query_params={"organization_id": organization_id, "account_id": account_id, "from": date, "to": date}
)
target_working_time_by_user = {"plans": _hour_to_millisecond(availability)}

if len(labor_list) > 0:
assert len(labor_list) == 1, "get_labor_control APIで取得した労務管理データが2件以上存在した。"
labor = labor_list[0]
request_body = labor
request_body["last_updated_datetime"] = labor["updated_datetime"]

if "working_time_by_user" in request_body["values"]:
request_body["values"]["working_time_by_user"].update(target_working_time_by_user)
else:
request_body["values"]["working_time_by_user"] = target_working_time_by_user

else:
request_body = {
"organization_id": organization_id,
"account_id": account_id,
"date": date,
"values": {"working_time_by_user": target_working_time_by_user},
}

content, _ = self.api.put_labor_control(request_body=request_body)
return content

0 comments on commit 6f15650

Please sign in to comment.