-
Notifications
You must be signed in to change notification settings - Fork 0
/
todolist.py
101 lines (74 loc) · 2.43 KB
/
todolist.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from bottle import Bottle, request, response, HTTPError, HTTP_CODES
from dao import ToDoSqlDAO
import os
path = os.path.abspath(os.path.dirname(__file__))
dao = ToDoSqlDAO(os.path.join(path, 'todo.db'))
app = Bottle()
process = {}
@app.get('/')
def index():
return "ToDo app is up and running"
@app.get('/tasks')
def task_list():
tasks = dao.find_tasks()
result = {'tasks': [get_task_dict(t) for t in tasks]}
return result
@app.get('/tasks/<task_id>')
def get_task(task_id):
task = dao.find_task_by_id(task_id)
if task:
return get_task_dict(task)
else:
return HTTPError(404, HTTP_CODES[404])
@app.post('/tasks')
def create_task():
params = request.json
description = params['description']
task_id = dao.create_task(description)
response.status = 201
return {'task_id': task_id}
@app.delete('/tasks/<task_id>')
def remove_task(task_id):
dao.delete_task(task_id)
response.status = 204
@app.put('/tasks/<task_id>')
def modify_task(task_id):
description = request.json['description'] if 'description' in request.json else None
status = request.json['status'] if 'status' in request.json else None
dao.modify_task(task_id, description, status)
response.status = 204
@app.post('/execution')
def create_process():
from multiprocessing import Process, Event
stop_event = Event()
p = Process(group=None, target=infinite_proccess, name="infinite", args=(stop_event,), kwargs={})
p.daemon = False
p.start()
process[p.pid] = p, stop_event
process_json = dict([(p.pid, p.is_alive()) for pid, (p, se) in process.iteritems()])
return {'process_id': p.pid, 'processes': process_json}
@app.delete('/execution/<processid>')
def kill_process(processid):
pid = int(processid)
if pid in process:
p, se = process[pid]
if p.is_alive():
se.set()
p.join()
del process[pid]
return {'process_id': p.pid, 'exitcode': p.exitcode}
@app.get('/execution')
def get_executions():
process_json = dict([(p.pid, p.is_alive()) for pid, (p, es) in process.iteritems()])
return {'processes': process_json}
def infinite_proccess(stop_event):
import time
i = 0
while not stop_event.is_set():
time.sleep(10)
print i
i += 1
def get_task_dict(task_as_list):
return {'id': task_as_list[0],
'description': task_as_list[1],
'status': task_as_list[2]}