Skip to content

Commit

Permalink
Better memory handling for streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
kchen115 committed Aug 7, 2013
1 parent 17d967a commit 33e8b9d
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 168 deletions.
3 changes: 1 addition & 2 deletions BlinkAnalysis/BlinkAnalysis.vcxproj
Expand Up @@ -73,6 +73,7 @@
<IgnoreSpecificDefaultLibraries>
</IgnoreSpecificDefaultLibraries>
<ShowProgress>NotSet</ShowProgress>
<Profile>true</Profile>
</Link>
<ProjectReference>
<UseLibraryDependencyInputs>false</UseLibraryDependencyInputs>
Expand Down Expand Up @@ -159,7 +160,6 @@
</ClCompile>
<ClCompile Include="StreamHandler.cpp" />
<ClCompile Include="StreamingManager.cpp" />
<ClCompile Include="StreamTaskQueue.cpp" />
<ClCompile Include="TCPClientListener.cpp" />
<ClCompile Include="CaptureObject.cpp" />
<ClCompile Include="WorldManager.cpp" />
Expand Down Expand Up @@ -229,7 +229,6 @@
<ClInclude Include="StreamSettingsForm.h">
<FileType>CppForm</FileType>
</ClInclude>
<ClInclude Include="StreamTaskQueue.h" />
<ClInclude Include="TCPClientListener.h" />
<ClInclude Include="WizardPages.h" />
<ClInclude Include="CaptureObject.h" />
Expand Down
6 changes: 0 additions & 6 deletions BlinkAnalysis/BlinkAnalysis.vcxproj.filters
Expand Up @@ -109,9 +109,6 @@
<ClCompile Include="StreamHandler.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="StreamTaskQueue.cpp">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="stdafx.h">
Expand Down Expand Up @@ -252,9 +249,6 @@
<ClInclude Include="StreamSettingsForm.h">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="StreamTaskQueue.h">
<Filter>Header Files</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ResourceCompile Include="app.rc">
Expand Down
64 changes: 38 additions & 26 deletions BlinkAnalysis/StreamHandler.cpp
Expand Up @@ -12,9 +12,11 @@ namespace BlinkAnalysis
this->ClientSocket = ClientSocket;
this->ClientNumber = ClientNumber;
ascii = gcnew ASCIIEncoding();
queue = gcnew StreamTaskQueue();
frames = new std::queue<std::string>();
sync = gcnew Object();
}


// start listening for client commands
void StreamHandler::Start() {
ContinueProcess = true ;
Expand All @@ -26,34 +28,48 @@ namespace BlinkAnalysis
ContinueProcess = false ;
if ( ClientThread && ClientThread->IsAlive )
ClientThread->Join() ;
if (StreamThread && StreamThread->IsAlive)
StreamThread->Join();
}

bool StreamHandler::Alive() {
return ( ClientThread && ClientThread->IsAlive );
}

void StreamHandler::addFrame(String^ frame)
void StreamHandler::addFrame(std::string frame)
{
queue->Queue(gcnew FrameObject(this, frame));
Monitor::Enter(sync);
frames->push(frame);
Monitor::Exit(sync);
if (!StreamThread)
{
StreamThread = gcnew Thread(gcnew ThreadStart(this, &StreamHandler::Stream));
StreamThread->Start();
}
}

void StreamHandler::addFrameAsync(Object^ frame)
void StreamHandler::Stream()
{
if (!StreamingManager::getInstance()->isStreaming()) return;
try
{
StreamHandler^ h = ((FrameObject^) frame)->handler;
String^ str = ((FrameObject^) frame)->frame;
array<Byte>^ response = h->ascii->GetBytes(str);
h->ClientSocket->GetStream()->Write(response, 0, response->Length);
}
catch(Exception^ ex)
while (streamData)
{
Console::WriteLine("Exception add frame: {0}", ex->Message);
}
finally
{
frame = nullptr;
int count = frames->size();
if (count > 0)
{
try
{
std::string str = frames->front();
array<Byte>^ response = gcnew array<Byte>(str.size());
System::Runtime::InteropServices::Marshal::Copy(IntPtr(&str[0]), response, 0, str.size());
ClientSocket->GetStream()->Write(response, 0, response->Length);
Monitor::Enter(sync);
frames->pop();
Monitor::Exit(sync);
}
catch(Exception^ ex)
{
Console::WriteLine("Exception add frame: {0}", ex->Message);
}
}
}
}

Expand Down Expand Up @@ -132,15 +148,12 @@ namespace BlinkAnalysis
{
case 'q':
ContinueProcess = false;
queue->setProcess(false);
break;
case 's':
streamData = true;
queue->setProcess(true);
break;
case 'd':
streamData = false;
queue->setProcess(false);
break;
case 'h':
streamHeader();
Expand All @@ -155,19 +168,18 @@ namespace BlinkAnalysis
break ;
}
}
catch ( IOException^ ) {
if(!TestConnection())
break ;
} // Timeout
catch ( SocketException^ ) {
Console::WriteLine( "Conection is broken!");
break ;
}
catch ( IOException^ ) {
if(!TestConnection())
break ;
} // Timeout
Thread::Sleep(200);
} // while ( ContinueProcess )
networkStream->Close();
ClientSocket->Close();
queue->setProcess(false);
streamData = false;
}
} // Process()
Expand Down
12 changes: 7 additions & 5 deletions BlinkAnalysis/StreamHandler.h
Expand Up @@ -2,7 +2,7 @@
#define STREAMHANDLER_H

#include <string>
#include "StreamTaskQueue.h"
#include <queue>

namespace BlinkAnalysis
{
Expand All @@ -26,7 +26,9 @@ namespace BlinkAnalysis
Thread^ ClientThread ;
int ClientNumber;
ASCIIEncoding^ ascii;
StreamTaskQueue^ queue;
std::queue<std::string>* frames;
Thread^ StreamThread;
Object^ sync;

private:
// if socket exception, test if client still responsive
Expand All @@ -35,9 +37,11 @@ namespace BlinkAnalysis
void streamHeader();
// main looping thread
void Process();
void Stream();

public:
StreamHandler (TcpClient^ ClientSocket, int ClientNumber);
~StreamHandler() { delete frames; }

void Start();

Expand All @@ -46,9 +50,7 @@ namespace BlinkAnalysis
bool Alive();

bool getStreamData() { return streamData; }
void addFrame(String^ frame);
static void addFrameAsync(Object^ frame);

void addFrame(std::string frame);
};


Expand Down
74 changes: 0 additions & 74 deletions BlinkAnalysis/StreamTaskQueue.cpp

This file was deleted.

52 changes: 0 additions & 52 deletions BlinkAnalysis/StreamTaskQueue.h

This file was deleted.

6 changes: 3 additions & 3 deletions BlinkAnalysis/StreamingManager.cpp
Expand Up @@ -136,6 +136,7 @@ namespace BlinkAnalysis
{
streaming = false;
listener->Stop();
ThreadListen->Join();
}

void StreamingManager::addFrame(std::string frame)
Expand All @@ -158,9 +159,7 @@ namespace BlinkAnalysis
{
if (!frameBuffer.empty())
{
String^ str = gcnew String(frameBuffer.front().c_str());
frameBuffer.pop();

std::string str = frameBuffer.front();
Monitor::Enter(ClientSockets->SyncRoot);
for (int i = 0; i < ClientSockets->Count; i++) {
StreamHandler^ client = (StreamHandler^)ClientSockets->default[i];
Expand All @@ -171,6 +170,7 @@ namespace BlinkAnalysis
}
}
Monitor::Exit(ClientSockets->SyncRoot);
frameBuffer.pop();
}
}
_endthread();
Expand Down

0 comments on commit 33e8b9d

Please sign in to comment.