Skip to content

Commit

Permalink
Feature/rordev 105 (#484)
Browse files Browse the repository at this point in the history
* changed: disabled stacktrace on forbidden in debug logs

* changed: better solution

* added: module es73x

* changed: porting es73x to scala

* fixes

* added: licenses

* changed: role index searcher wrapper security improvements

* fixed: build.sh

* fixed: build.sh
  • Loading branch information
coutoPL authored and sscarduzio committed Aug 10, 2019
1 parent e0cfb1f commit 48e73ea
Show file tree
Hide file tree
Showing 86 changed files with 2,918 additions and 707 deletions.
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ deploy:
env:
- ROR_TASK="license"
- ROR_TASK="unit"
- ROR_TASK="integration_es73x"
- ROR_TASK="integration_es73x_scala"
- ROR_TASK="integration_es70x"
- ROR_TASK="integration_es70x_scala"
- ROR_TASK="integration_es66x"
Expand Down
14 changes: 14 additions & 0 deletions bin/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ if [[ $TRAVIS != "true" ]] || [[ $ROR_TASK == "unit" ]]; then
./gradlew --stacktrace test ror
fi

if [[ $TRAVIS != "true" ]] || [[ $ROR_TASK == "integration_es73x" ]]; then
echo ">>> es73x => Running testcontainers.."
./gradlew integration-tests:test '-PesModule=es73x' || ( find . |grep hs_err |xargs cat && exit 1 )
fi

if [[ $TRAVIS != "true" ]] || [[ $ROR_TASK == "integration_es73x_scala" ]]; then
echo ">>> es73x => Running testcontainers.."
./gradlew integration-tests-scala:test '-PesModule=es73x' || ( find . |grep hs_err |xargs cat && exit 1 )
fi

if [[ $TRAVIS != "true" ]] || [[ $ROR_TASK == "integration_es70x" ]]; then
echo ">>> es70x => Running testcontainers.."
./gradlew integration-tests:test '-PesModule=es70x' || ( find . |grep hs_err |xargs cat && exit 1 )
Expand Down Expand Up @@ -116,6 +126,10 @@ fi
if [[ $TRAVIS != "true" ]] || [[ $ROR_TASK == "package" ]]; then

echo ">>> ($0) additional builds of ES module for specified ES version"

#es73
./gradlew --stacktrace es73x:ror '-PesVersion=7.3.0'

#es70
./gradlew --stacktrace es70x:ror '-PesVersion=7.0.0'
./gradlew --stacktrace es70x:ror '-PesVersion=7.0.1'
Expand Down
5 changes: 0 additions & 5 deletions core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,6 @@ test {
exceptionFormat = 'full'
events "standardOut", "started", "passed", "skipped", "failed"
}

tags {
exclude 'es70x'
}
}

dependencies {
Expand Down Expand Up @@ -131,7 +127,6 @@ license {
}
}


tasks.withType(ScalaCompile) {
scalaCompileOptions.additionalParameters = ["-Ypartial-unification", "-Ywarn-macros:after", "-feature", "-Xfatal-warnings"]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ public interface RequestInfoShim {

String extractType();

Integer getContentLength();

default Set<String> getExpandedIndices(Set<String> ixsSet) {
if (involvesIndices()) {
Set<String> all = extractAllIndicesAndAliases().stream().flatMap(e -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import tech.beshu.ror.es.IndexJsonContentManager.{ReadError, WriteError}

trait IndexJsonContentManager {

def sourceOf(index: String, `type`: String, id: String): Task[Either[ReadError, java.util.Map[String, Any]]]
def sourceOf(index: String, `type`: String, id: String): Task[Either[ReadError, java.util.Map[String, _]]]

def saveContent(index: String, `type`: String, id: String, content: java.util.Map[String, String]): Task[Either[WriteError, Unit]]
}
Expand Down
18 changes: 18 additions & 0 deletions core/src/main/scala/tech/beshu/ror/utils/ScalaOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package tech.beshu.ror.utils

import cats.Functor
import cats.data.{EitherT, NonEmptyList}
import com.twitter.{util => twitter}
import eu.timepit.refined.types.string.NonEmptyString
Expand Down Expand Up @@ -116,4 +117,21 @@ object ScalaOps {
promise.future
}

implicit class AutoCloseableOps[A <: AutoCloseable](val value: A) extends AnyVal {
def bracket[B](convert: A => B): B = {
try {
convert(value)
} finally {
value.close()
}
}
}

implicit class AutoClosableMOps[A <: AutoCloseable, M[_]: Functor](val value: M[A]) {
def bracket[B](convert: A => B): M[B] = {
import cats.implicits._
value.map(v => AutoCloseableOps(v).bracket(convert))
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,15 @@
/**
* Created by sscarduzio on 14/06/2017.
*/

@Singleton
public class AuditSinkImpl {
public class EsAuditSink {

private static final Logger logger = Loggers.getLogger(AuditSinkImpl.class);
private static final Logger logger = Loggers.getLogger(EsAuditSink.class);
private final BulkProcessor bulkProcessor;

@Inject
public AuditSinkImpl(Client client) {
public EsAuditSink(Client client) {
this.bulkProcessor = BulkProcessor
.builder(client, new AuditSinkBulkProcessorListener())
.setBulkActions(AUDIT_SINK_MAX_ITEMS)
Expand Down Expand Up @@ -92,5 +93,4 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
failure.printStackTrace();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ public EsIndexJsonContentProvider(NodeClient client) {
}

@Override
public Task<Either<ReadError, Map<String, Object>>> sourceOf(String index, String type, String id) {
public Task<Either<ReadError, Map<String, ?>>> sourceOf(String index, String type, String id) {
try {
GetResponse response = client.get(client.prepareGet(index, type, id).request()).actionGet();
Map<String, Object> source = Optional.ofNullable(response.getSourceAsMap()).orElse(Maps.newHashMap());
return Task$.MODULE$
.eval((Function0<Either<ReadError, Map<String, Object>>>) () -> Right$.MODULE$.apply(source))
.eval((Function0<Either<ReadError, Map<String, ?>>>) () -> Right$.MODULE$.apply(source))
.executeOn(Ror$.MODULE$.blockingScheduler(), true);
} catch (ResourceNotFoundException ex) {
return Task$.MODULE$.now(Left$.MODULE$.apply(ContentNotFound$.MODULE$));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public void onPassThrough() {

private AuditSink createAuditSink(Client client) {
return new AuditSink() {
AuditSinkImpl auditSink = new AuditSinkImpl(client);
EsAuditSink auditSink = new EsAuditSink(client);

@Override
public void submit(String indexName, String documentId, String jsonRecord) {
Expand Down
20 changes: 0 additions & 20 deletions es51x/src/main/java/tech/beshu/ror/es/RequestInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,13 @@ public class RequestInfo implements RequestInfoShim {
private final ClusterService clusterService;
private final Long taskId;
private final ThreadPool threadPool;
private final RestChannel channel;
private String content = null;
private Integer contentLength;

RequestInfo(RestChannel channel, Long taskId, String action, ActionRequest actionRequest,
ClusterService clusterService, ThreadPool threadPool) {
this.threadPool = threadPool;
this.request = channel.request();
this.channel = channel;
this.action = action;
this.actionRequest = actionRequest;
this.clusterService = clusterService;
Expand Down Expand Up @@ -169,20 +167,6 @@ public String extractType() {
return actionRequest.getClass().getSimpleName();
}

@Override
public Integer getContentLength() {
if (contentLength == null) {
BytesReference cnt = request.content();
if (cnt == null) {
contentLength = 0;
}
else {
contentLength = request.content().length();
}
}
return contentLength;
}

@Override
public Set<String> extractIndexMetadata(String index) {
SortedMap<String, AliasOrIndex> lookup = clusterService.state().metaData().getAliasAndIndexLookup();
Expand All @@ -194,10 +178,6 @@ public Long extractTaskId() {
return taskId;
}

public RestChannel getChannel() {
return channel;
}

@Override
public Integer extractContentLength() {
if (contentLength == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,107 +17,116 @@

package tech.beshu.ror.es.security;

import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.IndexSearcher;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.threadpool.ThreadPool;
import tech.beshu.ror.Constants;
import tech.beshu.ror.es.RorInstanceSupplier;
import tech.beshu.ror.utils.FilterTransient;

import java.io.IOException;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

/*
* @author Datasweet <contact@datasweet.fr>
*/
public class RoleIndexSearcherWrapper extends IndexSearcherWrapper {

private static final Logger logger = LogManager.getLogger(RoleIndexSearcherWrapper.class);

private final Function<ShardId, QueryShardContext> queryShardContextProvider;
private final ThreadContext threadContext;
private final ThreadPool threadPool;

public RoleIndexSearcherWrapper(IndexService indexService) {
if (indexService == null) {
throw new IllegalArgumentException("Please provide an indexService");
}
logger.debug("Create new RoleIndexSearcher wrapper, [{}]", indexService.getIndexSettings().getIndex().getName());
this.queryShardContextProvider = shardId -> indexService.newQueryShardContext(shardId.id(), null, null);
this.threadContext = indexService.getThreadPool().getThreadContext();
this.threadPool = indexService.getThreadPool();
}

@Override
protected DirectoryReader wrap(DirectoryReader reader) {
if(!RorInstanceSupplier.getInstance().get().isPresent()) {
logger.debug("Document filtering not available. Return default reader");
return reader;
}
return prepareDocumentFilterReader(prepareDocumentFieldReader(reader));
}

private DirectoryReader prepareDocumentFieldReader(DirectoryReader reader) {
// Field level security (FLS)
ThreadContext threadContext = threadPool.getThreadContext();
try {
String fieldsHeader = threadContext.getHeader(Constants.FIELDS_TRANSIENT);
Set<String> fields = Strings.isNullOrEmpty(fieldsHeader) ? null : Sets.newHashSet(fieldsHeader.split(",")).stream().map(String::trim).collect(Collectors.toSet());
if(fields != null) {
reader = DocumentFieldReader.wrap(reader, fields);
if (fieldsHeader != null) {
if (!fieldsHeader.isEmpty()) {
Set<String> fields = Sets.newHashSet(fieldsHeader.split(",")).stream().map(String::trim).collect(
Collectors.toSet());
return DocumentFieldReader.wrap(reader, fields);
}
else {
throw new IllegalStateException("FLS: " + Constants.FIELDS_TRANSIENT + " not found in threadContext");
}
}
else {
return reader;
}
} catch (IOException e) {
throw new IllegalStateException("Couldn't extract FLS fields from threadContext.", e);
throw new IllegalStateException("FLS: Couldn't extract FLS fields from threadContext", e);
}
}

FilterTransient filterTransient = FilterTransient.deserialize(threadContext.getHeader(Constants.FILTER_TRANSIENT));
if (filterTransient == null) {
logger.warn("Couldn't extract filterTransient from threadContext.");
return reader;
}
private DirectoryReader prepareDocumentFilterReader(DirectoryReader reader) {
// Document level security (DLS)
ThreadContext threadContext = threadPool.getThreadContext();
String filterHeader = threadContext.getHeader(Constants.FILTER_TRANSIENT);
if (filterHeader != null) {
FilterTransient filterTransient = FilterTransient.deserialize(filterHeader);
if (filterTransient == null) {
throw new IllegalStateException("DLS: " + Constants.FILTER_TRANSIENT + " present, but cannot be deserialized");
}

ShardId shardId = ShardUtils.extractShardId(reader);
if (shardId == null) {
throw new IllegalStateException(
LoggerMessageFormat.format("Couldn't extract shardId from reader [{}]", reader));
}
String filter = filterTransient.getFilter();
ShardId shardId = ShardUtils.extractShardId(reader);
if (shardId == null) {
throw new IllegalStateException(
LoggerMessageFormat.format("Couldn't extract shardId from reader [{}]", reader));
}
String filter = filterTransient.getFilter();

if (filter == null || filter.equals("")) {
return reader;
}
if (filter == null || filter.equals("")) {
throw new IllegalStateException("DLS: " + Constants.FILTER_TRANSIENT + " present, but contains no value");
}

try {
BooleanQuery.Builder boolQuery = new BooleanQuery.Builder();
boolQuery.setMinimumNumberShouldMatch(1);
QueryShardContext queryShardContext = this.queryShardContextProvider.apply(shardId);
XContentParser parser = XContentFactory.xContent(filter).createParser(filter);
QueryBuilder queryBuilder = queryShardContext.newParseContext(parser).parseInnerQueryBuilder().get();
ParsedQuery parsedQuery = queryShardContext.toFilter(queryBuilder);
boolQuery.add(parsedQuery.query(), BooleanClause.Occur.SHOULD);
return DocumentFilterReader.wrap(reader, new ConstantScoreQuery(boolQuery.build()));
} catch (IOException e) {
logger.error("Unable to setup document security");
throw ExceptionsHelper.convertToElastic(e);
try {
BooleanQuery.Builder boolQuery = new BooleanQuery.Builder();
boolQuery.setMinimumNumberShouldMatch(1);
QueryShardContext queryShardContext = this.queryShardContextProvider.apply(shardId);
XContentParser parser = XContentFactory.xContent(filter).createParser(filter);
QueryBuilder queryBuilder = queryShardContext.newParseContext(parser).parseInnerQueryBuilder().get();
ParsedQuery parsedQuery = queryShardContext.toFilter(queryBuilder);
boolQuery.add(parsedQuery.query(), BooleanClause.Occur.SHOULD);
return DocumentFilterReader.wrap(reader, new ConstantScoreQuery(boolQuery.build()));
} catch (IOException e) {
logger.error("Unable to setup document security");
throw ExceptionsHelper.convertToElastic(e);
}
}
else {
return reader;
}
}

@Override
protected IndexSearcher wrap(IndexSearcher indexSearcher) throws EngineException {
return indexSearcher;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@
/**
* Created by sscarduzio on 14/06/2017.
*/
@Singleton
public class EsAuditSink {

@Singleton public class AuditSinkImpl {

private static final Logger logger = Loggers.getLogger(AuditSinkImpl.class);
private static final Logger logger = Loggers.getLogger(EsAuditSink.class);
private final BulkProcessor bulkProcessor;

@Inject
public AuditSinkImpl(Client client) {
public EsAuditSink(Client client) {
this.bulkProcessor = BulkProcessor
.builder(client, new AuditSinkBulkProcessorListener())
.setBulkActions(AUDIT_SINK_MAX_ITEMS)
Expand Down Expand Up @@ -92,4 +92,5 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
failure.printStackTrace();
}
}

}
Loading

0 comments on commit 48e73ea

Please sign in to comment.