Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SMERGE #1056

Merged
merged 4 commits into from Feb 8, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
206 changes: 204 additions & 2 deletions warp10/src/main/java/io/warp10/continuum/gts/GTSHelper.java
@@ -1,5 +1,5 @@
//
// Copyright 2018-2021 SenX S.A.S.
// Copyright 2018-2022 SenX S.A.S.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -40,6 +40,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
Expand Down Expand Up @@ -4197,6 +4198,207 @@ public static GeoTimeSerie sortedMerge(GeoTimeSerie base, GeoTimeSerie gts) {
return merged;
}

/**
* Merge sorted GTS or encoders
*/
public static GTSEncoder sortedMerge(List<Object> params, boolean reversed) throws WarpScriptException {

final class GTSAndIndex {
private GeoTimeSerie gts;
private int idx;
}

List<GeoTimeSerie> series = new ArrayList<GeoTimeSerie>();
List<GTSEncoder> encoders = new ArrayList<GTSEncoder>();

for (int i = 0; i < params.size(); i++) {
if (params.get(i) instanceof GeoTimeSerie) {
if (!GTSHelper.isSorted((GeoTimeSerie) params.get(i))) {
throw new WarpScriptException("GTS " + GTSHelper.buildSelector((GeoTimeSerie) params.get(i), false) + " is not sorted.");
}
if (GTSHelper.isReversed((GeoTimeSerie) params.get(i)) != reversed) {
throw new WarpScriptException("GTS " + GTSHelper.buildSelector((GeoTimeSerie) params.get(i), false) + " is not sorted in the expected order.");
}
series.add((GeoTimeSerie) params.get(i));
} else if (params.get(i) instanceof GTSEncoder) {
encoders.add((GTSEncoder) params.get(i));
} else if (params.get(i) instanceof List) {
for (Object o: (List) params.get(i)) {
if (o instanceof GeoTimeSerie) {
if (!GTSHelper.isSorted((GeoTimeSerie) o)) {
throw new WarpScriptException("GTS " + GTSHelper.buildSelector((GeoTimeSerie) o, false) + " is not sorted.");
}
hbs marked this conversation as resolved.
Show resolved Hide resolved
if (GTSHelper.isReversed((GeoTimeSerie) o) != reversed) {
throw new WarpScriptException("GTS " + GTSHelper.buildSelector((GeoTimeSerie) o, false) + " is not sorted in the expected order.");
}
series.add((GeoTimeSerie) o);
} else if (o instanceof GTSEncoder) {
encoders.add((GTSEncoder) o);
} else {
throw new WarpScriptException("expects a list of Geo Time Series or encoders or of lists therefos.");
hbs marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
}

// Index at which the GTS start
int gtsidx = encoders.size();
BitSet eligible = new BitSet(encoders.size() + series.size());
hbs marked this conversation as resolved.
Show resolved Hide resolved
// Decoders used to scan the ENCODERS
final boolean freversed = reversed;
PriorityQueue<Object> decoders = new PriorityQueue<Object>(new Comparator<Object>() {
@Override
public int compare(Object o1, Object o2) {
Long ts1 = null;
Long ts2 = null;

if (o1 instanceof GTSDecoder) {
ts1 = ((GTSDecoder) o1).getTimestamp();
} else { // GTSAndIndex
GTSAndIndex gtsi = (GTSAndIndex) o1;
if (gtsi.idx < gtsi.gts.size()) {
ts1 = GTSHelper.tickAtIndex(gtsi.gts, gtsi.idx);
}
}

if (o2 instanceof GTSDecoder) {
ts2 = ((GTSDecoder) o2).getTimestamp();
} else { // GTSAndIndex
GTSAndIndex gtsi = (GTSAndIndex) o2;
if (gtsi.idx < gtsi.gts.size()) {
ts2 = GTSHelper.tickAtIndex(gtsi.gts, gtsi.idx);
}
}

if (null == ts1 && null == ts2) {
return 0;
} else if (null == ts1) {
if (freversed) {
return -1;
} else {
return 1;
}
} else if (null == ts2) {
if (freversed) {
return 1;
} else {
return -1;
}
} else {
if (!freversed) {
return Long.compare(ts1, ts2);
} else {
return Long.compare(ts2, ts1);
}
}
}
});

for (int i = 0; i < encoders.size(); i++) {
GTSDecoder decoder = encoders.get(i).getDecoder(true);
// Nullify decoders[i] if the decoder is exhausted
if (decoder.next()) {
decoders.add(decoder);
}
}

for (int i = 0; i < series.size(); i++) {
if (0 == series.get(i).size()) {
continue;
}
GTSAndIndex gtsi = new GTSAndIndex();
gtsi.gts = series.get(i);
gtsi.idx = 0;
decoders.add(gtsi);
}

GTSEncoder merged = new GTSEncoder(0);

if (!encoders.isEmpty()) {
merged.setMetadata(encoders.get(0).getMetadata());
} else if (!series.isEmpty()) {
merged.setMetadata(series.get(0).getMetadata());
}

try {
while(!decoders.isEmpty()) {
Object first = decoders.peek();
Long ts = null;
Long lastTs = null;

if (first instanceof GTSDecoder) {
GTSDecoder decoder = (GTSDecoder) first;
lastTs = decoder.getTimestamp();
} else {
GTSAndIndex gtsi = (GTSAndIndex) first;
if (gtsi.idx < gtsi.gts.size()) {
lastTs = GTSHelper.tickAtIndex(gtsi.gts, gtsi.idx);
}
}
if (null == lastTs) {
break;
}

boolean done = false;
// Iterate over the elements of the Queue
while(!decoders.isEmpty() && !done) {
Object elt = decoders.remove();
if (elt instanceof GTSDecoder) {
GTSDecoder decoder = (GTSDecoder) elt;
ts = decoder.getTimestamp();
if (!lastTs.equals(ts)) { // The first timestamp does not equal lastTs, so we are done for this loop
decoders.add(elt);
done = true;
continue;
}
boolean exhausted = false;
while(lastTs.equals(ts)) {
merged.addValue(ts, decoder.getLocation(), decoder.getElevation(), decoder.getBinaryValue());
if (decoder.next()) {
ts = decoder.getTimestamp();
// Ensure the ENCODER is sorted correctly
if (reversed && ts > lastTs) {
throw new WarpScriptException("ENCODER " + GTSHelper.buildSelector(decoder.getMetadata(), false) + " is not sorted in expected order.");
} else if (!reversed && ts < lastTs) {
throw new WarpScriptException("ENCODER " + GTSHelper.buildSelector(decoder.getMetadata(), false) + " is not sorted in expected order.");
}
} else {
ts = null;
exhausted = true;
}
}
if (!exhausted) {
decoders.add(decoder);
}
} else { // GTSAndIndex
GTSAndIndex gtsi = (GTSAndIndex) elt;
ts = GTSHelper.tickAtIndex(gtsi.gts, gtsi.idx);
if (!lastTs.equals(ts)) {
decoders.add(elt);
done = true;
continue;
}
while(lastTs.equals(ts)) {
merged.addValue(ts, GTSHelper.locationAtIndex(gtsi.gts, gtsi.idx), GTSHelper.elevationAtIndex(gtsi.gts, gtsi.idx), GTSHelper.valueAtIndex(gtsi.gts, gtsi.idx));
gtsi.idx++;
if (gtsi.idx >= gtsi.gts.size()) {
break;
}
ts = GTSHelper.tickAtIndex(gtsi.gts, gtsi.idx);
}
if (gtsi.idx < gtsi.gts.size()) {
decoders.add(gtsi);
}
}
}
}
} catch (IOException ioe) {
throw new WarpScriptException("encountered an error while merging.");
}

return merged;
}

/**
* Fill missing values/locations/elevations in a bucketized GTS with the previously
* encountered one.
Expand Down Expand Up @@ -5714,7 +5916,7 @@ public static Map<String, String> commonAttributes(List<GeoTimeSerie> lgts) {

for (int i = 0; i < lgts.size(); i++) {
Map<String, String> attributes = lgts.get(i).getMetadata().getAttributes();

if (null == attributes || attributes.isEmpty()) {
commonAttributes.clear();
break;
Expand Down
2 changes: 1 addition & 1 deletion warp10/src/main/java/io/warp10/script/WarpScriptLib.java
@@ -1,5 +1,5 @@
//
// Copyright 2019-2021 SenX S.A.S.
// Copyright 2019-2022 SenX S.A.S.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
114 changes: 64 additions & 50 deletions warp10/src/main/java/io/warp10/script/functions/MERGE.java
@@ -1,5 +1,5 @@
//
// Copyright 2018-2021 SenX S.A.S.
// Copyright 2018-2022 SenX S.A.S.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,81 +16,95 @@

package io.warp10.script.functions;

import io.warp10.continuum.gts.GTSDecoder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import io.warp10.continuum.gts.GTSEncoder;
import io.warp10.continuum.gts.GTSHelper;
import io.warp10.continuum.gts.GeoTimeSerie;
import io.warp10.script.NamedWarpScriptFunction;
import io.warp10.script.WarpScriptStackFunction;
import io.warp10.script.WarpScriptException;
import io.warp10.script.WarpScriptStack;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import io.warp10.script.WarpScriptStackFunction;

/**
* Apply merge on GTS instances
*/
public class MERGE extends NamedWarpScriptFunction implements WarpScriptStackFunction {

public MERGE(String name) {
super(name);
}

@Override
public Object apply(WarpScriptStack stack) throws WarpScriptException {
Object top = stack.pop();

Boolean reversed = null;

if (top instanceof Boolean) {
reversed = (Boolean) top;
top = stack.pop();
}

if (!(top instanceof List)) {
throw new WarpScriptException(getName() + " expects a list as input.");
hbs marked this conversation as resolved.
Show resolved Hide resolved
}

List<Object> params = (List<Object>) top;

List<GeoTimeSerie> series = new ArrayList<GeoTimeSerie>();
List<GTSEncoder> encoders = new ArrayList<GTSEncoder>();

for (int i = 0; i < params.size(); i++) {
if (params.get(i) instanceof GeoTimeSerie) {
series.add((GeoTimeSerie) params.get(i));
} else if (params.get(i) instanceof GTSEncoder) {
encoders.add((GTSEncoder) params.get(i));
} else if (params.get(i) instanceof List) {
for (Object o: (List) params.get(i)) {
if (o instanceof GeoTimeSerie) {
series.add((GeoTimeSerie) o);
} else if (o instanceof GTSEncoder) {
encoders.add((GTSEncoder) o);
} else {
throw new WarpScriptException(getName() + " expects a list of Geo Time Series or encoders as first parameter.");

if (null != reversed) {
try {
stack.push(GTSHelper.sortedMerge(params, reversed));
} catch (WarpScriptException wse) {
throw new WarpScriptException(getName() + " encountered an error while merging.", wse);
}
} else {
List<GeoTimeSerie> series = new ArrayList<GeoTimeSerie>();
List<GTSEncoder> encoders = new ArrayList<GTSEncoder>();

for (int i = 0; i < params.size(); i++) {
if (params.get(i) instanceof GeoTimeSerie) {
series.add((GeoTimeSerie) params.get(i));
} else if (params.get(i) instanceof GTSEncoder) {
encoders.add((GTSEncoder) params.get(i));
} else if (params.get(i) instanceof List) {
for (Object o: (List) params.get(i)) {
if (o instanceof GeoTimeSerie) {
series.add((GeoTimeSerie) o);
} else if (o instanceof GTSEncoder) {
encoders.add((GTSEncoder) o);
} else {
throw new WarpScriptException(getName() + " expects a list of Geo Time Series or encoders as first parameter.");
}
}
}
}
}

if (!encoders.isEmpty() && !series.isEmpty()) {
throw new WarpScriptException(getName() + " can only operate on homogeneous lists of Geo Time Series or encoders.");
}

try {
if (encoders.isEmpty()) {
GeoTimeSerie merged = GTSHelper.mergeViaEncoders(series);

stack.push(merged);
} else {
GTSEncoder encoder = new GTSEncoder(0L);

encoder.setMetadata(encoders.get(0).getMetadata());

for (GTSEncoder enc: encoders) {
encoder.merge(enc);
}
}

stack.push(encoder);
if (!encoders.isEmpty() && !series.isEmpty()) {
throw new WarpScriptException(getName() + " can only operate on homogeneous lists of Geo Time Series or encoders.");
}

try {
if (encoders.isEmpty()) {
GeoTimeSerie merged = GTSHelper.mergeViaEncoders(series);

stack.push(merged);
} else {
GTSEncoder encoder = new GTSEncoder(0L);

encoder.setMetadata(encoders.get(0).getMetadata());

for (GTSEncoder enc: encoders) {
encoder.merge(enc);
}

stack.push(encoder);
}
} catch (IOException ioe) {
throw new WarpScriptException(getName() + " failed.", ioe);
}
} catch (IOException ioe) {
throw new WarpScriptException(getName() + " failed.", ioe);
}

return stack;
Expand Down