Skip to content

Commit

Permalink
NIFI-1897:
Browse files Browse the repository at this point in the history
- Fixing default values in properties.
- Starting to fix Spring context to load correctly in standalone mode.
  • Loading branch information
mcgilman committed Jun 3, 2016
1 parent c8a31f0 commit 05cbb92
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 44 deletions.
2 changes: 2 additions & 0 deletions nifi-assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,8 @@ language governing permissions and limitations under the License. -->
<nifi.cluster.node.event.history.size>25</nifi.cluster.node.event.history.size>
<nifi.cluster.node.connection.timeout>5 sec</nifi.cluster.node.connection.timeout>
<nifi.cluster.node.read.timeout>5 sec</nifi.cluster.node.read.timeout>
<nifi.cluster.firewall.file />

<nifi.cluster.request.replication.claim.timeout>15 secs</nifi.cluster.request.replication.claim.timeout>

<!-- nifi.properties: zookeeper properties -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<constructor-arg ref="protocolSocketConfiguration"/>
<constructor-arg ref="protocolContext"/>
<property name="handshakeTimeout">
<bean factory-bean="nifiProperties" factory-method="getClusterProtocolConnectionHandshakeTimeout"/>
<bean factory-bean="nifiProperties" factory-method="getClusterNodeConnectionTimeout"/>
</property>
</bean>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.cluster.spring;

import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor;
import org.apache.nifi.cluster.coordination.heartbeat.HeartbeatMonitor;
import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
import org.apache.nifi.util.NiFiProperties;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

public class HeartbeatMonitorFactoryBean implements FactoryBean<HeartbeatMonitor>, ApplicationContextAware {
private ApplicationContext applicationContext;
private NiFiProperties properties;

private HeartbeatMonitor heartbeatMonitor = null;

@Override
public HeartbeatMonitor getObject() throws Exception {
if (heartbeatMonitor == null && properties.isNode()) {
final ClusterCoordinationProtocolSenderListener protocolSenderListener =
applicationContext.getBean("clusterCoordinationProtocolSenderListener", ClusterCoordinationProtocolSenderListener.class);
final ClusterCoordinator clusterCoordinator = applicationContext.getBean("clusterCoordinator", ClusterCoordinator.class);

heartbeatMonitor = new ClusterProtocolHeartbeatMonitor(clusterCoordinator, protocolSenderListener, properties);
}

return heartbeatMonitor;
}

@Override
public Class<?> getObjectType() {
return HeartbeatMonitor.class;
}

@Override
public boolean isSingleton() {
return true;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

public void setProperties(NiFiProperties properties) {
this.properties = properties;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.cluster.spring;

import org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator;
import org.apache.nifi.cluster.firewall.ClusterNodeFirewall;
import org.apache.nifi.cluster.protocol.impl.ClusterCoordinationProtocolSenderListener;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.revision.RevisionManager;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

public class NodeClusterCoordinatorFactoryBean implements FactoryBean<NodeClusterCoordinator>, ApplicationContextAware {
private ApplicationContext applicationContext;
private NiFiProperties properties;

private NodeClusterCoordinator nodeClusterCoordinator = null;

@Override
public NodeClusterCoordinator getObject() throws Exception {
if (nodeClusterCoordinator == null && properties.isNode()) {
final ClusterCoordinationProtocolSenderListener protocolSenderListener =
applicationContext.getBean("clusterCoordinationProtocolSenderListener", ClusterCoordinationProtocolSenderListener.class);
final EventReporter eventReporter = applicationContext.getBean("eventReporter", EventReporter.class);
final ClusterNodeFirewall clusterFirewall = applicationContext.getBean("clusterFirewall", ClusterNodeFirewall.class);
final RevisionManager revisionManager = applicationContext.getBean("revisionManager", RevisionManager.class);

nodeClusterCoordinator = new NodeClusterCoordinator(protocolSenderListener, eventReporter, clusterFirewall, revisionManager);
}

return nodeClusterCoordinator;
}

@Override
public Class<?> getObjectType() {
return NodeClusterCoordinator.class;
}

@Override
public boolean isSingleton() {
return true;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

public void setProperties(NiFiProperties properties) {
this.properties = properties;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,34 +17,41 @@

package org.apache.nifi.cluster.spring;

import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.config.DefaultClientConfig;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.http.replication.RequestCompletionCallback;
import org.apache.nifi.cluster.coordination.http.replication.ThreadPoolRequestReplicator;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.framework.security.util.SslContextFactory;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.util.WebUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

import com.sun.jersey.api.client.Client;
import com.sun.jersey.api.client.config.DefaultClientConfig;

public class ThreadPoolRequestReplicatorFactoryBean implements FactoryBean<ThreadPoolRequestReplicator> {
public class ThreadPoolRequestReplicatorFactoryBean implements FactoryBean<ThreadPoolRequestReplicator>, ApplicationContextAware {
private ApplicationContext applicationContext;
private NiFiProperties properties;

private EventReporter eventReporter;
private ClusterCoordinator clusterCoordinator;
private RequestCompletionCallback requestCompletionCallback;
private ThreadPoolRequestReplicator replicator = null;

@Override
public ThreadPoolRequestReplicator getObject() throws Exception {
final int numThreads = properties.getClusterNodeProtocolThreads();
final Client jerseyClient = WebUtils.createClient(new DefaultClientConfig(), SslContextFactory.createSslContext(properties));
final String connectionTimeout = properties.getClusterNodeConnectionTimeout();
final String readTimeout = properties.getClusterNodeReadTimeout();
if (replicator == null && properties.isNode()) {
final EventReporter eventReporter = applicationContext.getBean("eventReporter", EventReporter.class);
final ClusterCoordinator clusterCoordinator = applicationContext.getBean("clusterCoordinator", ClusterCoordinator.class);
final RequestCompletionCallback requestCompletionCallback = applicationContext.getBean("clusterCoordinator", RequestCompletionCallback.class);

final int numThreads = properties.getClusterNodeProtocolThreads();
final Client jerseyClient = WebUtils.createClient(new DefaultClientConfig(), SslContextFactory.createSslContext(properties));
final String connectionTimeout = properties.getClusterNodeConnectionTimeout();
final String readTimeout = properties.getClusterNodeReadTimeout();

final ThreadPoolRequestReplicator replicator = new ThreadPoolRequestReplicator(numThreads, jerseyClient, clusterCoordinator,
connectionTimeout, readTimeout, requestCompletionCallback, eventReporter);
replicator = new ThreadPoolRequestReplicator(numThreads, jerseyClient, clusterCoordinator,
connectionTimeout, readTimeout, requestCompletionCallback, eventReporter);
}

return replicator;
}
Expand All @@ -59,19 +66,13 @@ public boolean isSingleton() {
return true;
}

public void setProperties(NiFiProperties properties) {
this.properties = properties;
}

public void setEventReporter(final EventReporter eventReporter) {
this.eventReporter = eventReporter;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

public void setClusterCoordinator(final ClusterCoordinator clusterCoordinator) {
this.clusterCoordinator = clusterCoordinator;
public void setProperties(NiFiProperties properties) {
this.properties = properties;
}

public void setRequestCompletionCallback(final RequestCompletionCallback callback) {
this.requestCompletionCallback = callback;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,15 @@
<!-- Request Replicator -->
<bean id="requestReplicator" class="org.apache.nifi.cluster.spring.ThreadPoolRequestReplicatorFactoryBean">
<property name="properties" ref="nifiProperties"/>
<property name="eventReporter" ref="eventReporter" />
<property name="clusterCoordinator" ref="clusterCoordinator" />
<property name="requestCompletionCallback" ref="clusterCoordinator" />
</bean>

<!-- Cluster Coordinator -->
<bean id="clusterCoordinator" class="org.apache.nifi.cluster.coordination.node.NodeClusterCoordinator">
<constructor-arg ref="clusterCoordinationProtocolSenderListener" />
<constructor-arg ref="eventReporter" />
<constructor-arg ref="clusterFirewall" />
<constructor-arg ref="revisionManager" />
<bean id="clusterCoordinator" class="org.apache.nifi.cluster.spring.NodeClusterCoordinatorFactoryBean">
<property name="properties" ref="nifiProperties"/>
</bean>

<!-- Heartbeat Monitor -->
<bean id="heartbeatMonitor" class="org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor">
<constructor-arg ref="clusterCoordinator" />
<constructor-arg ref="clusterCoordinationProtocolSenderListener" />
<constructor-arg ref="nifiProperties" />
<bean id="heartbeatMonitor" class="org.apache.nifi.cluster.spring.HeartbeatMonitorFactoryBean">
<property name="properties" ref="nifiProperties"/>
</bean>
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public class FlowControllerFactoryBean implements FactoryBean, ApplicationContex
private AuditService auditService;
private StringEncryptor encryptor;
private BulletinRepository bulletinRepository;
private HeartbeatMonitor heartbeatMonitor;

@Override
public Object getObject() throws Exception {
Expand All @@ -52,6 +51,7 @@ public Object getObject() throws Exception {

if (properties.isNode()) {
final NodeProtocolSender nodeProtocolSender = applicationContext.getBean("nodeProtocolSender", NodeProtocolSender.class);
final HeartbeatMonitor heartbeatMonitor = applicationContext.getBean("heartbeatMonitor", HeartbeatMonitor.class);
flowController = FlowController.createClusteredInstance(
flowFileEventRepository,
properties,
Expand Down Expand Up @@ -111,7 +111,4 @@ public void setBulletinRepository(final BulletinRepository bulletinRepository) {
this.bulletinRepository = bulletinRepository;
}

public void setHeartbeatMonitor(final HeartbeatMonitor heartbeatMonitor) {
this.heartbeatMonitor = heartbeatMonitor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ public class StandardFlowServiceFactoryBean implements FactoryBean, ApplicationC
public Object getObject() throws Exception {
if (flowService == null) {
final FlowController flowController = applicationContext.getBean("flowController", FlowController.class);
final ClusterCoordinator clusterCoordinator = applicationContext.getBean("clusterCoordinator", ClusterCoordinator.class);
final RevisionManager revisionManager = applicationContext.getBean("revisionManager", RevisionManager.class);

if (properties.isNode()) {
final NodeProtocolSenderListener nodeProtocolSenderListener = applicationContext.getBean("nodeProtocolSenderListener", NodeProtocolSenderListener.class);
final ClusterCoordinator clusterCoordinator = applicationContext.getBean("clusterCoordinator", ClusterCoordinator.class);
flowService = StandardFlowService.createClusteredInstance(
flowController,
properties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
<property name="auditService" ref="auditService" />
<property name="encryptor" ref="stringEncryptor" />
<property name="bulletinRepository" ref="bulletinRepository" />
<property name="heartbeatMonitor" ref="heartbeatMonitor" />
</bean>

<!-- flow service -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ nifi.cluster.is.node=${nifi.cluster.is.node}
nifi.cluster.node.address=${nifi.cluster.node.address}
nifi.cluster.node.protocol.port=${nifi.cluster.node.protocol.port}
nifi.cluster.node.protocol.threads=${nifi.cluster.node.protocol.threads}
nifi.cluster.node.event.history.size=${nifi.cluster.manager.node.event.history.size}
nifi.cluster.node.event.history.size=${nifi.cluster.node.event.history.size}
nifi.cluster.node.connection.timeout=${nifi.cluster.node.connection.timeout}
nifi.cluster.node.read.timeout=${nifi.cluster.node.read.timeout}
nifi.cluster.firewall.file=${nifi.cluster.firewall.file}
Expand Down

0 comments on commit 05cbb92

Please sign in to comment.