please merge my fork and have a try #1

Closed
wants to merge 4 commits into
from
Jump to file or symbol
Failed to load files and symbols.
+152 −15
Split
View
2 README
@@ -43,7 +43,7 @@ apnsagent的程序包括:
from apnsagent.client import PushClient
client = PushClient(app_key) # 初始化一个推送客户端。
client.register_token(token,develop=True) # 可选,注册一个Token,不注册默认发到生产队列当中。
-client.push(alert,token,badge,sound,custom) # 发送一条推送消息。
+client.push(token,alert,badge,sound,custom) # 发送一条推送消息。
工程依赖
- apns
View
@@ -36,8 +36,8 @@ def register_token(self, token, user_id=None, develop=False):
self.app_key), token):
self.redis.srem('%s:%s' % (constants.INVALID_TOKENS,
self.app_key), token)
-
-
+
+
#TODO 为用户ID和token加上关联。
@@ -56,6 +56,15 @@ def get_target(self, token):
('%s:%s' % (constants.PUSH_JOB_CHANNEL, self.app_key),
'%s:%s' % (constants.PUSH_JOB_FALLBACK, self.app_key))
+ def sent_message_count(self) :
+ return self.redis.hget("counter",app_key)
+
+ def debug_tokens(self):
+ return self.redis.smembers('%s:%s' % (constants.DEBUG_TOKENS,self.app_key))
+
+ def invalid_tokens(self):
+ return self.redis.smembers('%s:%s' % (constants.INVALID_TOKENS,who))
+
def push(self, token=None, alert=None, badge=None,
sound=None, custom=None):
@@ -86,3 +95,29 @@ def push(self, token=None, alert=None, badge=None,
clients = self.redis.publish(channel, payload)
if not clients:
self.redis.sadd(fallback_set, payload) #TODO 加上超时
+
+ def push_batch(self,tokens,alert):
+ """push message in batch
+ """
+ token = tokens[0]
+ channel, fallback_set = self.get_target(token)
+
+ for tk in tokens:
+ d = {'token': tk}
+ if alert:
+ d['alert'] = alert
+ payload = simplejson.dumps(d)
+
+ clients = self.redis.publish(channel, payload)
+ if not clients:
+ self.redis.sadd(fallback_set, payload) #TODO 加上超时
+
+ def stop(self):
+ self.redis.publish("app_watcher",simplejson.dumps({'op':'stop','app_key':self.app_key}))
+
+
+ def valid(self):
+ """valid app_key
+ """
+ pass
+
View
@@ -12,10 +12,16 @@
from optparse import OptionParser
from ConfigParser import ConfigParser
-from apnsagent import *
-from apnsagent.constants import *
-from apnsagent.notification import *
-from apnsagent.logger import log,create_log
+
+from constants import *
+from notification import *
+from logger import log,create_log
+from web_daemon import *
+
+import simplejson
+
+from redis import *
+
class PushGuard(object):
"""推送服务的主程序,主要职责:
@@ -33,7 +39,11 @@ def __init__(self, app_dir, server_info):
self.app_dir = app_dir
self.server_info = server_info
- self.threads = []
+
+ self.rds = redis.Redis(**self.server_info)
+
+ self.threads = {}
+ self.notifiers = {}
def run(self):
"""读取一个目录,遍历下面的app文件夹,每个app启动一到两条线程对此提供服
@@ -51,8 +61,13 @@ def run(self):
if os.path.exists(os.path.join(self.app_dir, app, PRODUCTION_DIR)):
self.make_worker_threads(app)
+
#TODO 启动一条监控的线程,允许动态增加推送应用及移除推送应用。使用
#inotify或队列来实现监听变动。inotify或许不错哦。
+
+ start_web_daemon(self)
+
+ self.watch_app()
log.debug('just wait here,there are %d threads ' % len(self.threads))
@@ -78,7 +93,7 @@ def make_worker_threads(self, app, develop=False):
'key_file': key_file,
'server_info': self.server_info
}
-
+
push_job = threading.Thread(target=self.push, kwargs=kwargs)
feedback_job = threading.Thread(target=self.feedback, kwargs=kwargs)
@@ -88,18 +103,50 @@ def make_worker_threads(self, app, develop=False):
push_job.start()
feedback_job.start()
- self.threads.append(push_job)
- self.threads.append(feedback_job)
-
+ self.threads[app_key + ":push"] = push_job
+ self.threads[app_key + ":feeedback"] = feedback_job
+
+ def stop_worker_thread(self, app):
+ app_key = app
+
+ self.notifiers[app_key + ":push"].alive = False
+ self.notifiers[app_key + ":feeedback"].alive = False
+
+ del self.threads[app_key + ":push"]
+ del self.threads[app_key + ":feeedback"]
+
+ def change_worker_thread(self, develop=False):
+ self.stop_worker_thread(app)
+ self.make_worker_threads(app,develop)
+
+
+ def watch_app(self):
+ self.watcher = threading.Thread(target=self.app_watcher)
+ self.watcher.setDaemon(True)
+ self.watcher.start()
+
+
+ def app_watcher(self):
+ ps = self.rds.pubsub()
+ ps.subscribe("app_watcher")
+ channel = ps.listen()
+ for message in channel:
+ m = simplejson.loads(message["data"])
+ if(m["op"]=="stop"):
+ self.stop_worker_thread(m["app_key"])
+
def push(self, develop, app_key, cer_file, key_file, server_info):
notifier = Notifier('push', develop, app_key,
cer_file, key_file, server_info)
+ self.notifiers[app_key + ":push"] = notifier # XXX
notifier.run()
+
def feedback(self, develop, app_key, cer_file, key_file, server_info):
notifier = Notifier('feedback', develop, app_key,
cer_file, key_file, server_info)
+ self.notifiers[app_key + ":feeedback"] = notifier # XXX
notifier.run()
View
@@ -35,6 +35,8 @@ def __init__(self, job='push', develop=False, app_key=None,
self.cert_file = cer_file
self.key_file = key_file
+ self.alive = True
+
self.apns = APNs(use_sandbox=self.develop,
cert_file=self.cert_file, key_file=self.key_file)
if server_info:
@@ -89,10 +91,10 @@ def _send_message(self,message):
sound = real_message.get('sound', None)
try:
if real_message.get('custom',None):
- payload = Payload(alert=real_message['alert'], sound=sound,
+ payload = Payload(alert=real_message.get('alert',None), sound=sound,
custom=real_message['custom'],badge=badge)
else:
- payload = Payload(alert=real_message['alert'],
+ payload = Payload(alert=real_message.get('alert',None),
sound=sound, badge=badge)
except PayloadTooLargeError, e:
payload = Payload(badge=badge)
@@ -101,7 +103,8 @@ def _send_message(self,message):
self.app_key),
real_message['token']):
# the token is invalid,do nothing
- return
+ return
+ self.rds.hincrby("counter",self.app_key) # XXX
log.debug('will sent a meesage to token %s',real_message['token'])
self.apns.gateway_server.send_notification(real_message['token'],payload)
@@ -144,6 +147,8 @@ def push(self):
log.debug('subscribe push job channel successfully')
redis_channel = pubsub.listen()
for message in redis_channel:
+ if not self.alive:
+ break
self.send_message(message)
View
@@ -0,0 +1,50 @@
+import redis
+from flask import Flask
+app = Flask(__name__)
+
+from guard import *
+
+rds = redis.Redis()
+
+@app.route("/")
+def hello():
+ return "Hello World!"
+
+@app.route("/apps")
+def echo_apps():
+ return str(rds.hkeys("counter"))
+
+@app.route("/msg_count/<who>")
+def echo_msg_count(who):
+ return str(rds.hget("counter",who))
+
+@app.route("/fail_msg_count/<who>")
+def fail_echo_msg_count(who):
+ if(who in msg_counter):
+ return who + ":" + str(fail_msg_counter[who])
+ else:
+ return "no such an app"
+
+@app.route("/bad_tokens/<who>")
+def echo_bad_tokens(who):
+ return str(rds.smembers('%s:%s' % (constants.INVALID_TOKENS,who)))
+
+@app.route("/x")
+def echo():
+ global server
+ return str(len(server.notifiers))
+
+def start_server():
+ app.run()
+
+web_daemon = "start web daemon"
+server = None
+
+def start_web_daemon(serv):
+ global web_daemon
+ global server
+ server = serv
+ print web_daemon
+ web_daemon = threading.Thread(target=start_server)
+ web_daemon.setDaemon(True)
+ web_daemon.start()