Skip to content
This repository has been archived by the owner on Dec 9, 2020. It is now read-only.

Commit

Permalink
Merge branch 'sockets' into test
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyBanks committed Apr 29, 2014
2 parents e1bee51 + c57c404 commit 74927e5
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 100 deletions.
1 change: 1 addition & 0 deletions examples/example.py
100644 → 100755
@@ -1,3 +1,4 @@
#!/usr/bin/env python
import getpass
import logging
import os
Expand Down
3 changes: 1 addition & 2 deletions setup.py
Expand Up @@ -3,7 +3,7 @@

setuptools.setup(
name='ChatExchange',
version='0.0.0dev5',
version='0.0.0dev6',
url='https://github.com/Manishearth/ChatExchange',
packages=[
'chatexchange'
Expand All @@ -14,7 +14,6 @@
install_requires=[
'BeautifulSoup==3.2.1',
'httmock==1.2.2',
'pprintpp==0.2.1',
'pytest-capturelog==0.7',
'pytest==2.5.2',
'requests==2.2.1',
Expand Down
153 changes: 102 additions & 51 deletions src/chatexchange/browser.py
@@ -1,6 +1,9 @@
import json
import logging
import re
import sys
import threading
import time

from BeautifulSoup import BeautifulSoup
import requests
Expand All @@ -15,8 +18,10 @@ def __init__(self):
self.session = requests.Session()
self.rooms = {}
self.sockets = {}
self.polls = {}
self.chatfkey = ""
self.chatroot = "http://chat.stackexchange.com"
self.logger = logging.getLogger(str(self))

def loginSEOpenID(self, user, password):
"""
Expand Down Expand Up @@ -146,7 +151,6 @@ def _handle_se_openid_prompt_if_neccessary(self, prompt_response):

return response


def loginChatSE(self):
chatlogin = self.getSoup("http://stackexchange.com/users/chat-login")
authToken = chatlogin.find('input', {"name": "authToken"})['value']
Expand All @@ -170,7 +174,7 @@ def updateChatFkey(self):
self.chatfkey = fkey
return True
except Exception as e:
print "Error updating fkey:", e
self.logger.error("Error updating fkey: %s", e)
return False

def postSomething(self, relurl, data):
Expand All @@ -187,70 +191,117 @@ def getSomething(self, relurl):
def getSoup(self, url):
return BeautifulSoup(self.session.get(url).content)

def initSocket(self, roomno, func):
"""
Experimenta. Use polling of /events
"""
eventtime = self.postSomething(
"/chats/"+str(roomno)+"/events",
{"since": 0, "mode": "Messages", "msgCount": 100})['time']
print eventtime

wsurl = self.postSomething(
"/ws-auth",
{"roomid":roomno}
)['url']+"?l="+str(eventtime)
print wsurl

self.sockets[roomno] = {"url":wsurl}
self.sockets[roomno]['ws'] = websocket.create_connection(
wsurl, origin=self.chatroot)

def runner():
print roomno
#look at wsdump.py later to handle opcodes
while True:
a = self.sockets[roomno]['ws'].recv()
print "a", a
if a != None and a != "":
func(a)

self.sockets[roomno]['thread']=threading.Thread(target=runner)
self.sockets[roomno]['thread'].setDaemon(True)
self.sockets[roomno]['thread'].start()

def post(self, url, data):
return self.session.post(url,data)

def joinRoom(self, roomid):
roomid = str(roomid)
self.rooms[roomid] = {}
def joinRoom(self, room_id):
room_id = str(room_id)
self.rooms[room_id] = {}
result = self.postSomething(
"/chats/"+str(roomid)+"/events",
"/chats/"+str(room_id)+"/events",
{"since": 0, "mode": "Messages", "msgCount": 100})
eventtime = result['time']
self.rooms[roomid]["eventtime"] = eventtime
self.rooms[room_id]["eventtime"] = eventtime

def watch_room_socket(self, room_id, on_activity):
"""
Watches for raw activity in a room using WebSockets.
def pokeRoom(self, roomid):
roomid = str(roomid)
if not self.rooms[roomid]:
return false
This starts a new daemon thread.
"""
socket_watcher = RoomSocketWatcher(self, room_id, on_activity)
self.sockets[room_id] = socket_watcher
socket_watcher.start()

pokeresult = self.postSomething("/events",{"r"+roomid:self.rooms[roomid]['eventtime']})
def watch_room_http(self, room_id, on_activity, interval):
"""
Watches for raw activity in a room using HTTP polling.
try:
roomresult = pokeresult["r"+str(roomid)]
newtime = roomresult["t"]
self.rooms[roomid]["eventtime"]=newtime
except KeyError:
"NOP"
return pokeresult
This starts a new daemon thread.
"""
http_watcher = RoomPollingWatcher(self, room_id, on_activity, interval)
self.polls[room_id] = http_watcher
http_watcher.start()

def getURL(self, rel):
if rel[0] != "/":
rel = "/"+rel
return self.chatroot+rel


class RoomSocketWatcher(object):
def __init__(self, browser, room_id, on_activity):
self.browser = browser
self.room_id = str(room_id)
self.thread = None
self.logger = logging.getLogger(str(self))
self.on_activity = on_activity

def start(self):
events_data = self.browser.postSomething(
'/chats/%s/events' % (self.room_id,),
{'since': 0, 'mode': 'Messages', 'msgCount': 100}
)
eventtime = events_data['events'][0]['time_stamp']
self.logger.debug('eventtime == %r', eventtime)

ws_auth_data = self.browser.postSomething(
'/ws-auth',
{'roomid': self.room_id}
)
wsurl = ws_auth_data['url'] + '?l=%s' % (eventtime,)
self.logger.debug('wsurl == %r', wsurl)

self.ws = websocket.create_connection(
wsurl, origin=self.browser.chatroot)

self.thread = threading.Thread(target=self._runner)
self.thread.setDaemon(True)
self.thread.start()

def _runner(self):
#look at wsdump.py later to handle opcodes
while True:
a = self.ws.recv()
self.logger.debug("a == %r", a)

if a != None and a != "":
self.on_activity(json.loads(a))


class RoomPollingWatcher(object):
def __init__(self, browser, room_id, on_activity, interval):
self.browser = browser
self.room_id = str(room_id)
self.thread = None
self.logger = logging.getLogger(str(self))
self.on_activity = on_activity
self.interval = interval

def start(self):
self.thread = threading.Thread(target=self._runner)
self.thread.setDaemon(True)
self.thread.start()

def _runner(self):
while(True):
last_event_time = self.browser.rooms[self.room_id]['eventtime']

activity = self.browser.postSomething(
'/events',
{'r' + self.room_id: last_event_time})

try:
room_result = activity['r' + self.room_id]
eventtime = room_result['t']
self.browser.rooms[self.room_id]['eventtime'] = eventtime
except KeyError as ex:
pass # no updated time from room

self.on_activity(activity)

time.sleep(self.interval)


class LoginError(Exception):
pass
65 changes: 34 additions & 31 deletions src/chatexchange/wrapper.py
Expand Up @@ -68,9 +68,9 @@ def logout(self):
self.logger.info("Logged out.")
self.logged_in = False

def sendMessage(self, room, text):
self.message_queue.put((room, text))
self.logger.info("Queued message %r for room #%r.", text, room)
def sendMessage(self, room_id, text):
self.message_queue.put((room_id, text))
self.logger.info("Queued message %r for room_id #%r.", text, room_id)
self.logger.info("Queue length: %d.", self.message_queue.qsize())

def __del__(self):
Expand All @@ -91,11 +91,11 @@ def _worker(self):
return
else:
self.messages += 1
room, text = next
room_id, text = next
self.logger.info(
"Now serving customer %d, %r for room #%s.",
self.messages, text, room)
self._actuallySendMessage(room, text) # also blocking.
self.messages, text, room_id)
self._actuallySendMessage(room_id, text) # also blocking.
self.message_queue.task_done()

# Appeasing the rate limiter gods is hard.
Expand All @@ -104,8 +104,8 @@ def _worker(self):

# When told to wait n seconds, wait n * BACKOFF_MULTIPLIER + BACKOFF_ADDER

def _actuallySendMessage(self, room, text):
room = str(room)
def _actuallySendMessage(self, room_id, text):
room_id = str(room_id)
sent = False
attempt = 0
if text == self._previous:
Expand All @@ -115,7 +115,7 @@ def _actuallySendMessage(self, room, text):
attempt += 1
self.logger.debug("Attempt %d: start.", attempt)
response = self.br.postSomething(
"/chats/"+room+"/messages/new",
"/chats/"+room_id+"/messages/new",
{"text": text})
if isinstance(response, str):
match = re.match(TOO_FAST_RE, response)
Expand Down Expand Up @@ -150,25 +150,28 @@ def _actuallySendMessage(self, room, text):

time.sleep(wait)

def joinRoom(self, roomid):
self.br.joinRoom(roomid)

def watchRoom(self, roomid, func, interval):
def pokeMe():
while(True):
try:
pokeresult = self.br.pokeRoom(roomid)
events = pokeresult["r"+str(roomid)]["e"]
for event in events:
func(event,self)
except KeyError:
"NOP"
finally:
time.sleep(interval)
thethread = threading.Thread(target=pokeMe)
thethread.setDaemon(True)
thethread.start()
return thethread

def joinWatchSocket(self,roomid,func):
self.br.initSocket(roomid,func)
def joinRoom(self, room_id):
self.br.joinRoom(room_id)

def _room_events(self, activity, room_id):
"""
Returns a list of events associated with a particular room,
given an activity message from the server.
"""
room_activity = activity.get('r' + room_id, {})
room_events = room_activity.get('e', [])
return room_events

def watchRoom(self, room_id, on_event, interval):
def on_activity(activity):
for event in self._room_events(activity, room_id):
on_event(event, self)

self.br.watch_room_http(room_id, on_activity, interval)

def watchRoomSocket(self, room_id, on_event):
def on_activity(activity):
for event in self._room_events(activity, room_id):
on_event(event, self)

self.br.watch_room_socket(room_id, on_activity)
38 changes: 22 additions & 16 deletions test/test_live_messages.py
Expand Up @@ -25,10 +25,10 @@
os.environ.get('TRAVIS_COMMIT')):
TEST_MESSAGE_FORMAT = (
"[ [ChatExchange@Travis](https://travis-ci.org/"
"{0[TRAVIS_REPO_SLUG]}/builds/{0[TRAVIS_BUILD_ID]} \"This is"
"a test message for ChatExchange using the nonce {{0}}.\") ] This"
" is a test of [{0[TRAVIS_REPO_SLUG]}@{short_commit}](https://"
"github.com/{0[TRAVIS_REPO_SLUG]}/commit/{0[TRAVIS_COMMIT]})."
"{0[TRAVIS_REPO_SLUG]}/builds/{0[TRAVIS_BUILD_ID]} \"This is "
"a test message for ChatExchange using the nonce {{0}}.\") ] "
"This is a test of [{0[TRAVIS_REPO_SLUG]}@{short_commit}]("
"https://github.com/{0[TRAVIS_REPO_SLUG]}/commit/{0[TRAVIS_COMMIT]})."
).format(os.environ, short_commit=os.environ['TRAVIS_COMMIT'][:8])
else:
TEST_MESSAGE_FORMAT = (
Expand All @@ -53,30 +53,36 @@ def test_se_message_echo(host_id, room_id):
test_message_nonce = uuid.uuid4().hex
test_message = TEST_MESSAGE_FORMAT.format(test_message_nonce)

replied = [False]
seen_message_with_socket = []
seen_message_with_polling = []

def on_message(message, wrapper):
def on_socket_message(message, wrapper):
if test_message_nonce in message['content']:
replied[0] = True
logger.debug("Saw expected echoed test chat message!")
else:
logger.debug(
"Ignoring unexpected message: %s", message)
seen_message_with_socket.append(message)
logger.debug("Saw test message in socket")

def on_polling_message(message, wrapper):
if test_message_nonce in message['content']:
seen_message_with_polling.append(message)
logger.debug("Saw test message in polling")

logger.debug("Joining chat")
wrapper.joinRoom(room_id)

wrapper.watchRoom(room_id, on_message, 1)
wrapper.watchRoom(room_id, on_polling_message, 1)
wrapper.watchRoomSocket(room_id, on_socket_message)

time.sleep(2) # Avoid race conditions
logger.debug("Sending test message")
wrapper.sendMessage(room_id, test_message)

timeout_time = time.time() + 30
timeout_time = time.time() + 15.0

while time.time() < timeout_time and replied[0] == False:
while time.time() < timeout_time and not (
seen_message_with_socket and seen_message_with_polling):
time.sleep(1)

if not replied[0]:
raise Exception("did not see expected chat reply in time")
assert seen_message_with_polling, "didn't see own message using HTTP polling"
assert seen_message_with_socket, "didn't see own message using WebSockets"

wrapper.logout()

0 comments on commit 74927e5

Please sign in to comment.