From 82d1d108a1f6d81698901c29d1946ca02bdd7e0b Mon Sep 17 00:00:00 2001 From: peak3d Date: Thu, 10 Aug 2017 18:42:00 +0200 Subject: [PATCH] MT step --- lib/libbento4/Core/Ap4ByteStream.h | 1 - lib/mpegts/tsDemuxer.cpp | 50 ++++++++++++----- lib/mpegts/tsDemuxer.h | 2 +- src/TSReader.cpp | 4 +- src/TSReader.h | 2 +- src/common/AdaptiveStream.cpp | 90 ++++++++++++++++++------------ src/common/AdaptiveStream.h | 6 +- src/common/AdaptiveTree.cpp | 2 + src/common/AdaptiveTree.h | 3 + src/main.cpp | 9 +-- 10 files changed, 104 insertions(+), 65 deletions(-) diff --git a/lib/libbento4/Core/Ap4ByteStream.h b/lib/libbento4/Core/Ap4ByteStream.h index 558f7d363..f94d75de0 100755 --- a/lib/libbento4/Core/Ap4ByteStream.h +++ b/lib/libbento4/Core/Ap4ByteStream.h @@ -88,7 +88,6 @@ class AP4_ByteStream : public AP4_Referenceable virtual AP4_Result CopyTo(AP4_ByteStream& stream, AP4_LargeSize size); virtual AP4_Result Buffer() { return AP4_SUCCESS; } virtual AP4_Result Flush() { return AP4_SUCCESS; } - virtual const AP4_UI08 *GetBuffer(AP4_Size bytes_to_read) { return nullptr; }; private: AP4_ByteStreamObserver *observer_; }; diff --git a/lib/mpegts/tsDemuxer.cpp b/lib/mpegts/tsDemuxer.cpp index 76dfacb0d..073c6fab4 100644 --- a/lib/mpegts/tsDemuxer.cpp +++ b/lib/mpegts/tsDemuxer.cpp @@ -249,7 +249,6 @@ STREAM_TYPE AVContext::get_stream_type(uint8_t pes_type) int AVContext::configure_ts() { - size_t data_size = AV_CONTEXT_PACKETSIZE; uint64_t pos = av_pos; int fluts[][2] = { {FLUTS_NORMAL_TS_PACKETSIZE, 0}, @@ -257,30 +256,37 @@ int AVContext::configure_ts() {FLUTS_DVB_ASI_TS_PACKETSIZE, 0}, {FLUTS_ATSC_TS_PACKETSIZE, 0} }; + + uint8_t data[AV_CONTEXT_PACKETSIZE]; + size_t data_size(0); + int nb = sizeof (fluts) / (2 * sizeof (int)); int score = TS_CHECK_MIN_SCORE; for (int i = 0; i < MAX_RESYNC_SIZE; i++) { - const unsigned char* data = m_demux->ReadAV(pos, data_size); - if (!data) + if (!data_size) + data_size = m_demux->ReadAV(pos, data, AV_CONTEXT_PACKETSIZE) ? AV_CONTEXT_PACKETSIZE : 0; + + if (!data_size) return AVCONTEXT_IO_ERROR; - if (data[0] == 0x47) + + if (data[AV_CONTEXT_PACKETSIZE - data_size] == 0x47) { int count, found; for (int t = 0; t < nb; t++) // for all fluts { - const unsigned char* ndata; + unsigned char ndata; uint64_t npos = pos; int do_retry = score; // Reach for score do { --do_retry; npos += fluts[t][0]; - if (!(ndata = m_demux->ReadAV(npos, data_size))) + if (!m_demux->ReadAV(npos, &ndata, 1)) return AVCONTEXT_IO_ERROR; } - while (ndata[0] == 0x47 && (++fluts[t][1]) && do_retry); + while (ndata == 0x47 && (++fluts[t][1]) && do_retry); } // Is score reached ? count = found = 0; @@ -308,10 +314,16 @@ int AVContext::configure_ts() break; // None: Bad sync. Shift and retry else + { + --data_size; pos++; + } } else + { + --data_size; pos++; + } } DBG(DEMUX_DBG_ERROR, "%s: invalid stream\n", __FUNCTION__); @@ -327,18 +339,30 @@ int AVContext::TSResync() return ret; is_configured = true; } + + size_t data_size(0); + for (int i = 0; i < MAX_RESYNC_SIZE; i++) { - const unsigned char* data = m_demux->ReadAV(av_pos, av_pkt_size); - if (!data) + if (!data_size) + data_size = m_demux->ReadAV(av_pos, av_buf, av_pkt_size) ? av_pkt_size : 0; + + if (!data_size) return AVCONTEXT_IO_ERROR; - if (data[0] == 0x47) + + if (av_buf[av_pkt_size - data_size] == 0x47) { - memcpy(av_buf, data, av_pkt_size); - Reset(); - return AVCONTEXT_CONTINUE; + if (data_size != av_pkt_size) + data_size = m_demux->ReadAV(av_pos, av_buf, av_pkt_size) ? av_pkt_size : 0; + + if (data_size) + { + Reset(); + return AVCONTEXT_CONTINUE; + } } av_pos++; + --data_size; } return AVCONTEXT_TS_NOSYNC; diff --git a/lib/mpegts/tsDemuxer.h b/lib/mpegts/tsDemuxer.h index a5ab50005..22474956f 100644 --- a/lib/mpegts/tsDemuxer.h +++ b/lib/mpegts/tsDemuxer.h @@ -43,7 +43,7 @@ namespace TSDemux class TSDemuxer { public: - virtual const unsigned char* ReadAV(uint64_t pos, size_t len) = 0; + virtual bool ReadAV(uint64_t pos, unsigned char* buffer, size_t len) = 0; }; enum { diff --git a/src/TSReader.cpp b/src/TSReader.cpp index 469e62c9e..52742de50 100644 --- a/src/TSReader.cpp +++ b/src/TSReader.cpp @@ -47,10 +47,10 @@ TSReader::~TSReader() m_AVContext = nullptr; } -const unsigned char* TSReader::ReadAV(uint64_t pos, size_t len) +bool TSReader::ReadAV(uint64_t pos, unsigned char * data, size_t len) { m_stream->Seek(pos); - return m_stream->GetBuffer(len); + return AP4_SUCCEEDED(m_stream->Read(data, len)); } void TSReader::Reset(bool resetPackets) diff --git a/src/TSReader.h b/src/TSReader.h index 3b8ec3a7c..4d8c4be7c 100644 --- a/src/TSReader.h +++ b/src/TSReader.h @@ -34,7 +34,7 @@ class TSReader : public TSDemux::TSDemuxer bool Initialize(); - virtual const unsigned char* ReadAV(uint64_t pos, size_t len) override; + virtual bool ReadAV(uint64_t pos, unsigned char * data, size_t len) override; void Reset(bool resetPackets = true); bool StartStreaming(AP4_UI32 typeMask); diff --git a/src/common/AdaptiveStream.cpp b/src/common/AdaptiveStream.cpp index 272d46bc1..db1e9d00e 100755 --- a/src/common/AdaptiveStream.cpp +++ b/src/common/AdaptiveStream.cpp @@ -28,10 +28,12 @@ using namespace adaptive; AdaptiveStream::AdaptiveStream(AdaptiveTree &tree, AdaptiveTree::StreamType type) :tree_(tree) , type_(type) - , observer_(0) - , current_period_(tree_.periods_.empty() ? 0 : tree_.periods_[0]) - , current_adp_(0) - , current_rep_(0) + , observer_(nullptr) + , current_period_(tree_.periods_.empty() ? nullptr : tree_.periods_[0]) + , current_adp_(nullptr) + , current_rep_(nullptr) + , current_seg_(nullptr) + , loading_seg_(nullptr) , thread_data_(nullptr) { } @@ -125,15 +127,30 @@ bool AdaptiveStream::download_segment() void AdaptiveStream::worker() { do { - std::unique_lock lck(thread_data_->mutex_dl_); - thread_data_->signal_dl_.wait(lck); + std::unique_lock lckdl(thread_data_->mutex_dl_); + thread_data_->signal_dl_.wait(lckdl); + + bool ret = download_segment(); + + //Signal finished download + { + std::lock_guard lckrw(thread_data_->mutex_rw_); + loading_seg_ = nullptr; + if (!ret) + stopped_ = true; + } + thread_data_->signal_rw_.notify_one(); + } while (!thread_data_->thread_stop_); } - bool AdaptiveStream::write_data(const void *buffer, size_t buffer_size) { - segment_buffer_ += std::string((const char *)buffer, buffer_size); + { + std::lock_guard lckrw(thread_data_->mutex_rw_); + segment_buffer_ += std::string((const char *)buffer, buffer_size); + } + thread_data_->signal_rw_.notify_one(); return true; } @@ -237,14 +254,16 @@ bool AdaptiveStream::ensureSegment() if (stopped_) return false; - if (segment_read_pos_ >= segment_buffer_.size()) + if (!loading_seg_ && segment_read_pos_ >= segment_buffer_.size()) { current_seg_ = current_rep_->get_next_segment(current_seg_); - if (!download_segment() || segment_buffer_.empty()) + if (current_seg_) { - stopped_ = true; - return false; + loading_seg_ = current_seg_; + thread_data_->signal_dl_.notify_one(); } + else + return false; } return true; } @@ -252,40 +271,39 @@ bool AdaptiveStream::ensureSegment() uint32_t AdaptiveStream::read(void* buffer, uint32_t bytesToRead) { - if (ensureSegment() && bytesToRead) - { - uint32_t avail = segment_buffer_.size() - segment_read_pos_; - if (avail > bytesToRead) - avail = bytesToRead; - memcpy(buffer, segment_buffer_.data() + segment_read_pos_, avail); - - segment_read_pos_ += avail; - absolute_position_ += avail; - return avail; - } - return 0; -} + std::unique_lock lckrw(thread_data_->mutex_rw_); -const uint8_t *AdaptiveStream::getBuffer(uint32_t bytesToRead) -{ - const uint8_t *ret(nullptr); if (ensureSegment() && bytesToRead) { - uint32_t avail = segment_buffer_.size() - segment_read_pos_; - if (avail > bytesToRead) - avail = bytesToRead; + while (true) + { + uint32_t avail = segment_buffer_.size() - segment_read_pos_; + if (avail < bytesToRead && loading_seg_) + { + thread_data_->signal_rw_.wait(lckrw); + continue; + } + + if (avail >= bytesToRead) + { + if (avail > bytesToRead) + avail = bytesToRead; - if (avail == bytesToRead) - ret = reinterpret_cast(segment_buffer_.data() + segment_read_pos_); + memcpy(buffer, segment_buffer_.data() + segment_read_pos_, avail); - segment_read_pos_ += avail; - absolute_position_ += avail; + segment_read_pos_ += avail; + absolute_position_ += avail; + return avail; + } + return 0; + } } - return ret; + return 0; } bool AdaptiveStream::seek(uint64_t const pos) { + std::lock_guard lckrw(thread_data_->mutex_rw_); // we seek only in the current segment if (pos >= absolute_position_ - segment_read_pos_) { diff --git a/src/common/AdaptiveStream.h b/src/common/AdaptiveStream.h index b6cd50426..2aa2fe622 100755 --- a/src/common/AdaptiveStream.h +++ b/src/common/AdaptiveStream.h @@ -59,9 +59,7 @@ namespace adaptive unsigned int get_type()const{ return type_; }; bool ensureSegment(); - uint32_t read(void* buffer, - uint32_t bytesToRead); - const uint8_t *getBuffer(uint32_t bytesToRead); + uint32_t read(void* buffer, uint32_t bytesToRead); uint64_t tell(){ read(0, 0); return absolute_position_; }; bool seek(uint64_t const pos); bool seek_time(double seek_seconds, double current_seconds, bool &needReset); @@ -113,7 +111,7 @@ namespace adaptive const AdaptiveTree::Period *current_period_; const AdaptiveTree::AdaptationSet *current_adp_; const AdaptiveTree::Representation *current_rep_; - const AdaptiveTree::Segment *current_seg_; + const AdaptiveTree::Segment *current_seg_, *loading_seg_; //We assume that a single segment can build complete frames std::string segment_buffer_; std::map media_headers_; diff --git a/src/common/AdaptiveTree.cpp b/src/common/AdaptiveTree.cpp index ef4c126e5..b626fdefe 100755 --- a/src/common/AdaptiveTree.cpp +++ b/src/common/AdaptiveTree.cpp @@ -83,6 +83,8 @@ namespace adaptive void AdaptiveTree::set_download_speed(double speed) { + std::lock_guard lck(m_mutex); + download_speed_ = speed; if (!average_download_speed_) average_download_speed_ = download_speed_; diff --git a/src/common/AdaptiveTree.h b/src/common/AdaptiveTree.h index 931e6f924..d39bf19b2 100755 --- a/src/common/AdaptiveTree.h +++ b/src/common/AdaptiveTree.h @@ -23,6 +23,7 @@ #include #include #include "expat.h" +#include namespace adaptive { @@ -280,6 +281,8 @@ namespace adaptive virtual bool download(const char* url, const std::map &manifestHeaders); virtual bool write_data(void *buffer, size_t buffer_size) = 0; void SortRepresentations(); +private: + std::mutex m_mutex; }; } diff --git a/src/main.cpp b/src/main.cpp index 4967c9d09..30c5fdc43 100755 --- a/src/main.cpp +++ b/src/main.cpp @@ -166,7 +166,6 @@ class KodiHost : public SSD::SSD_HOST static_cast(instance)->ReleaseFrameBuffer(buffer); } - private: std::string m_strProfilePath, m_strLibraryPath; }kodihost; @@ -210,10 +209,6 @@ class AP4_DASHStream : public AP4_ByteStream /* unimplemented */ return AP4_ERROR_NOT_SUPPORTED; }; - const AP4_UI08 *GetBuffer(AP4_Size bytes_to_read) override - { - return stream_->getBuffer(bytes_to_read); - } // AP4_Referenceable methods void AddReference() override {}; void Release()override {}; @@ -276,9 +271,9 @@ bool KodiAdaptiveStream::download(const char* url, const std::map 0 && ~nbRead && write_data(buf, nbRead)) nbReadOverall+= nbRead; + while ((nbRead = file.Read(buf, 32 * 1024)) > 0 && ~nbRead && write_data(buf, nbRead)) nbReadOverall+= nbRead; free(buf); if (!nbReadOverall)