/
anova.py
124 lines (100 loc) · 3.42 KB
/
anova.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
"""
Example of using pycirculate with a simple Flask RESTful API.
Make sure to send requests with the HTTP header "Content-Type: application/json".
"""
from flask import Flask, request, jsonify, abort, make_response, render_template
from flask.ext.cors import CORS
from pycirculate.anova import AnovaController
import logging
import sys
logging.getLogger('flask_cors').level = logging.DEBUG
app = Flask(__name__)
ANOVA_MAC_ADDRESS = "F4:B8:5E:AF:F8:D6"
# Error handlers
@app.errorhandler(400)
def bad_request(error):
return make_response(jsonify({'error': 'Bad request.'}), 400)
@app.errorhandler(404)
def not_found(error):
return make_response(jsonify({'error': 'Not found.'}), 404)
@app.errorhandler(500)
def server_error(error):
return make_response(jsonify({'error': 'Server error.'}), 500)
def make_error(status_code, message, sub_code=None, action=None, **kwargs):
"""
Error with custom message.
"""
data = {
'status': status_code,
'message': message,
}
if action:
data['action'] = action
if sub_code:
data['sub_code'] = sub_code
data.update(kwargs)
response = jsonify(data)
response.status_code = status_code
return response
# REST endpoints
@app.route('/', methods=["GET","POST"])
def index():
try:
with AnovaController(ANOVA_MAC_ADDRESS) as ctrl:
output = {"anova_status": ctrl.anova_status()}
except Exception as exc:
app.logger.error(exc)
return make_error(500, "{0}: {1}".format(repr(exc), str(exc)))
return jsonify(output)
@app.route('/temp', methods=["GET"])
def get_temp():
try:
with AnovaController(ANOVA_MAC_ADDRESS) as ctrl:
output = {"current_temp": ctrl.read_temp(), "set_temp": ctrl.read_set_temp(), "unit": ctrl.read_unit(),}
except Exception as exc:
app.logger.error(exc)
return make_error(500, "{0}: {1}".format(repr(exc), str(exc)))
return jsonify(output)
@app.route('/temp', methods=["POST"])
def set_temp():
try:
temp = request.get_json()['temp']
except (KeyError, TypeError):
abort(400)
temp = float(temp)
with AnovaController(ANOVA_MAC_ADDRESS) as ctrl:
output = {"set_temp": ctrl.set_temp(temp)}
return jsonify(output)
@app.route('/stop', methods=["POST"])
def stop_anova():
with AnovaController(ANOVA_MAC_ADDRESS) as ctrl:
output = {"status": ctrl.stop_anova()}
return jsonify(output)
@app.route('/start', methods=["POST"])
def start_anova():
with AnovaController(ANOVA_MAC_ADDRESS) as ctrl:
output = {"status": ctrl.start_anova()}
return jsonify(output)
@app.route('/api', methods = ['GET'])
def this_func():
"""This is a function. It does nothing."""
return jsonify({ 'result': '' })
@app.route('/api/help', methods = ['GET'])
def help():
"""Print available functions."""
func_list = {}
for rule in app.url_map.iter_rules():
if rule.endpoint != 'static':
func_list[rule.rule] = app.view_functions[rule.endpoint].__doc__
return jsonify(func_list)
@app.route('/control')
def control():
return render_template('control.html')
if __name__ == '__main__':
# Setup logging
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
app.logger.addHandler(handler)
app.run(host='0.0.0.0')