-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtask_session_api.py
142 lines (113 loc) · 4.25 KB
/
task_session_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import asyncio
import time
import aiofiles
from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect
from pydantic import BaseModel
from sqlalchemy.orm import Query
from task_core.task_manager import task_manager
from data import SessionInfo, dataManager
from utils.api_base_func import token_requie
from utils.api_utils import HttpState, make_response
from utils.file import output_store_path
from utils.logger import logger
from utils.websocket import websocket_on_recive
class SessionQuery(BaseModel):
starttime: float | None
endtime: float | None
task_id: list[str] | None
session_api = APIRouter(prefix="/session", dependencies=[Depends(token_requie)])
@session_api.get("/all")
async def get_all_session(start: int, num: int):
with dataManager.session as sess:
all_seg = sess.query(SessionInfo)
sessions = all_seg.offset(start).limit(num).all()
return make_response(
all_nums=all_seg.count(), sessions=[ts.to_dict() for ts in sessions]
)
@session_api.get("/find")
async def find_session(form: SessionQuery):
"""
寻找给定时间区间内给定taskid的任务
"""
res: list[SessionInfo] = []
with dataManager.session as sess:
query_exp: Query
if form.task_id is not None:
query_exp = sess.query(SessionInfo).filter(
SessionInfo.task_id.in_(form.task_id)
)
else:
query_exp = sess.query(SessionInfo)
if form.starttime is None:
form.starttime = 0
if form.endtime is None:
form.endtime = time.time()
res.extend(
query_exp.filter(
SessionInfo.start_time > form.starttime,
SessionInfo.finish_time <= form.endtime,
).all()
)
return make_response(session=[x.to_dict() for x in res])
@session_api.get("/output")
async def get_session_output(session_id: str):
with dataManager.session as sess:
sess_tar = sess.query(SessionInfo).filter(SessionInfo.id == session_id).first()
if sess_tar is None:
return make_response(code=HttpState.CANT_FIND)
out_text = "output missing"
if sess_tar.running:
exector = task_manager.get_exector(session_id)
if exector is not None:
out_text = exector.stdout
else:
out_file = output_store_path / f"{sess_tar.id}.out"
if out_file.exists():
async with aiofiles.open(out_file.as_posix()) as file:
out_text = await file.read()
return make_response(
session_id=session_id,
finish=sess_tar.finish,
output=out_text,
input=sess_tar.command_input,
)
@session_api.get("/info")
async def get_sesion_info(session_id: str):
with dataManager.session as sess:
sess_tar = sess.query(SessionInfo).filter(SessionInfo.id == session_id).first()
if sess_tar is None:
return make_response(code=HttpState.CANT_FIND)
return make_response(session=sess_tar.to_dict())
@session_api.get("/stop")
async def stop_session(session_id: str):
exector = task_manager.get_exector(session_id)
if exector is None:
return make_response(code=HttpState.CANT_FIND)
await exector.kill()
return make_response()
@session_api.websocket("/communicate")
async def session_communicate(session_id: str, *, socket: WebSocket):
task_exector = task_manager.get_exector(session_id)
if task_exector is None:
await socket.close()
return
await socket.accept()
def on_recive(data: dict):
logger.debug("websocket recive data: %s", data)
if "input" in data:
task_exector.append_input(data["input"])
try:
await socket.receive_text()
asyncio.get_running_loop().create_task(
websocket_on_recive(socket, on_recive)
) # create a task to recive data from websocket
while True:
output_line = await task_exector.readline()
await socket.send_text(output_line)
if task_exector.finished:
break
await socket.send_text(f"task-{session_id} over")
except WebSocketDisconnect:
pass
else:
await socket.close()