Skip to content

Commit

Permalink
推送压测增加统计,
Browse files Browse the repository at this point in the history
  • Loading branch information
夜色 committed Dec 21, 2016
1 parent da8f562 commit 44f7289
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 24 deletions.
21 changes: 14 additions & 7 deletions mpush-core/src/main/java/com/mpush/core/push/FastFlowControl.java
Expand Up @@ -19,6 +19,8 @@

package com.mpush.core.push;

import java.util.concurrent.TimeUnit;

/**
* Created by ohun on 16/10/24.
*
Expand All @@ -27,15 +29,16 @@
public final class FastFlowControl implements FlowControl {
private final int limit;
private final int maxLimit;
private final int duration;
private final long duration;
private final long start0 = System.nanoTime();
private int count;
private int total;
private long start;

public FastFlowControl(int limit, int maxLimit, int duration) {
this.limit = limit;
this.maxLimit = maxLimit;
this.duration = duration;
this.duration = TimeUnit.MILLISECONDS.toNanos(duration);
}

public FastFlowControl(int limit) {
Expand All @@ -47,7 +50,7 @@ public FastFlowControl(int limit) {
@Override
public void reset() {
count = 0;
start = System.currentTimeMillis();
start = System.nanoTime();
}

@Override
Expand All @@ -65,7 +68,7 @@ public boolean checkQps() {

if (total > maxLimit) throw new OverFlowException(true);

if (System.currentTimeMillis() - start > duration) {
if (System.nanoTime() - start > duration) {
reset();
total++;
return true;
Expand All @@ -74,13 +77,17 @@ public boolean checkQps() {
}

@Override
public int getRemaining() {
return duration - (int) (System.currentTimeMillis() - start);
public long getRemaining() {
return duration - (System.nanoTime() - start);
}

@Override
public String report() {
return "total:d%, count:%d, qps:d%";
return String.format("total:%d, count:%d, qps:%d", total, count, qps());
}

@Override
public int qps() {
return (int) (TimeUnit.SECONDS.toNanos(total) / (System.nanoTime() - start0));
}
}
Expand Up @@ -34,7 +34,9 @@ public interface FlowControl {

default void end(){};

int getRemaining();
long getRemaining();

int qps();

String report();

Expand Down
Expand Up @@ -19,6 +19,7 @@

package com.mpush.core.push;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -30,21 +31,22 @@
public final class GlobalFlowControl implements FlowControl {
private final int limit;
private final int maxLimit;
private final int duration;
private final long duration;
private final AtomicInteger count = new AtomicInteger();
private final AtomicInteger total = new AtomicInteger();
private final long start0 = System.nanoTime();
private volatile long start;

public GlobalFlowControl(int limit, int maxLimit, int duration) {
this.limit = limit;
this.maxLimit = maxLimit;
this.duration = duration;
this.duration = TimeUnit.MILLISECONDS.toNanos(duration);
}

@Override
public void reset() {
count.set(0);
start = System.currentTimeMillis();
start = System.nanoTime();
}

@Override
Expand All @@ -61,7 +63,7 @@ public boolean checkQps() {

if (maxLimit > 0 && total.get() > maxLimit) throw new OverFlowException(true);

if (System.currentTimeMillis() - start > duration) {
if (System.nanoTime() - start > duration) {
reset();
total.incrementAndGet();
return true;
Expand All @@ -70,12 +72,17 @@ public boolean checkQps() {
}

@Override
public int getRemaining() {
return duration - (int) (System.currentTimeMillis() - start);
public long getRemaining() {
return duration - (System.nanoTime() - start);
}

@Override
public int qps() {
return (int) (TimeUnit.SECONDS.toNanos(total.get()) / (System.nanoTime() - start0));
}

@Override
public String report() {
return "total:d%, count:%d, qps:d%";
return String.format("total:%d, count:%d, qps:%d", total.get(), count.get(), qps());
}
}
4 changes: 2 additions & 2 deletions mpush-core/src/main/java/com/mpush/core/push/PushCenter.java
Expand Up @@ -54,9 +54,9 @@ public void addTask(PushTask task) {
logger.debug("add new task to push center, count={}, task={}", count, task);
}

public void delayTask(int delay, PushTask task) {
public void delayTask(long delay, PushTask task) {
long count = taskNum.incrementAndGet();
executor.schedule(task, delay, TimeUnit.MILLISECONDS);
executor.schedule(task, delay, TimeUnit.NANOSECONDS);
logger.debug("delay task to push center, count={}, task={}", count, task);
}

Expand Down
21 changes: 14 additions & 7 deletions mpush-core/src/main/java/com/mpush/core/push/RedisFlowControl.java
Expand Up @@ -22,6 +22,8 @@
import com.mpush.api.push.BroadcastController;
import com.mpush.common.push.RedisBroadcastController;

import java.util.concurrent.TimeUnit;

/**
* Created by ohun on 16/10/25.
*
Expand All @@ -30,8 +32,8 @@
public final class RedisFlowControl implements FlowControl {

private final BroadcastController controller;

private final int duration = 1000;
private final long start0 = System.nanoTime();
private final long duration = TimeUnit.SECONDS.toNanos(1);
private final int maxLimit;
private int limit;
private int count;
Expand All @@ -47,7 +49,7 @@ public RedisFlowControl(String taskId, int maxLimit) {
@Override
public void reset() {
count = 0;
start = System.currentTimeMillis();
start = System.nanoTime();
}

@Override
Expand All @@ -67,7 +69,7 @@ public boolean checkQps() throws OverFlowException {
throw new OverFlowException(true);
}

if (System.currentTimeMillis() - start > duration) {
if (System.nanoTime() - start > duration) {
reset();
total++;
return true;
Expand All @@ -91,12 +93,17 @@ public void end() {
}

@Override
public int getRemaining() {
return duration - (int) (System.currentTimeMillis() - start);
public long getRemaining() {
return duration - (int) (System.nanoTime() - start);
}

@Override
public String report() {
return "";
return String.format("total:%d, count:%d, qps:%d", total, count, qps());
}

@Override
public int qps() {
return (int) (TimeUnit.SECONDS.toNanos(total) / (System.nanoTime() - start0));
}
}
131 changes: 131 additions & 0 deletions mpush-test/src/main/java/com/mpush/test/push/PushClientTestMain2.java
@@ -0,0 +1,131 @@
/*
* (C) Copyright 2015-2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* ohun@live.cn (夜色)
*/

package com.mpush.test.push;

import com.mpush.api.push.*;
import com.mpush.core.push.FlowControl;
import com.mpush.core.push.GlobalFlowControl;
import com.mpush.core.push.OverFlowException;
import com.mpush.tools.log.Logs;
import org.junit.Test;

import java.time.LocalTime;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;

/**
* Created by ohun on 2016/1/7.
*
* @author ohun@live.cn
*/
public class PushClientTestMain2 {

public static void main(String[] args) throws Exception {
new PushClientTestMain2().testPush();
}

@Test
public void testPush() throws Exception {
Logs.init();
PushSender sender = PushSender.create();
sender.start().join();
Thread.sleep(1000);
Statistics statistics = new Statistics();
FlowControl flowControl = new GlobalFlowControl(3000, Integer.MAX_VALUE, 1000);// qps=1000

ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
System.out.println("time=" + LocalTime.now()
+ ", flowControl=" + flowControl.report()
+ ", statistics=" + statistics
);
}, 1, 1, TimeUnit.SECONDS);

for (int k = 0; k < 100; k++) {
for (int i = 0; i < 1000; i++) {
service.execute(new PushTask(sender, i, service, flowControl, statistics));
}
}

LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(30000));
}

private static class PushTask implements Runnable {
PushSender sender;
private int i;
FlowControl flowControl;
Statistics statistics;
ScheduledExecutorService executor;

public PushTask(PushSender sender, int i, ScheduledExecutorService executor, FlowControl flowControl, Statistics statistics) {
this.sender = sender;
this.i = i;
this.flowControl = flowControl;
this.executor = executor;
this.statistics = statistics;
}

@Override
public void run() {
if (flowControl.checkQps()) {
PushMsg msg = PushMsg.build(MsgType.MESSAGE, "this a first push.");
msg.setMsgId("msgId_" + i);

PushContext context = PushContext.build(msg)
.setAckModel(AckModel.NO_ACK)
.setUserId("user-" + i)
.setBroadcast(false)
.setTimeout(60000)
.setCallback(new PushCallback() {
@Override
public void onResult(PushResult result) {
statistics.add(result.resultCode);
}
});
FutureTask<PushResult> future = sender.send(context);
} else {
executor.schedule(this, flowControl.getRemaining(), TimeUnit.NANOSECONDS);
}
}
}

private static class Statistics {
final AtomicInteger successNum = new AtomicInteger();
final AtomicInteger failureNum = new AtomicInteger();
final AtomicInteger offlineNum = new AtomicInteger();
final AtomicInteger timeoutNum = new AtomicInteger();
AtomicInteger[] counters = new AtomicInteger[]{successNum, failureNum, offlineNum, timeoutNum};

private void add(int code) {
counters[code - 1].incrementAndGet();
}

@Override
public String toString() {
return "{" +
"successNum=" + successNum +
", offlineNum=" + offlineNum +
", timeoutNum=" + timeoutNum +
", failureNum=" + failureNum +
'}';
}
}
}

0 comments on commit 44f7289

Please sign in to comment.