Skip to content

Commit

Permalink
twitch low latency prototype
Browse files Browse the repository at this point in the history
  • Loading branch information
rakslice committed Sep 20, 2018
1 parent 8a75b31 commit b4b5fa6
Show file tree
Hide file tree
Showing 8 changed files with 437 additions and 4 deletions.
2 changes: 2 additions & 0 deletions orion.pro
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ SOURCES += src/main.cpp\
src/model/channel.cpp \
src/util/fileutils.cpp \
src/network/networkmanager.cpp \
src/network/prefetchstream.cpp \
src/model/game.cpp \
src/util/jsonparser.cpp \
src/model/channellistmodel.cpp \
Expand All @@ -43,6 +44,7 @@ HEADERS += src/model/channel.h \
src/model/channelmanager.h \
src/util/fileutils.h \
src/network/networkmanager.h \
src/network/prefetchstream.h \
src/model/game.h \
src/util/jsonparser.h \
src/model/channellistmodel.h \
Expand Down
2 changes: 2 additions & 0 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ int main(int argc, char *argv[])
QNetworkProxyFactory::setUseSystemConfiguration(true);
NetworkManager::initialize(engine.networkAccessManager());

HttpServer::getInstance()->setNetworkAccessManager(engine.networkAccessManager());

#ifndef Q_OS_ANDROID
//Single application solution
QLockFile lockfile(QDir::temp().absoluteFilePath("wz0dPKqHv3vX0BBsUFZt.lock"));
Expand Down
36 changes: 36 additions & 0 deletions src/network/httpserver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ HttpServer *HttpServer::getInstance() {
return instance;
}

void HttpServer::setNetworkAccessManager(QNetworkAccessManager * networkAccessManager) {
this->networkAccessManager = networkAccessManager;
}

QString HttpServer::port() {
return m_port;
}
Expand Down Expand Up @@ -43,6 +47,13 @@ void HttpServer::start() {
}

void HttpServer::stop() {
while (!prefetchStreams.empty()) {
PrefetchStream * stream = prefetchStreams.last();
prefetchStreams.pop_back();
stream->stop();
stream->deleteLater();
}

if (server) {
qDebug() << "Stopping server";
server->deleteLater();
Expand All @@ -56,6 +67,16 @@ void HttpServer::onConnect() {
connect(socket, &QTcpSocket::readyRead, this, &HttpServer::onRead);
}

static const QString PREFETCHSTREAM_PATH = "/prefetchstream/";


void HttpServer::prefetchStreamDied() {
PrefetchStream* deadStream = qobject_cast<PrefetchStream *>(sender());
qDebug() << "prefetch stream died, deleting";
prefetchStreams.removeAll(deadStream);
deadStream->deleteLater();
}

void HttpServer::onRead() {
qDebug() << "Reading request...";
QTcpSocket *socket = (QTcpSocket*) this->sender();
Expand All @@ -68,6 +89,21 @@ void HttpServer::onRead() {
if (tokens.length() >= 1) {
QString params = tokens[1];

if (params.startsWith(PREFETCHSTREAM_PATH)) {
// for prefetch streams don't just delete the socket on disconnect
disconnect(socket, &QTcpSocket::disconnected, socket, &QObject::deleteLater);

QString prefetchStreamUrl = params.mid(PREFETCHSTREAM_PATH.length());

PrefetchStream * newPrefetchStream = new PrefetchStream(socket, prefetchStreamUrl, networkAccessManager, this);
prefetchStreams.append(newPrefetchStream);
connect(newPrefetchStream, &PrefetchStream::died, this, &HttpServer::prefetchStreamDied);
newPrefetchStream->start();
return;
}

qDebug() << "GET params" << params;

//params to map
QMap<QString,QString> map;
params = params.mid(params.indexOf("?")+1);
Expand Down
11 changes: 11 additions & 0 deletions src/network/httpserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
#include <QDataStream>
#include <QMap>
#include <QDebug>
#include <QList>

#include "../model/singletonprovider.h"
#include "prefetchstream.h"

class HttpServer: public QObject
{
Expand All @@ -22,13 +24,19 @@ class HttpServer: public QObject
static HttpServer *instance;

explicit HttpServer(QObject *parent = 0);

QList<PrefetchStream *> prefetchStreams;

QNetworkAccessManager * networkAccessManager;
public:
static HttpServer *getInstance();

Q_INVOKABLE QString port();

bool isOk() const;

void setNetworkAccessManager(QNetworkAccessManager * networkAccessManager);

public slots:
// starts server
void start();
Expand All @@ -40,6 +48,9 @@ public slots:

void onRead();

private slots:
void prefetchStreamDied();

signals:
void codeReceived(QString code);
void error();
Expand Down
265 changes: 265 additions & 0 deletions src/network/prefetchstream.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,265 @@
#include "prefetchstream.h"

#include <QtNetwork/QNetworkReply>
#include <QtNetwork/QNetworkAccessManager>
#include <QPointer>

PrefetchStream::PrefetchStream(QTcpSocket * socket, QString streamUrl, QNetworkAccessManager *operation, QObject * parent) : QObject(parent),
socket(socket), streamUrl(QUrl(streamUrl)), operation(operation),
alive(true), consecutiveEmptyPlaylists(0), readBlockSize(4096), currentFragmentBytesSoFar(0)
{
nextPlaylistTimer.setInterval(1900);
nextPlaylistTimer.setSingleShot(true);

connect(&nextPlaylistTimer, &QTimer::timeout, this, &PrefetchStream::timeForNextPlaylist);
}

// FIXME there is a multi-free bug related to the upstream socket or
// downstream network request around here somewhere
// I've disabled a bunch of teardown, so that we leak memory and maybe handles but
// we don't crash, for prototype purposes.

PrefetchStream::~PrefetchStream() {
//stop();
}

void PrefetchStream::start() {
requestPlaylist(true);
}

void PrefetchStream::requestPlaylist(bool first)
{
QNetworkRequest request;

qDebug() << "fetching playlist"; // << streamUrl;

request.setUrl(streamUrl);

// squirrel away a bit of state
request.setAttribute(QNetworkRequest::User, first? 1 : 0);

QNetworkReply *reply = operation->get(request);

connect(reply, &QNetworkReply::finished, this, &PrefetchStream::handlePlaylistResponse);
}

void PrefetchStream::handlePlaylistResponse() {
QNetworkReply* reply = qobject_cast<QNetworkReply *>(sender());

qDebug() << "playlist response";

if (reply->error() != QNetworkReply::NoError) {

qDebug() << "playlist request got HTTP error" << reply->error();
stop();
return;
}

QByteArray data = reply->readAll();

bool firstRequest = reply->request().attribute(QNetworkRequest::User).toULongLong() == 1;

reply->deleteLater();

QStringList fragments = getPrefetchUrls(data);

if (firstRequest) {
/* We still need to decide whether this is a valid request or not and serve headers */
if (fragments.length() > 0) {
/* Looks like a realtime stream, start the merry-go-round */
QString response = "HTTP/1.1 200 OK\r\n";
response += "Content-Type: video/MP2T\r\n";
response += "Connection: Closed\r\n";
response += "\r\n";
trySendData(response.toUtf8());
}
else {
qDebug() << "giving not found error";
/* This isn't a realtime stream as far as we can tell; just return a 404 */
QString response = "HTTP/1.1 404 Not Found\r\n";
response += "Content-Type: text/plain; charset=utf-8\r\n";
response += "Connection: Closed\r\n";
response += "\r\n";
response += "Can't find a real-time stream at that location\r\n";
trySendData(response.toUtf8());
doneWithSocket();
return;
}
}

// now it's a valid stream in progress with the next fragments ready to request

if (fragments.length() == 0) {
consecutiveEmptyPlaylists += 1;
}
else {
consecutiveEmptyPlaylists = 0;
}

if (consecutiveEmptyPlaylists > 3) {
qDebug() << "too many empty playlists in a row, treating this stream as over";
doneWithSocket();
return;
}

// find the position of the last fragment served in the current fragment list
int lastFragmentProcessedPos = fragments.indexOf(lastFragmentProcessed);

qDebug() << "last playlist processed at offset" << lastFragmentProcessedPos;

// process the next fragment we recognize the previous one, or else the first fragment
int nextFragmenttoProcess = lastFragmentProcessedPos + 1;

if (nextFragmenttoProcess < fragments.length()) {
QString fragment = fragments.at(nextFragmenttoProcess);

lastFragmentProcessed = fragment;

//qDebug() << "check pos " << nextFragmenttoProcess << " should be " << fragments.indexOf(lastFragmentProcessed);

requestAndSendFragment(fragment);
return;
}

// there was nothing to do, we'll need to set a timer so we check the playlist again
setupNextPlaylistTimer();
}

void PrefetchStream::trySendData(const QByteArray & data) {
if (alive) {
qint64 result = socket->write(data);
if (result == -1) {
qDebug() << "an error occurred while writing";
stop();
}
}
}

void PrefetchStream::requestAndSendFragment(const QString &url) {
currentFragmentBytesSoFar = 0;

QNetworkRequest request;

request.setUrl(QUrl(url));

QNetworkReply * reply = operation->get(request);

connect(reply, &QNetworkReply::readyRead, this, &PrefetchStream::handleFragmentPart);
connect(reply, &QNetworkReply::finished, this, &PrefetchStream::handleFragmentFinished);
connect(reply, &QNetworkReply::aboutToClose, this, &PrefetchStream::handleFragmentAboutToClose);
connect(reply, QOverload<QNetworkReply::NetworkError>::of(&QNetworkReply::error), this, &PrefetchStream::handleFragmentError);
}


void PrefetchStream::handleFragmentPart() {

//qDebug() << "block at offset" << currentFragmentBytesSoFar;

QPointer<QNetworkReply> reply = qobject_cast<QNetworkReply *>(sender());

if (reply->error() != QNetworkReply::NoError) {

qDebug() << "fragment reply HTTP error" << reply->error();
stop();
return;
}

QByteArray data = reply->read(readBlockSize);

int length = data.length();
//qDebug() << "Cur data is" << length;

//qDebug() << "alive?" << alive;

currentFragmentBytesSoFar += length;

//qDebug() << "socket is open?" << socket->isOpen();

trySendData(data);
}

void PrefetchStream::handleFragmentFinished() {

qDebug() << "finished fragment" << currentFragmentBytesSoFar << "bytes";

QNetworkReply* reply = qobject_cast<QNetworkReply *>(sender());

if (alive) {
/* Based on ~ 2s fragments we are shooting for ~ 60ms blocks */
if (currentFragmentBytesSoFar / 32 > readBlockSize) {
qDebug() << "read block size too small, adjusting";
while (currentFragmentBytesSoFar / 32 > readBlockSize) {
readBlockSize *= 2;
}
qDebug() << "new read block size" << readBlockSize;
}

// continue the stream by requesting the latest playlist
requestPlaylist(false);
}

reply->deleteLater();
}

void PrefetchStream::handleFragmentAboutToClose() {
qDebug() << "fragment about to close";
}

void PrefetchStream::handleFragmentError(QNetworkReply::NetworkError code) {
qDebug() << "fragment error handler code" << code;
}

void PrefetchStream::setupNextPlaylistTimer() {
qDebug() << "setting timer for next playlist";
nextPlaylistTimer.start();
}

void PrefetchStream::timeForNextPlaylist() {
qDebug() << "hit next playlist timer";
requestPlaylist(false);
}

QStringList PrefetchStream::getPrefetchUrls(const QByteArray & data) {
QStringList out;

const QByteArray PREFETCH_PREFIX = "#EXT-X-TWITCH-PREFETCH:";

for (const QByteArray & line: data.split('\n')) {
if (line.startsWith(PREFETCH_PREFIX)) {
QByteArray url = line.mid(PREFETCH_PREFIX.length());
out.append(url);
}
}

qDebug() << "got" << out.length() << "prefetch urls";

return out;
}

void PrefetchStream::doneWithSocket() {
if (alive) {
qDebug() << "prefetch stream client socket ending normally";

socket->waitForBytesWritten();
socket->disconnectFromHost();
alive = false;

emit died();
}
}


void PrefetchStream::stop() {
if (alive) {
qDebug() << "prefetch stream stopped";
// FIXME for now let's just close the socket unannounced
alive = false;

socket->close();

emit died();
}
}



Loading

0 comments on commit b4b5fa6

Please sign in to comment.