Skip to content

Commit

Permalink
Extract plugin for JMX connector
Browse files Browse the repository at this point in the history
  • Loading branch information
electrum committed Jul 20, 2015
1 parent bf49106 commit 484b7cb
Show file tree
Hide file tree
Showing 17 changed files with 181 additions and 21 deletions.
1 change: 1 addition & 0 deletions pom.xml
Expand Up @@ -61,6 +61,7 @@

<modules>
<module>presto-spi</module>
<module>presto-jmx</module>
<module>presto-kafka</module>
<module>presto-cassandra</module>
<module>presto-blackhole</module>
Expand Down
68 changes: 68 additions & 0 deletions presto-jmx/pom.xml
@@ -0,0 +1,68 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-root</artifactId>
<version>0.112-SNAPSHOT</version>
</parent>

<artifactId>presto-jmx</artifactId>
<description>Presto - JMX Connector</description>
<packaging>presto-plugin</packaging>

<properties>
<air.main.basedir>${project.parent.basedir}</air.main.basedir>
</properties>

<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>

<!-- Presto SPI -->
<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-spi</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>slice</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<scope>provided</scope>
</dependency>

<!-- for testing -->
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>testing</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Expand Up @@ -13,8 +13,8 @@
*/
package com.facebook.presto.connector.jmx;

import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ColumnMetadata;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ConnectorTableHandle;
import com.facebook.presto.spi.ConnectorTableMetadata;
Expand All @@ -40,11 +40,11 @@
import java.util.Map;
import java.util.NoSuchElementException;

import static com.facebook.presto.connector.jmx.Types.checkType;
import static com.facebook.presto.spi.type.BigintType.BIGINT;
import static com.facebook.presto.spi.type.BooleanType.BOOLEAN;
import static com.facebook.presto.spi.type.DoubleType.DOUBLE;
import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
import static com.facebook.presto.util.Types.checkType;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Locale.ENGLISH;
import static javax.management.ObjectName.WILDCARD;
Expand Down
@@ -0,0 +1,59 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.connector.jmx;

import com.facebook.presto.spi.ConnectorFactory;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.Plugin;
import com.google.common.collect.ImmutableList;

import javax.inject.Inject;
import javax.management.MBeanServer;

import java.util.List;

import static java.lang.management.ManagementFactory.getPlatformMBeanServer;
import static java.util.Objects.requireNonNull;

public class JmxPlugin
implements Plugin
{
private final MBeanServer mBeanServer;
private NodeManager nodeManager;

public JmxPlugin()
{
this(getPlatformMBeanServer());
}

public JmxPlugin(MBeanServer mBeanServer)
{
this.mBeanServer = requireNonNull(mBeanServer, "mBeanServer is null");
}

@Inject
public synchronized void setNodeManager(NodeManager nodeManager)
{
this.nodeManager = nodeManager;
}

@Override
public synchronized <T> List<T> getServices(Class<T> type)
{
if (type == ConnectorFactory.class) {
return ImmutableList.of(type.cast(new JmxConnectorFactory(mBeanServer, nodeManager)));
}
return ImmutableList.of();
}
}
Expand Up @@ -37,7 +37,7 @@
import java.util.Map.Entry;
import java.util.Set;

import static com.facebook.presto.util.Types.checkType;
import static com.facebook.presto.connector.jmx.Types.checkType;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.stream.Collectors.toMap;
Expand Down
Expand Up @@ -30,13 +30,13 @@

import java.util.List;

import static com.facebook.presto.connector.jmx.Types.checkType;
import static com.facebook.presto.spi.TupleDomain.withFixedValues;
import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
import static com.facebook.presto.util.Types.checkType;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.airlift.slice.Slices.utf8Slice;
import static java.util.stream.Collectors.toList;

public class JmxSplitManager
implements ConnectorSplitManager
Expand Down Expand Up @@ -76,15 +76,15 @@ public ConnectorSplitSource getPartitionSplits(ConnectorTableHandle table, List<
//TODO is there a better way to get the node column?
JmxColumnHandle nodeColumnHandle = tableHandle.getColumns().get(0);

ImmutableList<ConnectorSplit> splits = nodeManager.getActiveNodes()
List<ConnectorSplit> splits = nodeManager.getActiveNodes()
.stream()
.filter(node -> {
TupleDomain<ColumnHandle> exactNodeMatch = withFixedValues(ImmutableMap.of(nodeColumnHandle, utf8Slice(node.getNodeIdentifier())));
return predicate.overlaps(exactNodeMatch);
}
)
.map(node -> new JmxSplit(tableHandle, ImmutableList.of(node.getHostAndPort())))
.collect(toImmutableList());
.collect(toList());

return new FixedSplitSource(connectorId, splits);
}
Expand Down
@@ -0,0 +1,33 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.connector.jmx;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

final class Types
{
private Types() {}

public static <A, B extends A> B checkType(A value, Class<B> target, String name)
{
checkNotNull(value, "%s is null", name);
checkArgument(target.isInstance(value),
"%s must be of type %s, not %s",
name,
target.getName(),
value.getClass().getName());
return target.cast(value);
}
}
Expand Up @@ -11,12 +11,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.split;
package com.facebook.presto.connector.jmx;

import com.facebook.presto.connector.jmx.JmxColumnHandle;
import com.facebook.presto.connector.jmx.JmxConnectorId;
import com.facebook.presto.connector.jmx.JmxSplitManager;
import com.facebook.presto.connector.jmx.JmxTableHandle;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPartitionResult;
import com.facebook.presto.spi.ConnectorSplit;
Expand All @@ -35,11 +31,11 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;

import static com.facebook.presto.spi.type.VarcharType.VARCHAR;
import static com.facebook.presto.util.ImmutableCollectors.toImmutableSet;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static java.lang.String.format;
import static java.util.stream.Collectors.toSet;
import static org.testng.Assert.assertEquals;

public class TestJmxSplitManager
Expand Down Expand Up @@ -80,7 +76,7 @@ public void testNoPredicate()
List<ConnectorSplit> allSplits = getAllSplits(splitSource);
assertEquals(allSplits.size(), nodes.size());

Set<String> actualNodes = nodes.stream().map(Node::getNodeIdentifier).collect(toImmutableSet());
Set<String> actualNodes = nodes.stream().map(Node::getNodeIdentifier).collect(toSet());
Set<String> expectedNodes = new HashSet<>();
for (ConnectorSplit split : allSplits) {
List<HostAddress> addresses = split.getAddresses();
Expand All @@ -91,11 +87,11 @@ public void testNoPredicate()
}

private static List<ConnectorSplit> getAllSplits(ConnectorSplitSource splitSource)
throws InterruptedException
throws InterruptedException, ExecutionException
{
ImmutableList.Builder<ConnectorSplit> splits = ImmutableList.builder();
while (!splitSource.isFinished()) {
List<ConnectorSplit> batch = getFutureValue(splitSource.getNextBatch(1000));
List<ConnectorSplit> batch = splitSource.getNextBatch(1000).get();
splits.addAll(batch);
}
return splits.build();
Expand Down
1 change: 1 addition & 0 deletions presto-main/etc/config.properties
Expand Up @@ -20,6 +20,7 @@ query.client.timeout=5m
query.max-age=30m

plugin.bundles=\
../presto-jmx/pom.xml,\
../presto-raptor/pom.xml,\
../presto-hive-cdh4/pom.xml,\
../presto-example-http/pom.xml,\
Expand Down
Expand Up @@ -18,7 +18,6 @@
import com.facebook.presto.client.QueryResults;
import com.facebook.presto.connector.ConnectorManager;
import com.facebook.presto.connector.informationSchema.InformationSchemaModule;
import com.facebook.presto.connector.jmx.JmxConnectorFactory;
import com.facebook.presto.connector.system.SystemTablesModule;
import com.facebook.presto.event.query.QueryCompletionEvent;
import com.facebook.presto.event.query.QueryCreatedEvent;
Expand Down Expand Up @@ -242,9 +241,6 @@ protected void setup(Binder binder)
binder.bind(ConnectorManager.class).in(Scopes.SINGLETON);
MapBinder<String, ConnectorFactory> connectorFactoryBinder = newMapBinder(binder, String.class, ConnectorFactory.class);

// jmx connector
connectorFactoryBinder.addBinding("jmx").to(JmxConnectorFactory.class);

// information schema
binder.install(new InformationSchemaModule());

Expand Down
6 changes: 6 additions & 0 deletions presto-server/src/main/provisio/presto.xml
Expand Up @@ -26,6 +26,12 @@
</artifactSet>

<!-- Plugins -->
<artifactSet to="plugin/jmx">
<artifact id="${project.groupId}:presto-jmx:zip:${project.version}">
<unpack />
</artifact>
</artifactSet>

<artifactSet to="plugin/cassandra">
<artifact id="${project.groupId}:presto-cassandra:zip:${project.version}">
<unpack />
Expand Down

0 comments on commit 484b7cb

Please sign in to comment.