Skip to content

Commit

Permalink
Refactor PhoenixSplit
Browse files Browse the repository at this point in the history
Replace WrappedPhoenixInputSplit with SerializedPhoenixInputSplit to
simplify memory accounting
  • Loading branch information
arhimondr authored and losipiuk committed Dec 23, 2021
1 parent af11c0e commit 4a1624a
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ public class PhoenixSplit
extends JdbcSplit
{
private final List<HostAddress> addresses;
private final WrappedPhoenixInputSplit phoenixInputSplit;
private final SerializedPhoenixInputSplit serializedPhoenixInputSplit;

@JsonCreator
public PhoenixSplit(
@JsonProperty("addresses") List<HostAddress> addresses,
@JsonProperty("phoenixInputSplit") WrappedPhoenixInputSplit wrappedPhoenixInputSplit)
@JsonProperty("serializedPhoenixInputSplit") SerializedPhoenixInputSplit serializedPhoenixInputSplit)
{
super(Optional.empty());
this.addresses = requireNonNull(addresses, "addresses is null");
this.phoenixInputSplit = requireNonNull(wrappedPhoenixInputSplit, "wrappedPhoenixInputSplit is null");
this.serializedPhoenixInputSplit = requireNonNull(serializedPhoenixInputSplit, "serializedPhoenixInputSplit is null");
}

@JsonProperty
Expand All @@ -48,15 +48,15 @@ public List<HostAddress> getAddresses()
return addresses;
}

@JsonProperty("phoenixInputSplit")
public WrappedPhoenixInputSplit getWrappedPhoenixInputSplit()
@JsonProperty
public SerializedPhoenixInputSplit getSerializedPhoenixInputSplit()
{
return phoenixInputSplit;
return serializedPhoenixInputSplit;
}

@JsonIgnore
public PhoenixInputSplit getPhoenixInputSplit()
{
return phoenixInputSplit.getPhoenixInputSplit();
return serializedPhoenixInputSplit.deserialize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public ConnectorSplitSource getSplits(
.map(PhoenixInputSplit.class::cast)
.map(split -> new PhoenixSplit(
getSplitAddresses(split),
new WrappedPhoenixInputSplit(split)))
SerializedPhoenixInputSplit.serialize(split)))
.collect(toImmutableList());
return new FixedSplitSource(splits);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.phoenix;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.io.ByteStreams;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.mapreduce.PhoenixInputSplit;

import java.io.IOException;
import java.io.UncheckedIOException;

import static java.util.Objects.requireNonNull;

public class SerializedPhoenixInputSplit
{
private final byte[] bytes;

public static SerializedPhoenixInputSplit serialize(PhoenixInputSplit split)
{
return new SerializedPhoenixInputSplit(WritableUtils.toByteArray(split));
}

@JsonCreator
public SerializedPhoenixInputSplit(@JsonProperty("bytes") byte[] bytes)
{
this.bytes = requireNonNull(bytes, "bytes is null");
}

@JsonProperty
public byte[] getBytes()
{
return bytes;
}

public PhoenixInputSplit deserialize()
{
PhoenixInputSplit split = new PhoenixInputSplit();
try {
split.readFields(ByteStreams.newDataInput(bytes));
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
return split;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void testPhoenixSplitJsonRoundtrip()
PhoenixInputSplit phoenixInputSplit = new PhoenixInputSplit(scans);
PhoenixSplit expected = new PhoenixSplit(
addresses,
new WrappedPhoenixInputSplit(phoenixInputSplit));
SerializedPhoenixInputSplit.serialize(phoenixInputSplit));

assertTrue(objectMapper.canSerialize(PhoenixSplit.class));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ public class PhoenixSplit
extends JdbcSplit
{
private final List<HostAddress> addresses;
private final WrappedPhoenixInputSplit phoenixInputSplit;
private final SerializedPhoenixInputSplit serializedPhoenixInputSplit;

@JsonCreator
public PhoenixSplit(
@JsonProperty("addresses") List<HostAddress> addresses,
@JsonProperty("phoenixInputSplit") WrappedPhoenixInputSplit wrappedPhoenixInputSplit)
@JsonProperty("serializedPhoenixInputSplit") SerializedPhoenixInputSplit serializedPhoenixInputSplit)
{
super(Optional.empty());
this.addresses = requireNonNull(addresses, "addresses is null");
this.phoenixInputSplit = requireNonNull(wrappedPhoenixInputSplit, "wrappedPhoenixInputSplit is null");
this.serializedPhoenixInputSplit = requireNonNull(serializedPhoenixInputSplit, "serializedPhoenixInputSplit is null");
}

@JsonProperty
Expand All @@ -48,15 +48,15 @@ public List<HostAddress> getAddresses()
return addresses;
}

@JsonProperty("phoenixInputSplit")
public WrappedPhoenixInputSplit getWrappedPhoenixInputSplit()
@JsonProperty
public SerializedPhoenixInputSplit getSerializedPhoenixInputSplit()
{
return phoenixInputSplit;
return serializedPhoenixInputSplit;
}

@JsonIgnore
public PhoenixInputSplit getPhoenixInputSplit()
{
return phoenixInputSplit.getPhoenixInputSplit();
return serializedPhoenixInputSplit.deserialize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public ConnectorSplitSource getSplits(
.map(PhoenixInputSplit.class::cast)
.map(split -> new PhoenixSplit(
getSplitAddresses(split),
new WrappedPhoenixInputSplit(split)))
SerializedPhoenixInputSplit.serialize(split)))
.collect(toImmutableList());
return new FixedSplitSource(splits);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.phoenix5;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.io.ByteStreams;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.mapreduce.PhoenixInputSplit;

import java.io.IOException;
import java.io.UncheckedIOException;

import static java.util.Objects.requireNonNull;

public class SerializedPhoenixInputSplit
{
private final byte[] bytes;

public static SerializedPhoenixInputSplit serialize(PhoenixInputSplit split)
{
return new SerializedPhoenixInputSplit(WritableUtils.toByteArray(split));
}

@JsonCreator
public SerializedPhoenixInputSplit(@JsonProperty("bytes") byte[] bytes)
{
this.bytes = requireNonNull(bytes, "bytes is null");
}

@JsonProperty
public byte[] getBytes()
{
return bytes;
}

public PhoenixInputSplit deserialize()
{
PhoenixInputSplit split = new PhoenixInputSplit();
try {
split.readFields(ByteStreams.newDataInput(bytes));
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
return split;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void testPhoenixSplitJsonRoundtrip()
PhoenixInputSplit phoenixInputSplit = new PhoenixInputSplit(scans);
PhoenixSplit expected = new PhoenixSplit(
addresses,
new WrappedPhoenixInputSplit(phoenixInputSplit));
SerializedPhoenixInputSplit.serialize(phoenixInputSplit));

assertTrue(objectMapper.canSerialize(PhoenixSplit.class));

Expand Down

0 comments on commit 4a1624a

Please sign in to comment.