Skip to content

Commit

Permalink
Add ability to write avscs directly to streams/writers (#540)
Browse files Browse the repository at this point in the history
Co-authored-by: Karthik Ramgopal <kramgopa@linkedin.com>
  • Loading branch information
karthikrg and li-kramgopa committed Jan 18, 2024
1 parent eea7e61 commit f46f720
Show file tree
Hide file tree
Showing 25 changed files with 667 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.OutputStream;
import java.io.StringWriter;
import java.io.Writer;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -196,7 +198,25 @@ default Schema newEnumSchema(String name, String doc, String namespace, List<Str
* @param schema a schema to print out as (exploded/fully-defined) avsc
* @return the given schema as avsc
*/
String toAvsc(Schema schema, AvscGenerationConfig config);
default String toAvsc(Schema schema, AvscGenerationConfig config) {
StringWriter writer = new StringWriter();
writeAvsc(schema, config, writer);
return writer.toString();
}

/**
* "serialize" a {@link Schema} to avsc format and write it to the given writer.
* @param schema a schema to print out as (exploded/fully-defined) avsc
* @param writer the writer to write the serialized schema to.
*/
void writeAvsc(Schema schema, AvscGenerationConfig config, Writer writer);

/**
* "serialize" a {@link Schema} to avsc format and write it to the given output stream.
* @param schema a schema to print out as (exploded/fully-defined) avsc
* @param stream the output stream to write the serialized schema to.
*/
void writeAvsc(Schema schema, AvscGenerationConfig config, OutputStream stream);

/***
* Instantiates and returns com.linkedin.avroutil1.compatibility.AvroWriter for the provided configs, plugin list
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
import com.linkedin.avroutil1.compatibility.backports.AvroNames;
import com.linkedin.avroutil1.normalization.AvscWriterPlugin;
import java.io.IOException;
import java.io.OutputStream;
import java.io.StringWriter;
import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -137,24 +139,37 @@ protected AvscWriter(boolean pretty, boolean preAvro702, boolean addAliasesForAv
this._plugins = plugins == null ? Collections.emptyList() : plugins;
}

public String toAvsc(Schema schema) {
try {
AvroNames names = new AvroNames();
StringWriter writer = new StringWriter();
G gen = createJsonGenerator(writer);
if(isLegacy) {
toJsonLegacy(schema, names, "", "", gen);
} else {
toJson(schema, names, "", "", gen);
}
public void writeAvsc(Schema schema, OutputStream outputStream) {
try (G generator = createJsonGenerator(outputStream)) {
writeAvsc(schema, generator);
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
}

gen.flush();
return writer.toString();
public void writeAvsc(Schema schema, Writer writer) {
try (G generator = createJsonGenerator(writer)) {
writeAvsc(schema, generator);
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
}

private void writeAvsc(Schema schema, G generator) throws IOException {
AvroNames names = new AvroNames();
if (isLegacy) {
toJsonLegacy(schema, names, "", "", generator);
} else {
toJson(schema, names, "", "", generator);
}
}

public String toAvsc(Schema schema) {
StringWriter writer = new StringWriter();
writeAvsc(schema, writer);
return writer.toString();
}

protected void toJsonLegacy(
Schema schema,
AvroNames names,
Expand Down Expand Up @@ -728,7 +743,9 @@ protected Avro702Data writeName(

//json generator methods (will vary by jackson version across different avro versions)

protected abstract G createJsonGenerator(StringWriter writer) throws IOException;
protected abstract G createJsonGenerator(Writer writer) throws IOException;

protected abstract G createJsonGenerator(OutputStream outputStream) throws IOException;

//properties are very broken across avro versions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,9 @@ public void writeArrayFieldStart(String fieldName) throws IOException {
public void flush() throws IOException {
delegate.flush();
}

@Override
public void close() throws IOException {
delegate.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,9 @@ public void writeArrayFieldStart(String fieldName) throws IOException {
public void flush() throws IOException {
delegate.flush();
}

@Override
public void close() throws IOException {
delegate.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
/**
* internal abstraction to bridge jackson 1 to 2 JsonGenerator
*/
public interface JsonGeneratorWrapper<T> {
public interface JsonGeneratorWrapper<T> extends AutoCloseable {

T getDelegate();

Expand All @@ -34,4 +34,6 @@ public interface JsonGeneratorWrapper<T> {
void writeArrayFieldStart(String fieldName) throws IOException;

void flush() throws IOException;

void close() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.io.ObjectOutput;
import java.io.OutputStream;
import java.io.StringWriter;
import java.io.Writer;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
Expand Down Expand Up @@ -1140,6 +1141,54 @@ public static String toAvsc(Schema schema, AvscGenerationConfig config) {
return ADAPTER.toAvsc(schema, config);
}

/**
* Given a schema, writes a (exploded, fully-inlined, self-contained, however you want to call it)
* avsc representation of the schema to the given {@link java.io.Writer}.
*
* This is logically the same as {@link Schema#toString(boolean)} except not full of horrible bugs.
* specifically, under all versions of avro, we support:
* <ul>
* <li>output free of avro-702, even under avro 1.4</li>
* <li>escaped characters in docs and default values remain properly escaped (avro-886)</li>
* </ul>
* (unless of course you choose to delegate to vanilla avro, at which point youre at the mercy of
* the runtime version thereof)
* @param schema a schema to serialize to avsc
* @param config configuration for avsc generation. see {@link AvscGenerationConfig} for available knobs
* @param writer The writer to write to.
*/
public static void writeAvsc(Schema schema, AvscGenerationConfig config, Writer writer) {
assertAvroAvailable();
if (config == null) {
throw new IllegalArgumentException("config must be provided");
}
ADAPTER.writeAvsc(schema, config, writer);
}

/**
* Given a schema, writes a (exploded, fully-inlined, self-contained, however you want to call it)
* avsc representation of the schema to the given {@link java.io.OutputStream}.
*
* This is logically the same as {@link Schema#toString(boolean)} except not full of horrible bugs.
* specifically, under all versions of avro, we support:
* <ul>
* <li>output free of avro-702, even under avro 1.4</li>
* <li>escaped characters in docs and default values remain properly escaped (avro-886)</li>
* </ul>
* (unless of course you choose to delegate to vanilla avro, at which point youre at the mercy of
* the runtime version thereof)
* @param schema a schema to serialize to avsc
* @param config configuration for avsc generation. see {@link AvscGenerationConfig} for available knobs
* @param stream The output stream to write to.
*/
public static void writeAvsc(Schema schema, AvscGenerationConfig config, OutputStream stream) {
assertAvroAvailable();
if (config == null) {
throw new IllegalArgumentException("config must be provided");
}
ADAPTER.writeAvsc(schema, config, stream);
}

/***
* Check if Fields in schema are reordered
* @param schema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.linkedin.avroutil1.compatibility.avro110.codec.CompatibleJsonEncoder;
import com.linkedin.avroutil1.compatibility.backports.ObjectInputToInputStreamAdapter;
import com.linkedin.avroutil1.normalization.AvscWriterPlugin;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -491,7 +493,7 @@ public Schema newEnumSchema(String name, String doc, String namespace, List<Stri
}

@Override
public String toAvsc(Schema schema, AvscGenerationConfig config) {
public void writeAvsc(Schema schema, AvscGenerationConfig config, Writer writer) {
boolean useRuntime;
if (!isRuntimeAvroCapableOf(config)) {
if (config.isForceUseOfRuntimeAvro()) {
Expand All @@ -505,13 +507,46 @@ public String toAvsc(Schema schema, AvscGenerationConfig config) {
}

if (useRuntime) {
return schema.toString(config.isPrettyPrint());
try {
writer.write(schema.toString(config.isPrettyPrint()));
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
} else {
//if the user does not specify do whatever runtime avro would (which for 1.10 means produce correct schema)
boolean usePre702Logic = config.getRetainPreAvro702Logic().orElse(Boolean.FALSE);
Avro110AvscWriter avscWriter =
new Avro110AvscWriter(config.isPrettyPrint(), usePre702Logic, config.isAddAvro702Aliases());
avscWriter.writeAvsc(schema, writer);
}
}

@Override
public void writeAvsc(Schema schema, AvscGenerationConfig config, OutputStream stream) {
boolean useRuntime;
if (!isRuntimeAvroCapableOf(config)) {
if (config.isForceUseOfRuntimeAvro()) {
throw new UnsupportedOperationException(
"desired configuration " + config + " is forced yet runtime avro " + supportedMajorVersion()
+ " is not capable of it");
}
useRuntime = false;
} else {
useRuntime = config.isPreferUseOfRuntimeAvro();
}

if (useRuntime) {
try {
stream.write(schema.toString(config.isPrettyPrint()).getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
} else {
//if the user does not specify do whatever runtime avro would (which for 1.10 means produce correct schema)
boolean usePre702Logic = config.getRetainPreAvro702Logic().orElse(Boolean.FALSE);
Avro110AvscWriter writer =
Avro110AvscWriter avscWriter =
new Avro110AvscWriter(config.isPrettyPrint(), usePre702Logic, config.isAddAvro702Aliases());
return writer.toAvsc(schema);
avscWriter.writeAvsc(schema, stream);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
import com.linkedin.avroutil1.compatibility.Jackson2Utils;
import com.linkedin.avroutil1.normalization.AvscWriterPlugin;
import java.io.IOException;
import java.io.StringWriter;
import java.io.OutputStream;
import java.io.Writer;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -51,12 +52,22 @@ public Avro110AvscWriter(boolean pretty, boolean preAvro702, boolean addAliasesF
}

@Override
protected Jackson2JsonGeneratorWrapper createJsonGenerator(StringWriter writer) throws IOException {
protected Jackson2JsonGeneratorWrapper createJsonGenerator(Writer writer) throws IOException {
JsonGenerator gen = FACTORY.createGenerator(writer);
return createWrapper(gen);
}

@Override
protected Jackson2JsonGeneratorWrapper createJsonGenerator(OutputStream stream) throws IOException {
JsonGenerator gen = FACTORY.createGenerator(stream);
return createWrapper(gen);
}

private Jackson2JsonGeneratorWrapper createWrapper(JsonGenerator generator) {
if (pretty) {
gen.useDefaultPrettyPrinter();
generator.useDefaultPrettyPrinter();
}
return new Jackson2JsonGeneratorWrapper(gen);
return new Jackson2JsonGeneratorWrapper(generator);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.linkedin.avroutil1.compatibility.avro111.codec.CompatibleJsonEncoder;
import com.linkedin.avroutil1.compatibility.backports.ObjectInputToInputStreamAdapter;
import com.linkedin.avroutil1.normalization.AvscWriterPlugin;
import java.io.Writer;
import java.nio.charset.StandardCharsets;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -489,7 +491,7 @@ public Schema newEnumSchema(String name, String doc, String namespace, List<Stri
}

@Override
public String toAvsc(Schema schema, AvscGenerationConfig config) {
public void writeAvsc(Schema schema, AvscGenerationConfig config, Writer writer) {
boolean useRuntime;
if (!isRuntimeAvroCapableOf(config)) {
if (config.isForceUseOfRuntimeAvro()) {
Expand All @@ -503,16 +505,50 @@ public String toAvsc(Schema schema, AvscGenerationConfig config) {
}

if (useRuntime) {
return schema.toString(config.isPrettyPrint());
try {
writer.write(schema.toString(config.isPrettyPrint()));
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
} else {
//if the user does not specify do whatever runtime avro would (which for 1.11 means produce correct schema)
boolean usePre702Logic = config.getRetainPreAvro702Logic().orElse(Boolean.FALSE);
Avro111AvscWriter avscWriter =
new Avro111AvscWriter(config.isPrettyPrint(), usePre702Logic, config.isAddAvro702Aliases());
avscWriter.writeAvsc(schema, writer);
}
}

@Override
public void writeAvsc(Schema schema, AvscGenerationConfig config, OutputStream stream) {
boolean useRuntime;
if (!isRuntimeAvroCapableOf(config)) {
if (config.isForceUseOfRuntimeAvro()) {
throw new UnsupportedOperationException(
"desired configuration " + config + " is forced yet runtime avro " + supportedMajorVersion()
+ " is not capable of it");
}
useRuntime = false;
} else {
useRuntime = config.isPreferUseOfRuntimeAvro();
}

if (useRuntime) {
try {
stream.write(schema.toString(config.isPrettyPrint()).getBytes(StandardCharsets.UTF_8));
} catch (IOException e) {
throw new AvroRuntimeException(e);
}
} else {
//if the user does not specify do whatever runtime avro would (which for 1.11 means produce correct schema)
boolean usePre702Logic = config.getRetainPreAvro702Logic().orElse(Boolean.FALSE);
Avro111AvscWriter writer =
Avro111AvscWriter avscWriter =
new Avro111AvscWriter(config.isPrettyPrint(), usePre702Logic, config.isAddAvro702Aliases());
return writer.toAvsc(schema);
avscWriter.writeAvsc(schema, stream);
}
}


@Override
public AvscWriter getAvscWriter(AvscGenerationConfig config, List<AvscWriterPlugin> schemaPlugins) {
boolean usePre702Logic = config.getRetainPreAvro702Logic().orElse(Boolean.FALSE);
Expand Down
Loading

0 comments on commit f46f720

Please sign in to comment.