Skip to content

Commit

Permalink
ISPN-6505 Remote continuous query on a cache with compat mode enabled…
Browse files Browse the repository at this point in the history
… leads to CCE
  • Loading branch information
anistor authored and tristantarrant committed Apr 29, 2016
1 parent ab1f8c2 commit e797dae
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 32 deletions.
Expand Up @@ -103,7 +103,8 @@ public ClientEntryListener(SerializationContext serializationContext, Continuous
@ClientCacheEntryRemoved @ClientCacheEntryRemoved
@ClientCacheEntryExpired @ClientCacheEntryExpired
public void handleClientCacheEntryCreatedEvent(ClientCacheEntryCustomEvent<byte[]> event) throws IOException { public void handleClientCacheEntryCreatedEvent(ClientCacheEntryCustomEvent<byte[]> event) throws IOException {
ContinuousQueryResult cqr = ProtobufUtil.fromByteArray(serializationContext, event.getEventData(), ContinuousQueryResult.class); byte[] eventData = event.getEventData();
ContinuousQueryResult cqr = (ContinuousQueryResult) ProtobufUtil.fromWrappedByteArray(serializationContext, eventData);
Object key = ProtobufUtil.fromWrappedByteArray(serializationContext, cqr.getKey()); Object key = ProtobufUtil.fromWrappedByteArray(serializationContext, cqr.getKey());
Object value = cqr.getValue() != null ? ProtobufUtil.fromWrappedByteArray(serializationContext, cqr.getValue()) : cqr.getProjection(); Object value = cqr.getValue() != null ? ProtobufUtil.fromWrappedByteArray(serializationContext, cqr.getValue()) : cqr.getProjection();
if (cqr.isJoining()) { if (cqr.isJoining()) {
Expand Down
Expand Up @@ -49,7 +49,7 @@
* @author anistor@redhat.com * @author anistor@redhat.com
* @since 9.0 * @since 9.0
*/ */
@Test(enabled = false, description = "To be fixed by https://issues.jboss.org/browse/ISPN-6505", groups = "functional", testName = "client.hotrod.event.EmbeddedCompatContinuousQueryTest") @Test(groups = "functional", testName = "client.hotrod.event.EmbeddedCompatContinuousQueryTest")
public class EmbeddedCompatContinuousQueryTest extends MultiHotRodServersTest { public class EmbeddedCompatContinuousQueryTest extends MultiHotRodServersTest {


private final int NUM_NODES = 5; private final int NUM_NODES = 5;
Expand Down Expand Up @@ -105,7 +105,7 @@ protected org.infinispan.client.hotrod.configuration.ConfigurationBuilder create
/** /**
* Using grouping and aggregation with continuous query is not allowed. * Using grouping and aggregation with continuous query is not allowed.
*/ */
@Test(enabled = false, description = "To be fixed by https://issues.jboss.org/browse/ISPN-6505", expectedExceptions = HotRodClientException.class, expectedExceptionsMessageRegExp = ".*ISPN000411:.*") @Test(expectedExceptions = HotRodClientException.class, expectedExceptionsMessageRegExp = ".*ISPN000411:.*")
public void testDisallowGroupingAndAggregation() { public void testDisallowGroupingAndAggregation() {
Query query = Search.getQueryFactory(remoteCache).from(UserPB.class) Query query = Search.getQueryFactory(remoteCache).from(UserPB.class)
.select(max("age")) .select(max("age"))
Expand Down
Expand Up @@ -36,8 +36,6 @@ public abstract class BaseMatcher<TypeMetadata, AttributeMetadata, AttributeId e


private final Lock write = readWriteLock.writeLock(); private final Lock write = readWriteLock.writeLock();


protected final Map<String, FilterRegistry<TypeMetadata, AttributeMetadata, AttributeId>> filtersByTypeName = new HashMap<>();

protected final Map<TypeMetadata, FilterRegistry<TypeMetadata, AttributeMetadata, AttributeId>> filtersByType = new HashMap<>(); protected final Map<TypeMetadata, FilterRegistry<TypeMetadata, AttributeMetadata, AttributeId>> filtersByType = new HashMap<>();


protected final ObjectPropertyHelper<TypeMetadata> propertyHelper; protected final ObjectPropertyHelper<TypeMetadata> propertyHelper;
Expand Down Expand Up @@ -185,10 +183,9 @@ public FilterSubscription registerFilter(String jpaQuery, Map<String, Object> na


write.lock(); write.lock();
try { try {
FilterRegistry<TypeMetadata, AttributeMetadata, AttributeId> filterRegistry = filtersByTypeName.get(parsingResult.getTargetEntityName()); FilterRegistry<TypeMetadata, AttributeMetadata, AttributeId> filterRegistry = filtersByType.get(parsingResult.getTargetEntityMetadata());
if (filterRegistry == null) { if (filterRegistry == null) {
filterRegistry = new FilterRegistry<>(createMetadataAdapter(parsingResult.getTargetEntityMetadata()), true); filterRegistry = new FilterRegistry<>(createMetadataAdapter(parsingResult.getTargetEntityMetadata()), true);
filtersByTypeName.put(parsingResult.getTargetEntityName(), filterRegistry);
filtersByType.put(filterRegistry.getMetadataAdapter().getTypeMetadata(), filterRegistry); filtersByType.put(filterRegistry.getMetadataAdapter().getTypeMetadata(), filterRegistry);
} }
return filterRegistry.addFilter(jpaQuery, namedParameters, parsingResult.getWhereClause(), parsingResult.getProjections(), parsingResult.getProjectedTypes(), parsingResult.getSortFields(), callback, eventType); return filterRegistry.addFilter(jpaQuery, namedParameters, parsingResult.getWhereClause(), parsingResult.getProjections(), parsingResult.getProjectedTypes(), parsingResult.getSortFields(), callback, eventType);
Expand All @@ -205,17 +202,16 @@ private void disallowGroupingAndAggregations(FilterParsingResult<TypeMetadata> p


@Override @Override
public void unregisterFilter(FilterSubscription filterSubscription) { public void unregisterFilter(FilterSubscription filterSubscription) {
FilterSubscriptionImpl filterSubscriptionImpl = (FilterSubscriptionImpl) filterSubscription; FilterSubscriptionImpl<TypeMetadata, AttributeMetadata, AttributeId> filterSubscriptionImpl = (FilterSubscriptionImpl<TypeMetadata, AttributeMetadata, AttributeId>) filterSubscription;
write.lock(); write.lock();
try { try {
FilterRegistry<TypeMetadata, AttributeMetadata, AttributeId> filterRegistry = filtersByTypeName.get(filterSubscriptionImpl.getEntityTypeName()); FilterRegistry<TypeMetadata, AttributeMetadata, AttributeId> filterRegistry = filtersByType.get(filterSubscriptionImpl.getMetadataAdapter().getTypeMetadata());
if (filterRegistry != null) { if (filterRegistry != null) {
filterRegistry.removeFilter(filterSubscription); filterRegistry.removeFilter(filterSubscription);
} else { } else {
throw new IllegalStateException("Reached illegal state"); throw new IllegalStateException("Reached illegal state");
} }
if (filterRegistry.getFilterSubscriptions().isEmpty()) { if (filterRegistry.getFilterSubscriptions().isEmpty()) {
filtersByTypeName.remove(filterRegistry.getMetadataAdapter().getTypeName());
filtersByType.remove(filterRegistry.getMetadataAdapter().getTypeMetadata()); filtersByType.remove(filterRegistry.getMetadataAdapter().getTypeMetadata());
} }
} finally { } finally {
Expand Down
Expand Up @@ -48,7 +48,7 @@ protected ProtobufMatcherEvalContext startSingleTypeContext(Object userContext,


@Override @Override
protected FilterRegistry<Descriptor, FieldDescriptor, Integer> getFilterRegistryForType(Descriptor entityType) { protected FilterRegistry<Descriptor, FieldDescriptor, Integer> getFilterRegistryForType(Descriptor entityType) {
return filtersByTypeName.get(entityType.getFullName()); return filtersByType.get(entityType);
} }


@Override @Override
Expand Down
@@ -1,5 +1,6 @@
package org.infinispan.query.continuous.impl; package org.infinispan.query.continuous.impl;


import org.infinispan.Cache;
import org.infinispan.commons.CacheException; import org.infinispan.commons.CacheException;
import org.infinispan.commons.io.UnsignedNumeric; import org.infinispan.commons.io.UnsignedNumeric;
import org.infinispan.commons.marshall.AbstractExternalizer; import org.infinispan.commons.marshall.AbstractExternalizer;
Expand Down Expand Up @@ -40,7 +41,7 @@ public class JPAContinuousQueryCacheEventFilterConverter<K, V, C> extends Abstra
/** /**
* The implementation class of the Matcher component to lookup and use. * The implementation class of the Matcher component to lookup and use.
*/ */
protected final Class<? extends Matcher> matcherImplClass; protected Class<? extends Matcher> matcherImplClass;


/** /**
* Optional cache for query objects. * Optional cache for query objects.
Expand Down Expand Up @@ -82,7 +83,7 @@ public Map<String, Object> getNamedParameters() {
* Acquires a Matcher instance from the ComponentRegistry of the given Cache object. * Acquires a Matcher instance from the ComponentRegistry of the given Cache object.
*/ */
@Inject @Inject
protected void injectDependencies(ComponentRegistry componentRegistry) { protected void injectDependencies(ComponentRegistry componentRegistry, Cache c) {
queryCache = componentRegistry.getComponent(QueryCache.class); queryCache = componentRegistry.getComponent(QueryCache.class);
matcher = componentRegistry.getComponent(matcherImplClass); matcher = componentRegistry.getComponent(matcherImplClass);
if (matcher == null) { if (matcher == null) {
Expand Down
Expand Up @@ -17,7 +17,7 @@
* @author anistor@redhat.com * @author anistor@redhat.com
* @since 7.0 * @since 7.0
*/ */
final class CompatibilityReflectionMatcher extends ReflectionMatcher { public final class CompatibilityReflectionMatcher extends ReflectionMatcher {


private final SerializationContext serializationContext; private final SerializationContext serializationContext;


Expand Down
Expand Up @@ -17,4 +17,5 @@ public interface ExternalizerIds {
Integer JPA_PROTOBUF_FILTER_AND_CONVERTER = 1702; Integer JPA_PROTOBUF_FILTER_AND_CONVERTER = 1702;
Integer JPA_CONTINUOUS_QUERY_CACHE_EVENT_FILTER_CONVERTER = 1703; Integer JPA_CONTINUOUS_QUERY_CACHE_EVENT_FILTER_CONVERTER = 1703;
Integer JPA_BINARY_PROTOBUF_FILTER_AND_CONVERTER = 1704; Integer JPA_BINARY_PROTOBUF_FILTER_AND_CONVERTER = 1704;
Integer JPA_CONTINUOUS_QUERY_RESULT = 1705;
} }
Expand Up @@ -27,6 +27,7 @@
import org.infinispan.objectfilter.impl.ProtobufMatcher; import org.infinispan.objectfilter.impl.ProtobufMatcher;
import org.infinispan.protostream.SerializationContext; import org.infinispan.protostream.SerializationContext;
import org.infinispan.query.remote.ProtobufMetadataManager; import org.infinispan.query.remote.ProtobufMetadataManager;
import org.infinispan.query.remote.impl.filter.ContinuousQueryResultExternalizer;
import org.infinispan.query.remote.impl.filter.JPABinaryProtobufFilterAndConverter; import org.infinispan.query.remote.impl.filter.JPABinaryProtobufFilterAndConverter;
import org.infinispan.query.remote.impl.filter.JPAContinuousQueryProtobufCacheEventFilterConverter; import org.infinispan.query.remote.impl.filter.JPAContinuousQueryProtobufCacheEventFilterConverter;
import org.infinispan.query.remote.impl.filter.JPAProtobufCacheEventFilterConverter; import org.infinispan.query.remote.impl.filter.JPAProtobufCacheEventFilterConverter;
Expand Down Expand Up @@ -58,6 +59,7 @@ public void cacheManagerStarting(GlobalComponentRegistry gcr, GlobalConfiguratio
externalizerMap.put(ExternalizerIds.JPA_PROTOBUF_FILTER_AND_CONVERTER, new JPAProtobufFilterAndConverter.Externalizer()); externalizerMap.put(ExternalizerIds.JPA_PROTOBUF_FILTER_AND_CONVERTER, new JPAProtobufFilterAndConverter.Externalizer());
externalizerMap.put(ExternalizerIds.JPA_CONTINUOUS_QUERY_CACHE_EVENT_FILTER_CONVERTER, new JPAContinuousQueryProtobufCacheEventFilterConverter.Externalizer()); externalizerMap.put(ExternalizerIds.JPA_CONTINUOUS_QUERY_CACHE_EVENT_FILTER_CONVERTER, new JPAContinuousQueryProtobufCacheEventFilterConverter.Externalizer());
externalizerMap.put(ExternalizerIds.JPA_BINARY_PROTOBUF_FILTER_AND_CONVERTER, new JPABinaryProtobufFilterAndConverter.Externalizer()); externalizerMap.put(ExternalizerIds.JPA_BINARY_PROTOBUF_FILTER_AND_CONVERTER, new JPABinaryProtobufFilterAndConverter.Externalizer());
externalizerMap.put(ExternalizerIds.JPA_CONTINUOUS_QUERY_RESULT, new ContinuousQueryResultExternalizer());
} }


@Override @Override
Expand Down
@@ -0,0 +1,77 @@
package org.infinispan.query.remote.impl.filter;

import org.infinispan.commons.marshall.AbstractExternalizer;
import org.infinispan.query.remote.client.ContinuousQueryResult;
import org.infinispan.query.remote.impl.ExternalizerIds;

import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Collections;
import java.util.Set;

/**
* A 'remote' ContinuousQueryResult needs jboss-marshalling serializability between nodes when running in compat mode.
* It will only be marshalled using protobuf before passing it to the remote client.
*
* @author anistor@redhat.com
* @since 9.0
*/
public final class ContinuousQueryResultExternalizer extends AbstractExternalizer<ContinuousQueryResult> {

@Override
public void writeObject(ObjectOutput output, ContinuousQueryResult continuousQueryResult) throws IOException {
output.writeBoolean(continuousQueryResult.isJoining());
output.writeInt(continuousQueryResult.getKey().length);
output.write(continuousQueryResult.getKey());
if (continuousQueryResult.isJoining()) {
Object[] projection = continuousQueryResult.getProjection();
if (projection == null) {
output.writeInt(continuousQueryResult.getValue().length);
output.writeObject(continuousQueryResult.getValue());
} else {
// skip serializing the instance if there is a projection
output.writeInt(-1);
int projLen = projection.length;
output.writeInt(projLen);
for (Object prj : projection) {
output.writeObject(prj);
}
}
}
}

@Override
public ContinuousQueryResult readObject(ObjectInput input) throws IOException, ClassNotFoundException {
boolean isJoining = input.readBoolean();
int keyLen = input.readInt();
byte[] key = new byte[keyLen];
input.readFully(key);
byte[] value = null;
Object[] projection = null;
if (isJoining) {
int valueLen = input.readInt();
if (valueLen == -1) {
int projLen = input.readInt();
projection = new Object[projLen];
for (int i = 0; i < projLen; i++) {
projection[i] = input.readObject();
}
} else {
value = new byte[valueLen];
input.readFully(value);
}
}
return new ContinuousQueryResult(isJoining, key, value, projection);
}

@Override
public Integer getId() {
return ExternalizerIds.JPA_CONTINUOUS_QUERY_RESULT;
}

@Override
public Set<Class<? extends ContinuousQueryResult>> getTypeClasses() {
return Collections.singleton(ContinuousQueryResult.class);
}
}
@@ -1,10 +1,11 @@
package org.infinispan.query.remote.impl.filter; package org.infinispan.query.remote.impl.filter;


import org.infinispan.Cache;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.io.UnsignedNumeric; import org.infinispan.commons.io.UnsignedNumeric;
import org.infinispan.commons.marshall.AbstractExternalizer; import org.infinispan.commons.marshall.AbstractExternalizer;
import org.infinispan.configuration.cache.Configuration; import org.infinispan.configuration.cache.Configuration;
import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.ComponentRegistry;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.metadata.Metadata; import org.infinispan.metadata.Metadata;
import org.infinispan.notifications.cachelistener.filter.EventType; import org.infinispan.notifications.cachelistener.filter.EventType;
import org.infinispan.objectfilter.Matcher; import org.infinispan.objectfilter.Matcher;
Expand All @@ -13,6 +14,7 @@
import org.infinispan.protostream.SerializationContext; import org.infinispan.protostream.SerializationContext;
import org.infinispan.query.continuous.impl.JPAContinuousQueryCacheEventFilterConverter; import org.infinispan.query.continuous.impl.JPAContinuousQueryCacheEventFilterConverter;
import org.infinispan.query.remote.client.ContinuousQueryResult; import org.infinispan.query.remote.client.ContinuousQueryResult;
import org.infinispan.query.remote.impl.CompatibilityReflectionMatcher;
import org.infinispan.query.remote.impl.ExternalizerIds; import org.infinispan.query.remote.impl.ExternalizerIds;
import org.infinispan.query.remote.impl.ProtobufMetadataManagerImpl; import org.infinispan.query.remote.impl.ProtobufMetadataManagerImpl;
import org.infinispan.query.remote.impl.indexing.ProtobufValueWrapper; import org.infinispan.query.remote.impl.indexing.ProtobufValueWrapper;
Expand All @@ -29,25 +31,32 @@
* @author anistor@redhat.com * @author anistor@redhat.com
* @since 8.0 * @since 8.0
*/ */
public final class JPAContinuousQueryProtobufCacheEventFilterConverter extends JPAContinuousQueryCacheEventFilterConverter<Object, Object, byte[]> { public final class JPAContinuousQueryProtobufCacheEventFilterConverter extends JPAContinuousQueryCacheEventFilterConverter<Object, Object, Object> {


private SerializationContext serCtx; private SerializationContext serCtx;


private boolean usesValueWrapper; private boolean usesValueWrapper;


private boolean isCompatMode;

public JPAContinuousQueryProtobufCacheEventFilterConverter(String jpaQuery, Map<String, Object> namedParameters, Class<? extends Matcher> matcherImplClass) { public JPAContinuousQueryProtobufCacheEventFilterConverter(String jpaQuery, Map<String, Object> namedParameters, Class<? extends Matcher> matcherImplClass) {
super(jpaQuery, namedParameters, matcherImplClass); super(jpaQuery, namedParameters, matcherImplClass);
} }


@Inject @Override
@SuppressWarnings("unused") protected void injectDependencies(ComponentRegistry componentRegistry, Cache c) {
protected void injectDependencies(EmbeddedCacheManager cacheManager, Configuration cfg) { serCtx = ProtobufMetadataManagerImpl.getSerializationContextInternal(c.getCacheManager());
serCtx = ProtobufMetadataManagerImpl.getSerializationContextInternal(cacheManager); Configuration cfg = c.getCacheConfiguration();
usesValueWrapper = cfg.indexing().index().isEnabled() && !cfg.compatibility().enabled(); isCompatMode = cfg.compatibility().enabled();
usesValueWrapper = cfg.indexing().index().isEnabled() && !isCompatMode;
if (isCompatMode) {
matcherImplClass = CompatibilityReflectionMatcher.class;
}
super.injectDependencies(componentRegistry, c);
} }


@Override @Override
public byte[] filterAndConvert(Object key, Object oldValue, Metadata oldMetadata, Object newValue, Metadata newMetadata, EventType eventType) { public Object filterAndConvert(Object key, Object oldValue, Metadata oldMetadata, Object newValue, Metadata newMetadata, EventType eventType) {
if (usesValueWrapper) { if (usesValueWrapper) {
oldValue = oldValue != null ? ((ProtobufValueWrapper) oldValue).getBinary() : null; oldValue = oldValue != null ? ((ProtobufValueWrapper) oldValue).getBinary() : null;
newValue = newValue != null ? ((ProtobufValueWrapper) newValue).getBinary() : null; newValue = newValue != null ? ((ProtobufValueWrapper) newValue).getBinary() : null;
Expand All @@ -61,18 +70,33 @@ public byte[] filterAndConvert(Object key, Object oldValue, Metadata oldMetadata
ObjectFilter objectFilter = getObjectFilter(); ObjectFilter objectFilter = getObjectFilter();
ObjectFilter.FilterResult f1 = oldValue == null ? null : objectFilter.filter(oldValue); ObjectFilter.FilterResult f1 = oldValue == null ? null : objectFilter.filter(oldValue);
ObjectFilter.FilterResult f2 = newValue == null ? null : objectFilter.filter(newValue); ObjectFilter.FilterResult f2 = newValue == null ? null : objectFilter.filter(newValue);
ContinuousQueryResult result;
if (f1 == null && f2 != null) { if (f1 == null && f2 != null) {
result = new ContinuousQueryResult(true, (byte[]) key, f2.getProjection() == null ? (byte[]) newValue : null, f2.getProjection()); return makeFilterResult(true, key, f2.getProjection() == null ? newValue : null, f2.getProjection());
} else if (f1 != null && f2 == null) { } else if (f1 != null && f2 == null) {
result = new ContinuousQueryResult(false, (byte[]) key, null, null); return makeFilterResult(false, key, null, null);
} else { } else {
return null; return null;
} }
}

protected Object makeFilterResult(boolean isJoining, Object key, Object value, Object[] projection) {
try { try {
return ProtobufUtil.toByteArray(serCtx, result); if (isCompatMode) {
key = ProtobufUtil.toWrappedByteArray(serCtx, key);
if (value != null) {
value = ProtobufUtil.toWrappedByteArray(serCtx, value);
}
}

Object result = new ContinuousQueryResult(isJoining, (byte[]) key, (byte[]) value, projection);

if (!isCompatMode) {
result = ProtobufUtil.toWrappedByteArray(serCtx, result);
}

return result;
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new CacheException(e);
} }
} }


Expand Down
@@ -1,5 +1,7 @@
package org.infinispan.query.remote.impl.filter; package org.infinispan.query.remote.impl.filter;


import org.infinispan.Cache;
import org.infinispan.commons.CacheException;
import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Inject;
import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.cachelistener.filter.FilterIndexingServiceProvider; import org.infinispan.notifications.cachelistener.filter.FilterIndexingServiceProvider;
Expand All @@ -24,9 +26,12 @@ public final class JPAContinuousQueryProtobufFilterIndexingServiceProvider exten


private SerializationContext serCtx; private SerializationContext serCtx;


private boolean isCompatMode;

@Inject @Inject
protected void injectDependencies(EmbeddedCacheManager cacheManager) { protected void injectDependencies(EmbeddedCacheManager cacheManager, Cache c) {
serCtx = ProtobufMetadataManagerImpl.getSerializationContextInternal(cacheManager); serCtx = ProtobufMetadataManagerImpl.getSerializationContextInternal(cacheManager);
isCompatMode = c.getCacheConfiguration().compatibility().enabled();
} }


@Override @Override
Expand All @@ -36,11 +41,24 @@ public boolean supportsFilter(IndexedFilter<?, ?, ?> indexedFilter) {


@Override @Override
protected Object makeFilterResult(Object userContext, Object eventType, Object key, Object instance, Object[] projection, Comparable[] sortProjection) { protected Object makeFilterResult(Object userContext, Object eventType, Object key, Object instance, Object[] projection, Comparable[] sortProjection) {
boolean isJoining = Boolean.TRUE.equals(eventType);
try { try {
return ProtobufUtil.toByteArray(serCtx, new ContinuousQueryResult(isJoining, (byte[]) key, (byte[]) instance, projection)); if (isCompatMode) {
key = ProtobufUtil.toWrappedByteArray(serCtx, key);
if (instance != null) {
instance = ProtobufUtil.toWrappedByteArray(serCtx, instance);
}
}

boolean isJoining = Boolean.TRUE.equals(eventType);
Object result = new ContinuousQueryResult(isJoining, (byte[]) key, (byte[]) instance, projection);

if (!isCompatMode) {
result = ProtobufUtil.toWrappedByteArray(serCtx, result);
}

return result;
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new CacheException(e);
} }
} }
} }

0 comments on commit e797dae

Please sign in to comment.