forked from clickyotomy/twitter-message-bus
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pull.py
executable file
·151 lines (121 loc) · 4.66 KB
/
pull.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#! /usr/bin/env python2.7
'''
Reads messages from the 'in' queue, decrypts the contents.
'''
import time
import json
from argparse import ArgumentParser
from logging import (NullHandler, getLogger, StreamHandler, Formatter, DEBUG,
INFO)
from pydisque.client import Client
from gist import get
from auth import status, verify, decrypt
# Formatting for logger output.
getLogger(__name__).addHandler(NullHandler())
LOGGER = getLogger()
HANDLER = StreamHandler()
FORMATTER = Formatter(
('%(asctime)s; %(name)s, %(levelname)s; PID: %(process)s; '
'%(module)s: %(funcName)s; traceback: %(exc_info)s; %(message)s')
)
HANDLER.setFormatter(FORMATTER)
# Check stream.py for more information.
VAULT_PATH = 'vault/keys.json'
def load_credentials(path=VAULT_PATH):
'''
Load credentials from vault.
'''
gist = None
with open(path, 'r') as vault_file:
try:
vault = json.loads(vault_file.read())
gist = vault['github']
except IOError:
print 'Unable to read vault-file: {0}.'.format(path)
except (KeyError, ValueError):
print 'Unable to parse the vault-file.'
return gist
def receive(token, queue, retry, debug=False):
'''
Get the message from the queue, display the decrypted text.
'''
if status(debug):
LOGGER.info('[keybase-status] client-up; signed-in')
else:
LOGGER.error('[keybase-status] client-down/sigend-out')
return
try:
while True:
job = queue.get_job(['in'], count=1, nohang=False)
# Wait for a valid job.
if len(job) > 0:
queue.ack_job(job[0][1])
LOGGER.info('[received-job]: %s', repr(job[0]))
signed = get(job[0][2].strip(), token, debug)
# Check for a valid signature.
if signed is None:
LOGGER.error('[gist-fetch] %s not found!', job[0][2])
continue
# If the message is verified, decrypt it.
flag, who, encrypted = verify(signed, debug)
if flag:
LOGGER.info('[keybase-verify] message signed by %s',
who)
who, text = decrypt(encrypted, debug)
if who is not None:
LOGGER.info(('[keybase-decrypt] message encrypted'
' by %s'), who)
LOGGER.info(('[keybase-decrypt] plain-text content: '
'\n%s'), text)
else:
LOGGER.error('[keybase-decrypt] un-trusted encryption')
continue
else:
LOGGER.error('[keybase-verify] unable to verify')
continue
time.sleep(retry)
except Exception:
LOGGER.error('[queue] unable to fetch jobs from \'in\'')
def main():
'''
Validate arguments, load credentials and read from the queue.
'''
message = 'Read messages from the message bus.'
socket_help = ('a list containing the host, port numbers to listen to; '
'defaults to localhost:7711 (for disque)')
retry_help = 'queue check frequncy (in seconds); defaults to 8'
parser = ArgumentParser(description=message)
parser.add_argument('-s', '--sockets', help=socket_help,
default=['localhost:7711'], dest='sockets',
metavar=('HOST:PORT'), nargs='+')
parser.add_argument('-d', '--debug', help='enable debugging',
action='store_true', default=False)
parser.add_argument('-r', '--retry', help=retry_help, default=8,
type=int, metavar=('DELAY'))
args = vars(parser.parse_args())
if args['debug']:
LOGGER.setLevel(DEBUG)
LOGGER.addHandler(HANDLER)
else:
LOGGER.setLevel(INFO)
LOGGER.addHandler(HANDLER)
# Load credentials.
token = load_credentials()
if not token:
LOGGER.error('[load_credentials] unable to load credentials!')
return
try:
# Connect to the redis-queue.
queue = Client(args['sockets'])
queue.connect()
LOGGER.info('[start-daemon]')
queue_info = json.dumps(queue.info(), indent=4)
LOGGER.debug('[queue-init]\n%s', queue_info)
receive(token=token, queue=queue, retry=args['retry'],
debug=args['debug'])
except Exception:
LOGGER.error('[error] unable to connect to the redis-queue (disque)!')
except KeyboardInterrupt:
LOGGER.critical('[stop-daemon]')
if __name__ == '__main__':
main()