Skip to content

Commit

Permalink
Fix atop split serialization
Browse files Browse the repository at this point in the history
Changes to the classloader setup mean that we can no longer use Joda
classes, so instead use ZonedDateTime from JDK. Also remove a hack
that was necessary to restore the timezone after deserialization.
  • Loading branch information
cberner committed Sep 16, 2016
1 parent 059e12b commit 2b7d693
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 56 deletions.
5 changes: 0 additions & 5 deletions presto-atop/pom.xml
Expand Up @@ -72,11 +72,6 @@
<artifactId>annotations</artifactId> <artifactId>annotations</artifactId>
</dependency> </dependency>


<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>

<dependency> <dependency>
<groupId>javax.inject</groupId> <groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId> <artifactId>javax.inject</artifactId>
Expand Down
Expand Up @@ -17,12 +17,11 @@
import io.airlift.configuration.ConfigDescription; import io.airlift.configuration.ConfigDescription;
import io.airlift.units.Duration; import io.airlift.units.Duration;
import io.airlift.units.MinDuration; import io.airlift.units.MinDuration;
import org.joda.time.DateTimeZone;


import javax.validation.constraints.Min; import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;


import java.util.TimeZone; import java.time.ZoneId;


import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.MINUTES;


Expand All @@ -32,7 +31,7 @@ public class AtopConnectorConfig
public static final String SECURITY_FILE = "file"; public static final String SECURITY_FILE = "file";


private String executablePath = "atop"; private String executablePath = "atop";
private String timeZone = TimeZone.getDefault().getID(); private String timeZone = ZoneId.systemDefault().getId();
private String security = SECURITY_NONE; private String security = SECURITY_NONE;
private Duration readTimeout = new Duration(5, MINUTES); private Duration readTimeout = new Duration(5, MINUTES);
private int concurrentReadersPerNode = 1; private int concurrentReadersPerNode = 1;
Expand Down Expand Up @@ -64,9 +63,9 @@ public AtopConnectorConfig setExecutablePath(String path)
return this; return this;
} }


public DateTimeZone getDateTimeZone() public ZoneId getTimeZoneId()
{ {
return DateTimeZone.forTimeZone(TimeZone.getTimeZone(timeZone)); return ZoneId.of(timeZone);
} }


@NotNull @NotNull
Expand Down
Expand Up @@ -13,9 +13,9 @@
*/ */
package com.facebook.presto.atop; package com.facebook.presto.atop;


import org.joda.time.DateTime; import java.time.ZonedDateTime;


public interface AtopFactory public interface AtopFactory
{ {
Atop create(AtopTable table, DateTime date); Atop create(AtopTable table, ZonedDateTime date);
} }
Expand Up @@ -23,10 +23,10 @@
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import io.airlift.slice.Slice; import io.airlift.slice.Slice;
import org.joda.time.DateTime;


import javax.annotation.Nullable; import javax.annotation.Nullable;


import java.time.ZonedDateTime;
import java.util.List; import java.util.List;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;


Expand All @@ -41,15 +41,15 @@ public class AtopPageSource
private final ConnectorSession session; private final ConnectorSession session;
private final Slice hostIp; private final Slice hostIp;
private final AtopTable table; private final AtopTable table;
private final DateTime date; private final ZonedDateTime date;
private final List<AtopColumn> columns; private final List<AtopColumn> columns;
private final List<Type> types; private final List<Type> types;
private final PageBuilder pageBuilder; private final PageBuilder pageBuilder;
@Nullable @Nullable
private Atop atop; private Atop atop;
private boolean finished; private boolean finished;


public AtopPageSource(Semaphore readerPermits, AtopFactory atopFactory, ConnectorSession session, Slice hostIp, AtopTable table, DateTime date, List<AtopColumn> columns, List<Type> types) public AtopPageSource(Semaphore readerPermits, AtopFactory atopFactory, ConnectorSession session, Slice hostIp, AtopTable table, ZonedDateTime date, List<AtopColumn> columns, List<Type> types)
{ {
this.readerPermits = requireNonNull(readerPermits, "readerPermits is null"); this.readerPermits = requireNonNull(readerPermits, "readerPermits is null");
this.atopFactory = requireNonNull(atopFactory, "atopFactory is null"); this.atopFactory = requireNonNull(atopFactory, "atopFactory is null");
Expand Down
Expand Up @@ -23,32 +23,29 @@
import com.facebook.presto.spi.type.Type; import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager; import com.facebook.presto.spi.type.TypeManager;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;


import javax.inject.Inject; import javax.inject.Inject;


import java.time.ZonedDateTime;
import java.util.List; import java.util.List;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;


import static com.facebook.presto.atop.Types.checkType; import static com.facebook.presto.atop.Types.checkType;
import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.slice.Slices.utf8Slice; import static io.airlift.slice.Slices.utf8Slice;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;


public final class AtopPageSourceProvider public final class AtopPageSourceProvider
implements ConnectorPageSourceProvider implements ConnectorPageSourceProvider
{ {
private final Semaphore readerPermits; private final Semaphore readerPermits;
private final DateTimeZone timeZone;
private final AtopFactory atopFactory; private final AtopFactory atopFactory;
private final TypeManager typeManager; private final TypeManager typeManager;


@Inject @Inject
public AtopPageSourceProvider(AtopConnectorConfig config, AtopFactory atopFactory, TypeManager typeManager) public AtopPageSourceProvider(AtopConnectorConfig config, AtopFactory atopFactory, TypeManager typeManager)
{ {
readerPermits = new Semaphore(requireNonNull(config, "config is null").getConcurrentReadersPerNode()); readerPermits = new Semaphore(requireNonNull(config, "config is null").getConcurrentReadersPerNode());
timeZone = config.getDateTimeZone();
this.atopFactory = requireNonNull(atopFactory, "atopFactory is null"); this.atopFactory = requireNonNull(atopFactory, "atopFactory is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null"); this.typeManager = requireNonNull(typeManager, "typeManager is null");
} }
Expand All @@ -72,9 +69,8 @@ public ConnectorPageSource createPageSource(
types.add(typeManager.getType(atopColumn.getType())); types.add(typeManager.getType(atopColumn.getType()));
} }


// Timezone is not preserved during JSON serialization ZonedDateTime date = atopSplit.getDate();
DateTime date = atopSplit.getDate().withZone(timeZone); checkArgument(date.equals(date.withHour(0).withMinute(0).withSecond(0).withNano(0)), "Expected date to be at beginning of day");
checkState(date.equals(date.withTimeAtStartOfDay()), "Expected date to be at beginning of day");
return new AtopPageSource(readerPermits, atopFactory, session, utf8Slice(atopSplit.getHost().getHostText()), atopSplit.getTable(), date, atopColumns.build(), types.build()); return new AtopPageSource(readerPermits, atopFactory, session, utf8Slice(atopSplit.getHost().getHostText()), atopSplit.getTable(), date, atopColumns.build(), types.build());
} }
} }
Expand Up @@ -18,17 +18,16 @@
import com.google.common.util.concurrent.TimeLimiter; import com.google.common.util.concurrent.TimeLimiter;
import com.google.common.util.concurrent.UncheckedTimeoutException; import com.google.common.util.concurrent.UncheckedTimeoutException;
import io.airlift.units.Duration; import io.airlift.units.Duration;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;


import javax.annotation.PreDestroy; import javax.annotation.PreDestroy;
import javax.inject.Inject; import javax.inject.Inject;


import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
Expand All @@ -45,31 +44,31 @@
public class AtopProcessFactory public class AtopProcessFactory
implements AtopFactory implements AtopFactory
{ {
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormat.forPattern("YYYYMMdd"); private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("YYYYMMdd");
private final String executablePath; private final String executablePath;
private final DateTimeZone timeZone; private final ZoneId timeZone;
private final Duration readTimeout; private final Duration readTimeout;
private final ExecutorService executor; private final ExecutorService executor;


@Inject @Inject
public AtopProcessFactory(AtopConnectorConfig config, AtopConnectorId connectorId) public AtopProcessFactory(AtopConnectorConfig config, AtopConnectorId connectorId)
{ {
this.executablePath = config.getExecutablePath(); this.executablePath = config.getExecutablePath();
this.timeZone = config.getDateTimeZone(); this.timeZone = config.getTimeZoneId();
this.readTimeout = config.getReadTimeout(); this.readTimeout = config.getReadTimeout();
this.executor = newFixedThreadPool(config.getConcurrentReadersPerNode(), daemonThreadsNamed("atop-" + connectorId + "executable-reader-%s")); this.executor = newFixedThreadPool(config.getConcurrentReadersPerNode(), daemonThreadsNamed("atop-" + connectorId + "executable-reader-%s"));
} }


@Override @Override
public Atop create(AtopTable table, DateTime date) public Atop create(AtopTable table, ZonedDateTime date)
{ {
checkArgument(date.getZone().equals(timeZone), "Split date (%s) is not in the local timezone (%s)", date.getZone(), timeZone); checkArgument(date.getZone().getRules().equals(timeZone.getRules()), "Split date (%s) is not in the local timezone (%s)", date.getZone(), timeZone);


ProcessBuilder processBuilder = new ProcessBuilder(executablePath); ProcessBuilder processBuilder = new ProcessBuilder(executablePath);
processBuilder.command().add("-P"); processBuilder.command().add("-P");
processBuilder.command().add(table.getAtopLabel()); processBuilder.command().add(table.getAtopLabel());
processBuilder.command().add("-r"); processBuilder.command().add("-r");
processBuilder.command().add(DATE_FORMATTER.print(date)); processBuilder.command().add(DATE_FORMATTER.format(date));
Process process; Process process;
try { try {
process = processBuilder.start(); process = processBuilder.start();
Expand Down
25 changes: 20 additions & 5 deletions presto-atop/src/main/java/com/facebook/presto/atop/AtopSplit.java
Expand Up @@ -18,29 +18,33 @@
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import org.joda.time.DateTime;


import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.List; import java.util.List;


import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.MoreObjects.toStringHelper;
import static java.time.Instant.ofEpochSecond;
import static java.util.Objects.requireNonNull; import static java.util.Objects.requireNonNull;


public class AtopSplit public class AtopSplit
implements ConnectorSplit implements ConnectorSplit
{ {
private final AtopTable table; private final AtopTable table;
private final HostAddress host; private final HostAddress host;
private final DateTime date; private final ZonedDateTime date;


@JsonCreator @JsonCreator
public AtopSplit( public AtopSplit(
@JsonProperty("table") AtopTable table, @JsonProperty("table") AtopTable table,
@JsonProperty("host") HostAddress host, @JsonProperty("host") HostAddress host,
@JsonProperty("date") DateTime date) @JsonProperty("epochSeconds") long epochSeconds,
@JsonProperty("timeZone") ZoneId timeZone)
{ {
this.table = requireNonNull(table, "table name is null"); this.table = requireNonNull(table, "table name is null");
this.host = requireNonNull(host, "host is null"); this.host = requireNonNull(host, "host is null");
this.date = requireNonNull(date, "date is null"); requireNonNull(timeZone, "timeZone is null");
this.date = ZonedDateTime.ofInstant(ofEpochSecond(epochSeconds), timeZone);
} }


@JsonProperty @JsonProperty
Expand All @@ -56,7 +60,18 @@ public HostAddress getHost()
} }


@JsonProperty @JsonProperty
public DateTime getDate() public long getEpochSeconds()
{
return date.toEpochSecond();
}

@JsonProperty
public ZoneId getTimeZone()
{
return date.getZone();
}

public ZonedDateTime getDate()
{ {
return date; return date;
} }
Expand Down
Expand Up @@ -25,11 +25,11 @@
import com.facebook.presto.spi.predicate.Domain; import com.facebook.presto.spi.predicate.Domain;
import com.facebook.presto.spi.predicate.Range; import com.facebook.presto.spi.predicate.Range;
import com.facebook.presto.spi.predicate.ValueSet; import com.facebook.presto.spi.predicate.ValueSet;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;


import javax.inject.Inject; import javax.inject.Inject;


import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;


Expand All @@ -41,17 +41,15 @@ public class AtopSplitManager
implements ConnectorSplitManager implements ConnectorSplitManager
{ {
private final NodeManager nodeManager; private final NodeManager nodeManager;
private final AtopConnectorId connectorId; private final ZoneId timeZone;
private final DateTimeZone timeZone;
private final int maxHistoryDays; private final int maxHistoryDays;


@Inject @Inject
public AtopSplitManager(NodeManager nodeManager, AtopConnectorConfig config, AtopConnectorId connectorId) public AtopSplitManager(NodeManager nodeManager, AtopConnectorConfig config)
{ {
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.connectorId = requireNonNull(connectorId, "connectorId is null");
requireNonNull(config, "config is null"); requireNonNull(config, "config is null");
timeZone = config.getDateTimeZone(); timeZone = config.getTimeZoneId();
maxHistoryDays = config.getMaxHistoryDays(); maxHistoryDays = config.getMaxHistoryDays();
} }


Expand All @@ -63,16 +61,16 @@ public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHand
AtopTableHandle table = handle.getTableHandle(); AtopTableHandle table = handle.getTableHandle();


List<ConnectorSplit> splits = new ArrayList<>(); List<ConnectorSplit> splits = new ArrayList<>();
DateTime end = DateTime.now().withZone(timeZone); ZonedDateTime end = ZonedDateTime.now(timeZone);
for (Node node : nodeManager.getAllNodes()) { for (Node node : nodeManager.getAllNodes()) {
DateTime start = end.minusDays(maxHistoryDays - 1).withTimeAtStartOfDay(); ZonedDateTime start = end.minusDays(maxHistoryDays - 1).withHour(0).withMinute(0).withSecond(0).withNano(0);
while (start.isBefore(end)) { while (start.isBefore(end)) {
DateTime splitEnd = start.withTime(23, 59, 59, 999); ZonedDateTime splitEnd = start.withHour(23).withMinute(59).withSecond(59).withNano(0);
Domain splitDomain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP_WITH_TIME_ZONE, start.getMillis(), true, splitEnd.getMillis(), true)), false); Domain splitDomain = Domain.create(ValueSet.ofRanges(Range.range(TIMESTAMP_WITH_TIME_ZONE, 1000 * start.toEpochSecond(), true, 1000 * splitEnd.toEpochSecond(), true)), false);
if (handle.getStartTimeConstraint().overlaps(splitDomain) && handle.getEndTimeConstraint().overlaps(splitDomain)) { if (handle.getStartTimeConstraint().overlaps(splitDomain) && handle.getEndTimeConstraint().overlaps(splitDomain)) {
splits.add(new AtopSplit(table.getTable(), node.getHostAndPort(), start)); splits.add(new AtopSplit(table.getTable(), node.getHostAndPort(), start.toEpochSecond(), start.getZone()));
} }
start = start.plusDays(1).withTimeAtStartOfDay(); start = start.plusDays(1).withHour(0).withMinute(0).withSecond(0).withNano(0);
} }
} }


Expand Down
@@ -0,0 +1,40 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.atop;

import com.facebook.presto.spi.HostAddress;
import io.airlift.json.JsonCodec;
import org.testng.annotations.Test;

import java.time.ZoneId;
import java.time.ZonedDateTime;

import static org.testng.Assert.assertEquals;

public class TestAtopSplit
{
@Test
public void testSerialization()
{
JsonCodec<AtopSplit> codec = JsonCodec.jsonCodec(AtopSplit.class);
ZonedDateTime now = ZonedDateTime.now(ZoneId.of("+01:23"));
AtopSplit split = new AtopSplit(AtopTable.DISKS, HostAddress.fromParts("localhost", 123), now.toEpochSecond(), now.getZone());
AtopSplit decoded = codec.fromJson(codec.toJson(split));
assertEquals(decoded.getTable(), split.getTable());
assertEquals(decoded.getHost(), split.getHost());
assertEquals(decoded.getDate(), split.getDate());
assertEquals(decoded.getEpochSeconds(), split.getEpochSeconds());
assertEquals(decoded.getTimeZone(), split.getTimeZone());
}
}
Expand Up @@ -16,12 +16,12 @@
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Splitter; import com.google.common.base.Splitter;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import org.joda.time.DateTime;


import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.time.ZonedDateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
Expand All @@ -33,7 +33,7 @@ public class TestingAtopFactory
implements AtopFactory implements AtopFactory
{ {
@Override @Override
public Atop create(AtopTable table, DateTime date) public Atop create(AtopTable table, ZonedDateTime date)
{ {
InputStream data = TestingAtopFactory.class.getResourceAsStream(table.name().toLowerCase() + ".txt"); InputStream data = TestingAtopFactory.class.getResourceAsStream(table.name().toLowerCase() + ".txt");
requireNonNull(data, format("No data found for %s", table)); requireNonNull(data, format("No data found for %s", table));
Expand All @@ -44,10 +44,10 @@ private static final class TestingAtop
implements Atop implements Atop
{ {
private final BufferedReader reader; private final BufferedReader reader;
private final DateTime date; private final ZonedDateTime date;
private String line; private String line;


private TestingAtop(InputStream dataStream, DateTime date) private TestingAtop(InputStream dataStream, ZonedDateTime date)
{ {
this.date = date; this.date = date;
this.reader = new BufferedReader(new InputStreamReader(dataStream)); this.reader = new BufferedReader(new InputStreamReader(dataStream));
Expand Down Expand Up @@ -86,7 +86,7 @@ public String next()
List<String> fields = new ArrayList<>(Splitter.on(" ").splitToList(currentLine)); List<String> fields = new ArrayList<>(Splitter.on(" ").splitToList(currentLine));


// Timestamps in file are delta encoded // Timestamps in file are delta encoded
long secondsSinceEpoch = date.getMillis() / 1000; long secondsSinceEpoch = date.toEpochSecond();
secondsSinceEpoch += Integer.parseInt(fields.get(2)); secondsSinceEpoch += Integer.parseInt(fields.get(2));
fields.set(2, String.valueOf(secondsSinceEpoch)); fields.set(2, String.valueOf(secondsSinceEpoch));


Expand Down

0 comments on commit 2b7d693

Please sign in to comment.