Improvements to Pig support #48

Open
wants to merge 7 commits into
from
+451 −59
Split
View
7 contrib/hadoop/src/java/voldemort/hadoop/VoldemortInputFormat.java
@@ -28,6 +28,7 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
import voldemort.VoldemortException;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
@@ -39,6 +40,8 @@
public class VoldemortInputFormat extends InputFormat<ByteArray, Versioned<byte[]>> {
+ private final Logger logger = Logger.getLogger(VoldemortInputFormat.class);
+
/**
* Create a new connection to admin client and give it to RecordReader.
* Called on the TaskTracker
@@ -77,15 +80,15 @@
throw new VoldemortException("Store '" + storeName + "' not found");
}
- // Generate splits
+ // Generate one split per node.
+ // Should consider a config setting allowing one split per partition.
Iterator<Node> nodeIter = cluster.getNodes().iterator();
List<InputSplit> splits = new ArrayList<InputSplit>();
while(nodeIter.hasNext()) {
Node currentNode = nodeIter.next();
VoldemortInputSplit split = new VoldemortInputSplit(storeName, currentNode);
splits.add(split);
}
-
adminClient.stop();
return splits;
}
View
9 contrib/hadoop/src/java/voldemort/hadoop/VoldemortInputSplit.java
@@ -40,12 +40,15 @@ public VoldemortInputSplit(String storeName, Node node) {
}
/**
- * Is used to order the splits so that the largest get processed first, in
- * an attempt to minimize the job runtime...Voldemort doesn't care!
+ * Pig will order the splits so that the largest get processed first. This has no
+ * consequence for Voldemort, but newer version of pig will also try to combine
+ * splits if the size of them are less than config <i>pig.maxCombinedSplitSize</i>.
+ * This does not map well to the Voldemort Storage. To avoid splitting altogether,
+ * we return Long.MAX_VALUE.
*/
@Override
public long getLength() throws IOException, InterruptedException {
- return 0;
+ return Long.MAX_VALUE;
}
public String getStoreName() {
View
4 contrib/hadoop/src/java/voldemort/hadoop/VoldemortRecordReader.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.log4j.Logger;
import voldemort.client.protocol.admin.AdminClient;
import voldemort.client.protocol.admin.AdminClientConfig;
import voldemort.utils.ByteArray;
@@ -33,6 +34,8 @@
public class VoldemortRecordReader extends RecordReader<ByteArray, Versioned<byte[]>> {
+ private final Logger logger = Logger.getLogger(VoldemortRecordReader.class);
+
private AdminClient adminClient;
private Iterator<Pair<ByteArray, Versioned<byte[]>>> iter = null;
private Pair<ByteArray, Versioned<byte[]>> currentPair = null;
@@ -67,6 +70,7 @@ public void initialize(InputSplit split, TaskAttemptContext context) throws IOEx
partitionIds.addAll(adminClient.getAdminClientCluster()
.getNodeById(voldemortSplit.getNodeId())
.getPartitionIds());
+ logger.info("Initializing split for node " + voldemortSplit.getNodeId() + ", partitions " + partitionIds);
this.iter = adminClient.fetchEntries(voldemortSplit.getNodeId(),
voldemortSplit.getStoreName(),
partitionIds,
View
66 .../voldemort/hadoop/pig/VoldemortStore.java → ...rt/hadoop/pig/AbstractVoldemortStore.java
@@ -1,42 +1,25 @@
-/*
- * Copyright 2010 LinkedIn, Inc
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
package voldemort.hadoop.pig;
-import java.io.IOException;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-
import voldemort.hadoop.VoldemortHadoopConfig;
import voldemort.hadoop.VoldemortInputFormat;
import voldemort.utils.ByteArray;
import voldemort.versioning.Versioned;
-@SuppressWarnings("unchecked")
-public class VoldemortStore extends LoadFunc {
+import java.io.IOException;
- private RecordReader reader;
+/**
+ * Superclass for Voldemort Pig Stores. The Tuple format is specified by subclasses.
+ */
+public abstract class AbstractVoldemortStore extends LoadFunc {
+ protected RecordReader reader;
@Override
public InputFormat getInputFormat() throws IOException {
@@ -45,6 +28,21 @@ public InputFormat getInputFormat() throws IOException {
}
@Override
+ public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
+ this.reader = reader;
+ }
+
+ @Override
+ public void setLocation(String location, Job job) throws IOException {
+ if(!location.startsWith("tcp://"))
+ throw new IOException("The correct format is tcp://<url:port>/storeName");
+ String[] subParts = location.split("/+");
+ Configuration conf = job.getConfiguration();
+ VoldemortHadoopConfig.setVoldemortURL(conf, subParts[0] + "//" + subParts[1]);
+ VoldemortHadoopConfig.setVoldemortStoreName(conf, subParts[2]);
+ }
+
+ @Override
public Tuple getNext() throws IOException {
ByteArray key = null;
Versioned<byte[]> value = null;
@@ -64,24 +62,8 @@ public Tuple getNext() throws IOException {
return null;
}
- Tuple tuple = TupleFactory.getInstance().newTuple(2);
- tuple.set(0, new DataByteArray(key.get()));
- tuple.set(1, new String(value.getValue()));
- return tuple;
- }
-
- @Override
- public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
- this.reader = reader;
+ return extractTuple(key, value);
}
- @Override
- public void setLocation(String location, Job job) throws IOException {
- if(!location.startsWith("tcp://"))
- throw new IOException("The correct format is tcp://<url:port>/storeName");
- String[] subParts = location.split("/+");
- Configuration conf = job.getConfiguration();
- VoldemortHadoopConfig.setVoldemortURL(conf, subParts[0] + "//" + subParts[1]);
- VoldemortHadoopConfig.setVoldemortStoreName(conf, subParts[2]);
- }
+ protected abstract Tuple extractTuple(ByteArray key, Versioned<byte[]> value) throws ExecException;
}
View
57 contrib/hadoop/src/java/voldemort/hadoop/pig/BinaryVoldemortStore.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package voldemort.hadoop.pig;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import voldemort.hadoop.VoldemortHadoopConfig;
+import voldemort.hadoop.VoldemortInputFormat;
+import voldemort.utils.ByteArray;
+import voldemort.versioning.Versioned;
+
+import java.io.IOException;
+
+/**
+ * Voldemort store which exposes values as DataByteArrays. Useful for loading
+ * binary format data (e.g protobufs, thrift).
+ *
+ * To use with Twitter's Elephant-Bird:
+ *
+ * <pre>
+ * dataset = LOAD 'tcp://localhost:6666/storename' USING BinaryVoldemortStore();
+ * DEFINE XProtoFormat x.x.x.pig.piggybank.XProtobufBytesToTuple();
+ * result = FOREACH dataset GENERATE $0 as key, XProtoFormat($1).fieldName as fieldName;
+ * </pre>
+ */
+public class BinaryVoldemortStore extends AbstractVoldemortStore {
+ @Override
+ protected Tuple extractTuple(ByteArray key, Versioned<byte[]> value) throws ExecException {
+ Tuple tuple = TupleFactory.getInstance().newTuple(2);
+ tuple.set(0, new DataByteArray(key.get()));
+ tuple.set(1, new DataByteArray(value.getValue()));
+ return tuple;
+
+ }
+}
View
50 contrib/hadoop/src/java/voldemort/hadoop/pig/StringVoldemortStore.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package voldemort.hadoop.pig;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+
+import voldemort.hadoop.VoldemortHadoopConfig;
+import voldemort.hadoop.VoldemortInputFormat;
+import voldemort.utils.ByteArray;
+import voldemort.versioning.Versioned;
+
+/**
+ * Voldemort store which exposes values as Strings.
+ */
+public class StringVoldemortStore extends AbstractVoldemortStore {
+
+ @Override
+ protected Tuple extractTuple(ByteArray key, Versioned<byte[]> value) throws ExecException {
+ Tuple tuple = TupleFactory.getInstance().newTuple(2);
+ tuple.set(0, new DataByteArray(key.get()));
+ tuple.set(1, new String(value.getValue()));
+ return tuple;
+ }
+
+}
View
1 src/java/voldemort/server/ServiceType.java
@@ -10,6 +10,7 @@
SOCKET("socket-service"),
ADMIN("admin-service"),
JMX("jmx-service"),
+ JMX_REMOTE("jmx-remote-service"),
SCHEDULER("scheduler-service"),
STORAGE("storage-service"),
VOLDEMORT("voldemort-server"),
View
66 src/java/voldemort/server/VoldemortConfig.java
@@ -41,8 +41,8 @@
/**
* Configuration parameters for the voldemort server.
- *
- *
+ *
+ *
*/
public class VoldemortConfig implements Serializable {
@@ -114,6 +114,9 @@
private RequestFormatType requestFormatType;
+ private int jmxRmiRegistryPort;
+ private int jmxRmiServerPort;
+
private boolean enableSlop;
private boolean enableSlopPusherJob;
private boolean enableRepair;
@@ -122,6 +125,7 @@
private boolean enableSocketServer;
private boolean enableAdminServer;
private boolean enableJmx;
+ private boolean enableJmxRemote;
private boolean enablePipelineRoutedStore;
private boolean enableVerboseLogging;
private boolean enableStatTracking;
@@ -262,10 +266,15 @@ public VoldemortConfig(Props props) {
this.clientThreadIdleMs = props.getInt("client.thread.idle.ms", 5000);
this.clientMaxQueuedRequests = props.getInt("client.max.queued.requests", 1000);
+ this.enableJmxRemote = props.getBoolean("jmx.remote.enable", false);
+ this.jmxRmiServerPort = props.getInt("jmx.remote.rmi.server.port", 10001);
+ this.jmxRmiRegistryPort = props.getInt("jmx.remote.rmi.registry.port", 10002);
+
this.enableHttpServer = props.getBoolean("http.enable", true);
this.enableSocketServer = props.getBoolean("socket.enable", true);
this.enableAdminServer = props.getBoolean("admin.enable", true);
this.enableJmx = props.getBoolean("jmx.enable", true);
+
this.enablePipelineRoutedStore = props.getBoolean("enable.pipeline.routed.store", true);
this.enableSlop = props.getBoolean("slop.enable", true);
this.enableSlopPusherJob = props.getBoolean("slop.pusher.enable", true);
@@ -530,7 +539,7 @@ public void setBdbMaxLogFileSize(long bdbMaxLogFileSize) {
/**
* A log file will be cleaned if its utilization percentage is below this
* value, irrespective of total utilization.
- *
+ *
* <ul>
* <li>property: "bdb.cleaner.minFileUtilization"</li>
* <li>default: 5</li>
@@ -551,7 +560,7 @@ public final void setBdbCleanerMinFileUtilization(int minFileUtilization) {
/**
* If true, the checkpointer uses more resources in order to complete the
* checkpoint in a shorter time interval.
- *
+ *
* <ul>
* <li>property: "bdb.checkpointer.high.priority"</li>
* <li>default: false</li>
@@ -568,7 +577,7 @@ public final void setBdbCheckpointerHighPriority(boolean bdbCheckpointerHighPrio
/**
* The maximum number of log files in the cleaner's backlog, or zero if
* there is no limit
- *
+ *
* <ul>
* <li>property: "bdb.cleaner.max.batch.files"</li>
* <li>default: 0</li>
@@ -587,9 +596,9 @@ public final void setBdbCleanerMaxBatchFiles(int bdbCleanerMaxBatchFiles) {
}
/**
- *
+ *
* The number of cleaner threads
- *
+ *
* <ul>
* <li>property: "bdb.cleaner.threads"</li>
* <li>default: 1</li>
@@ -617,11 +626,11 @@ public final void setBdbCleanerLookAheadCacheSize(int bdbCleanerLookAheadCacheSi
}
/**
- *
+ *
* The lock timeout for all transactional and non-transactional operations.
* Value of zero disables lock timeouts i.e. a deadlock scenario will block
* forever
- *
+ *
* <ul>
* <li>property: "bdb.lock.timeout.ms"</li>
* <li>default: 500</li>
@@ -667,10 +676,10 @@ public void setBdbReadUncommitted(boolean bdbReadUncommitted) {
}
/**
- *
+ *
* The cleaner will keep the total disk space utilization percentage above
* this value.
- *
+ *
* <ul>
* <li>property: "bdb.cleaner.minUtilization"</li>
* <li>default: 50</li>
@@ -689,7 +698,7 @@ public final void setBdbCleanerMinUtilization(int minUtilization) {
}
/**
- *
+ *
* The btree node fanout. Given by "bdb.btree.fanout". default: 512
*/
public int getBdbBtreeFanout() {
@@ -820,6 +829,15 @@ public void setEnableJmx(boolean enableJmx) {
this.enableJmx = enableJmx;
}
+ public boolean isJmxRemoteEnabled() {
+ return enableJmxRemote;
+ }
+
+ public void setJmxRemoteEnabled(boolean enableJmxRemote) {
+ this.enableJmxRemote = enableJmxRemote;
+ }
+
+
public boolean isPipelineRoutedStoreEnabled() {
return enablePipelineRoutedStore;
}
@@ -1016,6 +1034,30 @@ public void setClientMaxQueuedRequests(int clientMaxQueuedRequests) {
this.clientMaxQueuedRequests = clientMaxQueuedRequests;
}
+ public int getJmxRmiRegistryPort() {
+ return jmxRmiRegistryPort;
+ }
+
+ public void setJmxRmiRegistryPort(int jmxRmiRegistryPort) {
+ this.jmxRmiRegistryPort = jmxRmiRegistryPort;
+ }
+
+ public int getJmxRmiServerPort() {
+ return jmxRmiServerPort;
+ }
+
+ public void setJmxRmiServerPort(int jmxRmiServerPort) {
+ this.jmxRmiServerPort = jmxRmiServerPort;
+ }
+
+ public boolean isEnableJmxRemote() {
+ return enableJmxRemote;
+ }
+
+ public void setEnableJmxRemote(boolean enableJmxRemote) {
+ this.enableJmxRemote = enableJmxRemote;
+ }
+
public boolean isSlopEnabled() {
return this.enableSlop;
}
View
7 src/java/voldemort/server/VoldemortServer.java
@@ -32,6 +32,7 @@
import voldemort.cluster.Node;
import voldemort.server.gossip.GossipService;
import voldemort.server.http.HttpService;
+import voldemort.server.jmx.JmxRemoteService;
import voldemort.server.jmx.JmxService;
import voldemort.server.niosocket.NioSocketService;
import voldemort.server.protocol.RequestHandlerFactory;
@@ -198,6 +199,12 @@ public AsyncOperationService getAsyncRunner() {
services.add(new GossipService(this.metadata, scheduler, voldemortConfig));
}
+ if(voldemortConfig.isJmxRemoteEnabled())
+ services.add(new JmxRemoteService(
+ voldemortConfig.getJmxRmiRegistryPort(),
+ voldemortConfig.getJmxRmiServerPort())
+ );
+
if(voldemortConfig.isJmxEnabled())
services.add(new JmxService(this, this.metadata.getCluster(), storeRepository, services));
View
243 src/java/voldemort/server/jmx/JmxRemoteService.java
@@ -0,0 +1,243 @@
+package voldemort.server.jmx;
+/*
+ * Copyright 2008-2009 LinkedIn, Inc
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+import org.apache.log4j.Logger;
+import voldemort.server.AbstractService;
+import voldemort.server.ServiceType;
+
+import javax.management.MBeanServer;
+import javax.management.remote.JMXConnectorServer;
+import javax.management.remote.JMXConnectorServerFactory;
+import javax.management.remote.JMXServiceURL;
+import javax.management.remote.rmi.RMIConnectorServer;
+import javax.rmi.ssl.SslRMIClientSocketFactory;
+import javax.rmi.ssl.SslRMIServerSocketFactory;
+import java.io.IOException;
+import java.io.Serializable;
+import java.lang.management.ManagementFactory;
+import java.net.MalformedURLException;
+import java.net.Socket;
+import java.rmi.RemoteException;
+import java.rmi.registry.LocateRegistry;
+import java.rmi.server.RMIClientSocketFactory;
+import java.rmi.server.RMIServerSocketFactory;
+import java.util.HashMap;
+
+/**
+ * This listener fixes the port used by JMX/RMI Server making things much simpler if you need to connect JConsole or
+ * similar to a remote Tomcat instance that is running behind a firewall.
+ * Only the ports are configured via the listener. The remainder of the configuration is via the standard
+ * system properties for configuring JMX.
+ * <p/>
+ * Adapted from org.apache.catalina.mbeans.JmxRemoteLifecycleListener in the Apache Tomcat project.
+ */
+public class JmxRemoteService extends AbstractService {
+
+ private final Logger log = Logger.getLogger(JmxService.class);
+
+ private int rmiRegistryPort;
+ private int rmiServerPort;
+
+ protected boolean rmiSSL = true;
+ protected String ciphers[] = null;
+ protected String protocols[] = null;
+ protected boolean clientAuth = true;
+ protected boolean authenticate = true;
+ protected String passwordFile = null;
+ protected String accessFile = null;
+ protected boolean useLocalPorts = false;
+
+ protected JMXConnectorServer jmxConnector = null;
+
+ public JmxRemoteService(int rmiRegistryPort, int rmiServerPort) {
+ super(ServiceType.JMX_REMOTE);
+ this.rmiRegistryPort = rmiRegistryPort;
+ this.rmiServerPort = rmiServerPort;
+ }
+
+
+ private void init() {
+ // Get all the other parameters required from the standard system
+ // properties. Only need to get the parameters that affect the creation
+ // of the server port.
+ String rmiSSLValue = System.getProperty(
+ "com.sun.management.jmxremote.ssl", "false");
+ rmiSSL = Boolean.parseBoolean(rmiSSLValue);
+
+ String protocolsValue = System.getProperty(
+ "com.sun.management.jmxremote.ssl.enabled.protocols");
+ if (protocolsValue != null) {
+ protocols = protocolsValue.split(",");
+ }
+
+ String ciphersValue = System.getProperty(
+ "com.sun.management.jmxremote.ssl.enabled.cipher.suites");
+ if (ciphersValue != null) {
+ ciphers = ciphersValue.split(",");
+ }
+
+ String clientAuthValue = System.getProperty(
+ "com.sun.management.jmxremote.ssl.need.client.auth", "false");
+ clientAuth = Boolean.parseBoolean(clientAuthValue);
+
+ String authenticateValue = System.getProperty(
+ "com.sun.management.jmxremote.authenticate", "false");
+ authenticate = Boolean.parseBoolean(authenticateValue);
+
+ passwordFile = System.getProperty(
+ "com.sun.management.jmxremote.password.file",
+ "jmxremote.password");
+
+ accessFile = System.getProperty(
+ "com.sun.management.jmxremote.access.file",
+ "jmxremote.access");
+ }
+
+
+ @Override
+ protected void startInner() {
+ // Configure using standard jmx system properties
+ init();
+
+ // Prevent an attacker guessing the RMI object ID
+ System.setProperty("java.rmi.server.randomIDs", "true");
+
+ // Create the environment
+ HashMap<String, Object> env = new HashMap<String, Object>();
+
+ RMIClientSocketFactory csf = null;
+ RMIServerSocketFactory ssf = null;
+
+ // Configure SSL for RMI connection if required
+ if (rmiSSL) {
+ csf = new SslRMIClientSocketFactory();
+ ssf = new SslRMIServerSocketFactory(ciphers, protocols,
+ clientAuth);
+ }
+
+ // Force the use of local ports if required
+ if (useLocalPorts) {
+ csf = new RmiClientLocalhostSocketFactory(csf);
+ }
+
+ // Populate the env properties used to create the server
+ if (csf != null) {
+ env.put(RMIConnectorServer.RMI_CLIENT_SOCKET_FACTORY_ATTRIBUTE,
+ csf);
+ }
+ if (ssf != null) {
+ env.put(RMIConnectorServer.RMI_SERVER_SOCKET_FACTORY_ATTRIBUTE,
+ ssf);
+ }
+
+
+ // Configure authentication
+ if (authenticate) {
+ env.put("jmx.remote.x.password.file", passwordFile);
+ env.put("jmx.remote.x.access.file", accessFile);
+ }
+
+ // Create the Platform server
+ jmxConnector = createServer(rmiRegistryPort,
+ rmiServerPort, env,
+ ManagementFactory.getPlatformMBeanServer());
+
+ }
+
+ @Override
+ protected void stopInner() {
+ // When the server starts, configure JMX/RMI
+ destroyServer(jmxConnector);
+ }
+
+
+ private JMXConnectorServer createServer(int theRmiRegistryPort, int theRmiServerPort,
+ HashMap<String, Object> theEnv, MBeanServer theMBeanServer) {
+
+ // Create the RMI registry
+ try {
+ LocateRegistry.createRegistry(theRmiRegistryPort);
+ } catch (RemoteException e) {
+ log.error(
+ "Unable to create RMI registry at " + theRmiRegistryPort, e);
+ return null;
+ }
+
+ // Build the connection string with fixed ports
+ StringBuilder url = new StringBuilder();
+ url.append("service:jmx:rmi://localhost:");
+ url.append(theRmiServerPort);
+ url.append("/jndi/rmi://localhost:");
+ url.append(theRmiRegistryPort);
+ url.append("/jmxrmi");
+ JMXServiceURL serviceUrl;
+ try {
+ serviceUrl = new JMXServiceURL(url.toString());
+ } catch (MalformedURLException e) {
+ log.error("Invalid service URL: " + url.toString(), e);
+ return null;
+ }
+
+ // Start the JMX server with the connection string
+ JMXConnectorServer cs = null;
+ try {
+ cs = JMXConnectorServerFactory.newJMXConnectorServer(
+ serviceUrl, theEnv, theMBeanServer);
+ cs.start();
+ log.info("Started JMX server at RMI registry port " + theRmiRegistryPort + " and RMI server port " + theRmiServerPort);
+ } catch (IOException e) {
+ log.error("Unable to start JMX server", e);
+ }
+ return cs;
+ }
+
+ private void destroyServer(JMXConnectorServer theConnectorServer) {
+ if (theConnectorServer != null) {
+ try {
+ theConnectorServer.stop();
+ } catch (IOException e) {
+ log.error("Unable to stop JMX server", e);
+ }
+ }
+ }
+
+ public static class RmiClientLocalhostSocketFactory
+ implements RMIClientSocketFactory, Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private static final String FORCED_HOST = "localhost";
+
+ private RMIClientSocketFactory factory = null;
+
+ public RmiClientLocalhostSocketFactory(RMIClientSocketFactory theFactory) {
+ factory = theFactory;
+ }
+
+ @Override
+ public Socket createSocket(String host, int port) throws IOException {
+ if (factory == null) {
+ return new Socket(FORCED_HOST, port);
+ } else {
+ return factory.createSocket(FORCED_HOST, port);
+ }
+ }
+
+
+ }
+
+}