forked from farmOS/farmOS-aggregator
-
Notifications
You must be signed in to change notification settings - Fork 0
/
farms.py
115 lines (95 loc) · 2.54 KB
/
farms.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from typing import List
from fastapi import APIRouter, Body, Depends, Security, HTTPException, Query
from sqlalchemy.orm import Session
from app import crud
from app.api.utils.db import get_db
from app.api.utils.farms import get_farm_client, ClientError, get_farms_url_or_list, get_farm_by_id
from app.api.utils.security import get_farm_access
from app.schemas.farm import Farm, FarmCreate, FarmUpdate
from app.core.celery_app import celery_app
router = APIRouter()
# /farms/ endpoints for farmOS instances
@router.get(
"/",
response_model=List[Farm],
dependencies=[Security(get_farm_access, scopes=['farm:read'])]
)
def read_farms(
farms: List[Farm] = Depends(get_farms_url_or_list),
):
"""
Retrieve farms
"""
return farms
@router.get(
"/{farm_id}",
response_model=Farm,
dependencies=[Security(get_farm_access, scopes=['farm:read'])]
)
def read_farm_by_id(
farm: Farm = Depends(get_farm_by_id)
):
"""
Get a specific farm by id
"""
return farm
@router.post(
"/",
response_model=Farm,
dependencies=[Security(get_farm_access, scopes=['farm:create'])]
)
async def create_farm(
*,
db: Session = Depends(get_db),
farm_in: FarmCreate
):
"""
Create new farm
"""
existing_farm = crud.farm.get_by_url(db, farm_url=farm_in.url)
if existing_farm:
raise HTTPException(
status_code=409,
detail="A farm with this URL already exists.",
)
farm = crud.farm.create(db, farm_in=farm_in)
celery_app.send_task("app.worker.ping_farm", [farm.id])
return farm
@router.put(
"/{farm_id}",
response_model=Farm,
dependencies=[Security(get_farm_access, scopes=['farm:update'])]
)
async def update_farm(
*,
db: Session = Depends(get_db),
farm: Farm = Depends(get_farm_by_id),
farm_in: FarmUpdate,
):
"""
Update farm
"""
if farm_in.url is not None:
existing_farm = crud.farm.get_by_url(db, farm_url=farm_in.url)
if existing_farm:
raise HTTPException(
status_code=409,
detail="A farm with this URL already exists.",
)
farm = crud.farm.update(db, farm=farm, farm_in=farm_in)
return farm
@router.delete(
"/{farm_id}",
response_model=Farm,
dependencies=[Security(get_farm_access, scopes=['farm:delete'])]
)
async def delete_farm(
farm_id: int,
db: Session = Depends(get_db),
farm: Farm = Depends(get_farm_by_id)
):
"""
Delete farm
"""
farm = crud.farm.delete(db, farm_id=farm_id)
return farm