Skip to content

Commit

Permalink
Fix ByteBuf memory leak problem when setExplicitLac (apache#3577)
Browse files Browse the repository at this point in the history
* fix set explicitLac no released entry
  • Loading branch information
wenbingshen authored and yaalsn committed Jan 30, 2023
1 parent 759f911 commit 6d8784c
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.util.ReferenceCountUtil;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
Expand Down Expand Up @@ -1005,7 +1006,8 @@ public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, byte[]
}
}

private ByteBuf createExplicitLACEntry(long ledgerId, ByteBuf explicitLac) {
@VisibleForTesting
public ByteBuf createExplicitLACEntry(long ledgerId, ByteBuf explicitLac) {
ByteBuf bb = allocator.directBuffer(8 + 8 + 4 + explicitLac.capacity());
bb.writeLong(ledgerId);
bb.writeLong(METAENTRY_ID_LEDGER_EXPLICITLAC);
Expand All @@ -1016,19 +1018,25 @@ private ByteBuf createExplicitLACEntry(long ledgerId, ByteBuf explicitLac) {

public void setExplicitLac(ByteBuf entry, WriteCallback writeCallback, Object ctx, byte[] masterKey)
throws IOException, InterruptedException, BookieException {
ByteBuf explicitLACEntry = null;
try {
long ledgerId = entry.getLong(entry.readerIndex());
LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
synchronized (handle) {
entry.markReaderIndex();
handle.setExplicitLac(entry);
entry.resetReaderIndex();
ByteBuf explicitLACEntry = createExplicitLACEntry(ledgerId, entry);
explicitLACEntry = createExplicitLACEntry(ledgerId, entry);
getJournal(ledgerId).logAddEntry(explicitLACEntry, false /* ackBeforeSync */, writeCallback, ctx);
}
} catch (NoWritableLedgerDirException e) {
stateManager.transitionToReadOnlyMode();
throw new IOException(e);
} finally {
ReferenceCountUtil.safeRelease(entry);
if (explicitLACEntry != null) {
ReferenceCountUtil.safeRelease(explicitLACEntry);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;

import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.nio.charset.StandardCharsets;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.PortManager;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BookieImplTest extends BookKeeperClusterTestCase {
private static final Logger log = LoggerFactory.getLogger(BookieImplTest.class);

private static final int bookiePort = PortManager.nextFreePort();

public BookieImplTest() {
super(0);
}

@Test
public void testWriteLac() throws Exception {
final String metadataServiceUri = zkUtil.getMetadataServiceUri();
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setMetadataServiceUri(metadataServiceUri);

MetadataBookieDriver metadataDriver = BookieResources.createMetadataDriver(
conf, NullStatsLogger.INSTANCE);
RegistrationManager rm = metadataDriver.createRegistrationManager();
TestBookieImpl.Resources resources = new TestBookieImpl.ResourceBuilder(conf)
.withMetadataDriver(metadataDriver).withRegistrationManager(rm).build();
BookieImpl b = new TestBookieImpl(resources);
b.start();

final BookieImpl spyBookie = spy(b);

final long ledgerId = 10;
final long lac = 23;

DigestManager digestManager = DigestManager.instantiate(ledgerId, "".getBytes(StandardCharsets.UTF_8),
BookKeeper.DigestType.toProtoDigestType(BookKeeper.DigestType.CRC32), UnpooledByteBufAllocator.DEFAULT,
baseClientConf.getUseV2WireProtocol());

final ByteBufList toSend = digestManager.computeDigestAndPackageForSendingLac(lac);
ByteString body = UnsafeByteOperations.unsafeWrap(toSend.array(), toSend.arrayOffset(), toSend.readableBytes());

final ByteBuf lacToAdd = Unpooled.wrappedBuffer(body.asReadOnlyByteBuffer());
final byte[] masterKey = ByteString.copyFrom("masterKey".getBytes()).toByteArray();

final ByteBuf explicitLACEntry = b.createExplicitLACEntry(ledgerId, lacToAdd);
lacToAdd.resetReaderIndex();

doReturn(explicitLACEntry)
.when(spyBookie)
.createExplicitLACEntry(eq(ledgerId), eq(lacToAdd));

spyBookie.setExplicitLac(lacToAdd, null, null, masterKey);

assertEquals(0, lacToAdd.refCnt());
assertEquals(0, explicitLACEntry.refCnt());

b.shutdown();

}
}

0 comments on commit 6d8784c

Please sign in to comment.