Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Temporal Scoring Fixes for Lifelog Retrieval #229

Merged
merged 7 commits into from
Nov 5, 2021
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.eclipse.jetty.websocket.api.Session;
Expand All @@ -39,6 +40,8 @@
import org.vitrivr.cineast.core.db.dao.reader.MediaSegmentMetadataReader;
import org.vitrivr.cineast.core.db.dao.reader.MediaSegmentReader;
import org.vitrivr.cineast.core.util.LogHelper;
import org.vitrivr.cineast.core.util.QueryIDGenerator;
import org.vitrivr.cineast.core.util.TimeHelper;
import org.vitrivr.cineast.standalone.config.Config;
import org.vitrivr.cineast.standalone.config.ConstrainedQueryConfig;

Expand Down Expand Up @@ -87,6 +90,7 @@ public final void handle(Session session, T message) {
qconf.setMaxResults(max);
final int resultsPerModule = Math.min(qconf.getRawResultsPerModule() == -1 ? Config.sharedConfig().getRetriever().getMaxResultsPerModule() : qconf.getResultsPerModule(), Config.sharedConfig().getRetriever().getMaxResultsPerModule());
qconf.setResultsPerModule(resultsPerModule);
String qid = uuid.substring(0, 3);
Thread.currentThread().setName("query-msg-handler-" + uuid.substring(0, 3));
try {
/* Begin of Query: Send QueryStart Message to Client.
Expand All @@ -95,7 +99,7 @@ public final void handle(Session session, T message) {
*/
this.write(session, new QueryStart(uuid));
/* Execute actual query. */
LOGGER.trace("Executing query from message {}", message);
LOGGER.trace("Executing query with id {} from message {}", qid, message);
final Set<String> segmentIdsForWhichMetadataIsFetched = new HashSet<>();
final Set<String> objectIdsForWhichMetadataIsFetched = new HashSet<>();
this.execute(session, qconf, message, segmentIdsForWhichMetadataIsFetched, objectIdsForWhichMetadataIsFetched);
Expand Down Expand Up @@ -129,10 +133,18 @@ public final void handle(Session session, T message) {
* @return List of found {@link MediaSegmentDescriptor}
*/
protected List<MediaSegmentDescriptor> loadSegments(List<String> segmentIds) {
final Map<String, MediaSegmentDescriptor> map = this.mediaSegmentReader.lookUpSegments(segmentIds);
final ArrayList<MediaSegmentDescriptor> sdList = new ArrayList<>(map.size());
segmentIds.stream().filter(map::containsKey).forEach(s -> sdList.add(map.get(s)));
return sdList;
return loadSegments(segmentIds, QueryIDGenerator.generateQueryID());
}

protected List<MediaSegmentDescriptor> loadSegments(List<String> segmentIds, String queryID) {
queryID = queryID + "-loadseg";
LOGGER.trace("Loading segment information for {} segmentIDs, qid {}", segmentIds.size(), queryID);
return TimeHelper.timeCall(() -> {
final Map<String, MediaSegmentDescriptor> map = this.mediaSegmentReader.lookUpSegments(segmentIds);
final ArrayList<MediaSegmentDescriptor> sdList = new ArrayList<>(map.size());
segmentIds.stream().filter(map::containsKey).forEach(s -> sdList.add(map.get(s)));
return sdList;
}, "loading segment information, qid " + queryID, Level.TRACE);
}

/**
Expand All @@ -142,10 +154,18 @@ protected List<MediaSegmentDescriptor> loadSegments(List<String> segmentIds) {
* @return List of found {@link MediaObjectDescriptor}
*/
protected List<MediaObjectDescriptor> loadObjects(List<String> objectIds) {
final Map<String, MediaObjectDescriptor> map = this.mediaObjectReader.lookUpObjects(objectIds);
final ArrayList<MediaObjectDescriptor> vdList = new ArrayList<>(map.size());
objectIds.stream().filter(map::containsKey).forEach(s -> vdList.add(map.get(s)));
return vdList;
return loadObjects(objectIds, QueryIDGenerator.generateQueryID());
}

protected List<MediaObjectDescriptor> loadObjects(List<String> objectIds, String queryID) {
queryID = queryID + "-loadobj";
LOGGER.trace("Loading object information for {} segmentIDs, qid {}", objectIds.size(), queryID);
return TimeHelper.timeCall(() -> {
final Map<String, MediaObjectDescriptor> map = this.mediaObjectReader.lookUpObjects(objectIds);
final ArrayList<MediaObjectDescriptor> vdList = new ArrayList<>(map.size());
objectIds.stream().filter(map::containsKey).forEach(s -> vdList.add(map.get(s)));
return vdList;
}, "loading object information, qid " + queryID, Level.TRACE);
}

/**
Expand Down Expand Up @@ -193,27 +213,38 @@ protected synchronized List<Thread> loadAndWriteSegmentMetadata(Session session,
if (segmentIds.isEmpty()) {
return new ArrayList<>();
}
List<Thread> threads = new ArrayList<>();
segmentIds.removeAll(segmentIdsForWhichMetadataIsFetched);
segmentIdsForWhichMetadataIsFetched.addAll(segmentIds);
//chunk for memory safety-purposes
if (segmentIds.size() > 100_000) {
return Lists.partition(segmentIds, 100_000).stream().map(list -> loadAndWriteSegmentMetadata(session, queryId, list, segmentIdsForWhichMetadataIsFetched)).flatMap(Collection::stream).collect(Collectors.toList());
}
final List<MediaSegmentMetadataDescriptor> segmentMetadata = this.segmentMetadataReader.lookupMultimediaMetadata(segmentIds);
if (segmentMetadata.isEmpty()) {
return threads;
}
AtomicInteger i = new AtomicInteger(0);
Lists.partition(segmentMetadata, 100_000).forEach(list -> {
Thread writing = new Thread(() -> {
this.write(session, new MediaSegmentMetadataQueryResult(queryId, list)).join();
Thread fetching = new Thread(() -> {
final List<MediaSegmentMetadataDescriptor> segmentMetadata = this.segmentMetadataReader.lookupMultimediaMetadata(segmentIds);
if (segmentMetadata.isEmpty()) {
return;
}
List<Thread> threads = new ArrayList<>();
AtomicInteger i = new AtomicInteger(0);
Lists.partition(segmentMetadata, 100_000).forEach(list -> {
Thread writing = new Thread(() -> {
this.write(session, new MediaSegmentMetadataQueryResult(queryId, list)).join();
});
writing.setName("metadata-ws-write-queryId" + i.getAndIncrement());
writing.start();
threads.add(writing);
});
writing.setName("metadata-ws-write-" + i.getAndIncrement());
writing.start();
threads.add(writing);
for (Thread thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
LOGGER.error(e);
}
}
});
return threads;
fetching.setName("metadata-load-write-" + queryId);
fetching.start();
return Lists.newArrayList(fetching);
}

/**
Expand All @@ -223,11 +254,19 @@ protected synchronized List<Thread> loadAndWriteSegmentMetadata(Session session,
*/
protected List<String> submitSegmentAndObjectInformation(Session session, String queryId, List<String> segmentIds) {
/* Load segment & object information. */
LOGGER.trace("Loading segment information for {} segments", segmentIds.size());
final List<MediaSegmentDescriptor> segments = this.loadSegments(segmentIds);
LOGGER.trace("Loading segment and object information for submission, {} segments, qid {}", segmentIds.size(), queryId);
final List<MediaSegmentDescriptor> segments = this.loadSegments(segmentIds, queryId);
return submitPrefetchedSegmentAndObjectInformation(session, queryId, segments);
}

protected List<String> submitPrefetchedSegmentAndObjectInformation(Session session, String queryId, List<MediaSegmentDescriptor> segments) {
final List<String> objectIds = segments.stream().map(MediaSegmentDescriptor::getObjectId).collect(Collectors.toList());
final List<MediaObjectDescriptor> objects = this.loadObjects(objectIds);
return submitPrefetchedSegmentandObjectInformationfromIDs(session, queryId, segments, objectIds);
}

List<String> submitPrefetchedSegmentandObjectInformationfromIDs(Session session, String queryId, List<MediaSegmentDescriptor> segments, List<String> objectIds) {
LOGGER.trace("Loading object information");
final List<MediaObjectDescriptor> objects = this.loadObjects(objectIds, queryId.substring(0, 3));

if (segments.isEmpty() || objects.isEmpty()) {
LOGGER.traceEntry("Segment / Objectlist is Empty, ignoring this iteration");
Expand All @@ -246,21 +285,9 @@ protected List<String> submitSegmentAndObjectInformation(Session session, String
*/
void submitSegmentAndObjectInformationFromIds(Session session, String queryId, List<String> segmentIds, List<String> objectIds) {
/* Load segment & object information. */
LOGGER.trace("Loading segment information for {} segments", segmentIds.size());
final List<MediaSegmentDescriptor> segments = this.loadSegments(segmentIds);

LOGGER.trace("Loading object information");
final List<MediaObjectDescriptor> objects = this.loadObjects(objectIds);

if (segments.isEmpty() || objects.isEmpty()) {
LOGGER.traceEntry("Segment / Objectlist is Empty, ignoring this iteration");
}

LOGGER.trace("Writing results to the websocket");

/* Write segments, objects and similarity search data to stream. */
this.write(session, new MediaObjectQueryResult(queryId, objects));
this.write(session, new MediaSegmentQueryResult(queryId, segments));
LOGGER.trace("Loading segment and object information for submission, {} segments {} objects", segmentIds.size(), objectIds.size());
final List<MediaSegmentDescriptor> segments = this.loadSegments(segmentIds, queryId.substring(0, 3));
submitPrefetchedSegmentandObjectInformationfromIDs(session, queryId, segments, objectIds);
}

/**
Expand Down
Loading