This repository has been archived by the owner on Sep 21, 2020. It is now read-only.
/
GenericMQTTThingHandler.java
174 lines (159 loc) · 7.76 KB
/
GenericMQTTThingHandler.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
/**
* Copyright (c) 2010-2019 Contributors to the openHAB project
*
* See the NOTICE file(s) distributed with this work for additional
* information.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.openhab.binding.mqtt.generic.internal.handler;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang.StringUtils;
import org.eclipse.jdt.annotation.NonNullByDefault;
import org.eclipse.jdt.annotation.Nullable;
import org.eclipse.smarthome.core.thing.Channel;
import org.eclipse.smarthome.core.thing.ChannelUID;
import org.eclipse.smarthome.core.thing.Thing;
import org.eclipse.smarthome.core.thing.ThingStatus;
import org.eclipse.smarthome.core.thing.ThingStatusDetail;
import org.eclipse.smarthome.core.thing.type.ChannelTypeUID;
import org.eclipse.smarthome.core.types.StateDescription;
import org.eclipse.smarthome.io.transport.mqtt.MqttBrokerConnection;
import org.openhab.binding.mqtt.generic.AbstractMQTTThingHandler;
import org.openhab.binding.mqtt.generic.ChannelConfig;
import org.openhab.binding.mqtt.generic.ChannelState;
import org.openhab.binding.mqtt.generic.ChannelStateTransformation;
import org.openhab.binding.mqtt.generic.ChannelStateUpdateListener;
import org.openhab.binding.mqtt.generic.MqttChannelStateDescriptionProvider;
import org.openhab.binding.mqtt.generic.TransformationServiceProvider;
import org.openhab.binding.mqtt.generic.values.Value;
import org.openhab.binding.mqtt.generic.values.ValueFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This handler manages manual created Things with manually added channels to link to MQTT topics.
*
* @author David Graeff - Initial contribution
*/
@NonNullByDefault
public class GenericMQTTThingHandler extends AbstractMQTTThingHandler implements ChannelStateUpdateListener {
private final Logger logger = LoggerFactory.getLogger(GenericMQTTThingHandler.class);
final Map<ChannelUID, ChannelState> channelStateByChannelUID = new HashMap<>();
protected final MqttChannelStateDescriptionProvider stateDescProvider;
protected final TransformationServiceProvider transformationServiceProvider;
/**
* Creates a new Thing handler for generic MQTT channels.
*
* @param thing The thing of this handler
* @param stateDescProvider A channel state provider
* @param transformationServiceProvider The transformation service provider
* @param subscribeTimeout The subscribe timeout
*/
public GenericMQTTThingHandler(Thing thing, MqttChannelStateDescriptionProvider stateDescProvider,
TransformationServiceProvider transformationServiceProvider, int subscribeTimeout) {
super(thing, subscribeTimeout);
this.stateDescProvider = stateDescProvider;
this.transformationServiceProvider = transformationServiceProvider;
}
@Override
public @Nullable ChannelState getChannelState(ChannelUID channelUID) {
return channelStateByChannelUID.get(channelUID);
}
/**
* Subscribe on all channel static topics on all {@link ChannelState}s.
* If subscribing on all channels worked, the thing is put ONLINE, else OFFLINE.
*
* @param connection A started broker connection
*/
@Override
protected CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection) {
List<CompletableFuture<@Nullable Void>> futures = channelStateByChannelUID.values().stream()
.map(c -> c.start(connection, scheduler, 0)).collect(Collectors.toList());
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenRun(() -> {
updateStatus(ThingStatus.ONLINE, ThingStatusDetail.NONE);
});
}
@Override
protected void stop() {
channelStateByChannelUID.values().forEach(c -> c.getCache().resetState());
}
@Override
public void dispose() {
// Remove all state descriptions of this handler
channelStateByChannelUID.forEach((uid, state) -> stateDescProvider.remove(uid));
super.dispose();
// there is a design flaw, we can't clean up our stuff because it is needed by the super-class on disposal for unsubscribing
channelStateByChannelUID.clear();
}
@Override
public CompletableFuture<Void> unsubscribeAll() {
return CompletableFuture.allOf(channelStateByChannelUID.values().stream().map(ChannelState::stop)
.toArray(CompletableFuture[]::new));
}
/**
* For every Thing channel there exists a corresponding {@link ChannelState}. It consists of the MQTT state
* and MQTT command topic, the ChannelUID and a value state.
*
* @param channelConfig The channel configuration that contains MQTT state and command topic and multiple other
* configurations.
* @param channelUID The channel UID
* @param valueState The channel value state
* @return
*/
protected ChannelState createChannelState(ChannelConfig channelConfig, ChannelUID channelUID, Value valueState) {
ChannelState state = new ChannelState(channelConfig, channelUID, valueState, this);
String[] transformations;
// Incoming value transformations
transformations = channelConfig.transformationPattern.split("∩");
Stream.of(transformations).filter(t -> StringUtils.isNotBlank(t))
.map(t -> new ChannelStateTransformation(t, transformationServiceProvider))
.forEach(t -> state.addTransformation(t));
// Outgoing value transformations
transformations = channelConfig.transformationPatternOut.split("∩");
Stream.of(transformations).filter(t -> StringUtils.isNotBlank(t))
.map(t -> new ChannelStateTransformation(t, transformationServiceProvider))
.forEach(t -> state.addTransformationOut(t));
return state;
}
@Override
public void initialize() {
List<ChannelUID> configErrors = new ArrayList<>();
for (Channel channel : thing.getChannels()) {
final ChannelTypeUID channelTypeUID = channel.getChannelTypeUID();
if (channelTypeUID == null) {
logger.warn("Channel {} has no type", channel.getLabel());
continue;
}
final ChannelConfig channelConfig = channel.getConfiguration().as(ChannelConfig.class);
try {
Value value = ValueFactory.createValueState(channelConfig, channelTypeUID.getId());
ChannelState channelState = createChannelState(channelConfig, channel.getUID(), value);
channelStateByChannelUID.put(channel.getUID(), channelState);
StateDescription description = value.createStateDescription(channelConfig.unit,
StringUtils.isBlank(channelConfig.commandTopic));
stateDescProvider.setDescription(channel.getUID(), description);
} catch (IllegalArgumentException e) {
logger.warn("Channel configuration error", e);
configErrors.add(channel.getUID());
}
}
// If some channels could not start up, put the entire thing offline and display the channels
// in question to the user.
if (configErrors.isEmpty()) {
super.initialize();
} else {
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, "Remove and recreate: "
+ configErrors.stream().map(e -> e.getAsString()).collect(Collectors.joining(",")));
}
}
}