forked from sbujlab/c200_controls
-
Notifications
You must be signed in to change notification settings - Fork 0
/
c200_pid.py
146 lines (107 loc) · 4.56 KB
/
c200_pid.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import serial, time, io, datetime, math, os
from serial.tools.list_ports import comports
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.animation as ani
import matplotlib.dates as mdates
from multiprocessing import Process, Pipe, Array
slack_url = ""
avg_time = 30.0# 1 minute moving average
pid_cycle_time = 0.001 # ms pid cycle
debug = False
hard_temp_limit = 240.0
hard_rate_limit = 5.0/60.0 # 5 deg/min
def set_ssr( port, chan, val ):
if not debug:
if chan == 0:
port[0].dtr = val
if chan == 1:
port[0].rts = val
if chan == 2:
port[1].dtr = val
if chan == 3:
port[1].rts = val
def pid_loop(T_setp, prop_setp, assigned_tc, T_ramp_state, T_ramp, tc_data, tc_rate_min, tc_rate_hour, ssr_off, ssr_state, ssr_rb, pidctrl_state, ssr_avg_power):
n_to_average = avg_time/pid_cycle_time
power_sum = [[] for i in range(len(ssr_state))]
ssr_port = []
if not debug:
ports = comports()
for port in ports:
print port
port = ports[1]
print "Opening ", ports[1], " for SSR control"
print "Opening ", ports[2], " for SSR control"
ssr_port.append(serial.Serial(ports[1], 9600, timeout= 1, parity=serial.PARITY_NONE, xonxoff=0, rtscts=False, dsrdtr=False, bytesize=8, stopbits=1))
ssr_port.append(serial.Serial(ports[2], 9600, timeout= 1, parity=serial.PARITY_NONE, xonxoff=0, rtscts=False, dsrdtr=False, bytesize=8, stopbits=1))
for p in ssr_port:
p.close()
p.open()
p.dtr = False
p.rts = False
all_off = False
while 1:
if all_off:
# Someone manually turned us back on
for off_state in ssr_off:
if not off_state:
all_off = False
for tcval in tc_data:
if tcval > hard_temp_limit and tcval < 10000.0 and not all_off:
# Hit emergency limit
print "TC val of ", tcval, " found"
all_off = True
# Push to slack
if not debug:
os.system("curl -X POST -H \'Content-type: application/json\' --data \'{\"channel\": \"#c200\", \"text\":\"C200 shutting down! Temperature saftey threshold met\"}\' %s"% slack_url)
else:
print "Would push emergency off notification to slack"
for ssr in range(len(ssr_state)):
if all_off:
ssr_off[ssr] = True
power_sum[ssr].append(ssr_state[ssr])
if len(power_sum[ssr]) > n_to_average:
power_sum[ssr].pop(0)
ssr_avg_power[ssr] = sum(power_sum[ssr])/float(len(power_sum[ssr]))
# Under control of PID
if pidctrl_state[ssr]:
calc_setpoint = 0.0
diff = tc_data[assigned_tc[ssr]] - T_setp[ssr];
# Temp is low
if diff < 0:
calc_setpoint = -prop_setp[ssr]*diff
# Ramp rate is too high
if T_ramp_state[ssr]:
ramp_x = 0.0
if abs(T_ramp[ssr]) > 1e-2:
ramp_x = 1.0 - (T_ramp[ssr] - tc_rate_min[assigned_tc[ssr]])/T_ramp[ssr]
else:
ramp_x = 0.0
if T_ramp[ssr] > 0.0:
if ramp_x > 1.0:
calc_setpoint = 0.0
else:
if ramp_x > 0.75:
calc_setpoint *= (ramp_x-0.75)/0.25
if T_ramp[ssr] < 0.0:
if ramp_x > 1.0:
calc_setpoint = 1.0
else:
if ramp_x > 0.75:
calc_setpoint *= (ramp_x-0.75)/0.25
ramp_hard_x = 1.0 - (hard_rate_limit- tc_rate_min[assigned_tc[ssr]])/hard_rate_limit
if ramp_hard_x > 1.0:
calc_setpoint = 0.0
else:
if ramp_hard_x > 0.75:
calc_setpoint *= (ramp_hard_x-0.75)/0.25
if calc_setpoint < 0.0:
calc_setpoint = 0.0
if calc_setpoint > 1.0:
calc_setpoint = 1.0
ssr_rb[ssr] = calc_setpoint*100.0
if ssr_state[ssr] and not ssr_off[ssr]:
set_ssr(ssr_port, ssr, True)
else:
set_ssr(ssr_port, ssr, False)
time.sleep(pid_cycle_time)