diff --git a/api/scala/FebrlExample.scala b/api/scala/FebrlExample.scala new file mode 100644 index 000000000..31283aab7 --- /dev/null +++ b/api/scala/FebrlExample.scala @@ -0,0 +1,115 @@ +import zingg.client._; +import java.util.ArrayList; + +//setting silent mode for Argument creation only +:silent + +//build the arguments for zingg +val args = new Arguments(); +//set field definitions +val fname = new FieldDefinition(); +fname.setFieldName("fname"); +fname.setDataType("\"string\""); +fname.setMatchType(MatchType.FUZZY); +fname.setFields("fname"); + +val lname = new FieldDefinition(); +lname.setFieldName("lname"); +lname.setDataType("\"string\""); +lname.setMatchType(MatchType.FUZZY); +lname.setFields("lname"); + +val stNo = new FieldDefinition(); +stNo.setFieldName("stNo"); +stNo.setDataType("\"string\""); +stNo.setMatchType(MatchType.FUZZY); +stNo.setFields("stNo"); + +val add1 = new FieldDefinition(); +add1.setFieldName("add1"); +add1.setDataType("\"string\""); +add1.setMatchType(MatchType.FUZZY); +add1.setFields("add1"); + +val add2 = new FieldDefinition(); +add2.setFieldName("add2"); +add2.setDataType("\"string\""); +add2.setMatchType(MatchType.FUZZY); +add2.setFields("add2"); + +val city = new FieldDefinition(); +city.setFieldName("city"); +city.setDataType("\"string\""); +city.setMatchType(MatchType.FUZZY); +city.setFields("city"); + +val areacode = new FieldDefinition(); +areacode.setFieldName("areacode"); +areacode.setDataType("\"string\""); +areacode.setMatchType(MatchType.FUZZY); +areacode.setFields("areacode"); + +val state = new FieldDefinition(); +state.setFieldName("state"); +state.setDataType("\"string\""); +state.setMatchType(MatchType.FUZZY); +state.setFields("state"); + +val dob = new FieldDefinition(); +dob.setFieldName("dob"); +dob.setDataType("\"string\""); +dob.setMatchType(MatchType.FUZZY); +dob.setFields("dob"); + +val ssn = new FieldDefinition(); +ssn.setFieldName("ssn"); +ssn.setDataType("\"string\""); +ssn.setMatchType(MatchType.FUZZY); +ssn.setFields("ssn"); +:silent + +val fieldDef = new ArrayList[FieldDefinition](); +fieldDef.add(fname); +fieldDef.add(lname); +fieldDef.add(stNo); +fieldDef.add(add1); +fieldDef.add(add2); +fieldDef.add(city); +fieldDef.add(areacode); +fieldDef.add(state); +fieldDef.add(dob); +fieldDef.add(ssn); + +args.setFieldDefinition(fieldDef); +//set the modelid and the zingg dir +args.setModelId("100"); +args.setZinggDir("models"); +args.setNumPartitions(4); +args.setLabelDataSampleSize(0.5f); + +//reading dataset into inputPipe and settint it up in 'args' +//below line should not be required if you are reading from in memory dataset +//in that case, replace df with input df +val df = spark.read.option("header", "true").csv("examples/febrl/test.csv") +import zingg.client.pipe.InMemoryPipe; +import java.util.HashMap + +val inputPipe = new InMemoryPipe(df); +inputPipe.setProps(new HashMap[String, String]()); +val pipes = Array[zingg.client.pipe.Pipe](inputPipe); +args.setData(pipes); + +//setting outputpipe in 'args' +val outputPipe = new InMemoryPipe(); +//outputPipe.setProps(new HashMap[String, String]()); +val pipes = Array[zingg.client.pipe.Pipe](outputPipe); +args.setOutput(pipes); + +val options = new ClientOptions("--phase", "match", "--conf", "dummy", "--license", "dummy", "--email", "xxx@yyy.com"); + +//Zingg execution for the given phase +val client = new Client(args, options); +client.init(); +client.execute(); +//the output is in outputPipe.getRecords +outputPipe.getRecords().show() diff --git a/assembly/dependency-reduced-pom.xml b/assembly/dependency-reduced-pom.xml index 2935bdb84..8f9f23c43 100644 --- a/assembly/dependency-reduced-pom.xml +++ b/assembly/dependency-reduced-pom.xml @@ -347,14 +347,38 @@ provided - junit - junit - 4.13.1 + org.junit.jupiter + junit-jupiter-engine + 5.8.1 test - hamcrest-core - org.hamcrest + junit-platform-engine + org.junit.platform + + + apiguardian-api + org.apiguardian + + + + + org.junit.jupiter + junit-jupiter-api + 5.8.1 + test + + + opentest4j + org.opentest4j + + + junit-platform-commons + org.junit.platform + + + apiguardian-api + org.apiguardian diff --git a/client/src/main/java/zingg/client/pipe/Format.java b/client/src/main/java/zingg/client/pipe/Format.java index e4cad8254..912693b3c 100644 --- a/client/src/main/java/zingg/client/pipe/Format.java +++ b/client/src/main/java/zingg/client/pipe/Format.java @@ -21,7 +21,8 @@ public enum Format implements Serializable{ AVRO("avro"), SNOWFLAKE("net.snowflake.spark.snowflake"), TEXT("text"), - BIGQUERY("bigquery"); + BIGQUERY("bigquery"), + INMEMORY("inMemory"); String type; static Map map; diff --git a/client/src/main/java/zingg/client/pipe/InMemoryPipe.java b/client/src/main/java/zingg/client/pipe/InMemoryPipe.java new file mode 100644 index 000000000..49b588337 --- /dev/null +++ b/client/src/main/java/zingg/client/pipe/InMemoryPipe.java @@ -0,0 +1,32 @@ +package zingg.client.pipe; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +public class InMemoryPipe extends Pipe{ + Dataset dataset; + + public InMemoryPipe(Dataset ds) { + dataset = ds; + } + + public InMemoryPipe() { + } + + public Dataset getRecords() { + return dataset; + } + + public void setRecords(Dataset ds) { + dataset = ds; + } + + public InMemoryPipe(Pipe p) { + clone(p); + } + + @Override + public Format getFormat() { + return Format.INMEMORY; + } +} \ No newline at end of file diff --git a/client/src/main/java/zingg/client/pipe/PipeFactory.java b/client/src/main/java/zingg/client/pipe/PipeFactory.java index 5fa467edc..9f5cc3433 100644 --- a/client/src/main/java/zingg/client/pipe/PipeFactory.java +++ b/client/src/main/java/zingg/client/pipe/PipeFactory.java @@ -36,6 +36,8 @@ private static final Pipe getPipe(Pipe p) { return p; case JDBC: return new JdbcPipe(p); + case INMEMORY: + return new InMemoryPipe(p); default: break; } diff --git a/core/src/main/java/zingg/util/PipeUtil.java b/core/src/main/java/zingg/util/PipeUtil.java index 13e75b3fe..88311608d 100644 --- a/core/src/main/java/zingg/util/PipeUtil.java +++ b/core/src/main/java/zingg/util/PipeUtil.java @@ -28,6 +28,7 @@ import zingg.client.pipe.ElasticPipe; import zingg.client.pipe.FilePipe; import zingg.client.pipe.Format; +import zingg.client.pipe.InMemoryPipe; import zingg.client.pipe.Pipe; import scala.Option; import scala.collection.JavaConverters; @@ -71,6 +72,10 @@ private static DataFrameReader getReader(SparkSession spark, Pipe p) { private static Dataset read(DataFrameReader reader, Pipe p, boolean addSource) { Dataset input = null; LOG.warn("Reading " + p); + + if (p.getFormat() == Format.INMEMORY) { + return ((InMemoryPipe) p).getRecords(); + } if (p.getProps().containsKey(FilePipe.LOCATION)) { input = reader.load(p.get(FilePipe.LOCATION)); } @@ -186,6 +191,11 @@ public static void write(Dataset toWriteOrig, Arguments args, JavaSparkCont LOG.warn("Writing output " + p); + if (p.getFormat() == Format.INMEMORY) { + ((InMemoryPipe) p).setRecords(toWriteOrig); + return; + } + if (p.getMode() != null) { writer.mode(p.getMode()); }