Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
implemented broker settings retrieving
  • Loading branch information
tjamakeev committed Apr 12, 2018
1 parent 8a9f05e commit bc515e5
Show file tree
Hide file tree
Showing 15 changed files with 153 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
<artifactId>identity-manager-api</artifactId>
</dependency>

<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>hub-manager-api</artifactId>
</dependency>

<dependency>
<groupId>com.nimbusds</groupId>
<artifactId>nimbus-jose-jwt</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.subutai.core.environment.metadata.impl;


public class BrokerSettingException extends Exception
{

public BrokerSettingException( final String message )
{
super( message );
}


public BrokerSettingException( final Exception e )
{
super( e );
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package io.subutai.core.environment.metadata.impl;


import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -13,10 +19,12 @@
import io.subutai.common.peer.HostNotFoundException;
import io.subutai.core.environment.api.EnvironmentManager;
import io.subutai.core.environment.metadata.api.EnvironmentMetadataManager;
import io.subutai.core.hubmanager.api.HubManager;
import io.subutai.core.identity.api.IdentityManager;
import io.subutai.core.identity.api.exception.TokenCreateException;
import io.subutai.core.peer.api.PeerManager;
import io.subutai.hub.share.Utils;
import io.subutai.hub.share.dto.BrokerSettingsDto;
import io.subutai.hub.share.dto.environment.EnvironmentInfoDto;
import io.subutai.hub.share.event.Event;
import io.subutai.hub.share.json.JsonUtil;
Expand All @@ -31,16 +39,17 @@ public class EnvironmentMetadataManagerImpl implements EnvironmentMetadataManage
private final IdentityManager identityManager;
private PeerManager peerManager;
private EnvironmentManager environmentManager;
private String brokerURL;
private BrokerSettings brokerSettings;
private HubManager hubManager;


public EnvironmentMetadataManagerImpl( PeerManager peerManager, EnvironmentManager environmentManager,
IdentityManager identityManager, final String brokerURL )
IdentityManager identityManager, HubManager hubManager )
{
this.peerManager = peerManager;
this.environmentManager = environmentManager;
this.identityManager = identityManager;
this.brokerURL = brokerURL;
this.hubManager = hubManager;
}


Expand Down Expand Up @@ -96,18 +105,57 @@ public void pushEvent( final Event event )
String jsonEvent = JsonUtil.toJson( event );
LOG.debug( "Event received: {} {}", event, jsonEvent );
LOG.debug( "OS: {}", event.getCustomMetaByKey( "OS" ) );
LOG.debug( "Nature: {}", event.getPayload().getNature() );
String destination = "events." + event.getOrigin().getId();

thread( new EventProducer( brokerURL, destination, jsonEvent ), true );
checkBrokerSettings( false );
// TODO: 4/12/18 need to implement connections pool something like below; while creating connection
// every time with random URI
// ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory(
// "vm://broker1?marshal=false&broker.persistent=false&broker.useJmx=false");
// JmsPoolConnectionFactory cf = new JmsPoolConnectionFactory();
// cf.setConnectionFactory(amq);
// cf.setMaxConnections(3);
thread( new EventProducer( this.brokerSettings.getBroker(), destination, jsonEvent ), true );
}
catch ( JsonProcessingException e )
catch ( JsonProcessingException | URISyntaxException | BrokerSettingException e )
{
LOG.error( e.getMessage(), e );
}
}


private void checkBrokerSettings( boolean retrieve ) throws BrokerSettingException
{
if ( !( brokerSettings == null || retrieve ) )
{
return;
}

try
{
final BrokerSettingsDto response = hubManager.getBrokers();
if ( response == null )
{
throw new BrokerSettingException( "Could not retrieve broker settings." );
}
List<URI> list = new ArrayList<>();
for ( String s : response.getBrokers() )
{
list.add( new URI( s ) );
}
if ( list.size() == 0 )
{
throw new BrokerSettingException( "Broker URI list is empty." );
}
this.brokerSettings = new BrokerSettings( list );
}
catch ( URISyntaxException e )
{
throw new BrokerSettingException( e );
}
}


private void thread( Runnable runnable, boolean daemon )
{
Thread brokerThread = new Thread( runnable );
Expand All @@ -121,5 +169,23 @@ private void placeTokenIntoContainer( ContainerHost containerHost, String token
containerHost.executeAsync( new RequestBuilder(
String.format( "mkdir -p /etc/subutai/ ; echo '%s' > /etc/subutai/jwttoken", token ) ) );
}


private class BrokerSettings
{
List<URI> uriList;


public BrokerSettings( final List<URI> uriList )
{
this.uriList = uriList;
}


public URI getBroker()
{
return uriList.get( new Random().nextInt( uriList.size() ) );
}
}
}

Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package io.subutai.core.environment.metadata.impl;


import java.net.URI;
import java.net.URISyntaxException;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
Expand All @@ -21,17 +24,17 @@ public class EventProducer implements Runnable
private final String destination;
private String message;

private String brokerURL;
private URI uri;
private ActiveMQConnectionFactory connectionFactory;


public EventProducer( final String brokerURL, final String destination, final String message )
public EventProducer( final URI uri, final String destination, final String message ) throws URISyntaxException
{
this.brokerURL = brokerURL;
this.uri = uri;
this.destination = destination;
this.message = message;
// Create a ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory( this.brokerURL );
connectionFactory = new ActiveMQConnectionFactory( uri );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
<reference id="identityManager"
interface="io.subutai.core.identity.api.IdentityManager" />

<reference id="hubManager"
interface="io.subutai.core.hubmanager.api.HubManager" />

<bean id="environmentMetadataManager" scope="singleton" init-method="init" destroy-method="dispose"
class="io.subutai.core.environment.metadata.impl.EnvironmentMetadataManagerImpl">
<argument ref="peerManager" />
<argument ref="environmentManager" />
<argument ref="identityManager" />
<argument value="tcp://brokerHost:61616" />
<argument ref="hubManager" />
</bean>

<service ref="environmentMetadataManager"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import io.subutai.core.hubmanager.api.exception.HubManagerException;
import io.subutai.core.hubmanager.api.model.Config;
import io.subutai.hub.share.dto.BrokerSettingsDto;


public interface HubManager
Expand Down Expand Up @@ -50,4 +51,6 @@ public interface HubManager
boolean hasHubTasksInAction();

void notifyHubThatPeerIsOffline();

BrokerSettingsDto getBrokers();
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import io.subutai.core.security.api.SecurityManager;
import io.subutai.core.systemmanager.api.SystemManager;
import io.subutai.hub.share.common.HubEventListener;
import io.subutai.hub.share.dto.BrokerSettingsDto;
import io.subutai.hub.share.dto.PeerDto;
import io.subutai.hub.share.dto.PeerProductDataDto;
import io.subutai.hub.share.dto.UserDto;
Expand Down Expand Up @@ -807,4 +808,16 @@ public void onRhDisconnected( final ResourceHostInfo resourceHostInfo )
log.error( "Error sending peer metrics", e );
}
}


@Override
public BrokerSettingsDto getBrokers()
{
final HubRestClient client = new HubRestClient( configManager );

final RestResult<BrokerSettingsDto> response =
client.get( "/rest/v1/brokers/" + localPeer.getId(), BrokerSettingsDto.class );

return response.getEntity();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.subutai.hub.share.dto;


import java.util.List;


public class BrokerSettingsDto
{
private List<String> brokers;


public BrokerSettingsDto( final List<String> brokers )
{
this.brokers = brokers;
}


public BrokerSettingsDto()
{
}


public List<String> getBrokers()
{
return brokers;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.subutai.hub.share.event.meta;


import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;

Expand All @@ -13,8 +12,6 @@
@JsonSubTypes.Type( value = SourceMeta.class, name = "source" ),
@JsonSubTypes.Type( value = OriginMeta.class, name = "origin" )
} )
@JsonAutoDetect( fieldVisibility = JsonAutoDetect.Visibility.ANY, getterVisibility = JsonAutoDetect.Visibility.NONE,
setterVisibility = JsonAutoDetect.Visibility.NONE )
public interface Meta
{
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package io.subutai.hub.share.event.meta;


import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;

import io.subutai.hub.share.Utils;


@JsonAutoDetect( fieldVisibility = JsonAutoDetect.Visibility.ANY, getterVisibility = JsonAutoDetect.Visibility.NONE,
setterVisibility = JsonAutoDetect.Visibility.NONE )

public class OriginMeta implements Meta
{
@JsonProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,6 @@ public CustomPayload( final String message )
}


@Override
public Nature getNature()
{
return Nature.CUSTOM;
}


public String getMessage()
{
return message;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,6 @@ public LogPayload( final String source, final String message, final Level level
}


@Override
public Nature getNature()
{
return Nature.LOG;
}


public String getSource()
{
return source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,4 @@ public enum Nature
{
CUSTOM, LOG, PROGRESS
}


public abstract Nature getNature();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,6 @@ private ProgressPayload()
}


@Override
public Nature getNature()
{
return Nature.PROGRESS;
}


public String getStep()
{
return step;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void setup()

logPayload = new LogPayload("source", "initiated", LogPayload.Level.INFO);

origin = new OriginMeta( "subutaiPeer:subutaiContainer:subutaiEnvironment" );
origin = new OriginMeta( "subutaiPeer.subutaiContainer.subutaiEnvironment" );

source = new SourceMeta( "cassandra-blueprint", SourceMeta.Type.BLUEPRINT );
}
Expand Down

0 comments on commit bc515e5

Please sign in to comment.