From 6be86c00aee41db2d00a2f9e7ed65b028c16986d Mon Sep 17 00:00:00 2001 From: Navin Singh Date: Fri, 15 Apr 2022 16:26:02 +0530 Subject: [PATCH 1/3] New InMemoryPipe has been added --- .../main/java/zingg/client/pipe/Format.java | 3 +- .../java/zingg/client/pipe/InMemoryPipe.java | 32 +++++++++++++++++++ .../java/zingg/client/pipe/PipeFactory.java | 2 ++ core/src/main/java/zingg/util/PipeUtil.java | 10 ++++++ 4 files changed, 46 insertions(+), 1 deletion(-) create mode 100644 client/src/main/java/zingg/client/pipe/InMemoryPipe.java diff --git a/client/src/main/java/zingg/client/pipe/Format.java b/client/src/main/java/zingg/client/pipe/Format.java index 661e2eebe..b7be5dfdc 100644 --- a/client/src/main/java/zingg/client/pipe/Format.java +++ b/client/src/main/java/zingg/client/pipe/Format.java @@ -20,7 +20,8 @@ public enum Format implements Serializable{ PARQUET("PARQUET"), AVRO("avro"), SNOWFLAKE("net.snowflake.spark.snowflake"), - TEXT("text"); + TEXT("text"), + INMEMORY("inMemory"); String type; static Map map; diff --git a/client/src/main/java/zingg/client/pipe/InMemoryPipe.java b/client/src/main/java/zingg/client/pipe/InMemoryPipe.java new file mode 100644 index 000000000..49b588337 --- /dev/null +++ b/client/src/main/java/zingg/client/pipe/InMemoryPipe.java @@ -0,0 +1,32 @@ +package zingg.client.pipe; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; + +public class InMemoryPipe extends Pipe{ + Dataset dataset; + + public InMemoryPipe(Dataset ds) { + dataset = ds; + } + + public InMemoryPipe() { + } + + public Dataset getRecords() { + return dataset; + } + + public void setRecords(Dataset ds) { + dataset = ds; + } + + public InMemoryPipe(Pipe p) { + clone(p); + } + + @Override + public Format getFormat() { + return Format.INMEMORY; + } +} \ No newline at end of file diff --git a/client/src/main/java/zingg/client/pipe/PipeFactory.java b/client/src/main/java/zingg/client/pipe/PipeFactory.java index 5fa467edc..9f5cc3433 100644 --- a/client/src/main/java/zingg/client/pipe/PipeFactory.java +++ b/client/src/main/java/zingg/client/pipe/PipeFactory.java @@ -36,6 +36,8 @@ private static final Pipe getPipe(Pipe p) { return p; case JDBC: return new JdbcPipe(p); + case INMEMORY: + return new InMemoryPipe(p); default: break; } diff --git a/core/src/main/java/zingg/util/PipeUtil.java b/core/src/main/java/zingg/util/PipeUtil.java index 5cd08efee..e9520742b 100644 --- a/core/src/main/java/zingg/util/PipeUtil.java +++ b/core/src/main/java/zingg/util/PipeUtil.java @@ -28,6 +28,7 @@ import zingg.client.pipe.ElasticPipe; import zingg.client.pipe.FilePipe; import zingg.client.pipe.Format; +import zingg.client.pipe.InMemoryPipe; import zingg.client.pipe.Pipe; import scala.Option; import scala.collection.JavaConverters; @@ -71,6 +72,10 @@ private static DataFrameReader getReader(SparkSession spark, Pipe p) { private static Dataset read(DataFrameReader reader, Pipe p, boolean addSource) { Dataset input = null; LOG.warn("Reading " + p); + + if (p.getFormat() == Format.INMEMORY) { + return ((InMemoryPipe) p).getRecords(); + } if (p.getProps().containsKey(FilePipe.LOCATION)) { input = reader.load(p.get(FilePipe.LOCATION)); } @@ -186,6 +191,11 @@ public static void write(Dataset toWriteOrig, Arguments args, JavaSparkCont LOG.warn("Writing output " + p); + if (p.getFormat() == Format.INMEMORY) { + ((InMemoryPipe) p).setRecords(toWriteOrig); + return; + } + if (p.getMode() != null) { writer.mode(p.getMode()); } From 2e00336921d0331e6e978c8373f7ac7cce88614e Mon Sep 17 00:00:00 2001 From: Navin Singh Date: Mon, 18 Apr 2022 12:51:18 +0530 Subject: [PATCH 2/3] added FebrlExample.scala to demonstrate how to use zingg from scala REPL --- api/scala/FebrlExample.scala | 110 +++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 api/scala/FebrlExample.scala diff --git a/api/scala/FebrlExample.scala b/api/scala/FebrlExample.scala new file mode 100644 index 000000000..9befce29c --- /dev/null +++ b/api/scala/FebrlExample.scala @@ -0,0 +1,110 @@ +import zingg.client._; +import java.util.ArrayList; + +//setting silent mode for Argument creation only +:silent +val args = new Arguments(); + +val fname = new FieldDefinition(); +fname.setFieldName("fname"); +fname.setDataType("\"string\""); +fname.setMatchType(MatchType.FUZZY); +fname.setFields("fname"); + +val lname = new FieldDefinition(); +lname.setFieldName("lname"); +lname.setDataType("\"string\""); +lname.setMatchType(MatchType.FUZZY); +lname.setFields("lname"); + +val stNo = new FieldDefinition(); +stNo.setFieldName("stNo"); +stNo.setDataType("\"string\""); +stNo.setMatchType(MatchType.FUZZY); +stNo.setFields("stNo"); + +val add1 = new FieldDefinition(); +add1.setFieldName("add1"); +add1.setDataType("\"string\""); +add1.setMatchType(MatchType.FUZZY); +add1.setFields("add1"); + +val add2 = new FieldDefinition(); +add2.setFieldName("add2"); +add2.setDataType("\"string\""); +add2.setMatchType(MatchType.FUZZY); +add2.setFields("add2"); + +val city = new FieldDefinition(); +city.setFieldName("city"); +city.setDataType("\"string\""); +city.setMatchType(MatchType.FUZZY); +city.setFields("city"); + +val areacode = new FieldDefinition(); +areacode.setFieldName("areacode"); +areacode.setDataType("\"string\""); +areacode.setMatchType(MatchType.FUZZY); +areacode.setFields("areacode"); + +val state = new FieldDefinition(); +state.setFieldName("state"); +state.setDataType("\"string\""); +state.setMatchType(MatchType.FUZZY); +state.setFields("state"); + +val dob = new FieldDefinition(); +dob.setFieldName("dob"); +dob.setDataType("\"string\""); +dob.setMatchType(MatchType.FUZZY); +dob.setFields("dob"); + +val ssn = new FieldDefinition(); +ssn.setFieldName("ssn"); +ssn.setDataType("\"string\""); +ssn.setMatchType(MatchType.FUZZY); +ssn.setFields("ssn"); +:silent + +val fieldDef = new ArrayList[FieldDefinition](); +fieldDef.add(fname); +fieldDef.add(lname); +fieldDef.add(stNo); +fieldDef.add(add1); +fieldDef.add(add2); +fieldDef.add(city); +fieldDef.add(areacode); +fieldDef.add(state); +fieldDef.add(dob); +fieldDef.add(ssn); + +args.setFieldDefinition(fieldDef); +args.setModelId("100"); +args.setZinggDir("models"); +args.setNumPartitions(4); +args.setLabelDataSampleSize(0.5f); + +//reading dataset into inputPipe and settint it up in 'args' +//below line should not be required +val df = spark.read.option("header", "true").csv("examples/febrl/test.csv") +import zingg.client.pipe.InMemoryPipe; +import java.util.HashMap + +val inputPipe = new InMemoryPipe(df); +inputPipe.setProps(new HashMap[String, String]()); +val pipes = Array[zingg.client.pipe.Pipe](inputPipe); +args.setData(pipes); + +//setting outputpipe in 'args' +val outputPipe = new InMemoryPipe(); +//outputPipe.setProps(new HashMap[String, String]()); +val pipes = Array[zingg.client.pipe.Pipe](outputPipe); +args.setOutput(pipes); + +val options = new ClientOptions("--phase", "match", "--conf", "dummy", "--license", "dummy", "--email", "xxx@yyy.com"); + +//Zingg execution for the given phase +val client = new Client(args, options); +client.init(); +client.execute(); +outputPipe.getRecords().show() \ No newline at end of file From a51d650cbb6006c9e04a522de24fb8eea3ecd078 Mon Sep 17 00:00:00 2001 From: Sonal Date: Mon, 18 Apr 2022 14:14:37 +0530 Subject: [PATCH 3/3] Update FebrlExample.scala --- api/scala/FebrlExample.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/api/scala/FebrlExample.scala b/api/scala/FebrlExample.scala index 9befce29c..31283aab7 100644 --- a/api/scala/FebrlExample.scala +++ b/api/scala/FebrlExample.scala @@ -3,8 +3,10 @@ import java.util.ArrayList; //setting silent mode for Argument creation only :silent -val args = new Arguments(); +//build the arguments for zingg +val args = new Arguments(); +//set field definitions val fname = new FieldDefinition(); fname.setFieldName("fname"); fname.setDataType("\"string\""); @@ -79,13 +81,15 @@ fieldDef.add(dob); fieldDef.add(ssn); args.setFieldDefinition(fieldDef); +//set the modelid and the zingg dir args.setModelId("100"); args.setZinggDir("models"); args.setNumPartitions(4); args.setLabelDataSampleSize(0.5f); //reading dataset into inputPipe and settint it up in 'args' -//below line should not be required +//below line should not be required if you are reading from in memory dataset +//in that case, replace df with input df val df = spark.read.option("header", "true").csv("examples/febrl/test.csv") import zingg.client.pipe.InMemoryPipe; import java.util.HashMap @@ -107,4 +111,5 @@ val options = new ClientOptions("--phase", "match", "--conf", "dummy", "--licen val client = new Client(args, options); client.init(); client.execute(); -outputPipe.getRecords().show() \ No newline at end of file +//the output is in outputPipe.getRecords +outputPipe.getRecords().show()