Permalink
Browse files

adding tests for GZip/UnGZip; they're currently failing

  • Loading branch information...
1 parent 686fa96 commit b93fc9b02ccf47d4f3f6c2f7baa8ef5828dd9e59 @rfk committed Mar 11, 2011
Showing with 114 additions and 4 deletions.
  1. +5 −0 ChangeLog
  2. +1 −1 filelike/wrappers/__init__.py
  3. +4 −2 filelike/wrappers/compress.py
  4. +104 −1 filelike/wrappers/tests/test_compress.py
View
@@ -1,4 +1,9 @@
+Version 0.4.2
+
+ * Add gzip support to filelike.wrappers.compress.
+ Thanks to timcera for the patch.
+
Version 0.4.1
* Buffer wrapper: support for truncating the underlying file on write-back
@@ -177,7 +177,7 @@ def _truncate(self,size):
from filelike.wrappers.buffer import Buffer, FlushableBuffer
-from filelike.wrappers.compress import BZip2, UnBZip2
+from filelike.wrappers.compress import BZip2, UnBZip2, GZip, UnGZip
from filelike.wrappers.unix import Head
@@ -33,6 +33,8 @@
from filelike.wrappers.buffer import FlushableBuffer
import bz2
+import zlib
+
class Decompress(FileWrapper):
"""Abstract base class for decompressing files.
@@ -174,7 +176,7 @@ def _BZip2_decoder(fileobj):
return f
filelike.open.decoders.append(_BZip2_decoder)
-import zlib
+
class GZipMixin(object):
"""Mixin for Compress/Decompress subclasses using gzip."""
@@ -201,7 +203,7 @@ def decompress(data):
return ""
return d[0].decompress(data)
def d_reset():
- d[0] = zlib.compressobj(16+zlib.MAX_WBITS)
+ d[0] = zlib.decompressobj(16+zlib.MAX_WBITS)
decompress.reset = d_reset
self.decompress = decompress
# These can now be used by superclass constructors
@@ -1,12 +1,13 @@
-from filelike.wrappers import BZip2, UnBZip2
+from filelike.wrappers import BZip2, UnBZip2, GZip, UnGZip
from filelike import tests
from filelike.wrappers.tests.test_buffer import get_buffered_value, def_getvalue_maybe_buffered
import unittest
from StringIO import StringIO
import bz2
+import gzip
class Test_BZip2(tests.Test_ReadWriteSeek):
@@ -95,3 +96,105 @@ def test_resulting_file(self):
finally:
os.unlink(fn)
+
+def gz_compress(data):
+ s = StringIO()
+ f = gzip.GzipFile(fileobj=s,mode="w")
+ f.write(data)
+ f.close()
+ return s.getvalue()
+
+def gz_decompress(data):
+ s = StringIO()
+ s.write(data)
+ s.seek(0)
+ f = gzip.GzipFile(fileobj=s)
+ return f.read()
+
+
+class Test_GZip(tests.Test_ReadWriteSeek):
+ """Tetcases for GZip wrapper class."""
+
+ contents = gz_compress("This is my compressed\n test data")
+ empty_contents = gz_compress("")
+
+ def makeFile(self,contents,mode):
+ s = StringIO(gz_decompress(contents))
+ f = GZip(s,mode)
+ f.getvalue = def_getvalue_maybe_buffered(f,s,gz_compress)
+ return f
+
+ # We can't just write arbitrary text into a BZip stream, so we have
+ # to adjust these tests
+
+ def test_write_read(self):
+ self.file.write(self.contents[0:5])
+ c = self.file.read()
+ self.assertEquals(c,self.contents[5:])
+
+ def test_read_write_read(self):
+ c = self.file.read(5)
+ self.assertEquals(c,self.contents[:5])
+ self.file.write(self.contents[5:10])
+ c = self.file.read(5)
+ self.assertEquals(c,self.contents[10:15])
+
+ def test_read_write_seek(self):
+ c = self.file.read(5)
+ self.assertEquals(c,self.contents[:5])
+ self.file.write(self.contents[5:10])
+ self.file.seek(0)
+ c = self.file.read(10)
+ self.assertEquals(c,self.contents[:10])
+
+ def test_resulting_file(self):
+ """Make sure GZip changes are pushed through to actual file."""
+ import tempfile
+ import os
+ (fd,fn) = tempfile.mkstemp()
+ os.close(fd)
+ try:
+ f = open(fn,"w")
+ f.write("hello world!")
+ f.close()
+ f = GZip(open(fn,"r+"))
+ f.read(6)
+ f.seek(-6,1)
+ f.write(gz_compress("hello Australia!"))
+ f.close()
+ self.assertEquals(open(fn).read(),"hello Australia!")
+ finally:
+ os.unlink(fn)
+
+
+class Test_UnGZip(tests.Test_ReadWrite):
+ """Tetcases for UnGZip wrapper class."""
+
+ contents = "This is my uncompressed\n test data"
+
+ def makeFile(self,contents,mode):
+ s = StringIO(gz_compress(contents))
+ f = UnGZip(s,mode)
+ f.getvalue = def_getvalue_maybe_buffered(f,s,gz_decompress)
+ return f
+
+ def test_resulting_file(self):
+ """Make sure UnBZip2 changes are pushed through to actual file."""
+ import tempfile
+ import os
+ (fd,fn) = tempfile.mkstemp()
+ os.close(fd)
+ try:
+ f = open(fn,"w")
+ f.write(gz_compress("hello world!"))
+ f.close()
+ f = UnGZip(open(fn,"r+"))
+ f.read(6)
+ f.write("Ausralia!")
+ f.seek(-6,1)
+ f.write("tralia!")
+ f.close()
+ self.assertEquals(open(fn).read(),gz_compress("hello Australia!"))
+ finally:
+ os.unlink(fn)
+

0 comments on commit b93fc9b

Please sign in to comment.