-
Notifications
You must be signed in to change notification settings - Fork 0
/
thermoplexer.py
148 lines (129 loc) · 4.87 KB
/
thermoplexer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import gpio_setup, time
import BBIO.GPIO as GPIO
from spi import SPI
import logging
#import cpickle
import datetime
#import matplotlib.pyplot as plt
import smtplib
logging.basicConfig(format='%(levelname)s:%(message)s', level =
logging.WARNING)
CONNECTSTR = "dbname=will user=levlab host=levlabserver2.stanford.edu"
class Thermoplexer():
def __init__(self, dbname, maxtemp = 200):
# self.logfilename = logfilename
self.tempreader = SPI(1,0)
self.tempreader.msh = 1000000
self.dbname = dbname
self.maxtemp = maxtemp
# self.init_log()
def choose_tc(self, num):
if num > 8 or num < 1:
logging.warning("invalid choice")
return
bits = "{0:03b}".format(num - 1)
logging.debug('bits: %s'%bits)
GPIO.output("P8_12", int(bits[0]))
GPIO.output("P8_14", int(bits[1]))
GPIO.output("P8_16", int(bits[2]))
def read_temp(self, num):
self.choose_tc(num)
out = self.tempreader.readbytes(2)
byte1 = '{0:08b}'.format(out[0])
byte2 = '{0:08b}'.format(out[1])
tempbits = byte1[1:] + byte2[:-3]
return int(tempbits, 2)*0.25
def read_all_tcs(self):
for num in range(1,9):
print("TC # %d:"%num)
temp = self.read_temp(num)
if temp > 1000: #presumably thermocouple is railing
logging.warning('TC %d Failure'%num)
else:
print("%2.0f C"%temp)
time.sleep(0.5)
def save_temps(self):
conn = psycopg2.connect("dbname=will user=levlab host=levlabserver2.stanford.edu")
cur = conn.cursor()
for TCnum in self.TCnums:
now = datetime.datetime.now()
temp = self.read_temp(TCnum)
self.check_overheat(temp)
cur.execute("INSERT INTO data VALUES (%s, %s, %s);",(TCnum, timestamp, temp))
time.sleep(0.5)
conn.commit()
cur.close()
conn.close()
def check_overheat(self, temp):
if temp > self.maxtemp and temp <= 1000: #overheat condition
conn = psycopg2.connect(CONNECTSTR)
cur = conn.cursor()
cur.execute("SELECT sensor.fault FROM sensors WHERE sensor.id = %s;",tcnum)
faultbit = cur.fetchall()
if faultbit == 0:
cur.execute("UPDATE sensors SET fault = 1 WHERE sensor.id = %s;",tcnum)
logging.warning('TC %d OVERHEATED!'%num)
conn.commit()
# self.email_warning()
cur.close()
conn.close()
elif temp > 1000: #presumably thermocouple is railing
conn = psycopg2.connect(CONNECTSTR)
cur = conn.cursor()
cur.execute("SELECT sensor.fault FROM sensors WHERE sensor.id = %s;",tcnum)
faultbit = cur.fetchall()
if faultbit == 0:
cur.execute("UPDATE sensors SET fault = 1 WHERE sensor.id = %s;",tcnum)
logging.warning('TC %d Failure'%num)
conn.commit()
cur.close()
conn.close()
def email_warning(self, tcname, temp):
sender = 'thermoplexer@stanford.edu'
receivers = ['rwturner@stanford.edu']
message = '''From: From Thermoplexer <thermoplexer@stanford.edu>
To: To Human <ACM>
Subject: Overheating Warning!!
The thermocouple labeled '%s' has reached a temperature of %d!
The maximum acceptable temperature is %d.
Love,
Thermoplexer
'''%(tcname, temp, self.maxtemp)
try:
smtpObj = smtplib.SMTP('smtp.stanford.edu', 25)
smtpObj.starttls()
smtpObj.sendmail(sender, receivers, message)
print("Successfully sent email")
except SMTPException:
print("Error: unable to send email")
smtpObj.quit()
# cPickle implementation is deprecated
# def save_temps(self):
# f = open(self.logfilename, 'w')
# data = cpickle.load(f)
# for i in range(1,9):
# temp = self.read_temp(i)
# now = datetime.datetime.now()
# data[now] = [i, temp]
# time.sleep(0.5)
# cpickle.dump(data, f)
# f.close()
#
# def init_log(self):
# f = open(self.logfilename, 'w')
# data = {}
# cpickle.dump(data, f)
# f.close()
#
# def plot_temp_log(self):
# f = open(self.logfilename, 'r')
# data = cpickle.load(f)
# plt.plot_date(data.keys(), data.values(), xdate = True, ydate = False)
if __name__ == "__main__":
dbname = 'data'
# import thplxview
thplx = Thermoplexer(dbname)
# thplx.save_temps()
thplx.email_warning('testtc', 230)
# viewer = thplxview.ThermoplexerViewer(dbname)
# viewer.plot_all_TCs()