Skip to content

Commit

Permalink
Update pushbank now works with eventlet, and cleanup annotations
Browse files Browse the repository at this point in the history
Having previously used gevent.
This commit substitutes gevent, so this solves an issue where sslwrap has been deprecated.
This fixes #1.
  • Loading branch information
ssut committed Feb 8, 2015
1 parent 849d934 commit a630c56
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 55 deletions.
85 changes: 41 additions & 44 deletions pushbank/__init__.py
Expand Up @@ -6,13 +6,9 @@
import sys
import time
import traceback
import gevent
import eventlet
eventlet.monkey_patch()

from gevent import monkey
monkey.patch_all()
from gevent import Greenlet
from gevent import queue
from gevent import sleep
from threading import current_thread
from jinja2 import Environment, FileSystemLoader

Expand All @@ -22,18 +18,25 @@

home = os.getcwd()

# eventlet version of joinall similar to gevent one
def joinall(threads):
for t in threads:
try:
t.wait()
except:
pass

class PushBank(object):

def __init__(self, config):
self.config = config
self.emails = queue.Queue()
self.emails = eventlet.queue.Queue()
self.adapters = {}
self.cache = None
self.template = Environment(loader=FileSystemLoader(os.path.join(
home, 'pushbank', 'template'))).get_template('email.html')

# pushbank logger
# logger
current_thread().name = 'MAIN'
root_logger = logging.getLogger()
root_logger.level = config.log_level
Expand All @@ -50,7 +53,7 @@ def __init__(self, config):
# load adapters
self.load_adapter()

# connect smtp server
# connect to smtp server
self.connected = connect_mail(server=config.EMAIL['SMTP_SERVER'],
port=config.EMAIL['SMTP_PORT'],
user=config.EMAIL['SMTP_USER'],
Expand All @@ -60,21 +63,19 @@ def __init__(self, config):
@staticmethod
def execute(config):
pb = PushBank(config)
# waiting for smtp server
# waiting for smtp to be ready
while not pb.connected:
sleep(0)
eventlet.sleep(0)

tmp_path = os.path.join(home, 'tmp')
pb.cache = Cache(tmp_path=tmp_path)
email_process = Greenlet.spawn(pb.handle_email)
email_process.start()
bank_process = Greenlet.spawn(pb.handle_bank)
bank_process.start()
email_process = eventlet.spawn(pb.handle_email)
bank_process = eventlet.spawn(pb.handle_bank)

try:
gevent.joinall([email_process, bank_process])
joinall([email_process, bank_process])
except KeyboardInterrupt:
print >> sys.stdout, 'PushBank has stopped working'
print >> sys.stdout, 'PushBank has stopped working.'
sys.exit(0)

def load_adapter(self):
Expand All @@ -89,76 +90,74 @@ def load_adapter(self):
self.adapters[name] = imp.load_source(name, fn)
except Exception, e:
traceback.print_exc()
print >> sys.stderr, "Error loading %s: %s" % (name, e)
print >> sys.stderr, "Error loading %s: %s." % (name, e)
sys.exit(1)

def handle_adapter(self, adapter, **kwargs):
current_thread().name = adapter.en_name.upper()
try:
result = adapter.query(**kwargs)
except:
logging.warning('%s - Failed to fetch data', adapter.en_name)
logging.warning('%s - Failed to fetch data.', adapter.en_name)
return
if 'history' not in result:
logging.warning('Missing history in query result')
logging.warning('No histories fetched.')
return
result['history'].reverse()

# generate balance, history cache key
# generate keys
balance_key = '%s-balance' % (adapter.en_name)
history_key = '%s-history' % (adapter.en_name)

# get balance, histories from recently result
# get the latest data obtained from the last made
latest_balance = result['balance']
latest_history = result['history']

# get balance from cache
# get the balance from the cache
balance = self.cache.get(balance_key) if self.cache.exists(balance_key) \
else 0

# if balance is already cached and different from the latest balance
# check the difference between old and new balance
if balance > 0 and balance != latest_balance:
latest_history_size = len(latest_history)
# get histories from cache
history = self.cache.get(history_key) if self.cache.exists(
history_key) else []
# check history diffs
diffs = []
# iterate from recently histories
# iterate from histories
for h in latest_history:
# serialize history element (for check of duplicate)
# serialize history element (to find duplicates)
serialized = pickle.dumps(h)
# check exists of history in cached histories
# check if history is already in cached
if serialized not in history:
diffs.append(h)
history.append(serialized)
# remove first element if greather than latest history size
# remove first element if greather than latest_history_size
if len(history) > latest_history_size:
history.pop(0)
# save new history
self.cache.set(balance_key, latest_balance)
self.cache.set(history_key, history)
# add email queue from diffs
# put email data from diffs into queue
for data in diffs:
self.emails.put({
'adapter': adapter,
'data': data,
})
elif balance > 0:
logging.info('%s - No balance changes', adapter.en_name)
logging.info('%s - Balance has not changed.', adapter.en_name)
else:
# else save the balance and histories now
# or save the data
history = []
for h in latest_history:
# serialize history element
serialized = pickle.dumps(h)
# add serialized history to history list
# add serialized history to list
history.append(serialized)
# save to cache store
self.cache.set(balance_key, latest_balance)
self.cache.set(history_key, history)
logging.info("Stored your bank account data such as"
" balance and history. Waiting for save data to disk")
logging.info("Initialized. Saving data in progress.")

def handle_bank(self):
while True:
Expand All @@ -167,22 +166,21 @@ def handle_bank(self):
if name not in self.config.INCLUDE_BANKS:
continue
adapter = self.adapters[name]
thread = Greenlet.spawn(self.handle_adapter, adapter, **kwargs)
thread.start()
thread = eventlet.spawn(self.handle_adapter, adapter, **kwargs)
bank_threads.append(thread)

gevent.joinall(bank_threads)
joinall(bank_threads)
for thread in bank_threads:
del thread
del bank_threads

sleep(self.config.REFRESH_INTERVAL)
eventlet.sleep(self.config.REFRESH_INTERVAL)

def handle_email(self):
while True:
try:
# get recent email item
data = self.emails.get()
# get
data = self.emails.get_nowait()
adapter = data['adapter']
history = data['data']
mail_title = self.config.EMAIL['TITLE'].format(
Expand All @@ -191,7 +189,6 @@ def handle_email(self):
content = self.template.render(**history)
send_mail(target=self.config.EMAIL['TARGET'], title=mail_title,
content=content, adapter_name=adapter.en_name)
except queue.Empty:
# sleep zero for yield
sleep(0)
except eventlet.queue.Empty:
eventlet.sleep(1)
continue
13 changes: 5 additions & 8 deletions pushbank/cache.py
Expand Up @@ -2,11 +2,9 @@
import os
import pickle
import time
import eventlet
eventlet.monkey_patch()

from gevent import monkey
monkey.patch_all()
from gevent import sleep
from gevent import Greenlet
from threading import current_thread

from ._singleton import Singleton
Expand All @@ -19,10 +17,9 @@ def __init__(self, tmp_path):
self.cache_file = os.path.join(tmp_path, 'cache.db')
self._cache = {}
self._modified = False
self._backup_handler = Greenlet.spawn(self._backup_handler)
self._backup_handler = eventlet.spawn(self._backup_handler)

self._reload()
self._backup_handler.start()

def _backup_handler(self):
current_thread().name = 'BACKUP'
Expand All @@ -35,7 +32,7 @@ def _backup_handler(self):
elapsed = (time.clock() - start) * 1000
logging.info('Cache saved on disk: %.3f seconds', elapsed)
else:
sleep(5)
eventlet.sleep(5)

def _reload(self):
if os.path.isfile(self.cache_file):
Expand All @@ -51,7 +48,7 @@ def _save(self):
# waiting for write access permission to destination file
while os.path.isfile(self.cache_file) and \
not os.access(self.cache_file, os.W_OK):
sleep(0.1)
eventlet.sleep(0.1)

with open(self.cache_file, 'w') as f:
try:
Expand Down
5 changes: 2 additions & 3 deletions pushbank/mail.py
Expand Up @@ -2,12 +2,11 @@
import logging
import smtplib
import sys
import eventlet
eventlet.monkey_patch()

from email.mime.text import MIMEText
from email.header import Header
from gevent import monkey
monkey.patch_all()
from gevent import sleep

session = None
connect_params = {}
Expand Down

0 comments on commit a630c56

Please sign in to comment.