Skip to content

Commit

Permalink
[#noissue] fix testcase
Browse files Browse the repository at this point in the history
fix testcase
  • Loading branch information
koo-taejin committed Jun 20, 2017
1 parent e8899eb commit 5e6837d
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 79 deletions.
Expand Up @@ -28,6 +28,7 @@
import com.navercorp.pinpoint.rpc.packet.ResponsePacket;
import com.navercorp.pinpoint.rpc.packet.SendPacket;
import com.navercorp.pinpoint.rpc.util.ControlMessageEncodingUtils;
import com.navercorp.pinpoint.rpc.util.IOUtils;
import com.navercorp.pinpoint.rpc.util.MapUtils;
import com.navercorp.pinpoint.rpc.util.PinpointRPCTestUtils;
import org.jboss.netty.buffer.ChannelBuffer;
Expand Down Expand Up @@ -174,25 +175,20 @@ private void sendRegisterPacket(OutputStream outputStream, Map<String, Object> p
ControlHandshakePacket packet = new ControlHandshakePacket(1, payload);

ByteBuffer bb = packet.toBuffer().toByteBuffer(0, packet.toBuffer().writerIndex());
sendData(outputStream, bb.array());
IOUtils.write(outputStream, bb.array());
}

private void sendSimpleRequestPacket(OutputStream outputStream) throws ProtocolException, IOException {
RequestPacket packet = new RequestPacket(new byte[0]);
packet.setRequestId(10);

ByteBuffer bb = packet.toBuffer().toByteBuffer(0, packet.toBuffer().writerIndex());
sendData(outputStream, bb.array());
}

private void sendData(OutputStream outputStream, byte[] payload) throws IOException {
outputStream.write(payload);
outputStream.flush();
IOUtils.write(outputStream, bb.array());
}

private ControlHandshakeResponsePacket receiveRegisterConfirmPacket(InputStream inputStream) throws ProtocolException, IOException {

byte[] payload = readData(inputStream);
byte[] payload = IOUtils.read(inputStream, 50, 3000);
ChannelBuffer cb = ChannelBuffers.wrappedBuffer(payload);

short packetType = cb.readShort();
Expand All @@ -202,7 +198,7 @@ private ControlHandshakeResponsePacket receiveRegisterConfirmPacket(InputStream
}

private ResponsePacket readSimpleResponsePacket(InputStream inputStream) throws ProtocolException, IOException {
byte[] payload = readData(inputStream);
byte[] payload = IOUtils.read(inputStream, 50, 3000);
ChannelBuffer cb = ChannelBuffers.wrappedBuffer(payload);

short packetType = cb.readShort();
Expand All @@ -211,30 +207,6 @@ private ResponsePacket readSimpleResponsePacket(InputStream inputStream) throws
return packet;
}

private byte[] readData(InputStream inputStream) throws IOException {
int availableSize = 0;

for (int i = 0; i < 3; i++) {
availableSize = inputStream.available();

if (availableSize > 0) {
break;
}

try {
Thread.sleep(50);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

byte[] payload = new byte[availableSize];
inputStream.read(payload);

return payload;
}

class SimpleListener implements ServerMessageListener {

@Override
Expand Down
Expand Up @@ -18,18 +18,20 @@

import com.navercorp.pinpoint.rpc.common.SocketStateCode;
import com.navercorp.pinpoint.rpc.control.ProtocolException;
import com.navercorp.pinpoint.rpc.packet.*;
import com.navercorp.pinpoint.rpc.packet.ControlHandshakePacket;
import com.navercorp.pinpoint.rpc.packet.ControlHandshakeResponsePacket;
import com.navercorp.pinpoint.rpc.packet.RequestPacket;
import com.navercorp.pinpoint.rpc.packet.ResponsePacket;
import com.navercorp.pinpoint.rpc.server.handler.ServerStateChangeEventHandler;
import com.navercorp.pinpoint.rpc.util.ControlMessageEncodingUtils;
import com.navercorp.pinpoint.rpc.util.IOUtils;
import com.navercorp.pinpoint.rpc.util.MapUtils;
import com.navercorp.pinpoint.rpc.util.PinpointRPCTestUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.SocketUtils;

import java.io.IOException;
Expand All @@ -44,10 +46,8 @@
*/
public class EventHandlerTest {

private final Logger logger = LoggerFactory.getLogger(this.getClass());

private static int bindPort;

@BeforeClass
public static void setUp() throws IOException {
bindPort = SocketUtils.findAvailableTcpPort();
Expand Down Expand Up @@ -77,11 +77,11 @@ public void registerAgentSuccessTest() throws Exception {
if (socket != null) {
socket.close();
}

PinpointRPCTestUtils.close(serverAcceptor);
}
}

@Test
public void registerAgentFailTest() throws Exception {
ThrowExceptionEventHandler eventHandler = new ThrowExceptionEventHandler();
Expand All @@ -95,13 +95,13 @@ public void registerAgentFailTest() throws Exception {
try {
socket = new Socket("127.0.0.1", bindPort);
sendAndReceiveSimplePacket(socket);

Assert.assertTrue(eventHandler.getErrorCount() > 0);
} finally {
if (socket != null) {
socket.close();
}

PinpointRPCTestUtils.close(serverAcceptor);
}
}
Expand All @@ -125,25 +125,20 @@ private void sendRegisterPacket(OutputStream outputStream, Map<String, Object> p
ControlHandshakePacket packet = new ControlHandshakePacket(1, payload);

ByteBuffer bb = packet.toBuffer().toByteBuffer(0, packet.toBuffer().writerIndex());
sendData(outputStream, bb.array());
IOUtils.write(outputStream, bb.array());
}

private void sendSimpleRequestPacket(OutputStream outputStream) throws ProtocolException, IOException {
RequestPacket packet = new RequestPacket(new byte[0]);
packet.setRequestId(10);

ByteBuffer bb = packet.toBuffer().toByteBuffer(0, packet.toBuffer().writerIndex());
sendData(outputStream, bb.array());
}

private void sendData(OutputStream outputStream, byte[] payload) throws IOException {
outputStream.write(payload);
outputStream.flush();
IOUtils.write(outputStream, bb.array());
}

private ControlHandshakeResponsePacket receiveRegisterConfirmPacket(InputStream inputStream) throws ProtocolException, IOException {

byte[] payload = readData(inputStream);
byte[] payload = IOUtils.read(inputStream, 50, 3000);
ChannelBuffer cb = ChannelBuffers.wrappedBuffer(payload);

short packetType = cb.readShort();
Expand All @@ -153,7 +148,7 @@ private ControlHandshakeResponsePacket receiveRegisterConfirmPacket(InputStream
}

private ResponsePacket readSimpleResponsePacket(InputStream inputStream) throws ProtocolException, IOException {
byte[] payload = readData(inputStream);
byte[] payload = IOUtils.read(inputStream, 50, 3000);
ChannelBuffer cb = ChannelBuffers.wrappedBuffer(payload);

short packetType = cb.readShort();
Expand All @@ -162,30 +157,6 @@ private ResponsePacket readSimpleResponsePacket(InputStream inputStream) throws
return packet;
}

private byte[] readData(InputStream inputStream) throws IOException {
int availableSize = 0;

for (int i = 0; i < 3; i++) {
availableSize = inputStream.available();

if (availableSize > 0) {
break;
}

try {
Thread.sleep(50);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

byte[] payload = new byte[availableSize];
inputStream.read(payload);

return payload;
}

class EventHandler implements ServerStateChangeEventHandler {

private SocketStateCode code;
Expand All @@ -194,7 +165,7 @@ class EventHandler implements ServerStateChangeEventHandler {
public void eventPerformed(PinpointServer pinpointServer, SocketStateCode stateCode) {
this.code = stateCode;
}

@Override
public void exceptionCaught(PinpointServer pinpointServer, SocketStateCode stateCode, Throwable e) {
}
Expand All @@ -203,11 +174,11 @@ public SocketStateCode getCode() {
return code;
}
}

class ThrowExceptionEventHandler implements ServerStateChangeEventHandler {

private int errorCount = 0;

@Override
public void eventPerformed(PinpointServer pinpointServer, SocketStateCode stateCode) throws Exception {
throw new Exception("always error.");
Expand Down
61 changes: 61 additions & 0 deletions rpc/src/test/java/com/navercorp/pinpoint/rpc/util/IOUtils.java
@@ -0,0 +1,61 @@
/*
* Copyright 2017 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.navercorp.pinpoint.rpc.util;

import com.navercorp.pinpoint.rpc.TestAwaitTaskUtils;
import com.navercorp.pinpoint.rpc.TestAwaitUtils;
import org.junit.Assert;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
* @author Taejin Koo
*/
public final class IOUtils {

public static byte[] read(final InputStream inputStream, long waitUnitTime, long maxWaitTime) throws IOException {
boolean isReceived = TestAwaitUtils.await(new TestAwaitTaskUtils() {
@Override
public boolean checkCompleted() {
try {
int availableSize = inputStream.available();
return availableSize > 0;
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
}, 100, 1000);

if (!isReceived) {
Assert.fail("no available data");
}

int availableSize = inputStream.available();
byte[] payload = new byte[availableSize];
inputStream.read(payload);
return payload;
}

public static void write(OutputStream outputStream, byte[] payload) throws IOException {
outputStream.write(payload);
outputStream.flush();
}

}

0 comments on commit 5e6837d

Please sign in to comment.