Skip to content

Commit

Permalink
Retrieving logs using bloom cache
Browse files Browse the repository at this point in the history
  • Loading branch information
ajlopez committed Mar 27, 2019
1 parent 2e682f5 commit 20e770a
Show file tree
Hide file tree
Showing 18 changed files with 671 additions and 17 deletions.
12 changes: 10 additions & 2 deletions rskj-core/src/main/java/co/rsk/core/RskFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import co.rsk.crypto.Keccak256;
import co.rsk.db.RepositoryImpl;
import co.rsk.metrics.BlockHeaderElement;
import co.rsk.logfilter.BlocksBloomStore;
import co.rsk.metrics.HashRateCalculator;
import co.rsk.metrics.HashRateCalculatorMining;
import co.rsk.metrics.HashRateCalculatorNonMining;
Expand Down Expand Up @@ -205,7 +206,8 @@ public Web3 getWeb3(Rsk rsk,
BlockProcessor nodeBlockProcessor,
HashRateCalculator hashRateCalculator,
ConfigCapabilities configCapabilities,
BuildInfo buildInfo) {
BuildInfo buildInfo,
BlocksBloomStore blocksBloomStore) {
return new Web3RskImpl(
rsk,
blockchain,
Expand All @@ -229,7 +231,8 @@ public Web3 getWeb3(Rsk rsk,
nodeBlockProcessor,
hashRateCalculator,
configCapabilities,
buildInfo
buildInfo,
blocksBloomStore
);
}

Expand Down Expand Up @@ -284,6 +287,11 @@ public BlockChainImpl getBlockchain(BlockChainLoader blockChainLoader) {
return blockChainLoader.loadBlockchain();
}

@Bean
public BlocksBloomStore getBlocksBloomStore() {
return new BlocksBloomStore(64, 20);
}

@Bean
public TransactionPool getTransactionPool(org.ethereum.db.BlockStore blockStore,
ReceiptStore receiptStore,
Expand Down
64 changes: 64 additions & 0 deletions rskj-core/src/main/java/co/rsk/logfilter/BlocksBloom.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* This file is part of RskJ
* Copyright (C) 2017 RSK Labs Ltd.
* (derived from ethereumJ library, Copyright (c) 2016 <ether.camp>)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package co.rsk.logfilter;

import org.ethereum.core.Bloom;

/**
* Created by ajlopez on 29/01/2019.
*/
public class BlocksBloom {
private final Bloom bloom = new Bloom();
private long fromBlock = -1;
private long toBlock = -1;

public Bloom getBloom() { return this.bloom; }

public long fromBlock() { return this.fromBlock; }

public long toBlock() { return this.toBlock; }

public long size() {
if (this.fromBlock == -1) {
return 0;
}

return this.toBlock - this.fromBlock + 1;
}

public void addBlockBloom(long blockNumber, Bloom blockBloom) {
if (fromBlock == -1) {
fromBlock = blockNumber;
toBlock = blockNumber;
}
else if (blockNumber == toBlock + 1) {
toBlock = blockNumber;
}
else {
throw new UnsupportedOperationException("Block out of sequence");
}

this.bloom.or(blockBloom);
}

public boolean matches(Bloom bloom) {
return this.bloom.matches(bloom);
}
}
63 changes: 63 additions & 0 deletions rskj-core/src/main/java/co/rsk/logfilter/BlocksBloomStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* This file is part of RskJ
* Copyright (C) 2017 RSK Labs Ltd.
* (derived from ethereumJ library, Copyright (c) 2016 <ether.camp>)
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package co.rsk.logfilter;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Created by ajlopez on 05/02/2019.
*/
public class BlocksBloomStore {
private final int noBlocks;
private final int noConfirmations;
private final Map<Long, BlocksBloom> blocksBloom = new ConcurrentHashMap<>();

public BlocksBloomStore(int noBlocks, int noConfirmations) {
this.noBlocks = noBlocks;
this.noConfirmations = noConfirmations;
}

public boolean hasBlockNumber(long blockNumber) {
return this.blocksBloom.containsKey(this.firstNumberInRange(blockNumber));
}

public BlocksBloom getBlocksBloomByNumber(long number) {
return this.blocksBloom.get(firstNumberInRange(number));
}

public void setBlocksBloom(BlocksBloom blocksBloom) {
this.blocksBloom.put(blocksBloom.fromBlock(), blocksBloom);
}

public long firstNumberInRange(long number) {
return number - (number % this.noBlocks);
}

public long lastNumberInRange(long number) {
return firstNumberInRange(number) + this.noBlocks - 1;
}

public int getNoBlocks() {
return this.noBlocks;
}

public int getNoConfirmations() { return this.noConfirmations; }
}
6 changes: 4 additions & 2 deletions rskj-core/src/main/java/co/rsk/rpc/Web3RskImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import co.rsk.config.RskSystemProperties;
import co.rsk.core.NetworkStateExporter;
import co.rsk.logfilter.BlocksBloomStore;
import co.rsk.metrics.HashRateCalculator;
import co.rsk.mine.*;
import co.rsk.net.BlockProcessor;
Expand Down Expand Up @@ -83,11 +84,12 @@ public Web3RskImpl(Ethereum eth,
BlockProcessor nodeBlockProcessor,
HashRateCalculator hashRateCalculator,
ConfigCapabilities configCapabilities,
BuildInfo buildInfo) {
BuildInfo buildInfo,
BlocksBloomStore blocksBloomStore) {
super(eth, blockchain, transactionPool, blockStore, receiptStore, properties, minerClient, minerServer,
personalModule, ethModule, evmModule, txPoolModule, mnrModule, debugModule,
channelManager, repository, peerScoringManager, peerServer, nodeBlockProcessor,
hashRateCalculator, configCapabilities, buildInfo);
hashRateCalculator, configCapabilities, buildInfo, blocksBloomStore);

this.networkStateExporter = networkStateExporter;
this.blockStore = blockStore;
Expand Down
4 changes: 4 additions & 0 deletions rskj-core/src/main/java/co/rsk/vm/BitSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,8 @@ public boolean get(int position) {
public int size() {
return this.size;
}

protected byte[] getBytes() {
return this.bytes;
}
}
3 changes: 2 additions & 1 deletion rskj-core/src/main/java/org/ethereum/core/Bloom.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@
*/

public class Bloom {
public static final int BLOOM_BYTES = 256;

static final int _8STEPS = 8;
static final int _3LOW_BITS = 7;
static final int ENSURE_BYTE = 255;

byte[] data = new byte[256];
byte[] data = new byte[BLOOM_BYTES];


public Bloom() {
Expand Down
51 changes: 45 additions & 6 deletions rskj-core/src/main/java/org/ethereum/rpc/LogFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.ethereum.rpc;

import co.rsk.core.RskAddress;
import co.rsk.logfilter.BlocksBloom;
import co.rsk.logfilter.BlocksBloomStore;
import org.ethereum.core.*;
import org.ethereum.db.TransactionInfo;
import org.ethereum.vm.LogInfo;
Expand Down Expand Up @@ -102,7 +104,7 @@ public void newPendingTx(Transaction tx) {
//empty method
}

public static LogFilter fromFilterRequest(Web3.FilterRequest fr, Blockchain blockchain) throws Exception {
public static LogFilter fromFilterRequest(Web3.FilterRequest fr, Blockchain blockchain, BlocksBloomStore blocksBloomStore) throws Exception {
RskAddress[] addresses;

// Now, there is an array of array of topics
Expand Down Expand Up @@ -175,12 +177,12 @@ public static LogFilter fromFilterRequest(Web3.FilterRequest fr, Blockchain bloc

LogFilter filter = new LogFilter(addressesTopicsFilter, blockchain, fromLatestBlock, toLatestBlock);

retrieveHistoricalData(fr, blockchain, filter);
retrieveHistoricalData(fr, blockchain, filter, blocksBloomStore);

return filter;
}

private static void retrieveHistoricalData(Web3.FilterRequest fr, Blockchain blockchain, LogFilter filter) throws Exception {
private static void retrieveHistoricalData(Web3.FilterRequest fr, Blockchain blockchain, LogFilter filter, BlocksBloomStore blocksBloomStore) throws Exception {
Block blockFrom = isBlockWord(fr.fromBlock) ? null : Web3Impl.getBlockByNumberOrStr(fr.fromBlock, blockchain);
Block blockTo = isBlockWord(fr.toBlock) ? null : Web3Impl.getBlockByNumberOrStr(fr.toBlock, blockchain);

Expand All @@ -192,15 +194,52 @@ private static void retrieveHistoricalData(Web3.FilterRequest fr, Blockchain blo
// need to add historical data
blockTo = blockTo == null ? blockchain.getBestBlock() : blockTo;

for (long blockNum = blockFrom.getNumber(); blockNum <= blockTo.getNumber(); blockNum++) {
filter.onBlock(blockchain.getBlockByNumber(blockNum));
}
processBlocks(blockFrom.getNumber(), blockTo.getNumber(), filter, blockchain, blocksBloomStore);
}
else if ("latest".equalsIgnoreCase(fr.fromBlock)) {
filter.onBlock(blockchain.getBestBlock());
}
}

private static void processBlocks(long fromBlockNumber, long toBlockNumber, LogFilter filter, Blockchain blockchain, BlocksBloomStore blocksBloomStore) {
BlocksBloom auxiliaryBlocksBloom = null;
long bestBlockNumber = blockchain.getBestBlock().getNumber();

for (long blockNum = fromBlockNumber; blockNum <= toBlockNumber; blockNum++) {
boolean isConfirmedBlock = blockNum <= bestBlockNumber - blocksBloomStore.getNoConfirmations();

if (isConfirmedBlock) {
if (blocksBloomStore.firstNumberInRange(blockNum) == blockNum) {
if (blocksBloomStore.hasBlockNumber(blockNum)) {
BlocksBloom blocksBloom = blocksBloomStore.getBlocksBloomByNumber(blockNum);

if (!filter.addressesTopicsFilter.matchBloom(blocksBloom.getBloom())) {
blockNum = blocksBloomStore.lastNumberInRange(blockNum);
continue;
}
}

auxiliaryBlocksBloom = new BlocksBloom();
}

Block block = blockchain.getBlockByNumber(blockNum);

if (auxiliaryBlocksBloom != null) {
auxiliaryBlocksBloom.addBlockBloom(blockNum, new Bloom(block.getLogBloom()));
}

if (auxiliaryBlocksBloom != null && blocksBloomStore.lastNumberInRange(blockNum) == blockNum) {
blocksBloomStore.setBlocksBloom(auxiliaryBlocksBloom);
}

filter.onBlock(block);
}
else {
filter.onBlock(blockchain.getBlockByNumber(blockNum));
}
}
}

private static boolean isBlockWord(String id) {
return "latest".equalsIgnoreCase(id) || "pending".equalsIgnoreCase(id) || "earliest".equalsIgnoreCase(id);
}
Expand Down
9 changes: 7 additions & 2 deletions rskj-core/src/main/java/org/ethereum/rpc/Web3Impl.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import co.rsk.core.RskAddress;
import co.rsk.core.bc.AccountInformationProvider;
import co.rsk.crypto.Keccak256;
import co.rsk.logfilter.BlocksBloomStore;
import co.rsk.metrics.HashRateCalculator;
import co.rsk.mine.MinerClient;
import co.rsk.mine.MinerServer;
Expand Down Expand Up @@ -97,6 +98,8 @@ public class Web3Impl implements Web3 {
private final FilterManager filterManager;
private final BuildInfo buildInfo;

private final BlocksBloomStore blocksBloomStore;

private final PersonalModule personalModule;
private final EthModule ethModule;
private final EvmModule evmModule;
Expand Down Expand Up @@ -126,7 +129,8 @@ protected Web3Impl(
BlockProcessor nodeBlockProcessor,
HashRateCalculator hashRateCalculator,
ConfigCapabilities configCapabilities,
BuildInfo buildInfo) {
BuildInfo buildInfo,
BlocksBloomStore blocksBloomStore) {
this.eth = eth;
this.blockchain = blockchain;
this.blockStore = blockStore;
Expand All @@ -150,6 +154,7 @@ protected Web3Impl(
this.config = config;
filterManager = new FilterManager(eth);
this.buildInfo = buildInfo;
this.blocksBloomStore = blocksBloomStore;
initialBlockNumber = this.blockchain.getBestBlock().getNumber();

personalModule.init(this.config);
Expand Down Expand Up @@ -841,7 +846,7 @@ public String eth_newFilter(FilterRequest fr) throws Exception {
String str = null;

try {
Filter filter = LogFilter.fromFilterRequest(fr, blockchain);
Filter filter = LogFilter.fromFilterRequest(fr, blockchain, blocksBloomStore);
int id = filterManager.registerFilter(filter);

return str = toJsonHex(id);
Expand Down
2 changes: 1 addition & 1 deletion rskj-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -312,4 +312,4 @@ cache {
# precaution we'll set the limit to 100K entries.
max-elements: 100000
}
}
}
Loading

0 comments on commit 20e770a

Please sign in to comment.