Skip to content

Commit

Permalink
More work on remote source
Browse files Browse the repository at this point in the history
  • Loading branch information
dragorn committed Jun 15, 2017
1 parent 51e3db2 commit 1b48871
Showing 1 changed file with 151 additions and 1 deletion.
152 changes: 151 additions & 1 deletion datasourcetracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#include "config.hpp"

#include <string.h>

#include "configfile.h"
#include "getopt.h"
#include "datasourcetracker.h"
Expand All @@ -31,6 +33,7 @@
#include "pcapng_stream_ringbuf.h"
#include "streamtracker.h"
#include "kis_httpd_registry.h"
#include "endian_magic.h"

DST_DatasourceProbe::DST_DatasourceProbe(GlobalRegistry *in_globalreg,
string in_definition, SharedTrackerElement in_protovec) {
Expand Down Expand Up @@ -1655,7 +1658,7 @@ dst_incoming_remote::dst_incoming_remote(GlobalRegistry *in_globalreg,
timerid =
timetracker->RegisterTimer(SERVER_TIMESLICES_SEC * 10, NULL, 0,
[this] (int) -> int {
_MSG("Remote source connected but never sent a NEWSOURCE control, "
_MSG("Remote source connected but didn't send a NEWSOURCE control, "
"closing connection.", MSGFLAG_ERROR);
rbuf_handler->ProtocolError();
delete(this);
Expand All @@ -1664,6 +1667,7 @@ dst_incoming_remote::dst_incoming_remote(GlobalRegistry *in_globalreg,
}

dst_incoming_remote::~dst_incoming_remote() {
fprintf(stderr, "~dst_incoming_remote()\n");
shared_ptr<Timetracker> timetracker = globalreg->FetchGlobalAs<Timetracker>("TIMETRACKER");

// Kill the error timer
Expand All @@ -1677,9 +1681,155 @@ dst_incoming_remote::~dst_incoming_remote() {

void dst_incoming_remote::BufferAvailable(size_t in_amt) {
fprintf(stderr, "debug - dst - incoming remote - buffer available %lu\n", in_amt);
// Handle reading raw frames off the incoming buffer, but we only look for the
// NEWSOURCE command; any other frame is an error.

simple_cap_proto_frame_t *frame;
uint8_t *buf;
uint32_t frame_sz;
uint32_t header_checksum, data_checksum, calc_checksum;

string definition;
string srctype;

while (1) {
if (rbuf_handler == NULL)
return;

size_t buffamt = rbuf_handler->GetReadBufferUsed();
if (buffamt < sizeof(simple_cap_proto_t)) {
return;
}

// Allocate as much as we can and peek it from the buffer
buf = new uint8_t[buffamt];
rbuf_handler->PeekReadBufferData(buf, buffamt);

// Turn it into a frame header
frame = (simple_cap_proto_frame_t *) buf;

if (kis_ntoh32(frame->header.signature) != KIS_CAP_SIMPLE_PROTO_SIG) {
delete[] buf;
_MSG("Got an invalid remote data source connection, disconnecting.",
MSGFLAG_ERROR);
rbuf_handler->ProtocolError();
return;
}

// Get the frame header checksum and validate it; to validate we need to clear
// both the frame and the data checksum fields so remember them both now
header_checksum = kis_ntoh32(frame->header.header_checksum);
data_checksum = kis_ntoh32(frame->header.data_checksum);

// Zero the checksum field in the packet
frame->header.header_checksum = 0;
frame->header.data_checksum = 0;

// Calc the checksum of the header
calc_checksum = Adler32Checksum((const char *) frame,
sizeof(simple_cap_proto_t));

// Compare to the saved checksum
if (calc_checksum != header_checksum) {
delete[] buf;

_MSG("Got an invalid remote data source connection, invalid checksum, "
"disconnecting.", MSGFLAG_ERROR);
rbuf_handler->ProtocolError();

return;
}

// Get the size of the frame
frame_sz = kis_ntoh32(frame->header.packet_sz);

if (frame_sz > buffamt) {
// Nothing we can do right now, not enough data to
// make up a complete packet.
delete[] buf;
return;
}

// Calc the checksum of the rest
calc_checksum = Adler32Checksum((const char *) buf, frame_sz);

// Compare to the saved checksum
if (calc_checksum != data_checksum) {
delete[] buf;

_MSG("Got an invalid remote data source connection, invalid checksum, "
"disconnecting.", MSGFLAG_ERROR);
rbuf_handler->ProtocolError();

return;
}

// Consume the packet in the ringbuf
rbuf_handler->GetReadBufferData(NULL, frame_sz);

// Check the header type
if (strncmp(frame->header.type, "NEWSOURCE", 16) != 0) {
delete[] buf;

_MSG("Got an invalid remote data source connection, invalid frame "
"(expected NEWSOURCE), disconnecting.", MSGFLAG_ERROR);
rbuf_handler->ProtocolError();

return;
}

size_t data_offt = 0;
for (unsigned int kvn = 0; kvn < kis_ntoh32(frame->header.num_kv_pairs); kvn++) {

if (frame_sz < sizeof(simple_cap_proto_t) +
sizeof(simple_cap_proto_kv_t) + data_offt) {
delete[] buf;

_MSG("Got an invalid remote data source connection, invalid frame "
"(KV too long for frame), disconnecting.", MSGFLAG_ERROR);
rbuf_handler->ProtocolError();

return;
}

simple_cap_proto_kv_t *pkv =
(simple_cap_proto_kv_t *) &((frame->data)[data_offt]);

data_offt +=
sizeof(simple_cap_proto_kv_h_t) + kis_ntoh32(pkv->header.obj_sz);

// We only care about 2 KV types but will skip the rest
if (strncmp(pkv->header.key, "DEFINITION", 16) == 0) {
definition = string((char *) pkv->object, kis_ntoh32(pkv->header.obj_sz));
} else if (strncmp(pkv->header.key, "SOURCETYPE", 16) == 0) {
srctype = string((char *) pkv->object, kis_ntoh32(pkv->header.obj_sz));
}
}

delete[] buf;

if (definition == "") {
_MSG("Got an invalid remote data source connection, invalid frame "
"(missing DEFINITION kv), disconnecting.", MSGFLAG_ERROR);
rbuf_handler->ProtocolError();

return;
}

if (srctype == "") {
_MSG("Got an invalid remote data source connection, invalid frame "
"(missing DEFINITION kv), disconnecting.", MSGFLAG_ERROR);
rbuf_handler->ProtocolError();

return;

}

}
}

void dst_incoming_remote::BufferError(string in_error) {
fprintf(stderr, "debug - dst - incoming remote - buffer error - %s\n", in_error.c_str());
delete(this);
}

0 comments on commit 1b48871

Please sign in to comment.