Permalink
Browse files

Added reactive JSON encoder.

RB=1265252
G=si-dev
R=kbalasub,mlamure,mnchen
A=mlamure,mnchen
  • Loading branch information...
Xiao Ma
Xiao Ma committed Mar 8, 2018
1 parent 54726a5 commit 5b8ce7ceeedacd4eea10068af4d74acb24a8eaf0
Showing with 711 additions and 203 deletions.
  1. +4 −1 CHANGELOG
  2. +0 −1 data-testutils/build.gradle
  3. +97 −0 data-testutils/src/main/java/com/linkedin/data/ChunkedByteStringCollector.java
  4. +4 −5 data-testutils/src/main/java/com/linkedin/data/{ByteChunkWriter.java → ChunkedByteStringWriter.java}
  5. +0 −145 data-testutils/src/test/java/com/linkedin/data/TestByteChunkWriter.java
  6. +70 −0 data-testutils/src/test/java/com/linkedin/data/TestChunkedByteStringWriter.java
  7. +19 −0 data/src/main/java/com/linkedin/data/ByteString.java
  8. +3 −2 data/src/main/java/com/linkedin/data/codec/entitystream/DataDecoder.java
  9. +25 −0 data/src/main/java/com/linkedin/data/codec/entitystream/DataEncoder.java
  10. +3 −13 data/src/main/java/com/linkedin/data/codec/entitystream/JacksonJsonDataDecoder.java
  11. +352 −0 data/src/main/java/com/linkedin/data/codec/entitystream/JacksonJsonDataEncoder.java
  12. +22 −0 data/src/main/java/com/linkedin/data/codec/entitystream/JsonDataEncoder.java
  13. +0 −2 data/src/test/java/com/linkedin/data/codec/TestJacksonCodec.java
  14. +3 −7 data/src/test/java/com/linkedin/data/codec/entitystream/TestJacksonJsonDataDecoder.java
  15. +60 −0 data/src/test/java/com/linkedin/data/codec/entitystream/TestJacksonJsonDataEncoder.java
  16. +0 −1 entity-stream/build.gradle
  17. +40 −16 entity-stream/src/main/java/com/linkedin/entitystream/CollectingReader.java
  18. +5 −6 entity-stream/src/test/java/com/linkedin/reactivestreams/TestSingletonWriter.java
  19. +2 −2 ...linkedin/restli/examples/greetings/server/GreetingUnstructuredDataCollectionResourceReactive.java
  20. +2 −2 restli-server/src/test/java/com/linkedin/restli/server/twitter/FeedDownloadResourceReactive.java
View
@@ -18,10 +18,13 @@ This change fix the NPE and throw ServiceUnavailableException instead.
(RB=1275957)
Fix distributionBased ring creation with an empty pointsMap
(RB=1265252)
Added implementation for reactive streaming JSON encoder.
20.0.16
------
(RB=1269478)
Temprorily downgrade timings warning to debug to satisfy EKG
Temporarily downgrade timings warning to debug to satisfy EKG
20.0.15
------
@@ -1,5 +1,4 @@
dependencies {
compile project(':pegasus-common')
compile project(':entity-stream')
compile project(':data')
@@ -0,0 +1,97 @@
/*
Copyright (c) 2018 LinkedIn Corp.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.linkedin.data;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
/**
* A {@link Collector} of {@link ByteString}. It concatenates the bytes.
*/
public class ChunkedByteStringCollector implements Collector<ByteString, ChunkedByteStringCollector.ResultContainer, ChunkedByteStringCollector.Result>
{
@Override
public Supplier<ResultContainer> supplier()
{
return ResultContainer::new;
}
@Override
public BiConsumer<ResultContainer, ByteString> accumulator()
{
return this::accumulate;
}
private void accumulate(ResultContainer tmpResult, ByteString data)
{
try
{
tmpResult.os.write(data.copyBytes());
tmpResult.chunkCount++;
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
@Override
public BinaryOperator<ResultContainer> combiner()
{
throw new UnsupportedOperationException();
}
@Override
public Function<ResultContainer, Result> finisher()
{
return tmpResult -> new Result(tmpResult.os.toByteArray(), tmpResult.chunkCount);
}
@Override
public Set<Characteristics> characteristics()
{
return Collections.emptySet();
}
static class ResultContainer
{
ByteArrayOutputStream os = new ByteArrayOutputStream();
int chunkCount = 0;
}
public static class Result
{
public byte[] data;
public int chunkCount;
Result(byte[] data, int chunkCount)
{
this.data = data;
this.chunkCount = chunkCount;
}
}
}
@@ -25,25 +25,24 @@
/**
* This {@link Writer} implementation writes the byte chunks as @{link ByteString}s of a fixed chunk size. This is
* This {@link Writer} implementation writes the bytes as @{link ByteString}s of a fixed size. This is
* useful in testing to allow {@link Reader}s to receive data in multiple chunks.
*
*/
public class ByteChunkWriter implements Writer<ByteString>
public class ChunkedByteStringWriter implements Writer<ByteString>
{
private byte[] _bytes;
private int _offset;
private int _chunkSize;
private WriteHandle<? super ByteString> _writeHandle;
public ByteChunkWriter(byte[] bytes, int chunkSize)
public ChunkedByteStringWriter(byte[] bytes, int chunkSize)
{
_bytes = bytes;
_chunkSize = chunkSize;
_offset = 0;
}
public ByteChunkWriter(String s, int chunkSize)
public ChunkedByteStringWriter(String s, int chunkSize)
{
this(s.getBytes(StandardCharsets.UTF_8), chunkSize);
}

This file was deleted.

Oops, something went wrong.
@@ -0,0 +1,70 @@
/*
Copyright (c) 2018 LinkedIn Corp.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package com.linkedin.data;
import com.linkedin.entitystream.CollectingReader;
import com.linkedin.entitystream.EntityStream;
import com.linkedin.entitystream.EntityStreams;
import com.linkedin.entitystream.Writer;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class TestChunkedByteStringWriter
{
private static final byte[] DATA = new byte[256];
static
{
for (int i = 0; i < 256; i++)
{
DATA[i] = (byte) i;
}
}
@DataProvider
public Object[][] data()
{
return new Object[][]
{
{1},
{127},
{128},
{255},
{256},
{512},
};
}
@Test(dataProvider = "data")
public void testWrite(int chunkSize)
throws InterruptedException, ExecutionException, TimeoutException
{
Writer<ByteString> writer = new ChunkedByteStringWriter(DATA, chunkSize);
EntityStream<ByteString> stream = EntityStreams.newEntityStream(writer);
CollectingReader<ByteString, ?, ChunkedByteStringCollector.Result> reader = new CollectingReader<>(new ChunkedByteStringCollector());
stream.setReader(reader);
ChunkedByteStringCollector.Result result = reader.getResult().toCompletableFuture().get();
Assert.assertEquals(result.data, DATA);
Assert.assertEquals(result.chunkCount, (DATA.length - 1) / chunkSize + 1);
}
}
@@ -79,6 +79,25 @@ public static ByteString unsafeWrap(byte[] bytes)
return bytes.length == 0 ? empty() : new ByteString(bytes);
}
/**
* Returns a new {@link ByteString} that wraps the supplied bytes. Changes to the supplied bytes will be reflected
* in the returned {@link ByteString}.
*
* WARNING: Please exercise caution when using this. Care must be taken to ensure that bytes are not changed
* after construction.
*
* @param bytes the bytes to back the ByteString.
* @param offset the offset of the actual data.
* @param length the length of the actual data.
* @return a {@link ByteString} that wraps the supplied bytes.
* @throws NullPointerException if {@code bytes} is {@code null}.
*/
public static ByteString unsafeWrap(byte[] bytes, int offset, int length)
{
ArgumentUtil.notNull(bytes, "bytes");
return bytes.length == 0 ? empty() : new ByteString(bytes, offset, length);
}
/**
* Returns a new {@link ByteString} that wraps a copy of the supplied bytes. Changes to the supplied bytes
* will not be reflected in the returned {@link ByteString}.
Oops, something went wrong.

0 comments on commit 5b8ce7c

Please sign in to comment.