Permalink
Browse files

sconex: Add experimental gzip stream, and dependency on libz

  • Loading branch information...
sconemad committed Aug 24, 2015
1 parent ccc1efd commit 1ad68d88783c8c7a5fc0ce857019f0d5cbfa8de2
Showing with 383 additions and 0 deletions.
  1. +3 −0 config.h.in
  2. +4 −0 configure.in
  3. +284 −0 sconex/GzipStream.cpp
  4. +71 −0 sconex/GzipStream.h
  5. +2 −0 sconex/Makefile.am
  6. +19 −0 server/ServerModule.cpp
@@ -85,6 +85,9 @@
/* Define to 1 if you have the `socket' library (-lsocket). */
#undef HAVE_LIBSOCKET

/* Define to 1 if you have the `z' library (-lz). */
#undef HAVE_LIBZ

/* Define to 1 if you have the <limits.h> header file. */
#undef HAVE_LIMITS_H

@@ -42,6 +42,10 @@ CXXFLAGS="$CXXFLAGS -Wall"
# Libraries
AC_CHECK_LIB([dl],[dlopen])

AC_CHECK_LIB([z],[deflate],,
[AC_MSG_ERROR([library 'libz' is required])]
)

AC_CHECK_LIB([pthread],[pthread_create])
AC_CHECK_LIB([c_r],[pthread_create],
[LIBS="$LIBS -pthread"
@@ -0,0 +1,284 @@
/* SconeServer (http://www.sconemad.com)
Compression/decompression stream using Gzip
Copyright (c) 2000-2015 Andrew Wedgbury <wedge@sconemad.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2, or (at your option)
any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program (see the file COPYING); if not, write to the
Free Software Foundation, Inc.,
59 Temple Place - Suite 330, Boston, MA 02111-1307, USA */

#include <sconex/GzipStream.h>
namespace scx {

//=============================================================================
GzipStream::GzipStream(
int read_buffer_size,
int write_buffer_size
) : Stream("gzip"),
m_read_in(read_buffer_size),
m_read_out(read_buffer_size),
m_read_zs(0),
m_write_in(write_buffer_size),
m_write_out(write_buffer_size),
m_write_zs(0)
{
DEBUG_COUNT_CONSTRUCTOR(GzipStream);

DEBUG_ASSERT(
read_buffer_size >= 0,
"GzipStream() Buffer size invalid");
DEBUG_ASSERT(
read_buffer_size <= GzipStream_MAX_BUFFER,
"GzipStream() Buffer size too large");

DEBUG_ASSERT(
write_buffer_size >= 0,
"GzipStream() Buffer size invalid");
DEBUG_ASSERT(
write_buffer_size <= GzipStream_MAX_BUFFER,
"GzipStream() Buffer size too large");

if (read_buffer_size > 0) {
m_read_zs = new z_stream;
memset(m_read_zs,0,sizeof(z_stream));
inflateInit2(m_read_zs, 32+15);
}

if (write_buffer_size > 0) {
m_write_zs = new z_stream;
memset(m_write_zs,0,sizeof(z_stream));
deflateInit2(m_write_zs,
Z_DEFAULT_COMPRESSION,Z_DEFLATED,
16+15,8,Z_DEFAULT_STRATEGY);
}
}

//=============================================================================
GzipStream::~GzipStream()
{
DEBUG_ASSERT(
m_read_in.used() == 0,
"~GzipStream() Read input buffer still contains data");
DEBUG_ASSERT(
m_read_out.used() == 0,
"~GzipStream() Read output buffer still contains data");
DEBUG_ASSERT(
m_write_in.used() == 0,
"~GzipStream() Write input buffer still contains data");
DEBUG_ASSERT(
m_write_out.used() == 0,
"~GzipStream() Write output buffer still contains data");

if (m_read_zs) {
inflateEnd(m_read_zs);
delete m_read_zs;
}

if (m_write_zs) {
deflateEnd(m_write_zs);
delete m_write_zs;
}

DEBUG_COUNT_DESTRUCTOR(GzipStream);
}

//=============================================================================
Condition GzipStream::read(void* buffer,int n,int& na)
{
if (!m_read_zs) return Stream::read(buffer,n,na);
DEBUG_ASSERT(n>0,"write() Zero bytes specified");
DEBUG_ASSERT(buffer!=0,"write() Null buffer passed");
na = 0;
Condition c = Ok;

// Can the request be fullfilled by the buffer?
if (n <= m_read_out.used()) {
na = m_read_out.pop_to(buffer, n);
enable_event(Stream::SendReadable,m_read_out.used());
return Ok;
}

m_read_in.compact();
if (m_read_in.free()) {
int nr = 0;
c = Stream::read(m_read_in.tail(),m_read_in.free(),nr);
if (nr > 0) m_read_in.push(nr);
}

inflate_buffer(Z_FULL_FLUSH);

n = std::min(n, m_read_out.used());
if (n > 0) {
na = m_read_out.pop_to(buffer,n);
enable_event(Stream::SendReadable,m_read_out.used());
return Ok;
}

if (m_read_in.used()) {
return Wait;
}

return c;
}

//=============================================================================
Condition GzipStream::write(const void* buffer,int n,int& na)
{
if (!m_write_zs) return Stream::write(buffer,n,na);
DEBUG_ASSERT(n>0,"write() Zero bytes specified");
DEBUG_ASSERT(buffer!=0,"write() Null buffer passed");
na = 0;

// Ensure there is enough space in the input buffer
if (n > m_write_in.free()) {
m_write_in.compact();
if (n > m_write_in.free()) {
int extra = n - m_write_in.free();
DEBUG_LOG("Increasing input buffer by " << extra);
m_write_in.resize(m_write_in.size() + extra);
//XXX could make this more efficient?
}
}

na = m_write_in.push_from(buffer,n);

deflate_buffer(Z_FULL_FLUSH);

// Attempt to write the data from the output buffer
int nw = 0;
Condition c = Stream::write(m_write_out.head(),m_write_out.used(),nw);
if (nw>0) m_write_out.pop(nw);
enable_event(Stream::Writeable,m_write_out.used() > 0);
return c;
}

//=============================================================================
Condition GzipStream::event(Event e)
{
switch (e) {

case Stream::Closing: {
if (!m_write_zs) break;

if (m_write_in.used() > 0) {
deflate_buffer(Z_FULL_FLUSH);
} else {
deflate_buffer(Z_FINISH);
}

// Don't let the stream dissapear if there is any data left to process.
if (m_write_in.used() > 0 || m_write_out.used() > 0) {
enable_event(Stream::Writeable,true);
return Wait;
}
} break;

case Stream::Writeable: {
if (m_write_in.used() > 0) {
deflate_buffer(Z_FULL_FLUSH);
}
int n = m_write_out.used();
if (n > 0) {
int nw = 0;
Condition c = Stream::write(m_write_out.head(),n,nw);
if (nw>0) m_write_out.pop(nw);

if (c==scx::Error) {
// Went wrong
return scx::Error;

} else if (nw<n) {
// Wrote some of the buffer
enable_event(Stream::Writeable,true);
return scx::Wait;
}
}

// Wrote everything, cancel writeable notifications now
enable_event(Stream::Writeable,false);
} break;

default:
break;
}

return Ok;
}

//=============================================================================
std::string GzipStream::stream_status() const
{
std::ostringstream oss;
if (m_read_zs) {
oss << "ri:" << m_read_in.status_string()
<< " ro:" << m_read_out.status_string();
}
if (m_write_zs) {
if (m_read_zs) oss << " ";
oss << "wi:" << m_write_in.status_string()
<< " wo:" << m_write_out.status_string();
}
return oss.str();
}

//=============================================================================
int GzipStream::inflate_buffer(int flush)
{
int prev_in = m_read_zs->total_in;
m_read_zs->next_in = (Bytef*)m_read_in.head();
m_read_zs->avail_in = m_read_in.used();

int prev_out = m_read_zs->total_out;
m_read_zs->next_out = (Bytef*)m_read_out.tail();
m_read_zs->avail_out = m_read_out.free();

int ret = inflate(m_read_zs,flush);

int in = m_read_zs->total_in - prev_in;
if (in) m_read_in.pop(in);

int out = m_read_zs->total_out - prev_out;
if (out) m_read_out.push(out);

DEBUG_LOG("inflate(" << flush << "): " << ret <<
" in:" << in << " out:" << out);
return ret;
}

//=============================================================================
int GzipStream::deflate_buffer(int flush)
{
int prev_in = m_write_zs->total_in;
m_write_zs->next_in = (Bytef*)m_write_in.head();
m_write_zs->avail_in = m_write_in.used();

int prev_out = m_write_zs->total_out;
m_write_zs->next_out = (Bytef*)m_write_out.tail();
m_write_zs->avail_out = m_write_out.free();

int ret = deflate(m_write_zs,flush);

int in = m_write_zs->total_in - prev_in;
if (in) m_write_in.pop(in);

int out = m_write_zs->total_out - prev_out;
if (out) m_write_out.push(out);

DEBUG_LOG("deflate(" << flush << "): " << ret <<
" in:" << in << " out:" << out);
return ret;
}

};
@@ -0,0 +1,71 @@
/* SconeServer (http://www.sconemad.com)
Compression/decompression stream using Gzip
Copyright (c) 2000-2015 Andrew Wedgbury <wedge@sconemad.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2, or (at your option)
any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program (see the file COPYING); if not, write to the
Free Software Foundation, Inc.,
59 Temple Place - Suite 330, Boston, MA 02111-1307, USA */

#ifndef scxGzipStream_h
#define scxGzipStream_h

#include <sconex/sconex.h>
#include <sconex/Stream.h>
#include <sconex/Buffer.h>
#include <zlib.h>
namespace scx {

#define GzipStream_MAX_BUFFER (10*1048576)
#define GzipStream_DEFAULT_BUFFER 1024

//=============================================================================
class SCONEX_API GzipStream : public Stream {

public:

GzipStream(
int read_buffer_size = GzipStream_DEFAULT_BUFFER,
int write_buffer_size = GzipStream_DEFAULT_BUFFER
);

virtual ~GzipStream();

virtual Condition read(void* buffer,int n,int& na);
virtual Condition write(const void* buffer,int n,int& na);

virtual Condition event(Event e);

virtual std::string stream_status() const;

protected:

int inflate_buffer(int flush);
int deflate_buffer(int flush);

Buffer m_read_in;
Buffer m_read_out;
z_streamp m_read_zs;

Buffer m_write_in;
Buffer m_write_out;
z_streamp m_write_zs;

private:

};

};
#endif
@@ -25,6 +25,7 @@ File.cpp \
FileDir.cpp \
FilePath.cpp \
FileStat.cpp \
GzipStream.cpp \
Job.cpp \
Kernel.cpp \
LineBuffer.cpp \
@@ -87,6 +88,7 @@ File.h \
FileDir.h \
FilePath.h \
FileStat.h \
GzipStream.h \
IOBase.h \
Job.h \
Kernel.h \
Oops, something went wrong.

0 comments on commit 1ad68d8

Please sign in to comment.