Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions Lib/test/test_free_threading/test_lzma.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import unittest

from test.support import import_helper, threading_helper
from test.support.threading_helper import run_concurrently

lzma = import_helper.import_module("lzma")
from lzma import LZMACompressor, LZMADecompressor

from test.test_lzma import INPUT


NTHREADS = 10


@threading_helper.requires_working_threading()
class TestLZMA(unittest.TestCase):
def test_compressor(self):
lzc = LZMACompressor()

# First compress() outputs LZMA header
header = lzc.compress(INPUT)
self.assertGreater(len(header), 0)

def worker():
# it should return empty bytes as it buffers data internally
data = lzc.compress(INPUT)
self.assertEqual(data, b"")

run_concurrently(worker_func=worker, nthreads=NTHREADS - 1)
full_compressed = header + lzc.flush()
decompressed = lzma.decompress(full_compressed)
# The decompressed data should be INPUT repeated NTHREADS times
self.assertEqual(decompressed, INPUT * NTHREADS)

def test_decompressor(self):
chunk_size = 128
chunks = [bytes([ord("a") + i]) * chunk_size for i in range(NTHREADS)]
input_data = b"".join(chunks)
compressed = lzma.compress(input_data)

lzd = LZMADecompressor()
output = []

def worker():
data = lzd.decompress(compressed, chunk_size)
self.assertEqual(len(data), chunk_size)
output.append(data)

run_concurrently(worker_func=worker, nthreads=NTHREADS)
self.assertEqual(len(output), NTHREADS)
# Verify the expected chunks (order doesn't matter due to append race)
self.assertSetEqual(set(output), set(chunks))


if __name__ == "__main__":
unittest.main()
46 changes: 13 additions & 33 deletions Modules/_lzmamodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,6 @@ OutputBuffer_OnError(_BlocksOutputBuffer *buffer)
}


#define ACQUIRE_LOCK(obj) do { \
if (!PyThread_acquire_lock((obj)->lock, 0)) { \
Py_BEGIN_ALLOW_THREADS \
PyThread_acquire_lock((obj)->lock, 1); \
Py_END_ALLOW_THREADS \
} } while (0)
#define RELEASE_LOCK(obj) PyThread_release_lock((obj)->lock)

typedef struct {
PyTypeObject *lzma_compressor_type;
Expand Down Expand Up @@ -111,7 +104,7 @@ typedef struct {
lzma_allocator alloc;
lzma_stream lzs;
int flushed;
PyThread_type_lock lock;
PyMutex mutex;
} Compressor;

typedef struct {
Expand All @@ -124,7 +117,7 @@ typedef struct {
char needs_input;
uint8_t *input_buffer;
size_t input_buffer_size;
PyThread_type_lock lock;
PyMutex mutex;
} Decompressor;

#define Compressor_CAST(op) ((Compressor *)(op))
Expand Down Expand Up @@ -617,14 +610,14 @@ _lzma_LZMACompressor_compress_impl(Compressor *self, Py_buffer *data)
{
PyObject *result = NULL;

ACQUIRE_LOCK(self);
PyMutex_Lock(&self->mutex);
if (self->flushed) {
PyErr_SetString(PyExc_ValueError, "Compressor has been flushed");
}
else {
result = compress(self, data->buf, data->len, LZMA_RUN);
}
RELEASE_LOCK(self);
PyMutex_Unlock(&self->mutex);
return result;
}

Expand All @@ -644,14 +637,14 @@ _lzma_LZMACompressor_flush_impl(Compressor *self)
{
PyObject *result = NULL;

ACQUIRE_LOCK(self);
PyMutex_Lock(&self->mutex);
if (self->flushed) {
PyErr_SetString(PyExc_ValueError, "Repeated call to flush()");
} else {
self->flushed = 1;
result = compress(self, NULL, 0, LZMA_FINISH);
}
RELEASE_LOCK(self);
PyMutex_Unlock(&self->mutex);
return result;
}

Expand Down Expand Up @@ -820,12 +813,7 @@ Compressor_new(PyTypeObject *type, PyObject *args, PyObject *kwargs)
self->alloc.free = PyLzma_Free;
self->lzs.allocator = &self->alloc;

self->lock = PyThread_allocate_lock();
if (self->lock == NULL) {
Py_DECREF(self);
PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
return NULL;
}
self->mutex = (PyMutex){0};

self->flushed = 0;
switch (format) {
Expand Down Expand Up @@ -867,10 +855,8 @@ static void
Compressor_dealloc(PyObject *op)
{
Compressor *self = Compressor_CAST(op);
assert(!PyMutex_IsLocked(&self->mutex));
lzma_end(&self->lzs);
if (self->lock != NULL) {
PyThread_free_lock(self->lock);
}
PyTypeObject *tp = Py_TYPE(self);
tp->tp_free(self);
Py_DECREF(tp);
Expand Down Expand Up @@ -1146,12 +1132,12 @@ _lzma_LZMADecompressor_decompress_impl(Decompressor *self, Py_buffer *data,
{
PyObject *result = NULL;

ACQUIRE_LOCK(self);
PyMutex_Lock(&self->mutex);
if (self->eof)
PyErr_SetString(PyExc_EOFError, "Already at end of stream");
else
result = decompress(self, data->buf, data->len, max_length);
RELEASE_LOCK(self);
PyMutex_Unlock(&self->mutex);
return result;
}

Expand Down Expand Up @@ -1244,12 +1230,7 @@ _lzma_LZMADecompressor_impl(PyTypeObject *type, int format,
self->lzs.allocator = &self->alloc;
self->lzs.next_in = NULL;

self->lock = PyThread_allocate_lock();
if (self->lock == NULL) {
Py_DECREF(self);
PyErr_SetString(PyExc_MemoryError, "Unable to allocate lock");
return NULL;
}
self->mutex = (PyMutex){0};

self->check = LZMA_CHECK_UNKNOWN;
self->needs_input = 1;
Expand Down Expand Up @@ -1304,14 +1285,13 @@ static void
Decompressor_dealloc(PyObject *op)
{
Decompressor *self = Decompressor_CAST(op);
assert(!PyMutex_IsLocked(&self->mutex));

if(self->input_buffer != NULL)
PyMem_Free(self->input_buffer);

lzma_end(&self->lzs);
Py_CLEAR(self->unused_data);
if (self->lock != NULL) {
PyThread_free_lock(self->lock);
}
PyTypeObject *tp = Py_TYPE(self);
tp->tp_free(self);
Py_DECREF(tp);
Expand Down
Loading