Skip to content

Commit

Permalink
Add implementation for schema guessing from the example file in the HTTP
Browse files Browse the repository at this point in the history
Server source.

fixes apache#1637
fixes apache#1518 for streampipes-extensions
  • Loading branch information
obermeier committed May 30, 2023
1 parent 77a34d5 commit a5c599a
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 1 deletion.
Expand Up @@ -257,6 +257,11 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<configuration>
<propertyExpansion>
checkstyle.config.base.path=${project.parent.parent.basedir}/tools/maven
</propertyExpansion>
</configuration>
</plugin>
</plugins>
<finalName>streampipes-connect-adapters-iiot</finalName>
Expand Down
Expand Up @@ -17,19 +17,22 @@
*/
package org.apache.streampipes.connect.iiot.protocol.stream;

import org.apache.streampipes.connect.iiot.utils.FileProtocolUtils;
import org.apache.streampipes.extensions.api.connect.IAdapterPipeline;
import org.apache.streampipes.extensions.api.connect.IFormat;
import org.apache.streampipes.extensions.api.connect.IParser;
import org.apache.streampipes.extensions.api.connect.exception.ParseException;
import org.apache.streampipes.extensions.management.connect.HttpServerAdapterManagement;
import org.apache.streampipes.extensions.management.connect.SendToPipeline;
import org.apache.streampipes.extensions.management.connect.adapter.guess.SchemaGuesser;
import org.apache.streampipes.extensions.management.connect.adapter.model.generic.Protocol;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.model.AdapterType;
import org.apache.streampipes.model.connect.grounding.ProtocolDescription;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.schema.EventProperty;
import org.apache.streampipes.model.schema.EventPropertyPrimitive;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.staticproperty.CollectionStaticProperty;
import org.apache.streampipes.model.staticproperty.StaticProperty;
import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
Expand All @@ -46,9 +49,12 @@
import org.apache.streampipes.sdk.utils.Assets;
import org.apache.streampipes.sdk.utils.Datatypes;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class HttpServerProtocol extends Protocol {

Expand Down Expand Up @@ -112,6 +118,8 @@ public GuessSchema getGuessSchema() throws ParseException {
StaticPropertyExtractor extractor =
StaticPropertyExtractor.from(adapterDescription.getConfig(), new ArrayList<>());
GuessSchemaBuilder schemaBuilder = GuessSchemaBuilder.create();
GuessSchema shema = null;


String selectedImportMode = extractor.selectedAlternativeInternalId(CONFIGURE);

Expand All @@ -123,9 +131,27 @@ public GuessSchema getGuessSchema() throws ParseException {
StaticPropertyExtractor.from(((StaticPropertyGroup) member).getStaticProperties(), new ArrayList<>());
schemaBuilder.property(makeProperty(memberExtractor));
}

shema = schemaBuilder.build();
} else if (selectedImportMode.equals(FILE_IMPORT)){
var fileName = extractor.selectedFilename(FILE);

InputStream dataInputStream = getDataFromEndpoint(fileName);

List<byte[]> dataByte = parser.parseNEvents(dataInputStream, 2);
EventSchema eventSchema = parser.getEventSchema(dataByte);
shema = SchemaGuesser.guessSchema(eventSchema);
}

return schemaBuilder.build();
return shema;
}

public InputStream getDataFromEndpoint(String fileName) throws ParseException {
try {
return FileProtocolUtils.getFileInputStream(fileName);
} catch (IOException e) {
throw new ParseException("Could not find file: " + fileName);
}
}

private EventProperty makeProperty(StaticPropertyExtractor memberExtractor) {
Expand Down

0 comments on commit a5c599a

Please sign in to comment.