Skip to content

Commit

Permalink
MT step
Browse files Browse the repository at this point in the history
  • Loading branch information
peak3d committed Aug 10, 2017
1 parent a50d2b9 commit 82d1d10
Show file tree
Hide file tree
Showing 10 changed files with 104 additions and 65 deletions.
1 change: 0 additions & 1 deletion lib/libbento4/Core/Ap4ByteStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ class AP4_ByteStream : public AP4_Referenceable
virtual AP4_Result CopyTo(AP4_ByteStream& stream, AP4_LargeSize size);
virtual AP4_Result Buffer() { return AP4_SUCCESS; }
virtual AP4_Result Flush() { return AP4_SUCCESS; }
virtual const AP4_UI08 *GetBuffer(AP4_Size bytes_to_read) { return nullptr; };
private:
AP4_ByteStreamObserver *observer_;
};
Expand Down
50 changes: 37 additions & 13 deletions lib/mpegts/tsDemuxer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,38 +249,44 @@ STREAM_TYPE AVContext::get_stream_type(uint8_t pes_type)

int AVContext::configure_ts()
{
size_t data_size = AV_CONTEXT_PACKETSIZE;
uint64_t pos = av_pos;
int fluts[][2] = {
{FLUTS_NORMAL_TS_PACKETSIZE, 0},
{FLUTS_M2TS_TS_PACKETSIZE, 0},
{FLUTS_DVB_ASI_TS_PACKETSIZE, 0},
{FLUTS_ATSC_TS_PACKETSIZE, 0}
};

uint8_t data[AV_CONTEXT_PACKETSIZE];
size_t data_size(0);

int nb = sizeof (fluts) / (2 * sizeof (int));
int score = TS_CHECK_MIN_SCORE;

for (int i = 0; i < MAX_RESYNC_SIZE; i++)
{
const unsigned char* data = m_demux->ReadAV(pos, data_size);
if (!data)
if (!data_size)
data_size = m_demux->ReadAV(pos, data, AV_CONTEXT_PACKETSIZE) ? AV_CONTEXT_PACKETSIZE : 0;

if (!data_size)
return AVCONTEXT_IO_ERROR;
if (data[0] == 0x47)

if (data[AV_CONTEXT_PACKETSIZE - data_size] == 0x47)
{
int count, found;
for (int t = 0; t < nb; t++) // for all fluts
{
const unsigned char* ndata;
unsigned char ndata;
uint64_t npos = pos;
int do_retry = score; // Reach for score
do
{
--do_retry;
npos += fluts[t][0];
if (!(ndata = m_demux->ReadAV(npos, data_size)))
if (!m_demux->ReadAV(npos, &ndata, 1))
return AVCONTEXT_IO_ERROR;
}
while (ndata[0] == 0x47 && (++fluts[t][1]) && do_retry);
while (ndata == 0x47 && (++fluts[t][1]) && do_retry);
}
// Is score reached ?
count = found = 0;
Expand Down Expand Up @@ -308,10 +314,16 @@ int AVContext::configure_ts()
break;
// None: Bad sync. Shift and retry
else
{
--data_size;
pos++;
}
}
else
{
--data_size;
pos++;
}
}

DBG(DEMUX_DBG_ERROR, "%s: invalid stream\n", __FUNCTION__);
Expand All @@ -327,18 +339,30 @@ int AVContext::TSResync()
return ret;
is_configured = true;
}

size_t data_size(0);

for (int i = 0; i < MAX_RESYNC_SIZE; i++)
{
const unsigned char* data = m_demux->ReadAV(av_pos, av_pkt_size);
if (!data)
if (!data_size)
data_size = m_demux->ReadAV(av_pos, av_buf, av_pkt_size) ? av_pkt_size : 0;

if (!data_size)
return AVCONTEXT_IO_ERROR;
if (data[0] == 0x47)

if (av_buf[av_pkt_size - data_size] == 0x47)
{
memcpy(av_buf, data, av_pkt_size);
Reset();
return AVCONTEXT_CONTINUE;
if (data_size != av_pkt_size)
data_size = m_demux->ReadAV(av_pos, av_buf, av_pkt_size) ? av_pkt_size : 0;

if (data_size)
{
Reset();
return AVCONTEXT_CONTINUE;
}
}
av_pos++;
--data_size;
}

return AVCONTEXT_TS_NOSYNC;
Expand Down
2 changes: 1 addition & 1 deletion lib/mpegts/tsDemuxer.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ namespace TSDemux
class TSDemuxer
{
public:
virtual const unsigned char* ReadAV(uint64_t pos, size_t len) = 0;
virtual bool ReadAV(uint64_t pos, unsigned char* buffer, size_t len) = 0;
};

enum {
Expand Down
4 changes: 2 additions & 2 deletions src/TSReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ TSReader::~TSReader()
m_AVContext = nullptr;
}

const unsigned char* TSReader::ReadAV(uint64_t pos, size_t len)
bool TSReader::ReadAV(uint64_t pos, unsigned char * data, size_t len)
{
m_stream->Seek(pos);
return m_stream->GetBuffer(len);
return AP4_SUCCEEDED(m_stream->Read(data, len));
}

void TSReader::Reset(bool resetPackets)
Expand Down
2 changes: 1 addition & 1 deletion src/TSReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class TSReader : public TSDemux::TSDemuxer

bool Initialize();

virtual const unsigned char* ReadAV(uint64_t pos, size_t len) override;
virtual bool ReadAV(uint64_t pos, unsigned char * data, size_t len) override;

void Reset(bool resetPackets = true);
bool StartStreaming(AP4_UI32 typeMask);
Expand Down
90 changes: 54 additions & 36 deletions src/common/AdaptiveStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ using namespace adaptive;
AdaptiveStream::AdaptiveStream(AdaptiveTree &tree, AdaptiveTree::StreamType type)
:tree_(tree)
, type_(type)
, observer_(0)
, current_period_(tree_.periods_.empty() ? 0 : tree_.periods_[0])
, current_adp_(0)
, current_rep_(0)
, observer_(nullptr)
, current_period_(tree_.periods_.empty() ? nullptr : tree_.periods_[0])
, current_adp_(nullptr)
, current_rep_(nullptr)
, current_seg_(nullptr)
, loading_seg_(nullptr)
, thread_data_(nullptr)
{
}
Expand Down Expand Up @@ -125,15 +127,30 @@ bool AdaptiveStream::download_segment()
void AdaptiveStream::worker()
{
do {
std::unique_lock<std::mutex> lck(thread_data_->mutex_dl_);
thread_data_->signal_dl_.wait(lck);
std::unique_lock<std::mutex> lckdl(thread_data_->mutex_dl_);
thread_data_->signal_dl_.wait(lckdl);

bool ret = download_segment();

//Signal finished download
{
std::lock_guard<std::mutex> lckrw(thread_data_->mutex_rw_);
loading_seg_ = nullptr;
if (!ret)
stopped_ = true;
}
thread_data_->signal_rw_.notify_one();

} while (!thread_data_->thread_stop_);
}


bool AdaptiveStream::write_data(const void *buffer, size_t buffer_size)
{
segment_buffer_ += std::string((const char *)buffer, buffer_size);
{
std::lock_guard<std::mutex> lckrw(thread_data_->mutex_rw_);
segment_buffer_ += std::string((const char *)buffer, buffer_size);
}
thread_data_->signal_rw_.notify_one();
return true;
}

Expand Down Expand Up @@ -237,55 +254,56 @@ bool AdaptiveStream::ensureSegment()
if (stopped_)
return false;

if (segment_read_pos_ >= segment_buffer_.size())
if (!loading_seg_ && segment_read_pos_ >= segment_buffer_.size())
{
current_seg_ = current_rep_->get_next_segment(current_seg_);
if (!download_segment() || segment_buffer_.empty())
if (current_seg_)
{
stopped_ = true;
return false;
loading_seg_ = current_seg_;
thread_data_->signal_dl_.notify_one();
}
else
return false;
}
return true;
}


uint32_t AdaptiveStream::read(void* buffer, uint32_t bytesToRead)
{
if (ensureSegment() && bytesToRead)
{
uint32_t avail = segment_buffer_.size() - segment_read_pos_;
if (avail > bytesToRead)
avail = bytesToRead;
memcpy(buffer, segment_buffer_.data() + segment_read_pos_, avail);

segment_read_pos_ += avail;
absolute_position_ += avail;
return avail;
}
return 0;
}
std::unique_lock<std::mutex> lckrw(thread_data_->mutex_rw_);

const uint8_t *AdaptiveStream::getBuffer(uint32_t bytesToRead)
{
const uint8_t *ret(nullptr);
if (ensureSegment() && bytesToRead)
{
uint32_t avail = segment_buffer_.size() - segment_read_pos_;
if (avail > bytesToRead)
avail = bytesToRead;
while (true)
{
uint32_t avail = segment_buffer_.size() - segment_read_pos_;
if (avail < bytesToRead && loading_seg_)
{
thread_data_->signal_rw_.wait(lckrw);
continue;
}

if (avail >= bytesToRead)
{
if (avail > bytesToRead)
avail = bytesToRead;

if (avail == bytesToRead)
ret = reinterpret_cast<const uint8_t*>(segment_buffer_.data() + segment_read_pos_);
memcpy(buffer, segment_buffer_.data() + segment_read_pos_, avail);

segment_read_pos_ += avail;
absolute_position_ += avail;
segment_read_pos_ += avail;
absolute_position_ += avail;
return avail;
}
return 0;
}
}
return ret;
return 0;
}

bool AdaptiveStream::seek(uint64_t const pos)
{
std::lock_guard<std::mutex> lckrw(thread_data_->mutex_rw_);
// we seek only in the current segment
if (pos >= absolute_position_ - segment_read_pos_)
{
Expand Down
6 changes: 2 additions & 4 deletions src/common/AdaptiveStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,7 @@ namespace adaptive
unsigned int get_type()const{ return type_; };

bool ensureSegment();
uint32_t read(void* buffer,
uint32_t bytesToRead);
const uint8_t *getBuffer(uint32_t bytesToRead);
uint32_t read(void* buffer, uint32_t bytesToRead);
uint64_t tell(){ read(0, 0); return absolute_position_; };
bool seek(uint64_t const pos);
bool seek_time(double seek_seconds, double current_seconds, bool &needReset);
Expand Down Expand Up @@ -113,7 +111,7 @@ namespace adaptive
const AdaptiveTree::Period *current_period_;
const AdaptiveTree::AdaptationSet *current_adp_;
const AdaptiveTree::Representation *current_rep_;
const AdaptiveTree::Segment *current_seg_;
const AdaptiveTree::Segment *current_seg_, *loading_seg_;
//We assume that a single segment can build complete frames
std::string segment_buffer_;
std::map<std::string, std::string> media_headers_;
Expand Down
2 changes: 2 additions & 0 deletions src/common/AdaptiveTree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ namespace adaptive

void AdaptiveTree::set_download_speed(double speed)
{
std::lock_guard<std::mutex> lck(m_mutex);

download_speed_ = speed;
if (!average_download_speed_)
average_download_speed_ = download_speed_;
Expand Down
3 changes: 3 additions & 0 deletions src/common/AdaptiveTree.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <map>
#include <inttypes.h>
#include "expat.h"
#include <mutex>

namespace adaptive
{
Expand Down Expand Up @@ -280,6 +281,8 @@ namespace adaptive
virtual bool download(const char* url, const std::map<std::string, std::string> &manifestHeaders);
virtual bool write_data(void *buffer, size_t buffer_size) = 0;
void SortRepresentations();
private:
std::mutex m_mutex;
};

}
9 changes: 2 additions & 7 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ class KodiHost : public SSD::SSD_HOST
static_cast<kodi::addon::CInstanceVideoCodec*>(instance)->ReleaseFrameBuffer(buffer);
}


private:
std::string m_strProfilePath, m_strLibraryPath;
}kodihost;
Expand Down Expand Up @@ -210,10 +209,6 @@ class AP4_DASHStream : public AP4_ByteStream
/* unimplemented */
return AP4_ERROR_NOT_SUPPORTED;
};
const AP4_UI08 *GetBuffer(AP4_Size bytes_to_read) override
{
return stream_->getBuffer(bytes_to_read);
}
// AP4_Referenceable methods
void AddReference() override {};
void Release()override {};
Expand Down Expand Up @@ -276,9 +271,9 @@ bool KodiAdaptiveStream::download(const char* url, const std::map<std::string, s
file.CURLOpen(OpenFileFlags::READ_CHUNKED | OpenFileFlags::READ_NO_CACHE | OpenFileFlags::READ_AUDIO_VIDEO);

// read the file
char *buf = (char*)malloc(1024*1024);
char *buf = (char*)malloc(32*1024);
size_t nbRead, nbReadOverall = 0;
while ((nbRead = file.Read(buf, 1024 * 1024)) > 0 && ~nbRead && write_data(buf, nbRead)) nbReadOverall+= nbRead;
while ((nbRead = file.Read(buf, 32 * 1024)) > 0 && ~nbRead && write_data(buf, nbRead)) nbReadOverall+= nbRead;
free(buf);

if (!nbReadOverall)
Expand Down

0 comments on commit 82d1d10

Please sign in to comment.