Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[#3664] Refactor bulk incrementer to clear it's keys after taking sna…
…pshots
  • Loading branch information
Xylus authored and emeroad committed Jan 26, 2018
1 parent 6f96aa9 commit 8267cf1
Show file tree
Hide file tree
Showing 11 changed files with 657 additions and 573 deletions.
Expand Up @@ -34,7 +34,6 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -46,6 +45,7 @@
* @author netspider
* @author emeroad
* @author jaehong.kim
* @author HyunGil Jeong
*/
@Repository
public class HbaseMapResponseTimeDao implements MapResponseTimeDao {
Expand All @@ -65,17 +65,15 @@ public class HbaseMapResponseTimeDao implements MapResponseTimeDao {
private TimeSlot timeSlot;

@Autowired
@Qualifier("selfMerge")
private RowKeyMerge rowKeyMerge;
@Qualifier("selfBulkIncrementer")
private BulkIncrementer bulkIncrementer;

@Autowired
@Qualifier("statisticsSelfRowKeyDistributor")
private RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix;

private final boolean useBulk;

private final BulkCounter bulkCounter = new BulkCounter();

public HbaseMapResponseTimeDao() {
this(true);
}
Expand Down Expand Up @@ -106,8 +104,7 @@ public void received(String applicationName, ServiceType applicationServiceType,
final ColumnName selfColumnName = new ResponseColumnName(agentId, slotNumber);
if (useBulk) {
TableName mapStatisticsSelfTableName = tableNameProvider.getTableName(MAP_STATISTICS_SELF_VER2_STR);
RowInfo rowInfo = new DefaultRowInfo(selfRowKey, selfColumnName);
bulkCounter.increment(mapStatisticsSelfTableName, rowInfo);
bulkIncrementer.increment(mapStatisticsSelfTableName, selfRowKey, selfColumnName);
} else {
final byte[] rowKey = getDistributedKey(selfRowKey.getRowKey());
// column name is the name of caller app.
Expand All @@ -134,20 +131,7 @@ public void flushAll() {
throw new IllegalStateException("useBulk is " + useBulk);
}

// update statistics by rowkey and column for now. need to update it by rowkey later.
Map<TableName, List<Increment>> incrementMap = new HashMap<>();

Map<TableName, Map<RowInfo, Long>> tableCounterMap = bulkCounter.getAndReset();
for (Map.Entry<TableName, Map<RowInfo, Long>> e : tableCounterMap.entrySet()) {
final Map<RowInfo, Long> counters = e.getValue();

final List<Increment> mergedIncrements = rowKeyMerge.createBulkIncrement(counters, rowKeyDistributorByHashPrefix);
if (!mergedIncrements.isEmpty()) {
final TableName tableName = e.getKey();
incrementMap.put(tableName, mergedIncrements);
}
}

Map<TableName, List<Increment>> incrementMap = bulkIncrementer.getIncrements(rowKeyDistributorByHashPrefix);
for (Map.Entry<TableName, List<Increment>> e : incrementMap.entrySet()) {
TableName tableName = e.getKey();
List<Increment> increments = e.getValue();
Expand Down
Expand Up @@ -37,7 +37,6 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -46,6 +45,7 @@
*
* @author netspider
* @author emeroad
* @author HyunGil Jeong
*/
@Repository
public class HbaseMapStatisticsCalleeDao implements MapStatisticsCalleeDao {
Expand All @@ -65,17 +65,15 @@ public class HbaseMapStatisticsCalleeDao implements MapStatisticsCalleeDao {
private TimeSlot timeSlot;

@Autowired
@Qualifier("calleeMerge")
private RowKeyMerge rowKeyMerge;
@Qualifier("calleeBulkIncrementer")
private BulkIncrementer bulkIncrementer;

@Autowired
@Qualifier("statisticsCalleeRowKeyDistributor")
private RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix;

private final boolean useBulk;

private final BulkCounter bulkCounter = new BulkCounter();

public HbaseMapStatisticsCalleeDao() {
this(true);
}
Expand All @@ -84,7 +82,6 @@ public HbaseMapStatisticsCalleeDao(boolean useBulk) {
this.useBulk = useBulk;
}


@Override
public void update(String calleeApplicationName, ServiceType calleeServiceType, String callerApplicationName, ServiceType callerServiceType, String callerHost, int elapsed, boolean isError) {
if (callerApplicationName == null) {
Expand Down Expand Up @@ -112,8 +109,7 @@ public void update(String calleeApplicationName, ServiceType calleeServiceType,

if (useBulk) {
TableName mapStatisticsCallerTableName = tableNameProvider.getTableName(MAP_STATISTICS_CALLER_VER2_STR);
RowInfo rowInfo = new DefaultRowInfo(calleeRowKey, callerColumnName);
bulkCounter.increment(mapStatisticsCallerTableName, rowInfo);
bulkIncrementer.increment(mapStatisticsCallerTableName, calleeRowKey, callerColumnName);
} else {
final byte[] rowKey = getDistributedKey(calleeRowKey.getRowKey());

Expand All @@ -140,18 +136,7 @@ public void flushAll() {
throw new IllegalStateException();
}

Map<TableName, List<Increment>> incrementMap = new HashMap<>();

Map<TableName, Map<RowInfo, Long>> tableCounterMap = bulkCounter.getAndReset();
for (Map.Entry<TableName, Map<RowInfo, Long>> e : tableCounterMap.entrySet()) {
final Map<RowInfo, Long> counters = e.getValue();

final List<Increment> mergedIncrements = rowKeyMerge.createBulkIncrement(counters, rowKeyDistributorByHashPrefix);
if (!mergedIncrements.isEmpty()) {
final TableName tableName = e.getKey();
incrementMap.put(tableName, mergedIncrements);
}
}
Map<TableName, List<Increment>> incrementMap = bulkIncrementer.getIncrements(rowKeyDistributorByHashPrefix);

for (Map.Entry<TableName, List<Increment>> e : incrementMap.entrySet()) {
TableName tableName = e.getKey();
Expand Down
Expand Up @@ -37,7 +37,6 @@
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Repository;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -46,6 +45,7 @@
*
* @author netspider
* @author emeroad
* @author HyunGil Jeong
*/
@Repository
public class HbaseMapStatisticsCallerDao implements MapStatisticsCallerDao {
Expand All @@ -62,20 +62,18 @@ public class HbaseMapStatisticsCallerDao implements MapStatisticsCallerDao {
private AcceptedTimeService acceptedTimeService;

@Autowired
@Qualifier("callerMerge")
private RowKeyMerge rowKeyMerge;
private TimeSlot timeSlot;

@Autowired
@Qualifier("statisticsCallerRowKeyDistributor")
private RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix;
@Qualifier("callerBulkIncrementer")
private BulkIncrementer bulkIncrementer;

@Autowired
private TimeSlot timeSlot;
@Qualifier("statisticsCallerRowKeyDistributor")
private RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix;

private final boolean useBulk;

private final BulkCounter bulkCounter = new BulkCounter();

public HbaseMapStatisticsCallerDao() {
this(true);
}
Expand Down Expand Up @@ -110,8 +108,7 @@ public void update(String callerApplicationName, ServiceType callerServiceType,
final ColumnName calleeColumnName = new CalleeColumnName(callerAgentid, calleeServiceType.getCode(), calleeApplicationName, calleeHost, calleeSlotNumber);
if (useBulk) {
TableName mapStatisticsCalleeTableName = tableNameProvider.getTableName(MAP_STATISTICS_CALLEE_VER2_STR);
RowInfo rowInfo = new DefaultRowInfo(callerRowKey, calleeColumnName);
bulkCounter.increment(mapStatisticsCalleeTableName, rowInfo);
bulkIncrementer.increment(mapStatisticsCalleeTableName, callerRowKey, calleeColumnName);
} else {
final byte[] rowKey = getDistributedKey(callerRowKey.getRowKey());
// column name is the name of caller app.
Expand All @@ -137,18 +134,7 @@ public void flushAll() {
throw new IllegalStateException();
}
// update statistics by rowkey and column for now. need to update it by rowkey later.
Map<TableName, List<Increment>> incrementMap = new HashMap<>();

Map<TableName, Map<RowInfo, Long>> tableCounterMap = bulkCounter.getAndReset();
for (Map.Entry<TableName, Map<RowInfo, Long>> e : tableCounterMap.entrySet()) {
final Map<RowInfo, Long> counters = e.getValue();

final List<Increment> mergedIncrements = rowKeyMerge.createBulkIncrement(counters, rowKeyDistributorByHashPrefix);
if (!mergedIncrements.isEmpty()) {
final TableName tableName = e.getKey();
incrementMap.put(tableName, mergedIncrements);
}
}
Map<TableName, List<Increment>> incrementMap = bulkIncrementer.getIncrements(rowKeyDistributorByHashPrefix);

for (Map.Entry<TableName, List<Increment>> e : incrementMap.entrySet()) {
TableName tableName = e.getKey();
Expand Down

This file was deleted.

@@ -0,0 +1,51 @@
/*
* Copyright 2018 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.collector.dao.hbase.statistics;

import com.google.common.util.concurrent.AtomicLongMap;
import com.navercorp.pinpoint.collector.util.AtomicLongMapUtils;
import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Increment;

import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* @author HyunGil Jeong
*/
public class BulkIncrementer {

private final RowKeyMerge rowKeyMerge;

private final AtomicLongMap<RowInfo> counter = AtomicLongMap.create();

public BulkIncrementer(RowKeyMerge rowKeyMerge) {
this.rowKeyMerge = Objects.requireNonNull(rowKeyMerge, "rowKeyMerge must not be null");
}

public void increment(TableName tableName, RowKey rowKey, ColumnName columnName) {
RowInfo rowInfo = new DefaultRowInfo(tableName, rowKey, columnName);
counter.incrementAndGet(rowInfo);
}

public Map<TableName, List<Increment>> getIncrements(RowKeyDistributorByHashPrefix rowKeyDistributor) {
final Map<RowInfo, Long> snapshot = AtomicLongMapUtils.remove(counter);
return rowKeyMerge.createBulkIncrement(snapshot, rowKeyDistributor);
}
}
Expand Up @@ -16,30 +16,37 @@

package com.navercorp.pinpoint.collector.dao.hbase.statistics;

import org.apache.hadoop.hbase.TableName;

import java.util.Objects;

/**
* @author emeroad
* @author HyunGil Jeong
*/
public class DefaultRowInfo implements RowInfo {

private RowKey rowKey;
private ColumnName columnName;
private final TableName tableName;
private final RowKey rowKey;
private final ColumnName columnName;

public DefaultRowInfo(RowKey rowKey, ColumnName columnName) {
if (rowKey == null) {
throw new NullPointerException("rowKey must not be null");
}
if (columnName == null) {
throw new NullPointerException("columnName must not be null");
}
public DefaultRowInfo(TableName tableName, RowKey rowKey, ColumnName columnName) {
this.tableName = Objects.requireNonNull(tableName, "tableName must not be null");
this.rowKey = Objects.requireNonNull(rowKey, "rowKey must not be null");
this.columnName = Objects.requireNonNull(columnName, "columnName must not be null");
}

this.rowKey = rowKey;
this.columnName = columnName;
@Override
public TableName getTableName() {
return tableName;
}

@Override
public RowKey getRowKey() {
return rowKey;
}

@Override
public ColumnName getColumnName() {
return columnName;
}
Expand All @@ -51,15 +58,15 @@ public boolean equals(Object o) {

DefaultRowInfo that = (DefaultRowInfo) o;

if (!columnName.equals(that.columnName)) return false;
if (!tableName.equals(that.tableName)) return false;
if (!rowKey.equals(that.rowKey)) return false;

return true;
return columnName.equals(that.columnName);
}

@Override
public int hashCode() {
int result = rowKey.hashCode();
int result = tableName.hashCode();
result = 31 * result + rowKey.hashCode();
result = 31 * result + columnName.hashCode();
return result;
}
Expand Down

0 comments on commit 8267cf1

Please sign in to comment.