Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport multi-threaded zstd to 4.14.x to support multi-threaded zstd compression on centos 8 #2130

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ AC_ARG_ENABLE([zstd],
[enable_zstd=auto])

AS_IF([test "x$enable_zstd" != "xno"], [
PKG_CHECK_MODULES([ZSTD], [libzstd], [have_zstd=yes], [have_zstd=no])
PKG_CHECK_MODULES([ZSTD], [libzstd >= 1.3.8], [have_zstd=yes], [have_zstd=no])
AS_IF([test "$enable_zstd" = "yes"], [
if test "$have_zstd" = "no"; then
AC_MSG_ERROR([--enable-zstd specified, but not available])
Expand Down
11 changes: 6 additions & 5 deletions macros.in
Original file line number Diff line number Diff line change
Expand Up @@ -383,11 +383,12 @@ package or when debugging this package.\
#%packager

# Compression type and level for source/binary package payloads.
# "w9.gzdio" gzip level 9 (default).
# "w9.bzdio" bzip2 level 9.
# "w6.xzdio" xz level 6, xz's default.
# "w7T16.xzdio" xz level 7 using 16 thread (xz only)
# "w6.lzdio" lzma-alone level 6, lzma's default
# "w9.gzdio" gzip level 9 (default).
# "w9.bzdio" bzip2 level 9.
# "w6.xzdio" xz level 6, xz's default.
# "w7T16.xzdio" xz level 7 using 16 threads
# "w19T8.zstdio" zstd level 19 using 8 threads
# "w6.lzdio" lzma-alone level 6, lzma's default
#
#%_source_payload w9.gzdio
#%_binary_payload w9.gzdio
Expand Down
87 changes: 58 additions & 29 deletions rpmio/rpmio.c
Original file line number Diff line number Diff line change
Expand Up @@ -632,7 +632,7 @@ static off_t gzdTell(FDSTACK_t fps)
gzdSetError(fps);
#else
pos = -2;
#endif
#endif
}
return pos;
}
Expand Down Expand Up @@ -1070,6 +1070,7 @@ static rpmzstd rpmzstdNew(int fdno, const char *fmode)
char *t = stdio;
char *te = t + sizeof(stdio) - 2;
int c;
int threads = 0;

switch ((c = *s++)) {
case 'a':
Expand Down Expand Up @@ -1098,7 +1099,14 @@ static rpmzstd rpmzstdNew(int fdno, const char *fmode)
flags &= ~O_ACCMODE;
flags |= O_RDWR;
continue;
break;
case 'T':
if (*s >= '0' && *s <= '9') {
threads = strtol(s, (char **)&s, 10);
/* T0 means automatic detection */
if (threads == 0)
threads = sysconf(_SC_NPROCESSORS_ONLN);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as in xz. Not sure if this is the right thing to do. getcpunumber macro is not available in 4.14.x

}
continue;
default:
if (c >= (int)'0' && c <= (int)'9') {
level = strtol(s-1, (char **)&s, 10);
Expand Down Expand Up @@ -1132,10 +1140,15 @@ static rpmzstd rpmzstdNew(int fdno, const char *fmode)
}
nb = ZSTD_DStreamInSize();
} else { /* compressing */
if ((_stream = (void *) ZSTD_createCStream()) == NULL
|| ZSTD_isError(ZSTD_initCStream(_stream, level))) {
if ((_stream = (void *) ZSTD_createCCtx()) == NULL
|| ZSTD_isError(ZSTD_CCtx_setParameter(_stream, ZSTD_c_compressionLevel, level))) {
return NULL;
}

if (threads > 0) {
if (ZSTD_isError (ZSTD_CCtx_setParameter(_stream, ZSTD_c_nbWorkers, threads)))
rpmlog(RPMLOG_DEBUG, "zstd library does not support multi-threading\n");
}
nb = ZSTD_CStreamOutSize();
}

Expand Down Expand Up @@ -1173,16 +1186,24 @@ assert(zstd);
rc = 0;
} else { /* compressing */
/* close frame */
zstd->zob.dst = zstd->b;
zstd->zob.size = zstd->nb;
zstd->zob.pos = 0;
int xx = ZSTD_flushStream(zstd->_stream, &zstd->zob);
if (ZSTD_isError(xx))
fps->errcookie = ZSTD_getErrorName(xx);
else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp))
fps->errcookie = "zstdFlush fwrite failed.";
else
rc = 0;
int xx;
do {
ZSTD_inBuffer zib = { NULL, 0, 0 };
zstd->zob.dst = zstd->b;
zstd->zob.size = zstd->nb;
zstd->zob.pos = 0;
xx = ZSTD_compressStream2(zstd->_stream, &zstd->zob, &zib, ZSTD_e_flush);
if (ZSTD_isError(xx)) {
fps->errcookie = ZSTD_getErrorName(xx);
break;
}
else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp)) {
fps->errcookie = "zstdClose fwrite failed.";
break;
}
else
rc = 0;
} while (xx != 0);
}
return rc;
}
Expand Down Expand Up @@ -1227,7 +1248,7 @@ assert(zstd);
zstd->zob.pos = 0;

/* Compress next chunk. */
int xx = ZSTD_compressStream(zstd->_stream, &zstd->zob, &zib);
int xx = ZSTD_compressStream2(zstd->_stream, &zstd->zob, &zib, ZSTD_e_continue);
if (ZSTD_isError(xx)) {
fps->errcookie = ZSTD_getErrorName(xx);
return -1;
Expand Down Expand Up @@ -1256,17 +1277,25 @@ assert(zstd);
ZSTD_freeDStream(zstd->_stream);
} else { /* compressing */
/* close frame */
zstd->zob.dst = zstd->b;
zstd->zob.size = zstd->nb;
zstd->zob.pos = 0;
int xx = ZSTD_endStream(zstd->_stream, &zstd->zob);
if (ZSTD_isError(xx))
fps->errcookie = ZSTD_getErrorName(xx);
else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp))
fps->errcookie = "zstdClose fwrite failed.";
else
rc = 0;
ZSTD_freeCStream(zstd->_stream);
int xx;
do {
ZSTD_inBuffer zib = { NULL, 0, 0 };
zstd->zob.dst = zstd->b;
zstd->zob.size = zstd->nb;
zstd->zob.pos = 0;
xx = ZSTD_compressStream2(zstd->_stream, &zstd->zob, &zib, ZSTD_e_end);
if (ZSTD_isError(xx)) {
fps->errcookie = ZSTD_getErrorName(xx);
break;
}
else if (zstd->zob.pos != fwrite(zstd->b, 1, zstd->zob.pos, zstd->fp)) {
fps->errcookie = "zstdClose fwrite failed.";
break;
}
else
rc = 0;
} while (xx != 0);
ZSTD_freeCCtx(zstd->_stream);
}

if (zstd->fp && fileno(zstd->fp) > 2)
Expand Down Expand Up @@ -1338,7 +1367,7 @@ ssize_t Fwrite(const void *buf, size_t size, size_t nmemb, FD_t fd)
if (fd != NULL) {
FDSTACK_t fps = fdGetFps(fd);
fdio_write_function_t _write = FDIOVEC(fps, write);

fdstat_enter(fd, FDSTAT_WRITE);
do {
rc = (_write ? _write(fps, buf, size * nmemb) : -2);
Expand Down Expand Up @@ -1647,7 +1676,7 @@ int Fileno(FD_t fd)
if (rc != -1)
break;
}

DBGIO(fd, (stderr, "==> Fileno(%p) rc %d %s\n", (fd ? fd : NULL), rc, fdbg(fd)));
return rc;
}
Expand Down Expand Up @@ -1702,7 +1731,7 @@ int rpmioSlurp(const char * fn, uint8_t ** bp, ssize_t * blenp)

exit:
if (fd) (void) Fclose(fd);

if (rc) {
if (b) free(b);
b = NULL;
Expand Down