Skip to content

Commit

Permalink
Decouple schema parsing from code generation (#532)
Browse files Browse the repository at this point in the history
Remove dead code

Co-authored-by: Karthik Ramgopal <kramgopa@linkedin.com>
  • Loading branch information
karthikrg and li-kramgopa committed Jan 12, 2024
1 parent 39d884c commit cdf7f90
Show file tree
Hide file tree
Showing 15 changed files with 297 additions and 266 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,56 +10,47 @@
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Objects;
import java.util.Set;
import org.apache.avro.Schema;


/**
* common context to all {@link com.linkedin.avroutil1.builder.operations.Operation}s run during an execution
*/
public class OperationContext {
private final HashMap<String, Object> context;
private Set<File> avroFiles;
private Set<AvroSchema> avroSchemas;
private final Set<File> avroFiles;
private final Set<AvroSchema> avroSchemas;
private final SchemaSet lookupSchemaSet;

public OperationContext() {
public OperationContext(Set<AvroSchema> avroSchemas, Set<File> avroFiles, SchemaSet lookupSchemaSet) {
this.context = new HashMap<>();
}

public void addParsedSchemas(Set<AvroSchema> avroSchemas, Set<File> avroFiles) {
if (this.avroFiles != null || this.avroSchemas != null) {
throw new IllegalStateException("Cannot initialize avro files twice");
}

this.avroFiles = Collections.unmodifiableSet(avroFiles);
this.avroSchemas = Collections.unmodifiableSet(avroSchemas);
}

public void addVanillaSchemas(Set<Schema> schemas, Set<File> avroFiles) {
addParsedSchemas(AvroSchemaUtils.schemasToAvroSchemas(schemas), avroFiles);
this.avroSchemas = Collections.unmodifiableSet(Objects.requireNonNull(avroSchemas));
this.avroFiles = Collections.unmodifiableSet(Objects.requireNonNull(avroFiles));
this.lookupSchemaSet = lookupSchemaSet;
}

/**
* Returns an unmodifiable set of all input avro files.
*/
public Set<File> getAvroFiles() {
if (this.avroFiles == null) {
throw new IllegalStateException("Avro files haven't been added yet.");
}
return this.avroFiles;
}

/**
* Returns an unmodifiable set of all parsed avro schemas.
*/
public Set<AvroSchema> getAvroSchemas() {
if (this.avroSchemas == null) {
throw new IllegalStateException("Avro schemas haven't been added yet.");
}

return this.avroSchemas;
}

/**
* Returns the lookup schema set.
*/
public SchemaSet getLookupSchemaSet() {
return this.lookupSchemaSet;
}

public void setConfigValue(String key, Object value) {
if (this.context.containsKey(key)) {
throw new IllegalStateException("Cannot initialize an already-initialized config key.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* See License in the project root for license information.
*/

package com.linkedin.avroutil1.builder.operations.codegen.vanilla;
package com.linkedin.avroutil1.builder.operations;


import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@
*/
public class BuilderPluginContext {

private List<Operation> operations = new ArrayList<>(1);
private OperationContext _operationContext = new OperationContext();
private final List<Operation> operations = new ArrayList<>(1);
private volatile boolean sealed = false;
private final OperationContext operationContext;

public BuilderPluginContext(OperationContext operationContext) {
this.operationContext = operationContext;
}

public void add(Operation op) {
if (sealed) {
Expand All @@ -39,8 +43,12 @@ public void run() throws Exception {
//"seal" any internal state to prevent plugins from trying to do weird things during execution
sealed = true;

for (Operation op : operations) {
op.run(_operationContext);
}
operations.parallelStream().forEach(op -> {
try {
op.run(operationContext);
} catch (Exception e) {
throw new IllegalStateException("Exception running operation", e);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

package com.linkedin.avroutil1.builder;

import com.linkedin.avroutil1.builder.operations.OperationContext;
import com.linkedin.avroutil1.builder.operations.codegen.CodeGenerator;
import com.linkedin.avroutil1.builder.operations.codegen.own.AvroUtilCodeGenOp;
import com.linkedin.avroutil1.builder.operations.codegen.OperationContextBuilder;
import com.linkedin.avroutil1.builder.operations.codegen.own.AvroUtilCodeGenPlugin;
import com.linkedin.avroutil1.builder.operations.codegen.CodeGenOpConfig;
import com.linkedin.avroutil1.builder.operations.Operation;
import com.linkedin.avroutil1.builder.operations.codegen.own.AvroUtilOperationContextBuilder;
import com.linkedin.avroutil1.builder.operations.codegen.vanilla.VanillaProcessedCodeGenOp;
import com.linkedin.avroutil1.builder.plugins.BuilderPlugin;
import com.linkedin.avroutil1.builder.plugins.BuilderPluginContext;
Expand Down Expand Up @@ -232,8 +234,6 @@ public static void main(String[] args) throws Exception {
plugin.parseAndValidateOptions(options);
}

BuilderPluginContext context = new BuilderPluginContext();

CodeGenOpConfig opConfig = new CodeGenOpConfig(
inputs,
nonImportableSources,
Expand All @@ -254,20 +254,22 @@ public static void main(String[] args) throws Exception {

opConfig.validateParameters();

Operation op;
OperationContextBuilder operationContextBuilder;
switch (opConfig.getGeneratorType()) {
case AVRO_UTIL:
op = new AvroUtilCodeGenOp(opConfig);
operationContextBuilder = new AvroUtilOperationContextBuilder();
plugins.add(new AvroUtilCodeGenPlugin(opConfig));
break;
case VANILLA:
op = new VanillaProcessedCodeGenOp(opConfig);
operationContextBuilder = new VanillaProcessedCodeGenOp();
break;
default:
throw new IllegalStateException("unhandled: " + opConfig.getGeneratorType());
}
context.add(op);
OperationContext opContext = operationContextBuilder.buildOperationContext(opConfig);
BuilderPluginContext context = new BuilderPluginContext(opContext);

//allow plugins to add operations
// Allow other plugins to add operations
for (BuilderPlugin plugin : plugins) {
plugin.createOperations(context);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2024 LinkedIn Corp.
* Licensed under the BSD 2-Clause License (the "License").
* See License in the project root for license information.
*/

package com.linkedin.avroutil1.builder.operations.codegen;

import com.linkedin.avroutil1.builder.operations.OperationContext;


/**
* Builds operation context.
*/
public interface OperationContextBuilder {

/**
* Builds and returns the {@link OperationContext}.
*/
OperationContext buildOperationContext(CodeGenOpConfig opConfig) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Copyright 2022 LinkedIn Corp.
* Licensed under the BSD 2-Clause License (the "License").
* See License in the project root for license information.
*/

package com.linkedin.avroutil1.builder.operations.codegen.own;

import com.linkedin.avroutil1.builder.operations.OperationContext;
import com.linkedin.avroutil1.builder.operations.SchemaSet;
import com.linkedin.avroutil1.builder.operations.codegen.CodeGenOpConfig;
import com.linkedin.avroutil1.builder.plugins.BuilderPlugin;
import com.linkedin.avroutil1.builder.plugins.BuilderPluginContext;
import com.linkedin.avroutil1.codegen.SpecificRecordClassGenerator;
import com.linkedin.avroutil1.codegen.SpecificRecordGenerationConfig;
import com.linkedin.avroutil1.codegen.SpecificRecordGeneratorUtil;
import com.linkedin.avroutil1.model.AvroJavaStringRepresentation;
import com.linkedin.avroutil1.model.AvroNamedSchema;
import com.linkedin.avroutil1.model.AvroType;
import com.linkedin.avroutil1.model.AvroUnionSchema;
import com.squareup.javapoet.JavaFile;
import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* A code generation plugin using the avro-codegen module of avro-util
*/
public class AvroUtilCodeGenPlugin implements BuilderPlugin {
private static final Logger LOGGER = LoggerFactory.getLogger(AvroUtilCodeGenPlugin.class);

private final CodeGenOpConfig config;

public AvroUtilCodeGenPlugin(CodeGenOpConfig config) {
this.config = config;
}

@Override
public Set<Integer> supportedApiVersions() {
return Collections.singleton(1);
}

@Override
public void createOperations(BuilderPluginContext context) {
context.add(this::generateCode);
}

private void generateCode(OperationContext opContext) {
//mkdir any output folders that don't exist
if (!config.getOutputSpecificRecordClassesRoot().exists() && !config.getOutputSpecificRecordClassesRoot()
.mkdirs()) {
throw new IllegalStateException(
"unable to create destination folder " + config.getOutputSpecificRecordClassesRoot());
}

final AtomicInteger schemaCounter = new AtomicInteger(0);
final int schemaChunkSize = 500;
Collection<List<AvroNamedSchema>> allNamedSchemaList = opContext.getAvroSchemas().stream().flatMap(schema -> {
if (schema instanceof AvroNamedSchema) {
return Stream.of((AvroNamedSchema) schema);
} else if (AvroType.UNION.equals(schema.type())) {
return ((AvroUnionSchema) schema).getTypes()
.stream()
.map(schemaOrRef -> (AvroNamedSchema) schemaOrRef.getSchema());
} else {
return Stream.empty();
}
}).collect(Collectors.groupingBy(it -> schemaCounter.getAndIncrement() / schemaChunkSize)).values();

long genStart = System.currentTimeMillis();

final SpecificRecordGenerationConfig generationConfig =
SpecificRecordGenerationConfig.getBroadCompatibilitySpecificRecordGenerationConfig(
AvroJavaStringRepresentation.fromJson(config.getStringRepresentation().toString()),
AvroJavaStringRepresentation.fromJson(config.getMethodStringRepresentation().toString()),
config.getMinAvroVersion(), config.isUtf8EncodingPutByIndexEnabled());

// Make sure the output folder exists
File outputFolder = config.getOutputSpecificRecordClassesRoot();
if (!outputFolder.exists() && !outputFolder.mkdirs()) {
throw new IllegalStateException("unable to create output folder " + outputFolder);
}
final Path outputDirectoryPath = outputFolder.toPath();
final SpecificRecordClassGenerator generator = new SpecificRecordClassGenerator();

int totalGeneratedClasses = allNamedSchemaList.parallelStream().map(allNamedSchemas -> {
HashSet<String> alreadyGeneratedSchemaNames = new HashSet<>();
List<JavaFile> generatedSpecificClasses = new ArrayList<>(allNamedSchemas.size());
for (AvroNamedSchema namedSchema : allNamedSchemas) {
try {
if (!alreadyGeneratedSchemaNames.contains(namedSchema.getFullName())) {
// skip codegen if schema is on classpath and config says to skip
if (config.shouldSkipCodegenIfSchemaOnClasspath() &&
doesSchemaExistOnClasspath(namedSchema, opContext.getLookupSchemaSet())) {
continue;
}

//top level schema
alreadyGeneratedSchemaNames.add(namedSchema.getFullName());
generatedSpecificClasses.add(generator.generateSpecificClass(namedSchema, generationConfig));

// generate internal schemas if not already present
List<AvroNamedSchema> internalSchemaList =
SpecificRecordGeneratorUtil.getNestedInternalSchemaList(namedSchema);
for (AvroNamedSchema namedInternalSchema : internalSchemaList) {
if (!alreadyGeneratedSchemaNames.contains(namedInternalSchema.getFullName())) {
// skip codegen for nested schemas if schema is on classpath and config says to skip
if (config.shouldSkipCodegenIfSchemaOnClasspath() &&
doesSchemaExistOnClasspath(namedInternalSchema, opContext.getLookupSchemaSet())) {
continue;
}

generatedSpecificClasses.add(generator.generateSpecificClass(namedInternalSchema, generationConfig));
alreadyGeneratedSchemaNames.add(namedInternalSchema.getFullName());
}
}
}
} catch (Exception e) {
throw new RuntimeException("failed to generate class for " + namedSchema.getFullName(), e);
}
}
writeJavaFilesToDisk(generatedSpecificClasses, outputDirectoryPath);
return generatedSpecificClasses.size();
}).reduce(0, Integer::sum);

long genEnd = System.currentTimeMillis();
LOGGER.info("Generated {} java source files in {} millis", totalGeneratedClasses, genEnd - genStart);
}

private boolean doesSchemaExistOnClasspath(AvroNamedSchema schema, SchemaSet schemaSet) {
if (schemaSet == null) {
return false;
}

return schemaSet.getByName(schema.getFullName()) != null;
}

private void writeJavaFilesToDisk(Collection<JavaFile> javaFiles, Path outputFolderPath) {

long writeStart = System.currentTimeMillis();

// write out the files we generated
int filesWritten = javaFiles.parallelStream().map(javaFile -> {
try {
javaFile.writeToPath(outputFolderPath);
} catch (Exception e) {
throw new IllegalStateException("while writing file " + javaFile.typeSpec.name, e);
}

return 1;
}).reduce(0, Integer::sum);

long writeEnd = System.currentTimeMillis();
LOGGER.info("wrote out {} generated java source files under {} in {} millis", filesWritten, outputFolderPath,
writeEnd - writeStart);
}
}
Loading

0 comments on commit cdf7f90

Please sign in to comment.