-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.py
215 lines (178 loc) · 6.24 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
import MySQLdb
import onetimepass as otp
import sys, string, random, base64, bcrypt, qrcode, crypt
from datetime import datetime
#DB connection settings
conhost = 'localhost'
conuser = 'ece458'
condb = 'ece458'
#use seperate class to manage MySQL queries
class DB:
conn = None
#create connection
def connect(self):
self.conn = MySQLdb.connect(host=conhost,user=conuser,db=condb)
#query DB and return cursor for control
def query(self, sql, param):
try:
self.connect()
cursor = self.conn.cursor(MySQLdb.cursors.DictCursor)
cursor.execute(sql,param)
# if query is not a select statement, commit to DB
if string.find(sql,'select',0,6) == -1:
print 'sql commit'
self.conn.commit()
status = True
except MySQLdb.Error, e:
print "Error %d: %s" % (e.args[0],e.args[1])
status = False
return cursor, status
#check if user already exists
def checkuser(email):
db = DB()
cur, status = db.query("select email from Users where email=%s",(email,))
if status == False:
print "MySQL error"
return "MySQL error"
#check for duplicate users
if cur.fetchone() != None:
print "User exists already"
return "Username exists already, please choose different username"
else:
return True
#create user and assign random secret for two-factor authentication
def createuser(email,pw):
db = DB()
#assign secret to user, hash pw, insert into db
secret = base64.b32encode(''.join(random.choice(string.replace(string.ascii_uppercase + string.digits, '8', '')) for x in range(10)))
hashpw = bcrypt.hashpw(pw, bcrypt.gensalt())
cur, status = db.query("insert into Users(email,password,secret,type) values (%s,%s,%s,'user')",(email,hashpw,secret))
if status == False:
print "MySQL error"
return "MySQL error", -1
else:
qrimg = qrcode.make('otpauth://totp/' + email + '?secret=' + secret)
qrimg.save('qrcodes/' + secret +'.png')
print "User successfully added"
return True, secret
#check for valid username and password, returns True if valid, False if not valid, None for SQL error
def login(email,pw):
if pw == '' or email == '':
return False, None, None;
db = DB()
cur, status = db.query("select Id,password,type from Users where email=%s",(email,))
if status == True:
row = cur.fetchone()
if row != None:
dbpw = row['password']
if dbpw == bcrypt.hashpw(pw,dbpw):
return True, row['Id'], row['type']
return False, None, None
else:
return None, None, None
#verify two-factor token
def verifyuser(userid,token):
db = DB()
cur, status = db.query("select email, secret from Users where Id=%s",(userid,))
row = cur.fetchone()
if row == None:
#should never happen
print "Invalid user entered"
return False, None
else:
#print otp.valid_totp(token, row['secret'])
return otp.valid_totp(token, row['secret']), row['email']
#create poll
def createPoll(title):
db = DB()
startDate = datetime.now()
endDate = datetime.now()
cur, status = db.query("INSERT INTO polls(title, startDate, endDate, salt) VALUES (%s, %s, %s, SUBSTRING(MD5(RAND()), -16))", (title, startDate.strftime('%Y-%m-%d %H:%M:%S'), endDate.strftime('%Y-%m-%d %H:%M:%S')))
if status == False:
print "MySQL error"
return "MySQL error"
else:
print "Poll successfully added"
return True
#read poll salt
def readPollSalt(pollId):
db = DB()
cur, status = db.query("SELECT salt FROM polls WHERE Id='%s'", (pollId))
row = cur.fetchone()
if row == None:
return False
else:
return row['salt']
#create option
def createOption(pollId, text):
db = DB()
cur, status = db.query("INSERT INTO options(pollId, num, text) VALUES (%s, %s, %s)", (pollId, numOptions(pollId), text))
if status == False:
print "MySQL error"
return "MySQL error"
else:
print "Option successfully added"
return True
#get options in poll
def numOptions(pollId):
db = DB()
cur, status = db.query("SELECT count(*) AS count FROM options WHERE pollId=%s", (pollId))
return cur.fetchone()['count']+1
#create vote
def createVote(pollId, optionId, userId):
db = DB()
currentTime = datetime.now()
if verifyVote(pollId, userId) == False:
cur, status = db.query("INSERT INTO votes(pollId, optionId, userId, timestamp) VALUES (%s, %s, ENCRYPT(%s, %s), %s)", (pollId, optionId, userId, readPollSalt(pollId), currentTime.strftime('%Y-%m-%d %H:%M:%S')))
if status == False:
print "MySQL error"
return "MySQL error"
else:
print "Vote successfully added"
return True
else:
return False
#verify vote
def verifyVote(pollId, userId):
db = DB()
cryptUserId = crypt.crypt(str(userId), readPollSalt(pollId))
cur, status = db.query("SELECT count(*) AS votecount FROM votes WHERE pollId=%s and userId=%s", (pollId, str(cryptUserId)))
res = cur.fetchone()
if res['votecount'] > 0:
return True;
else:
return False;
#get polls
def getPolls():
db = DB()
cur, status = db.query("SELECT title, Id FROM polls", None)
entries = cur.fetchall()
return entries
#get poll
def getPoll(pollId):
db = DB()
cur, status = db.query("SELECT title FROM polls WHERE Id=%s", (pollId))
poll = cur.fetchone()
return poll
#get options
def getOptions(pollId):
db = DB()
cur, status = db.query("SELECT text, Id FROM options WHERE pollId=%s", (pollId))
options = cur.fetchall()
return options
#get poll results
def getPollResults(pollId):
db = DB()
cur, status = db.query("SELECT COUNT(*) AS total FROM votes WHERE pollId=%s", (pollId))
total = cur.fetchone()['total']
cur, status = db.query("SELECT votesCount.count, o.num, o.text FROM (SELECT v.optionId, COUNT(v.optionId) AS count FROM votes v WHERE v.pollId=%s GROUP BY v.optionId) AS votesCount LEFT JOIN options o ON votesCount.optionId = o.Id", (pollId))
entries = cur.fetchall()
for rows in entries:
rows['percent']=100 * float(rows['count'])/float(total)
return entries
#get poll votes
def getPollHistory(pollId):
db = DB()
cur, status = db.query("SELECT o.num, v.userId, v.timestamp FROM votes v LEFT JOIN options o ON v.optionId = o.Id WHERE v.pollId=%s ORDER BY timestamp DESC", (pollId))
entries = cur.fetchall()
return entries