|
23 | 23 | import java.io.File; |
24 | 24 | import java.io.IOException; |
25 | 25 | import java.nio.ByteBuffer; |
| 26 | +import java.util.HashMap; |
| 27 | +import java.util.Map; |
26 | 28 | import java.util.Set; |
27 | 29 | import java.util.concurrent.atomic.AtomicBoolean; |
28 | 30 | import java.util.concurrent.atomic.AtomicInteger; |
@@ -568,6 +570,68 @@ public void checkpointComplete(Checkpoint checkpoint, |
568 | 570 | storage.gcThread.doCompactEntryLogs(threshold); |
569 | 571 | } |
570 | 572 |
|
| 573 | + /** |
| 574 | + * Test extractMetaFromEntryLogs optimized method to avoid excess memory usage. |
| 575 | + */ |
| 576 | + @Test(timeout = 60000) |
| 577 | + public void testExtractMetaFromEntryLogs() throws Exception { |
| 578 | + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); |
| 579 | + File tmpDir = createTempDir("bkTest", ".dir"); |
| 580 | + File curDir = Bookie.getCurrentDirectory(tmpDir); |
| 581 | + Bookie.checkDirectoryStructure(curDir); |
| 582 | + conf.setLedgerDirNames(new String[] { tmpDir.toString() }); |
| 583 | + |
| 584 | + LedgerDirsManager dirs = new LedgerDirsManager(conf, conf.getLedgerDirs()); |
| 585 | + final Set<Long> ledgers = Collections |
| 586 | + .newSetFromMap(new ConcurrentHashMap<Long, Boolean>()); |
| 587 | + |
| 588 | + LedgerManager manager = getLedgerManager(ledgers); |
| 589 | + |
| 590 | + CheckpointSource checkpointSource = new CheckpointSource() { |
| 591 | + |
| 592 | + @Override |
| 593 | + public Checkpoint newCheckpoint() { |
| 594 | + return null; |
| 595 | + } |
| 596 | + |
| 597 | + @Override |
| 598 | + public void checkpointComplete(Checkpoint checkpoint, |
| 599 | + boolean compact) throws IOException { |
| 600 | + } |
| 601 | + }; |
| 602 | + InterleavedLedgerStorage storage = new InterleavedLedgerStorage(); |
| 603 | + storage.initialize(conf, manager, dirs, dirs, checkpointSource, NullStatsLogger.INSTANCE); |
| 604 | + final byte[] KEY = "foobar".getBytes(); |
| 605 | + |
| 606 | + for (long ledger = 0; ledger <= 10; ledger++) { |
| 607 | + ledgers.add(ledger); |
| 608 | + for(int entry = 1; entry <= 50; entry++) { |
| 609 | + try { |
| 610 | + storage.addEntry(genEntry(ledger, entry, ENTRY_SIZE)); |
| 611 | + } catch (IOException e) { |
| 612 | + //ignore exception on failure to add entry. |
| 613 | + } |
| 614 | + }; |
| 615 | + }; |
| 616 | + |
| 617 | + storage.flush(); |
| 618 | + storage.shutdown(); |
| 619 | + |
| 620 | + storage = new InterleavedLedgerStorage(); |
| 621 | + storage.initialize(conf, manager, dirs, dirs, checkpointSource, NullStatsLogger.INSTANCE); |
| 622 | + |
| 623 | + long startingEntriesCount = storage.gcThread.entryLogger.getLeastUnflushedLogId() - storage.gcThread.scannedLogId; |
| 624 | + LOG.info("The old Log Entry count is: " + startingEntriesCount); |
| 625 | + |
| 626 | + Map<Long, EntryLogMetadata> entryLogMetaData = new HashMap<>(); |
| 627 | + Map<Long, EntryLogMetadata> finalEntryLogMetadataMap = storage.gcThread.extractMetaFromEntryLogs(entryLogMetaData); |
| 628 | + long finalEntriesCount = storage.gcThread.entryLogger.getLeastUnflushedLogId() - storage.gcThread.scannedLogId; |
| 629 | + LOG.info("The latest Log Entry count is: " + finalEntriesCount); |
| 630 | + |
| 631 | + assertTrue("The GC did not clean up entries...", startingEntriesCount != finalEntriesCount); |
| 632 | + assertTrue("Entries Count is zero", finalEntriesCount == 0); |
| 633 | + } |
| 634 | + |
571 | 635 | private ByteBuffer genEntry(long ledger, long entry, int size) { |
572 | 636 | ByteBuffer bb = ByteBuffer.wrap(new byte[size]); |
573 | 637 | bb.putLong(ledger); |
|
0 commit comments