Skip to content

Commit

Permalink
Issue #2014 - Unix Socket Client (#2025)
Browse files Browse the repository at this point in the history
There are still problems with this impl (some client tests ignored) and there is still a work around for the JNR bug 50, however this impl is already much better than the unix socket support that is already in the release.  So will merge for now and put more effort in once there is a JNR fix.

* WIP add unix domain sockets support in HttpClient
* move unix socket client part to unix socket module #2014
* some cleanup #2014
* add missing headers #2014
* add TODO
* UnixSocket client refactor
* cleanup test and pom
* minor changes, use LOG.isDebugEnabled() before using debug method
* add UNIX SOCKET http client test with all other tests, push this to see what happen on Jenkins
* fix some unit tests
* fix more tests
* fix load test
* UnixSocket client
* Demonstrate JNR bug
* Worked around JNR bug 50
* close channel on client side as well
* more details in log
* log file path as well
* #2014 disable test per default as doesn't work on some environement
* Revert "#2014 disable test per default as doesn't work on some environement"
* test only on unix
* Allow test of specific transport(s)
* Move unix socket to /tmp
* move test socket to /tmp
* move test socket to /tmp
* ignore failing tests for now
* fix bean name and possible to use sys prop org.eclipse.jetty.http.client.AbstractTest.Transports with mvn cli
* test isBlank as surefire props is not null
* correctly create tmp file with @before
* do not delete file
* use /tmp as build directory doesn't seem to work within docker...
* do not delete sock file on client as it is own by the server
* file must not exist when binding unix socket
* #2014 fix license header
* network specific tests assumed
* Fixed to handle null selector keys
* add assume for tests that assume a network connector

Signed-off-by: olivier lamy <olamy@webtide.com>
Signed-off-by: Greg Wilkins <gregw@webtide.com>
  • Loading branch information
gregw committed Jan 13, 2018
1 parent 0b3a276 commit f4e37b1
Show file tree
Hide file tree
Showing 21 changed files with 825 additions and 87 deletions.
1 change: 0 additions & 1 deletion jetty-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@
<version>${project.version}</version>
<optional>true</optional>
</dependency>

<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,8 @@ public void update(Selector selector)
list.add(selector + " keys=" + selector_keys.size());
for (SelectionKey key : selector_keys)
{
if (key==null)
continue;
try
{
list.add(String.format("SelectionKey@%x{i=%d}->%s", key.hashCode(), key.interestOps(), key.attachment()));
Expand Down Expand Up @@ -757,7 +759,7 @@ public void update(Selector selector)
boolean zero = true;
for (SelectionKey key : selector.keys())
{
if (key.isValid())
if (key!=null && key.isValid())
{
Closeable closeable = null;
Object attachment = key.attachment();
Expand Down Expand Up @@ -803,7 +805,7 @@ public void update(Selector selector)
{
for (SelectionKey key : selector.keys())
{
if (key.isValid())
if (key!=null && key.isValid())
{
Object attachment = key.attachment();
if (attachment instanceof EndPoint)
Expand Down
7 changes: 6 additions & 1 deletion jetty-unixsocket/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,12 @@
<dependency>
<groupId>com.github.jnr</groupId>
<artifactId>jnr-unixsocket</artifactId>
<version>0.18</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -55,7 +57,7 @@
/**
*
*/
@ManagedObject("HTTP connector using NIO ByteChannels and Selectors")
@ManagedObject("Connector using UNIX Socket")
public class UnixSocketConnector extends AbstractConnector
{
private static final Logger LOG = Log.getLogger(UnixSocketConnector.class);
Expand Down Expand Up @@ -246,10 +248,18 @@ public void open() throws IOException
UnixServerSocketChannel serverChannel = UnixServerSocketChannel.open();

serverChannel.configureBlocking(getAcceptors()>0);
serverChannel.socket().bind(bindAddress, getAcceptQueueSize());
try
{
serverChannel.socket().bind(bindAddress, getAcceptQueueSize());
}
catch (IOException e)
{
LOG.warn("cannot bind {} exists={} writable={}", file, file.exists(), file.canWrite());
throw e;
}
addBean(serverChannel);

LOG.debug("opened {}",serverChannel);
if (LOG.isDebugEnabled())
LOG.debug("opened {}",serverChannel);
_acceptChannel = serverChannel;
}
}
Expand Down Expand Up @@ -283,7 +293,14 @@ public void close()
}
}

new File(_unixSocket).delete();
try
{
Files.deleteIfExists(Paths.get(_unixSocket));
}
catch ( IOException e )
{
LOG.warn(e);
}
}
}

Expand Down Expand Up @@ -430,9 +447,11 @@ protected boolean isConnectionPending(SelectableChannel channel)
@Override
protected SelectableChannel doAccept(SelectableChannel server) throws IOException
{
LOG.debug("doAccept async {}",server);
if (LOG.isDebugEnabled())
LOG.debug("doAccept async {}",server);
UnixSocketChannel channel = ((UnixServerSocketChannel)server).accept();
LOG.debug("accepted async {}",channel);
if (LOG.isDebugEnabled())
LOG.debug("accepted async {}",channel);
return channel;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;

import org.eclipse.jetty.io.ChannelEndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
Expand All @@ -32,8 +35,9 @@

public class UnixSocketEndPoint extends ChannelEndPoint
{
public final static InetSocketAddress NOIP=new InetSocketAddress(0);
private static final Logger LOG = Log.getLogger(UnixSocketEndPoint.class);
private static final Logger CEPLOG = Log.getLogger(ChannelEndPoint.class);


private final UnixSocketChannel _channel;

Expand Down Expand Up @@ -71,4 +75,50 @@ protected void doShutdownOutput()
LOG.debug(e);
}
}


@Override
public boolean flush(ByteBuffer... buffers) throws IOException
{
// TODO this is a work around for https://github.com/jnr/jnr-unixsocket/issues/50
long flushed=0;
try
{
for (ByteBuffer b : buffers)
{
if (b.hasRemaining())
{
int r=b.remaining();
int p=b.position();
int l=_channel.write(b);
if (l>=0)
{
b.position(p+l);
flushed+=l;
}

if (CEPLOG.isDebugEnabled())
CEPLOG.debug("flushed {}/{} r={} {}", l,r,b.remaining(), this);

if (b.hasRemaining())
break;
}
}

}
catch (IOException e)
{
throw new EofException(e);
}

if (flushed>0)
notIdle();

for (ByteBuffer b : buffers)
if (!BufferUtil.isEmpty(b))
return false;

return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
//
// ========================================================================
// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.unixsocket.client;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;

import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.unixsocket.UnixSocketEndPoint;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

import jnr.enxio.channels.NativeSelectorProvider;
import jnr.unixsocket.UnixSocketAddress;
import jnr.unixsocket.UnixSocketChannel;

public class HttpClientTransportOverUnixSockets
extends HttpClientTransportOverHTTP
{
private static final Logger LOG = Log.getLogger( HttpClientTransportOverUnixSockets.class );

private String _unixSocket;
private SelectorManager selectorManager;

private UnixSocketChannel channel;

public HttpClientTransportOverUnixSockets( String unixSocket )
{
if ( unixSocket == null )
{
throw new IllegalArgumentException( "Unix socket file cannot be null" );
}
this._unixSocket = unixSocket;
}

@Override
protected SelectorManager newSelectorManager(HttpClient client)
{
return selectorManager = new UnixSocketSelectorManager(client,getSelectors());
}

@Override
public void connect( InetSocketAddress address, Map<String, Object> context )
{

try
{
InetAddress inet = address.getAddress();
if (!inet.isLoopbackAddress() && !inet.isLinkLocalAddress() && !inet.isSiteLocalAddress())
throw new IOException("UnixSocket cannot connect to "+address.getHostString());

// Open a unix socket
UnixSocketAddress unixAddress = new UnixSocketAddress( this._unixSocket );
channel = UnixSocketChannel.open( unixAddress );

HttpDestination destination = (HttpDestination)context.get(HTTP_DESTINATION_CONTEXT_KEY);
HttpClient client = destination.getHttpClient();

configure(client, channel);

channel.configureBlocking(false);
selectorManager.accept(channel, context);
}
// Must catch all exceptions, since some like
// UnresolvedAddressException are not IOExceptions.
catch (Throwable x)
{
// If IPv6 is not deployed, a generic SocketException "Network is unreachable"
// exception is being thrown, so we attempt to provide a better error message.
if (x.getClass() == SocketException.class)
x = new SocketException("Could not connect to " + address).initCause(x);

try
{
if (channel != null)
channel.close();
}
catch (IOException xx)
{
LOG.ignore(xx);
}
finally
{
connectFailed(context, x);
}
}
}

public class UnixSocketSelectorManager extends ClientSelectorManager
{
protected UnixSocketSelectorManager(HttpClient client, int selectors)
{
super(client,selectors);
}

@Override
protected Selector newSelector() throws IOException
{
return NativeSelectorProvider.getInstance().openSelector();
}

@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey key)
{
UnixSocketEndPoint endp = new UnixSocketEndPoint((UnixSocketChannel)channel, selector, key, getScheduler());
endp.setIdleTimeout(getHttpClient().getIdleTimeout());
return endp;
}
}

@Override
protected void doStop()
throws Exception
{
super.doStop();
try
{
if (channel != null)
channel.close();
}
catch (IOException xx)
{
LOG.ignore(xx);
}
}
}
Loading

0 comments on commit f4e37b1

Please sign in to comment.