Skip to content
This repository has been archived by the owner on Feb 11, 2019. It is now read-only.

Commit

Permalink
Merge branch 'feature/issue-10-fancy-acks' into develop
Browse files Browse the repository at this point in the history
  • Loading branch information
annasborysova committed Jul 6, 2015
2 parents 44acfdb + b076689 commit 7c7fe12
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 28 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Expand Up @@ -5,6 +5,8 @@ python:
env:
# Test against the latest version of Twisted.
- TWISTED_VERSION="Twisted"
services:
- redis-server
matrix:
include:
# Test against the oldest version of Twisted that we claim to support
Expand Down
28 changes: 23 additions & 5 deletions vxyowsup/tests/test_whatsapp.py
Expand Up @@ -14,8 +14,8 @@
from yowsup.stacks import YowStackBuilder
from yowsup.layers.logger import YowLoggerLayer
from yowsup.layers import YowLayer
from yowsup.layers.protocol_messages.protocolentities import (
TextMessageProtocolEntity)
from yowsup.layers.protocol_messages.protocolentities import TextMessageProtocolEntity
from yowsup.layers.protocol_acks.protocolentities import AckProtocolEntity


@staticmethod
Expand Down Expand Up @@ -53,15 +53,24 @@ def setUp(self):
'cc': '27',
'phone': '27000000000',
'password': base64.b64encode("xxx"),
#'redis_manager': {'key_prefix': "vumi:whatsapp", 'db': 1},
}

self.transport = yield self.tx_helper.get_transport(self.config)
self.testing_layer = self.transport.stack_client.network_layer

def assert_id_format_correct(self, node):
uuid, _sep, count = node["id"].partition('-')
self.assertEqual(len(uuid), 10)
self.assertTrue(int(count) > 0)

def assert_nodes_equal(self, node1, node2):
# TODO: test id explicitly
node2["id"] = node1["id"]
self.assertEqual(node1.toString(), node2.toString())
self.assert_id_format_correct(node1)
self.assert_id_format_correct(node2)
id_stub = node1["id"].split('-')[0]
xml1 = node1.toString().replace(node1["id"], id_stub)
xml2 = node2.toString().replace(node2["id"], id_stub)
self.assertEqual(xml1, xml2)

def assert_messages_equal(self, message1, message2):
'''
Expand All @@ -71,11 +80,18 @@ def assert_messages_equal(self, message1, message2):
self.assertEqual(message1['to_addr'], message2['to_addr'])
self.assertEqual(message1['from_addr'], message2['from_addr'])

def assert_ack(self, ack, message, whatsapp_id):
self.assertEqual(ack.payload['event_type'], 'ack')
self.assertEqual(ack.payload['user_message_id'], message['message_id'])
self.assertEqual(ack.payload['sent_message_id'], whatsapp_id)

@inlineCallbacks
def test_outbound(self):
message_sent = yield self.tx_helper.make_dispatch_outbound(content='fail!', to_addr='double fail!')
node_received = yield self.testing_layer.data_received.get()
[ack] = yield self.tx_helper.wait_for_dispatched_events(1)
self.assert_nodes_equal(TUMessage_to_PTNode(message_sent), node_received)
self.assert_ack(ack, message_sent, node_received['id'])

@inlineCallbacks
def test_publish(self):
Expand Down Expand Up @@ -111,6 +127,8 @@ def send(self, data):
send to lower (no lower in this layer)
'''
reactor.callFromThread(self.data_received.put, data)
ack = AckProtocolEntity(_id=data['id'], _class='message')
self.receive(ack.toProtocolTreeNode())

def send_to_transport(self, text, from_address):
'''method to be used in testing'''
Expand Down
84 changes: 61 additions & 23 deletions vxyowsup/whatsapp.py
Expand Up @@ -4,13 +4,11 @@
from twisted.internet.threads import deferToThread

from vumi.transports.base import Transport
from vumi.config import ConfigText
from vumi.config import ConfigText, ConfigDict, ConfigInt
from vumi import log
from vumi.message import TransportUserMessage
from vumi.persist.txredis_manager import TxRedisManager

from yowsup.layers.network import YowNetworkLayer

from yowsup.layers import YowLayerEvent
from yowsup.stacks import YowStackBuilder

from yowsup.layers.interface import YowInterfaceLayer, ProtocolEntityCallback
Expand All @@ -29,6 +27,11 @@ class WhatsAppTransportConfig(Transport.CONFIG_CLASS):
password = ConfigText(
'Password received from WhatsApp on yowsup registration',
static=True)
redis_manager = ConfigDict(
'How to connect to Redis', required=True, static=True)
ack_timeout = ConfigInt(
'Length of time (integer) redis will store message ids in seconds (timeout for receiving acks)',
default=60*60*24, static=True)


class WhatsAppClientDone(Exception):
Expand All @@ -40,13 +43,17 @@ class WhatsAppTransport(Transport):
CONFIG_CLASS = WhatsAppTransportConfig
transport_type = 'whatsapp'

@defer.inlineCallbacks
def setup_transport(self):
config = self.get_static_config()
config = self.config = self.get_static_config()
log.info('Transport starting with: %s' % (config,))

self.redis = yield TxRedisManager.from_config(config.redis_manager)
self.redis = self.redis.sub_manager(self.transport_name)

CREDENTIALS = (config.phone, config.password)

stack_client = self.stack_client = StackClient(CREDENTIALS, self)

self.client_d = deferToThread(stack_client.client_start)
self.client_d.addErrback(self.catch_exit)
self.client_d.addErrback(self.print_error)
Expand All @@ -56,16 +63,27 @@ def teardown_transport(self):
print "Stopping client ..."
self.stack_client.client_stop()
yield self.client_d
yield self.redis._close()
print "Loop done."

def handle_outbound_message(self, message):
# message is a vumi.message.TransportUserMessage
log.info('Sending %r' % (message.to_json(),))
self.stack_client.send_to_stack(
message['content'], message['to_addr'] + '@s.whatsapp.net')
# TODO: set the remote-message-id to something more useful
return self.publish_ack(
message['message_id'], 'remote-message-id')
msg = TextMessageProtocolEntity(message['content'], to=message['to_addr'] + '@s.whatsapp.net')
self.redis.setex(msg.getId(), self.config.ack_timeout, message['message_id'])
self.stack_client.send_to_stack(msg)

@defer.inlineCallbacks
def _send_ack(self, whatsapp_id):
vumi_id = yield self.redis.get(whatsapp_id)
yield self.publish_ack(user_message_id=vumi_id, sent_message_id=whatsapp_id)

@defer.inlineCallbacks
def _send_delivery_report(self, whatsapp_id):
vumi_id = yield self.redis.get(whatsapp_id)
yield self.publish_delivery_report(user_message_id=vumi_id, delivery_status='delivered')
# safe to remove key from redis here?
# would be nice to remove to prevent sending delivery report for both delivered and 'read'

def catch_exit(self, f):
f.trap(WhatsAppClientDone)
Expand Down Expand Up @@ -93,8 +111,7 @@ def __init__(self, credentials, transport):

def client_start(self):

self.stack.broadcastEvent(YowLayerEvent(
YowNetworkLayer.EVENT_STATE_CONNECT))
self.whatsapp_interface.connect()

self.stack.loop(discrete=0, count=1, timeout=1)

Expand All @@ -103,18 +120,17 @@ def client_stop(self):

def _stop():
print "Sending disconnect ..."
self.stack.broadcastEvent(YowLayerEvent(
YowNetworkLayer.EVENT_STATE_DISCONNECT))
self.whatsapp_interface.disconnect()

def _kill():
raise WhatsAppClientDone("We are exiting NOW!")

self.stack.execDetached(_stop)
self.stack.execDetached(_kill)

def send_to_stack(self, text, to_address):
def send_to_stack(self, msg):
def send():
self.whatsapp_interface.send_to_human(text, to_address)
self.whatsapp_interface.send_to_human(msg)
self.stack.execDetached(send)


Expand All @@ -124,9 +140,8 @@ def __init__(self, transport):
super(WhatsAppInterface, self).__init__()
self.transport = transport

def send_to_human(self, text, to_address):
message = TextMessageProtocolEntity(text, to=to_address)
self.toLower(message)
def send_to_human(self, msg):
self.toLower(msg)

@ProtocolEntityCallback("message")
def onMessage(self, messageProtocolEntity):
Expand All @@ -137,16 +152,39 @@ def onMessage(self, messageProtocolEntity):
messageProtocolEntity.getId(),
from_address + '@s.whatsapp.net', 'read',
messageProtocolEntity.getParticipant())

self.toLower(receipt)

print "You have received a message, and thusly sent a receipt"
print "You are now sending a reply"
# self.send_to_human('iiii', from_address + '@s.whatsapp.net', "this transport's gone rogue")

reactor.callFromThread(self.transport.publish_message,
from_addr=from_address, content=body, to_addr=None,
transport_type=self.transport.transport_type,
to_addr_type=TransportUserMessage.AT_MSISDN)

@ProtocolEntityCallback("receipt")
def onReceipt(self, entity):
ack = OutgoingAckProtocolEntity(
entity.getId(), "receipt", "delivery", entity.getFrom())
'''receives confirmation of delivery to human'''
# shady?
print "The user you attempted to contact has received the message"
print "You are sending an acknowledgement of their accomplishment"
print entity.getType()
ack = OutgoingAckProtocolEntity(entity.getId(), "receipt", entity.getType(), entity.getFrom())
self.toLower(ack)
# if receipt means that it got delivered to the whatsapp user then
# entity.getType() is None
# if receipt means that the user has opened the message then
# entity.getType() is 'read'
# when it is delivered and read simultaneously,
# only one receipt is sent and entity.getType() is 'read'
reactor.callFromThread(self.transport._send_delivery_report, entity.getId())

@ProtocolEntityCallback("ack")
def onAck(self, ack):
'''receives confirmation of delivery to server'''
# sent_message_id: whatsapp id
# user_message_id: vumi_id
print "WhatsApp acknowledges your " + ack.getClass()
if ack.getClass() == "message":
reactor.callFromThread(self.transport._send_ack, ack.getId())

0 comments on commit 7c7fe12

Please sign in to comment.