Skip to content

Commit

Permalink
[mqtt] HA: add transformation to incoming topics, where needed (#4990)
Browse files Browse the repository at this point in the history
Signed-off-by: Jochen Klein <git@jochen.susca.de>
  • Loading branch information
jochen314 authored and martinvw committed Mar 19, 2019
1 parent be3f3a0 commit 0ea0bd1
Show file tree
Hide file tree
Showing 22 changed files with 357 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.openhab.binding.mqtt.generic.internal.convention.homeassistant.HaID;
import org.openhab.binding.mqtt.generic.internal.generic.ChannelStateUpdateListener;
import org.openhab.binding.mqtt.generic.internal.generic.MqttChannelTypeProvider;
import org.openhab.binding.mqtt.generic.internal.generic.TransformationServiceProvider;
import org.openhab.binding.mqtt.generic.internal.handler.ThingChannelConstants;
import org.osgi.service.cm.ConfigurationException;
import org.slf4j.Logger;
Expand All @@ -77,6 +78,9 @@ public class HomeAssistantMQTTImplementationTests extends JavaOSGiTest {
@Mock
ChannelStateUpdateListener channelStateUpdateListener;

@Mock
TransformationServiceProvider transformationServiceProvider;

/**
* Create an observer that fails the test as soon as the broker client connection changes its connection state
* to something else then CONNECTED.
Expand Down Expand Up @@ -124,6 +128,8 @@ public void setUp() throws InterruptedException, ExecutionException, TimeoutExce
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).get(200, TimeUnit.MILLISECONDS);

failure = null;

doReturn(null).when(transformationServiceProvider).getTransformationService(any());
}

@After
Expand Down Expand Up @@ -162,7 +168,7 @@ public void parseHATree() throws InterruptedException, ExecutionException, Timeo

ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(4);
DiscoverComponents discover = spy(new DiscoverComponents(ThingChannelConstants.testHomeAssistantThing,
scheduler, channelStateUpdateListener, gson));
scheduler, channelStateUpdateListener, gson, transformationServiceProvider));

// The DiscoverComponents object calls ComponentDiscovered callbacks.
// In the following implementation we add the found component to the `haComponents` map
Expand Down Expand Up @@ -195,8 +201,8 @@ public void parseHATree() throws InterruptedException, ExecutionException, Timeo
verify(channelTypeProvider, times(1)).setChannelType(any(), any());

// We expect a switch component with an OnOff channel with the initial value UNDEF:
State value = haComponents.get(haID.getChannelGroupID()).channelTypes()
.get(ComponentSwitch.switchChannelID).channelState.getCache().getChannelState();
State value = haComponents.get(haID.getChannelGroupID()).channelTypes().get(ComponentSwitch.switchChannelID)
.getState().getCache().getChannelState();
assertThat(value, is(UnDefType.UNDEF));

haComponents.values().stream().map(e -> e.start(connection, scheduler, 100))
Expand All @@ -209,8 +215,8 @@ public void parseHATree() throws InterruptedException, ExecutionException, Timeo
verify(channelStateUpdateListener, times(1)).updateChannelState(any(), any());

// Value should be ON now.
value = haComponents.get(haID.getChannelGroupID()).channelTypes()
.get(ComponentSwitch.switchChannelID).channelState.getCache().getChannelState();
value = haComponents.get(haID.getChannelGroupID()).channelTypes().get(ComponentSwitch.switchChannelID)
.getState().getCache().getChannelState();
assertThat(value, is(OnOffType.ON));

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.junit.Test;
import org.mockito.Mock;
import org.openhab.binding.mqtt.generic.internal.convention.homeassistant.DiscoverComponents.ComponentDiscovered;
import org.openhab.binding.mqtt.generic.internal.generic.TransformationServiceProvider;
import org.openhab.binding.mqtt.generic.internal.handler.ThingChannelConstants;

import com.google.gson.Gson;
Expand All @@ -46,6 +47,9 @@ public class DiscoverComponentsTests extends JavaOSGiTest {
@Mock
ComponentDiscovered discovered;

@Mock
TransformationServiceProvider transformationServiceProvider;

@Before
public void setUp() {
initMocks(this);
Expand All @@ -57,6 +61,7 @@ public void setUp() {
doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any());
doReturn(CompletableFuture.completedFuture(true)).when(connection).publish(any(), any(), anyInt(),
anyBoolean());
doReturn(null).when(transformationServiceProvider).getTransformationService(any());
}

@Test
Expand All @@ -66,8 +71,8 @@ public void discoveryTimeTest() throws InterruptedException, ExecutionException,

Gson gson = new GsonBuilder().registerTypeAdapterFactory(new ChannelConfigurationTypeAdapterFactory()).create();

DiscoverComponents discover = spy(
new DiscoverComponents(ThingChannelConstants.testHomeAssistantThing, scheduler, null, gson));
DiscoverComponents discover = spy(new DiscoverComponents(ThingChannelConstants.testHomeAssistantThing,
scheduler, null, gson, transformationServiceProvider));

discover.startDiscovery(connection, 50, new HaID("homeassistant", "object", "node", "component"), discovered)
.get(100, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@

public class HAConfigurationTests {

private Gson gson = new GsonBuilder().registerTypeAdapterFactory(new ChannelConfigurationTypeAdapterFactory()).create();
private Gson gson = new GsonBuilder().registerTypeAdapterFactory(new ChannelConfigurationTypeAdapterFactory())
.create();

@Test
public void testAbbreviations() {
Expand Down Expand Up @@ -103,7 +104,8 @@ public void testTildeSubstritution() {
+ " \"~\":\"D/\"\n" //
+ "}";

ComponentSwitch.ChannelConfiguration config = BaseChannelConfiguration.fromString(json, gson, ComponentSwitch.ChannelConfiguration.class);
ComponentSwitch.ChannelConfiguration config = BaseChannelConfiguration.fromString(json, gson,
ComponentSwitch.ChannelConfiguration.class);

assertThat(config.availability_topic, is("D/E"));
assertThat(config.state_topic, is("O/D/"));
Expand Down Expand Up @@ -134,7 +136,8 @@ public void testSampleFanConfig() {
+ " ]\n" //
+ "}";

ComponentFan.ChannelConfiguration config = BaseChannelConfiguration.fromString(json, gson, ComponentFan.ChannelConfiguration.class);
ComponentFan.ChannelConfiguration config = BaseChannelConfiguration.fromString(json, gson,
ComponentFan.ChannelConfiguration.class);
assertThat(config.name, is("Bedroom Fan"));

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ protected void unsetChannelProvider(MqttChannelTypeProvider provider) {
} else if (thingTypeUID.equals(MqttBindingConstants.HOMIE300_MQTT_THING)) {
return new HomieThingHandler(thing, typeProvider, 1500, 200);
} else if (thingTypeUID.equals(MqttBindingConstants.HOMEASSISTANT_MQTT_THING)) {
return new HomeAssistantThingHandler(thing, typeProvider, 1500, 200);
return new HomeAssistantThingHandler(thing, typeProvider, this, 1500, 200);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.smarthome.core.thing.ChannelGroupUID;
import org.eclipse.smarthome.core.thing.type.ChannelDefinition;
import org.eclipse.smarthome.core.thing.type.ChannelDefinitionBuilder;
import org.eclipse.smarthome.core.thing.type.ChannelGroupType;
import org.eclipse.smarthome.core.thing.type.ChannelGroupTypeBuilder;
import org.eclipse.smarthome.core.thing.type.ChannelGroupTypeUID;
import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.binding.mqtt.generic.internal.MqttBindingConstants;
import org.openhab.binding.mqtt.generic.internal.convention.homeassistant.CFactory.ComponentConfiguration;
import org.openhab.binding.mqtt.generic.internal.generic.MqttChannelTypeProvider;
import org.openhab.binding.mqtt.generic.internal.values.Value;

/**
* A HomeAssistant component is comparable to an ESH channel group.
Expand All @@ -41,6 +42,7 @@
@NonNullByDefault
public abstract class AbstractComponent<C extends BaseChannelConfiguration> {
// Component location fields
private final ComponentConfiguration componentConfiguration;
protected final ChannelGroupTypeUID channelGroupTypeUID;
protected final ChannelGroupUID channelGroupUID;
protected final HaID haID;
Expand All @@ -62,6 +64,7 @@ public abstract class AbstractComponent<C extends BaseChannelConfiguration> {
* @param gson A Gson instance
*/
public AbstractComponent(CFactory.ComponentConfiguration componentConfiguration, Class<C> clazz) {
this.componentConfiguration = componentConfiguration;
this.haID = componentConfiguration.getHaID();
this.channelGroupTypeUID = new ChannelGroupTypeUID(MqttBindingConstants.BINDING_ID,
haID.getChannelGroupTypeID());
Expand All @@ -72,6 +75,10 @@ public AbstractComponent(CFactory.ComponentConfiguration componentConfiguration,
this.configHash = channelConfigurationJson.hashCode();
}

protected CChannel.Builder buildChannel(String channelID, Value valueState, String label) {
return new CChannel.Builder(this, componentConfiguration, channelID, valueState, label);
}

/**
* Subscribes to all state channels of the component and adds all channels to the provided channel type provider.
*
Expand All @@ -82,7 +89,7 @@ public AbstractComponent(CFactory.ComponentConfiguration componentConfiguration,
*/
public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
int timeout) {
return channels.values().stream().map(v -> v.channelState.start(connection, scheduler, timeout))
return channels.values().stream().map(v -> v.start(connection, scheduler, timeout))
.reduce(CompletableFuture.completedFuture(null), (f, v) -> f.thenCompose(b -> v));
}

Expand All @@ -93,8 +100,8 @@ public AbstractComponent(CFactory.ComponentConfiguration componentConfiguration,
* exceptionally on errors.
*/
public CompletableFuture<@Nullable Void> stop() {
return channels.values().stream().map(v -> v.channelState.stop())
.reduce(CompletableFuture.completedFuture(null), (f, v) -> f.thenCompose(b -> v));
return channels.values().stream().map(v -> v.stop()).reduce(CompletableFuture.completedFuture(null),
(f, v) -> f.thenCompose(b -> v));
}

/**
Expand All @@ -103,7 +110,7 @@ public AbstractComponent(CFactory.ComponentConfiguration componentConfiguration,
* @param channelTypeProvider The channel type provider
*/
public void addChannelTypes(MqttChannelTypeProvider channelTypeProvider) {
channels.values().forEach(v -> channelTypeProvider.setChannelType(v.channelTypeUID, v.type));
channels.values().forEach(v -> v.addChannelTypes(channelTypeProvider));
}

/**
Expand All @@ -113,7 +120,7 @@ public void addChannelTypes(MqttChannelTypeProvider channelTypeProvider) {
* @param channelTypeProvider The channel type provider
*/
public void removeChannelTypes(MqttChannelTypeProvider channelTypeProvider) {
channels.values().forEach(v -> channelTypeProvider.removeChannelType(v.channelTypeUID));
channels.values().forEach(v -> v.removeChannelTypes(channelTypeProvider));
}

/**
Expand Down Expand Up @@ -167,8 +174,7 @@ public int getConfigHash() {
* Return the channel group type.
*/
public ChannelGroupType type() {
final List<ChannelDefinition> channelDefinitions = channels.values().stream()
.map(c -> new ChannelDefinitionBuilder(c.channelUID.getId(), c.channelTypeUID).build())
final List<ChannelDefinition> channelDefinitions = channels.values().stream().map(c -> c.type())
.collect(Collectors.toList());
return ChannelGroupTypeBuilder.instance(channelGroupTypeUID, name()).withChannelDefinitions(channelDefinitions)
.build();
Expand All @@ -179,7 +185,7 @@ public ChannelGroupType type() {
* to the MQTT broker got lost.
*/
public void resetState() {
channels.values().forEach(c -> c.channelState.getCache().resetState());
channels.values().forEach(c -> c.resetState());
}

}
Loading

0 comments on commit 0ea0bd1

Please sign in to comment.