Skip to content

Commit

Permalink
Refactor PrometheusSplit
Browse files Browse the repository at this point in the history
Store remote URI as String to make split memory accounting simpler as
accounting memory used by the URI object is rather complex due to many
"caching" fields for various URI parts inside the URI object
  • Loading branch information
arhimondr authored and losipiuk committed Dec 23, 2021
1 parent 411258d commit 8e3a396
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.trino.spi.connector.RecordSet;
import io.trino.spi.type.Type;

import java.net.URI;
import java.util.List;

import static java.util.Objects.requireNonNull;
Expand All @@ -42,7 +43,7 @@ public PrometheusRecordSet(PrometheusClient prometheusClient, PrometheusSplit sp
}
this.columnTypes = types.build();

this.byteSource = ByteSource.wrap(prometheusClient.fetchUri(split.getUri()));
this.byteSource = ByteSource.wrap(prometheusClient.fetchUri(URI.create(split.getUri())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,19 @@
public class PrometheusSplit
implements ConnectorSplit
{
private final URI uri;
private final String uri;
private final List<HostAddress> addresses;

@JsonCreator
public PrometheusSplit(
@JsonProperty("uri") URI uri)
public PrometheusSplit(@JsonProperty("uri") String uri)
{
this.uri = requireNonNull(uri, "uri is null");

addresses = ImmutableList.of(HostAddress.fromUri(uri));
addresses = ImmutableList.of(HostAddress.fromUri(URI.create(uri)));
}

@JsonProperty
public URI getUri()
public String getUri()
{
return uri;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public ConnectorSplitSource getSplits(
prometheusURI,
time,
table.getName(),
queryChunkSizeDuration));
queryChunkSizeDuration).toString());
}
catch (URISyntaxException e) {
throw new TrinoException(PROMETHEUS_UNKNOWN_ERROR, "split URI invalid: " + e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.net.URI;

import static io.trino.plugin.prometheus.PrometheusClient.TIMESTAMP_COLUMN_TYPE;
import static io.trino.plugin.prometheus.PrometheusQueryRunner.createPrometheusClient;
import static io.trino.spi.type.BigintType.BIGINT;
Expand Down Expand Up @@ -123,7 +121,7 @@ public void testDropTableTable()
@Test
public void testGetColumnTypes()
{
URI dataUri = server.getUri();
String dataUri = server.getUri().toString();
RecordSet recordSet = new PrometheusRecordSet(
client,
new PrometheusSplit(dataUri),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.net.URI;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -42,7 +41,7 @@
public class TestPrometheusRecordSet
{
private PrometheusHttpServer prometheusHttpServer;
private URI dataUri;
private String dataUri;

@Test
public void testCursorSimple()
Expand Down Expand Up @@ -96,7 +95,7 @@ public void testCursorSimple()
public void setUp()
{
prometheusHttpServer = new PrometheusHttpServer();
dataUri = prometheusHttpServer.resolve("/prometheus-data/up_matrix_response.json");
dataUri = prometheusHttpServer.resolve("/prometheus-data/up_matrix_response.json").toString();
}

@AfterClass(alwaysRun = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.net.URI;
import java.time.Instant;
import java.util.LinkedHashMap;
import java.util.Map;
Expand All @@ -42,14 +41,14 @@
public class TestPrometheusRecordSetProvider
{
private PrometheusHttpServer prometheusHttpServer;
private URI dataUri;
private String dataUri;
private PrometheusClient client;

@BeforeClass
public void setUp()
{
prometheusHttpServer = new PrometheusHttpServer();
dataUri = prometheusHttpServer.resolve("/prometheus-data/up_matrix_response.json");
dataUri = prometheusHttpServer.resolve("/prometheus-data/up_matrix_response.json").toString();
client = new PrometheusClient(new PrometheusConnectorConfig(), METRIC_CODEC, TESTING_TYPE_MANAGER);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
public class TestPrometheusSplit
{
private PrometheusHttpServer prometheusHttpServer;
private final PrometheusSplit split = new PrometheusSplit(URI.create("http://127.0.0.1/test.file"));
private final PrometheusSplit split = new PrometheusSplit("http://127.0.0.1/test.file");
private static final int NUMBER_MORE_THAN_EXPECTED_NUMBER_SPLITS = 100;

@BeforeClass
Expand All @@ -80,22 +80,22 @@ public void setUp()
public void testAddresses()
{
// http split with default port
PrometheusSplit httpSplit = new PrometheusSplit(URI.create("http://prometheus.com/prometheus"));
PrometheusSplit httpSplit = new PrometheusSplit("http://prometheus.com/prometheus");
assertEquals(httpSplit.getAddresses(), ImmutableList.of(HostAddress.fromString("prometheus.com")));
assertTrue(httpSplit.isRemotelyAccessible());

// http split with custom port
httpSplit = new PrometheusSplit(URI.create("http://prometheus.com:8080/prometheus"));
httpSplit = new PrometheusSplit("http://prometheus.com:8080/prometheus");
assertEquals(httpSplit.getAddresses(), ImmutableList.of(HostAddress.fromParts("prometheus.com", 8080)));
assertTrue(httpSplit.isRemotelyAccessible());

// http split with default port
PrometheusSplit httpsSplit = new PrometheusSplit(URI.create("https://prometheus.com/prometheus"));
PrometheusSplit httpsSplit = new PrometheusSplit("https://prometheus.com/prometheus");
assertEquals(httpsSplit.getAddresses(), ImmutableList.of(HostAddress.fromString("prometheus.com")));
assertTrue(httpsSplit.isRemotelyAccessible());

// http split with custom port
httpsSplit = new PrometheusSplit(URI.create("https://prometheus.com:8443/prometheus"));
httpsSplit = new PrometheusSplit("https://prometheus.com:8443/prometheus");
assertEquals(httpsSplit.getAddresses(), ImmutableList.of(HostAddress.fromParts("prometheus.com", 8443)));
assertTrue(httpsSplit.isRemotelyAccessible());
}
Expand Down Expand Up @@ -128,7 +128,7 @@ public void testQueryWithTableNameNeedingURLEncodeInSplits()
null,
(DynamicFilter) null);
PrometheusSplit split = (PrometheusSplit) splits.getNextBatch(NOT_PARTITIONED, 1).getNow(null).getSplits().get(0);
String queryInSplit = split.getUri().getQuery();
String queryInSplit = URI.create(split.getUri()).getQuery();
String timeShouldBe = decimalSecondString(now.toEpochMilli() -
config.getMaxQueryRangeDuration().toMillis() +
config.getQueryChunkSizeDuration().toMillis() -
Expand All @@ -154,7 +154,7 @@ public void testQueryDividedIntoSplitsFirstSplitHasRightTime()
null,
(DynamicFilter) null);
PrometheusSplit split = (PrometheusSplit) splits.getNextBatch(NOT_PARTITIONED, 1).getNow(null).getSplits().get(0);
String queryInSplit = split.getUri().getQuery();
String queryInSplit = URI.create(split.getUri()).getQuery();
String timeShouldBe = decimalSecondString(now.toEpochMilli() -
config.getMaxQueryRangeDuration().toMillis() +
config.getQueryChunkSizeDuration().toMillis() -
Expand Down Expand Up @@ -182,7 +182,7 @@ public void testQueryDividedIntoSplitsLastSplitHasRightTime()
List<ConnectorSplit> splits = splitsMaybe.getNextBatch(NOT_PARTITIONED, NUMBER_MORE_THAN_EXPECTED_NUMBER_SPLITS).getNow(null).getSplits();
int lastSplitIndex = splits.size() - 1;
PrometheusSplit lastSplit = (PrometheusSplit) splits.get(lastSplitIndex);
String queryInSplit = lastSplit.getUri().getQuery();
String queryInSplit = URI.create(lastSplit.getUri()).getQuery();
String timeShouldBe = decimalSecondString(now.toEpochMilli());
URI uriAsFormed = new URI("http://doesnotmatter:9090/api/v1/query?query=up[" +
getQueryChunkSizeDurationAsPrometheusCompatibleDurationString(config) + "]" +
Expand All @@ -205,9 +205,9 @@ public void testQueryDividedIntoSplitsShouldHaveCorrectSpacingBetweenTimes()
null,
(DynamicFilter) null);
PrometheusSplit split1 = (PrometheusSplit) splits.getNextBatch(NOT_PARTITIONED, 1).getNow(null).getSplits().get(0);
Map<String, String> paramsMap1 = parse(split1.getUri(), StandardCharsets.UTF_8).stream().collect(Collectors.toMap(NameValuePair::getName, NameValuePair::getValue));
Map<String, String> paramsMap1 = parse(URI.create(split1.getUri()), StandardCharsets.UTF_8).stream().collect(Collectors.toMap(NameValuePair::getName, NameValuePair::getValue));
PrometheusSplit split2 = (PrometheusSplit) splits.getNextBatch(NOT_PARTITIONED, 1).getNow(null).getSplits().get(0);
Map<String, String> paramsMap2 = parse(split2.getUri(), StandardCharsets.UTF_8).stream().collect(Collectors.toMap(NameValuePair::getName, NameValuePair::getValue));
Map<String, String> paramsMap2 = parse(URI.create(split2.getUri()), StandardCharsets.UTF_8).stream().collect(Collectors.toMap(NameValuePair::getName, NameValuePair::getValue));
assertEquals(paramsMap1.get("query"), "up[1d]");
assertEquals(paramsMap2.get("query"), "up[1d]");
long diff = Double.valueOf(paramsMap2.get("time")).longValue() - Double.valueOf(paramsMap1.get("time")).longValue();
Expand Down

0 comments on commit 8e3a396

Please sign in to comment.