Skip to content

Commit

Permalink
Add notification support to some send methods. Send ack requests to s…
Browse files Browse the repository at this point in the history
…erver and maintain a sending queue.
  • Loading branch information
tfar authored and Sergey Il'inykh committed Jul 17, 2014
1 parent 6d9fca8 commit 0e98128
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 24 deletions.
66 changes: 66 additions & 0 deletions src/xmpp/xmpp-core/protocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,7 @@ void CoreProtocol::init()
compress_started = false;
sm_started = false;
sm_receive_count = 0;
sm_server_last_handled = 0;
}

void CoreProtocol::reset()
Expand All @@ -641,6 +642,24 @@ void CoreProtocol::reset()
init();
}

void CoreProtocol::sendStanza(const QDomElement &e, bool notify) {
if (isStreamManagementActive()) {
#ifdef XMPP_TEST
if (notify) qDebug() << "Want notification for stanza";
#endif
sm_send_queue.push_back(qMakePair(e, notify));
qDebug() << "sm_send_queue: ";
for (QList<QPair<QDomElement, bool> >::iterator i = sm_send_queue.begin(); i != sm_send_queue.end(); ++i) {
QPair<QDomElement, bool> entry = *i;
qDebug() << "\t" << entry.first.tagName() << " : " << entry.second;
}
if (sm_send_queue.length() > 9 && sm_send_queue.length() % 5 == 0) requestSMAcknowlegement();
sm_ack_last_requested.start();
}
qDebug() << "CoreProtocol::sendStanza";
BasicProtocol::sendStanza(e);
}

void CoreProtocol::startClientOut(const Jid &_jid, bool _oldOnly, bool tlsActive, bool _doAuth, bool _doCompress)
{
jid_ = _jid;
Expand Down Expand Up @@ -909,6 +928,11 @@ bool CoreProtocol::streamManagementHandleStanza(const QDomElement &e)
send(e);
event = ESend;
return true;
} else if (s == "a") {
qWarning() << "Received ack response from server";
processSMAcknowlegement(e.attribute("h").toULong());
event = EAck;
return true;
} else {
need = NNotify;
notify |= NRecv;
Expand Down Expand Up @@ -966,6 +990,38 @@ void CoreProtocol::markLastMessageStanzaAcked() {
}
}

bool CoreProtocol::isStreamManagementActive() const {
return sm_started;
}

void CoreProtocol::requestSMAcknowlegement() {
qDebug() << "Now I'd request acknowledgement from the server.";
sm_ack_last_requested.start();
sendDirect(QString("<r xmlns='urn:xmpp:sm:2'/>"));
notify |= NTimeout;
need = NNotify;
timeout_sec = 30;
}

int CoreProtocol::getNotableStanzasAcked() {
return sm_stanzas_notify;
}

void CoreProtocol::processSMAcknowlegement(unsigned long last_handled) {
int handled_stanzas = 0;
int notifies = 0;
if (sm_server_last_handled == 0) handled_stanzas = last_handled + 1;
else handled_stanzas = last_handled - sm_server_last_handled;
sm_server_last_handled = last_handled;

for (int n = 0; n < handled_stanzas && !sm_send_queue.isEmpty() ; ++n) {
QPair<QDomElement, bool> entry = sm_send_queue.first();
sm_send_queue.pop_front();
if (entry.second) notifies++;
}
sm_stanzas_notify = notifies;
}

bool CoreProtocol::grabPendingItem(const Jid &to, const Jid &from, int type, DBItem *item)
{
for(QList<DBItem>::Iterator it = dbpending.begin(); it != dbpending.end(); ++it) {
Expand Down Expand Up @@ -1767,12 +1823,22 @@ bool CoreProtocol::normalStep(const QDomElement &e)
if(e.namespaceURI() == NS_STREAM_MANAGEMENT && e.localName() == "enabled") {
qWarning() << "Stream Management enabled";
sm_started = true;
sm_ack_last_requested.start();
notify |= NTimeout;
need = NNotify;
timeout_sec = 30;
event = EReady;
step = Done;
return true;
}
}

if (isStreamManagementActive()) {
if (sm_ack_last_requested.elapsed() >= 30000) {
requestSMAcknowlegement();
}
}

if(isReady() && !e.isNull()) {
if(isValidStanza(e)) {
stanzaToRecv = e;
Expand Down
19 changes: 17 additions & 2 deletions src/xmpp/xmpp-core/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
#ifndef PROTOCOL_H
#define PROTOCOL_H

#include <qpair.h>
#include <QObject>
//Added by qt3to4:
#include <QList>
#include <QPair>
#include <QTimer>

#include "xmlprotocol.h"
#include "xmpp.h"

Expand Down Expand Up @@ -126,6 +128,7 @@ namespace XMPP
EStanzaReady, // a stanza was received
EStanzaSent, // a stanza was sent
EReady, // stream is ready for stanza use
EAck, // received SM ack response from server
ECustom = XmlProtocol::ECustom+10
};
enum Error {
Expand Down Expand Up @@ -261,6 +264,9 @@ namespace XMPP

void reset();

// reimplemented to do SM
void sendStanza(const QDomElement &e, bool notify = false);

void startClientOut(const Jid &jid, bool oldOnly, bool tlsActive, bool doAuth, bool doCompression);
void startServerOut(const QString &to);
void startDialbackOut(const QString &to, const QString &from);
Expand All @@ -282,6 +288,9 @@ namespace XMPP
void markStanzaHandled(unsigned long id);
void markLastMessageStanzaAcked();

bool isStreamManagementActive() const;
int getNotableStanzasAcked();

// input
QString user, host;

Expand All @@ -302,7 +311,6 @@ namespace XMPP
QString key, id;
bool ok;
};

private:
enum Step {
Start,
Expand Down Expand Up @@ -331,7 +339,12 @@ namespace XMPP
QList<DBItem> dbrequests, dbpending, dbvalidated;

QList<QPair<unsigned long, bool> > sm_receive_queue;
QList<QPair<QDomElement, bool> > sm_send_queue;
unsigned long sm_receive_count;
QTime sm_ack_last_requested;
unsigned long sm_server_last_handled;
int sm_stanzas_notify;

bool server, dialback, dialback_verify;
int step;

Expand All @@ -358,6 +371,8 @@ namespace XMPP
bool dialbackStep(const QDomElement &e);

unsigned long getSMLastHandledId();
void requestSMAcknowlegement();
void processSMAcknowlegement(unsigned long last_handled);

// reimplemented
bool stepAdvancesParser() const;
Expand Down
29 changes: 25 additions & 4 deletions src/xmpp/xmpp-core/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
#include "td.h"
#endif

//#define XMPP_DEBUG
#define XMPP_DEBUG

using namespace XMPP;

Expand Down Expand Up @@ -233,6 +233,7 @@ class ClientStream::Private

QList<Stanza*> in;

QTimer timeout_timer;
QTimer noopTimer;
int noop_time;
};
Expand Down Expand Up @@ -281,6 +282,8 @@ ClientStream::ClientStream(const QString &host, const QString &defRealm, ByteStr
//d->state = Connecting;
//d->jid = Jid();
//d->server = QString();

connect(&(d->timeout_timer), SIGNAL(timeout()), SLOT(sm_timeout()));
}

ClientStream::~ClientStream()
Expand Down Expand Up @@ -579,10 +582,10 @@ Stanza ClientStream::read()
}
}

void ClientStream::write(const Stanza &s)
void ClientStream::write(const Stanza &s, bool notify)
{
if(d->state == Active) {
d->client.sendStanza(s.element());
d->client.sendStanza(s.element(), notify);
processNext();
}
}
Expand Down Expand Up @@ -974,6 +977,13 @@ void ClientStream::processNext()
continue;
return;
}
if (d->client.notify & CoreProtocol::NTimeout ) {
qDebug() << "Time = " + d->client.timeout_sec;
d->timeout_timer.start(d->client.timeout_sec * 1000);
d->timeout_timer.setSingleShot(true);
d->client.notify &= ~ CoreProtocol::NTimeout;
qDebug() << "\tNTimeout received";
}

int event = d->client.event;
d->notify = 0;
Expand Down Expand Up @@ -1083,6 +1093,9 @@ void ClientStream::processNext()
delayedCloseFinished();
return;
}
case CoreProtocol::EAck: {
qDebug() << "Received ack response: " << d->client.getNotableStanzasAcked();
}
}
}
}
Expand Down Expand Up @@ -1224,6 +1237,14 @@ int ClientStream::convertedSASLCond() const
return 0;
}

void ClientStream::sm_timeout() {
#ifdef XMPP_DEBUG
printf("ClientStream::sm_timeout()\n");
#endif
d->client.timeout_sec = 0;
processNext();
}

void ClientStream::doNoop()
{
if(d->state == Active) {
Expand All @@ -1237,7 +1258,7 @@ void ClientStream::doNoop()

// SM stuff
bool ClientStream::isStreamManagementActive() {
return true; //TODO: do actual detection here
return d->client.isStreamManagementActive();
}

void ClientStream::ackLastMessageStanza() {
Expand Down
5 changes: 3 additions & 2 deletions src/xmpp/xmpp-core/xmlprotocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ namespace XMPP
};
enum Notify {
NSend = 0x01, // need to know if data has been written
NRecv = 0x02 // need incoming data
NRecv = 0x02, // need incoming data
NTimeout = 0x04 // need to know when time passed
};

XmlProtocol();
Expand All @@ -68,7 +69,7 @@ namespace XMPP
bool processStep();

// set these before returning from a step
int need, event, errorCode, notify;
int need, event, errorCode, notify, timeout_sec;

inline bool isIncoming() const { return incoming; }
QString xmlEncoding() const;
Expand Down
4 changes: 3 additions & 1 deletion src/xmpp/xmpp-core/xmpp_clientstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ namespace XMPP
void close();
bool stanzaAvailable() const;
Stanza read();
void write(const Stanza &s);
void write(const Stanza &s, bool notify = false);

int errorCondition() const;
QString errorText() const;
Expand Down Expand Up @@ -190,6 +190,8 @@ namespace XMPP
void sasl_authenticated();
void sasl_error();

void sm_timeout();

void doNoop();
void doReadyRead();

Expand Down
2 changes: 1 addition & 1 deletion src/xmpp/xmpp-core/xmpp_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ namespace XMPP
virtual void close()=0;
virtual bool stanzaAvailable() const=0;
virtual Stanza read()=0;
virtual void write(const Stanza &s)=0;
virtual void write(const Stanza &s, bool notify = false)=0;

virtual int errorCondition() const=0;
virtual QString errorText() const=0;
Expand Down
17 changes: 11 additions & 6 deletions src/xmpp/xmpp-im/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,8 @@ QString Client::groupChatNick(const QString &host, const QString &room) const
}

bool Client::isStreamManagementActive() const {
return qobject_cast<ClientStream*>(d->stream);
ClientStream *cs = qobject_cast<ClientStream*>(d->stream);
return cs->isStreamManagementActive();
}


Expand Down Expand Up @@ -603,7 +604,7 @@ void Client::distribute(const QDomElement &x)
}
}

void Client::send(const QDomElement &x)
void Client::send(const QDomElement &x, bool want_notify)
{
if(!d->stream)
return;
Expand All @@ -622,13 +623,13 @@ void Client::send(const QDomElement &x)
//printf("bad stanza??\n");
return;
}

QString out = s.toString();
qWarning() << "Out: " << out;
debug(QString("Client: outgoing: [\n%1]\n").arg(out));
emit xmlOutgoing(out);

//printf("x[%s] x2[%s] s[%s]\n", Stream::xmlToString(x).toLatin1(), Stream::xmlToString(e).toLatin1(), s.toString().toLatin1());
d->stream->write(s);
d->stream->write(s, want_notify);
}

void Client::send(const QString &str)
Expand Down Expand Up @@ -988,9 +989,9 @@ void Client::importRosterItem(const RosterItem &item)
debug(dstr + str);
}

void Client::sendMessage(const Message &m)
void Client::sendMessage(const Message &m, bool want_notify)
{
JT_Message *j = new JT_Message(rootTask(), m);
JT_Message *j = new JT_Message(rootTask(), m, want_notify);
j->go(true);
}

Expand Down Expand Up @@ -1169,6 +1170,10 @@ void Client::handleIncoming(BSConnection *c)
d->ftman->stream_incomingReady(c);
}

void Client::handleSMAckResponse(int h) {
qDebug() << "handleSMAckResponse: h = " << h;
}

//---------------------------------------------------------------------------
// LiveRosterItem
//---------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 0e98128

Please sign in to comment.