Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

PEP-8 for the win

  • Loading branch information...
commit 99939451aa3c5b20d0b413f59975c569805709b2 1 parent 487b373
@olleolleolle olleolleolle authored
Showing with 102 additions and 88 deletions.
  1. +43 −43 core.py
  2. +12 −10 core_test.py
  3. +25 −22 smskrupp
  4. +22 −13 web.py
View
86 core.py
@@ -132,11 +132,11 @@ def get_groups(self, phone=None, number=None):
c.execute('select g.id, g.name, g.keyword, g.phone from qq_groupMembers m '
+'join qq_groups g on g.id = m.groupId '
+'where m.number=? and g.phone=? order by g.name asc',
- (number,phone))
+ (number, phone))
elif number:
c.execute('select g.id, g.name, g.keyword, g.phone from qq_groupMembers m '
- +'join qq_groups g on g.id = m.groupId '
- +'where m.number=? order by g.name asc',
+ + 'join qq_groups g on g.id = m.groupId '
+ + 'where m.number=? order by g.name asc',
(number,))
elif phone:
c.execute('select id,name, keyword, phone from qq_groups where phone=? order by name asc',
@@ -151,13 +151,13 @@ def get_send_groups(self, sender, phone=None):
c = self.cursor
if phone:
c.execute('select g.id, g.name, g.keyword, g.phone from qq_groupMembers m '
- +'join qq_groups g on g.id = m.groupId '
- +'where m.number=? and g.phone=? and m.sender=1 order by g.name asc',
- (sender,phone))
+ + 'join qq_groups g on g.id = m.groupId '
+ + 'where m.number=? and g.phone=? and m.sender=1 order by g.name asc',
+ (sender, phone))
else:
c.execute('select g.id, g.name, g.keyword, g.phone from qq_groupMembers m '
- +'join qq_groups g on g.id = m.groupId '
- +'where m.number=? and m.sender=1 order by g.name asc',
+ + 'join qq_groups g on g.id = m.groupId '
+ + 'where m.number=? and m.sender=1 order by g.name asc',
(sender,))
return [{'id':row[0], 'name':row[1], 'keyword':row[2], 'phone':row[3]} for row in c]
@@ -167,17 +167,17 @@ def get_admin_groups(self, sender, phone=None):
c = self.cursor
if phone:
c.execute('select g.id, g.name, g.keyword, g.phone from qq_groupMembers m '
- +'join qq_groups g on g.id = m.groupId '
- +'where m.number=? and g.phone=? and m.admin=1 order by g.name asc',
- (sender,phone))
+ + 'join qq_groups g on g.id = m.groupId '
+ + 'where m.number=? and g.phone=? and m.admin=1 order by g.name asc',
+ (sender, phone))
else:
c.execute('select g.id, g.name, g.keyword, g.phone from qq_groupMembers m '
- +'join qq_groups g on g.id = m.groupId '
- +'where m.number=? and m.admin=1 order by g.name asc',
+ + 'join qq_groups g on g.id = m.groupId '
+ + 'where m.number=? and m.admin=1 order by g.name asc',
(sender,))
return [{'id':row[0], 'name':row[1], 'keyword':row[2], 'phone':row[3]} for row in c]
- def get_group_id(self,name):
+ def get_group_id(self, name):
c = self.cursor
c.execute('select id,name from qq_groups where name=?', (name,))
x = c.fetchone()
@@ -208,11 +208,11 @@ def get_member_ids(self, number):
def get_member_info(self, member_id):
c = self.cursor
c.execute('select m.id,m.number,m.alias,m.groupId,g.name from qq_groupMembers m '
- +'left join qq_groups g on g.id = m.groupId where m.id=?',
+ + 'left join qq_groups g on g.id = m.groupId where m.id=?',
(member_id,))
x = c.fetchone()
if x:
- return {'id': x[0], 'number': x[1], 'alias': x[2], 'groupId':x[3], 'groupName':x[4]}
+ return {'id': x[0], 'number': x[1], 'alias': x[2], 'groupId': x[3], 'groupName': x[4]}
return None
def _calculate_udh_part(self, udh):
@@ -222,51 +222,51 @@ def _calculate_udh_part(self, udh):
i = 1
while i <= length:
# parse one IEI
- iei_id = int(udh[i*2:i*2+2], 16)
+ iei_id = int(udh[i * 2: i * 2 + 2], 16)
i += 1
- iei_len = int(udh[i*2:i*2+2], 16)
+ iei_len = int(udh[i * 2: i * 2 + 2], 16)
if not iei_id == 0:
# not concatenation iei
i += iei_len
continue
i += 1
- ref = int(udh[i*2:i*2+2], 16)
+ ref = int(udh[i * 2: i * 2 + 2], 16)
i += 1
- num_parts = int(udh[i*2:i*2+2], 16)
+ num_parts = int(udh[i * 2: i * 2 + 2], 16)
i += 1
- part = int(udh[i*2:i*2+2], 16)
- return part,num_parts,ref
+ part = int(udh[i * 2: i * 2 + 2], 16)
+ return part, num_parts, ref
def get_unprocessed(self):
c = self.cursor
- c.execute("select ID,SenderNumber,RecipientID,TextDecoded,UDH "+
+ c.execute("select ID,SenderNumber,RecipientID,TextDecoded,UDH " +
"from inbox where Processed='false'")
parts = {}
ret = []
for row in c:
- i,src,phone,text,udh = row
+ i, src, phone, text, udh = row
x = self._calculate_udh_part(udh)
if x:
- part,num_parts,ref = x
- key = src+'-'+str(ref)
+ part, num_parts, ref = x
+ key = src + '-' + str(ref)
if not key in parts:
- parts[key] = src,phone,[],[]
- parts[key][2].append((part,text))
+ parts[key] = src, phone, [], []
+ parts[key][2].append((part, text))
parts[key][3].append(i)
if len(parts[key][2]) == num_parts:
# found all parts
- tot_text = "".join(map(lambda x: x[1], sorted(parts[key][2], key = lambda x: x[0])))
- ret.append({'ids':parts[key][3], 'src':src, 'phone':phone, 'text':tot_text})
+ tot_text = "".join(map(lambda x: x[1], sorted(parts[key][2], key=lambda x: x[0])))
+ ret.append({'ids': parts[key][3], 'src': src, 'phone': phone, 'text': tot_text})
del parts[key]
else:
# single part
- ret.append({'ids':[i], 'src':src, 'phone':phone, 'text':text})
+ ret.append({'ids': [i], 'src': src, 'phone': phone, 'text': text})
return ret
def set_processed(self, msgId, status='true'):
c = self.cursor
- c.execute("update inbox set Processed=? where ID=?", (status,msgId))
+ c.execute("update inbox set Processed=? where ID=?", (status, msgId))
self.conn.commit()
def fake_incoming(self, src, phoneId, msg):
@@ -288,40 +288,40 @@ def cleanup(self):
if self.conn:
self.conn.close()
self.conn = None
-
+
def add_webuser(self, username, pw, privilege):
import bcrypt
c = self.cursor
- h = bcrypt.hashpw(pw,bcrypt.gensalt())
+ h = bcrypt.hashpw(pw, bcrypt.gensalt())
c.execute('insert into qq_webUsers (username,hash,privilege) values (?,?,?)',
- (username,h,privilege))
+ (username, h, privilege))
self.conn.commit()
def get_webusers(self):
c = self.cursor
c.execute('select u.id,u.username,u.privilege,g.id,g.name from qq_webUsers u '
- +'left join qq_webUserGroups wg on wg.userId = u.id '
- +'left join qq_groups g on g.id = wg.groupId '
- +'order by u.id')
+ + 'left join qq_webUserGroups wg on wg.userId = u.id '
+ + 'left join qq_groups g on g.id = wg.groupId '
+ + 'order by u.id')
ret = []
seen = []
for row in c:
if row[0] in seen:
- ret[-1]['groups'].append({'group_id':row[3],'group_name':row[4]})
+ ret[-1]['groups'].append({'group_id': row[3], 'group_name': row[4]})
else:
seen.append(row[0])
- ret.append({'user_id':row[0],'username':row[1],'privilege':row[2],'groups':[]})
+ ret.append({'user_id': row[0], 'username': row[1], 'privilege': row[2], 'groups': []})
if row[3]:
- ret[-1]['groups'].append({'group_id':row[3],'group_name':row[4]})
+ ret[-1]['groups'].append({'group_id': row[3], 'group_name': row[4]})
return ret
def get_webuser_groups(self, webuser_id):
c = self.cursor
c.execute('select groupId,name,keyword from qq_webUserGroups wg '
- +'left join qq_groups g on g.id=wg.groupId where userId=?',
+ + 'left join qq_groups g on g.id=wg.groupId where userId=?',
(webuser_id,))
- return [{'id':row[0],'name':row[1], 'keyword':row[2]} for row in c]
+ return [{'id': row[0], 'name': row[1], 'keyword': row[2]} for row in c]
def set_webuser_group(self, webuser_id, group_id):
c = self.cursor
View
22 core_test.py
@@ -114,11 +114,12 @@ def test_get_unprocessed(self):
assert "phone1" == u[0]['phone']
assert "hello" == u[0]['text']
+
class TestDoer:
def setUp(self):
config.db = config.test_db
config.smsdrc = config.test_smsdrc
- self.sender = FakeSender() #core.Sender()
+ self.sender = FakeSender() # core.Sender()
self.doer = core.Doer(self.sender)
self.data = core.Data()
self.data.setup_db()
@@ -161,7 +162,7 @@ def test_run_admin_command_add_unauthorized(self):
assert not "+4673123" in [x['number'] for x in self.data.get_group_members(gid)]
assert not "+4673123" in [x['number'] for x in self.data.get_group_senders(gid)]
assert not "+4673123" in [x['number'] for x in self.data.get_group_admins(gid)]
- assert len(self.sender.sendouts) == 1 # help message
+ assert len(self.sender.sendouts) == 1 # help message
assert self.sender.sendouts[0][0] == number
def test_run_admin_command_add(self):
@@ -175,7 +176,7 @@ def test_run_admin_command_add(self):
assert "+4673123" in [x['number'] for x in self.data.get_group_members(gid)]
assert not "+4673123" in [x['number'] for x in self.data.get_group_senders(gid)]
assert not "+4673123" in [x['number'] for x in self.data.get_group_admins(gid)]
- assert len(self.sender.sendouts) == 1 # welcome message
+ assert len(self.sender.sendouts) == 1 # welcome message
assert self.sender.sendouts[0][0] == "+4673123"
def test_run_admin_command_add_sender(self):
@@ -189,7 +190,7 @@ def test_run_admin_command_add_sender(self):
assert "+4673123" in [x['number'] for x in self.data.get_group_members(gid)]
assert "+4673123" in [x['number'] for x in self.data.get_group_senders(gid)]
assert not "+4673123" in [x['number'] for x in self.data.get_group_admins(gid)]
- assert len(self.sender.sendouts) == 1 # welcome message
+ assert len(self.sender.sendouts) == 1 # welcome message
assert self.sender.sendouts[0][0] == "+4673123"
def test_run_admin_command_add_admin(self):
@@ -203,7 +204,7 @@ def test_run_admin_command_add_admin(self):
assert "+4673123" in [x['number'] for x in self.data.get_group_members(gid)]
assert "+4673123" in [x['number'] for x in self.data.get_group_senders(gid)]
assert "+4673123" in [x['number'] for x in self.data.get_group_admins(gid)]
- assert len(self.sender.sendouts) == 1 # welcome message
+ assert len(self.sender.sendouts) == 1 # welcome message
assert self.sender.sendouts[0][0] == "+4673123"
def test_run_admin_command_add_keyword(self):
@@ -217,7 +218,7 @@ def test_run_admin_command_add_keyword(self):
assert "+4673123" in [x['number'] for x in self.data.get_group_members(gid)]
assert not "+4673123" in [x['number'] for x in self.data.get_group_senders(gid)]
assert not "+4673123" in [x['number'] for x in self.data.get_group_admins(gid)]
- assert len(self.sender.sendouts) == 1 # welcome message
+ assert len(self.sender.sendouts) == 1 # welcome message
assert self.sender.sendouts[0][0] == "+4673123"
def test_run_admin_command_add_sender_keyword(self):
@@ -231,7 +232,7 @@ def test_run_admin_command_add_sender_keyword(self):
assert "+4673123" in [x['number'] for x in self.data.get_group_members(gid)]
assert "+4673123" in [x['number'] for x in self.data.get_group_senders(gid)]
assert not "+4673123" in [x['number'] for x in self.data.get_group_admins(gid)]
- assert len(self.sender.sendouts) == 1 # welcome message
+ assert len(self.sender.sendouts) == 1 # welcome message
assert self.sender.sendouts[0][0] == "+4673123"
def test_run_admin_command_add_admin_keyword(self):
@@ -245,7 +246,7 @@ def test_run_admin_command_add_admin_keyword(self):
assert "+4673123" in [x['number'] for x in self.data.get_group_members(gid)]
assert "+4673123" in [x['number'] for x in self.data.get_group_senders(gid)]
assert "+4673123" in [x['number'] for x in self.data.get_group_admins(gid)]
- assert len(self.sender.sendouts) == 1 # welcome message
+ assert len(self.sender.sendouts) == 1 # welcome message
assert self.sender.sendouts[0][0] == "+4673123"
def test_run_sendout_unauthorized(self):
@@ -293,7 +294,7 @@ def test_run_sendout(self):
doer = core.Doer(s)
self.data.fake_incoming(number, phone, "test")
doer.run()
- assert 1 == len(s.sendouts) # test message
+ assert 1 == len(s.sendouts) # test message
assert s.sendouts[0][0] == number
def test_run_sendout_prefix(self):
@@ -310,9 +311,10 @@ def test_run_sendout_prefix(self):
assert 1 == len(s.sendouts)
assert (number, "#keyword test") == s.sendouts[0]
+
class FakeSender:
def __init__(self):
self.sendouts = []
def send(self, dest, msg):
- self.sendouts.append((dest,msg))
+ self.sendouts.append((dest, msg))
View
47 smskrupp
@@ -4,16 +4,17 @@ import sys
import core
from config import config
+
def usage():
- print("%s add-group <name> <keyword>"%sys.argv[0])
- print("%s list-groups"%sys.argv[0])
- print("%s list-members <group>"%sys.argv[0])
- print("%s add-member <number> <alias> <group>"%sys.argv[0])
- print("%s rm-member <number> <group|ALL>"%sys.argv[0])
- print("%s set-sender <number> <group>"%sys.argv[0])
- print("%s set-admin <number> <group>"%sys.argv[0])
- print("%s fake-incoming <src> <phone> <msg>"%sys.argv[0])
- print("%s add-webadmin <login> <password>"%sys.argv[0])
+ print("%s add-group <name> <keyword>" % sys.argv[0])
+ print("%s list-groups" % sys.argv[0])
+ print("%s list-members <group>" % sys.argv[0])
+ print("%s add-member <number> <alias> <group>" % sys.argv[0])
+ print("%s rm-member <number> <group|ALL>" % sys.argv[0])
+ print("%s set-sender <number> <group>" % sys.argv[0])
+ print("%s set-admin <number> <group>" % sys.argv[0])
+ print("%s fake-incoming <src> <phone> <msg>" % sys.argv[0])
+ print("%s add-webadmin <login> <password>" % sys.argv[0])
sys.exit(1)
if len(sys.argv) < 2:
@@ -36,17 +37,20 @@ elif sys.argv[1] == 'list-members':
print "no such group!"
sys.exit(1)
for member in data.get_group_members(gid):
- if member['sender']: s = 's'
+ if member['sender']:
+ s = 's'
else:
s = ' '
- if member['admin']: s = 'a'+s
- else: s = ' '+s
- s += ' %s [%s]'%(member['alias'],member['number'])
+ if member['admin']:
+ s = 'a' + s
+ else:
+ s = ' ' + s
+ s += ' %s [%s]' % (member['alias'], member['number'])
print(s)
elif sys.argv[1] == 'add-member':
if len(sys.argv) != 5:
usage()
- number,alias,group = sys.argv[2:]
+ number, alias, group = sys.argv[2:]
number = core.normalize_number(number)
if not number:
print "number error!"
@@ -61,7 +65,7 @@ elif sys.argv[1] == 'add-member':
elif sys.argv[1] == 'rm-member':
if len(sys.argv) != 4:
usage()
- number,group = sys.argv[2:]
+ number, group = sys.argv[2:]
number = core.normalize_number(number)
if not number:
print "number error!"
@@ -80,13 +84,13 @@ elif sys.argv[1] == 'set-sender':
if len(sys.argv) != 4:
usage()
- number,group = sys.argv[2:]
+ number, group = sys.argv[2:]
number = core.normalize_number(number)
group_id = data.get_group_id(group)
if not group_id:
print "group error!"
sys.exit(1)
- mid = data.get_member_id(number, group_id);
+ mid = data.get_member_id(number, group_id)
if not mid:
print "no such number!"
sys.exit(1)
@@ -95,13 +99,13 @@ elif sys.argv[1] == 'set-admin':
if len(sys.argv) != 4:
usage()
- number,group = sys.argv[2:]
+ number, group = sys.argv[2:]
number = core.normalize_number(number)
group_id = data.get_group_id(group)
if not group_id:
print "group error!"
sys.exit(1)
- mid = data.get_member_id(number, group_id);
+ mid = data.get_member_id(number, group_id)
if not mid:
print "no such number!"
sys.exit(1)
@@ -109,12 +113,12 @@ elif sys.argv[1] == 'set-admin':
elif sys.argv[1] == 'fake-incoming':
if len(sys.argv) != 5:
usage()
- src,phone,msg = sys.argv[2:]
+ src, phone, msg = sys.argv[2:]
src = core.normalize_number(src)
if not src:
print "number error!"
sys.exit(1)
- data.fake_incoming(src,phone,msg)
+ data.fake_incoming(src, phone, msg)
elif sys.argv[1] == 'add-webadmin':
if len(sys.argv) != 4:
usage()
@@ -122,4 +126,3 @@ elif sys.argv[1] == 'add-webadmin':
data.add_webuser(login, password, 2)
else:
usage()
-
View
35 web.py
@@ -1,8 +1,8 @@
# -*- coding: utf-8 -*-
import sqlite3
-from flask import Flask,render_template,request,session,flash,redirect, \
- url_for,abort, g
+from flask import Flask, render_template, request, session, flash, redirect, \
+ url_for, abort, g
from config import config
import core
@@ -15,15 +15,18 @@
app.debug = config.debug
app.secret_key = config.flask_key
+
def connect_db():
"""Returns a new connection to the database."""
return sqlite3.connect(config.db)
+
@app.before_request
def before_request():
"""Make sure we are connected to the database each request."""
g.db = connect_db()
+
@app.teardown_request
def teardown_request(exception):
"""Closes the database again at the end of the request."""
@@ -61,7 +64,7 @@ def groups(name=None):
gid = data.add_group(request.form['name'], request.form['keyword'],
config.default_phone)
data.set_webuser_group(session.get('userid'), gid)
- return redirect(url_for('groups')+request.form['name'])
+ return redirect(url_for('groups') + request.form['name'])
members = None
if not session.get('admin'):
@@ -80,7 +83,8 @@ def groups(name=None):
return render_template('group.html', name=name, members=members, groups=groups, error=error)
-@app.route('/removemember/<mid>')
+
+@app.route('/removemember/<mid>')
def remove_member(mid=None):
data = core.Data()
info = data.get_member_info(mid)
@@ -94,7 +98,8 @@ def remove_member(mid=None):
data.remove_number(mid)
return redirect(url_for('groups', name=info['groupName']))
-@app.route('/removegroup/<name>')
+
+@app.route('/removegroup/<name>')
def remove_group(name):
data = core.Data()
gid = data.get_group_id(name)
@@ -109,7 +114,8 @@ def remove_group(name):
data.remove_group(gid)
return redirect(url_for('groups'))
-@app.route("/settings", methods=['POST','GET'])
+
+@app.route("/settings", methods=['POST', 'GET'])
def settings():
if not session.get('admin'):
abort(401)
@@ -125,19 +131,21 @@ def settings():
data.set_webuser_pw(request.form['userid'], request.form['pw'])
groups = data.get_groups()
- return render_template('settings.html', webusers=data.get_webusers(),groups=groups)
+ return render_template('settings.html', webusers=data.get_webusers(), groups=groups)
-@app.route('/removewebusergroup/<uid>/<gid>')
-def remove_webuser_group(uid,gid):
+
+@app.route('/removewebusergroup/<uid>/<gid>')
+def remove_webuser_group(uid, gid):
if not session.get('admin'):
abort(401)
data = core.Data()
- data.remove_webuser_group(uid,gid)
+ data.remove_webuser_group(uid, gid)
return redirect(url_for('settings'))
-@app.route('/removewebuser/<uid>')
+
+@app.route('/removewebuser/<uid>')
def remove_webuser(uid):
if not session.get('admin'):
abort(401)
@@ -146,13 +154,14 @@ def remove_webuser(uid):
data.remove_webuser(uid)
return redirect(url_for('settings'))
+
@app.route("/")
@app.route('/login', methods=['POST', 'GET'])
def login():
error = None
if request.method == 'POST':
data = core.Data()
- userid,privilege = data.check_webuser_login(request.form['username'],
+ userid, privilege = data.check_webuser_login(request.form['username'],
request.form['password'])
if privilege:
session['username'] = request.form['username']
@@ -165,6 +174,7 @@ def login():
error = 'Invalid username/password'
return render_template('login.html', error=error)
+
@app.route('/logout')
def logout():
session.pop('username', None)
@@ -176,4 +186,3 @@ def logout():
if __name__ == '__main__':
app.run()
-
Please sign in to comment.
Something went wrong with that request. Please try again.