Skip to content

Commit

Permalink
Merge branch 'streamsets' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
sonalgoyal committed May 5, 2022
2 parents b8994ca + a51d650 commit 9496679
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 6 deletions.
115 changes: 115 additions & 0 deletions api/scala/FebrlExample.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import zingg.client._;
import java.util.ArrayList;

//setting silent mode for Argument creation only
:silent

//build the arguments for zingg
val args = new Arguments();
//set field definitions
val fname = new FieldDefinition();
fname.setFieldName("fname");
fname.setDataType("\"string\"");
fname.setMatchType(MatchType.FUZZY);
fname.setFields("fname");

val lname = new FieldDefinition();
lname.setFieldName("lname");
lname.setDataType("\"string\"");
lname.setMatchType(MatchType.FUZZY);
lname.setFields("lname");

val stNo = new FieldDefinition();
stNo.setFieldName("stNo");
stNo.setDataType("\"string\"");
stNo.setMatchType(MatchType.FUZZY);
stNo.setFields("stNo");

val add1 = new FieldDefinition();
add1.setFieldName("add1");
add1.setDataType("\"string\"");
add1.setMatchType(MatchType.FUZZY);
add1.setFields("add1");

val add2 = new FieldDefinition();
add2.setFieldName("add2");
add2.setDataType("\"string\"");
add2.setMatchType(MatchType.FUZZY);
add2.setFields("add2");

val city = new FieldDefinition();
city.setFieldName("city");
city.setDataType("\"string\"");
city.setMatchType(MatchType.FUZZY);
city.setFields("city");

val areacode = new FieldDefinition();
areacode.setFieldName("areacode");
areacode.setDataType("\"string\"");
areacode.setMatchType(MatchType.FUZZY);
areacode.setFields("areacode");

val state = new FieldDefinition();
state.setFieldName("state");
state.setDataType("\"string\"");
state.setMatchType(MatchType.FUZZY);
state.setFields("state");

val dob = new FieldDefinition();
dob.setFieldName("dob");
dob.setDataType("\"string\"");
dob.setMatchType(MatchType.FUZZY);
dob.setFields("dob");

val ssn = new FieldDefinition();
ssn.setFieldName("ssn");
ssn.setDataType("\"string\"");
ssn.setMatchType(MatchType.FUZZY);
ssn.setFields("ssn");
:silent

val fieldDef = new ArrayList[FieldDefinition]();
fieldDef.add(fname);
fieldDef.add(lname);
fieldDef.add(stNo);
fieldDef.add(add1);
fieldDef.add(add2);
fieldDef.add(city);
fieldDef.add(areacode);
fieldDef.add(state);
fieldDef.add(dob);
fieldDef.add(ssn);

args.setFieldDefinition(fieldDef);
//set the modelid and the zingg dir
args.setModelId("100");
args.setZinggDir("models");
args.setNumPartitions(4);
args.setLabelDataSampleSize(0.5f);

//reading dataset into inputPipe and settint it up in 'args'
//below line should not be required if you are reading from in memory dataset
//in that case, replace df with input df
val df = spark.read.option("header", "true").csv("examples/febrl/test.csv")
import zingg.client.pipe.InMemoryPipe;
import java.util.HashMap

val inputPipe = new InMemoryPipe(df);
inputPipe.setProps(new HashMap[String, String]());
val pipes = Array[zingg.client.pipe.Pipe](inputPipe);
args.setData(pipes);

//setting outputpipe in 'args'
val outputPipe = new InMemoryPipe();
//outputPipe.setProps(new HashMap[String, String]());
val pipes = Array[zingg.client.pipe.Pipe](outputPipe);
args.setOutput(pipes);

val options = new ClientOptions("--phase", "match", "--conf", "dummy", "--license", "dummy", "--email", "xxx@yyy.com");

//Zingg execution for the given phase
val client = new Client(args, options);
client.init();
client.execute();
//the output is in outputPipe.getRecords
outputPipe.getRecords().show()
34 changes: 29 additions & 5 deletions assembly/dependency-reduced-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -347,14 +347,38 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.8.1</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>hamcrest-core</artifactId>
<groupId>org.hamcrest</groupId>
<artifactId>junit-platform-engine</artifactId>
<groupId>org.junit.platform</groupId>
</exclusion>
<exclusion>
<artifactId>apiguardian-api</artifactId>
<groupId>org.apiguardian</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.8.1</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>opentest4j</artifactId>
<groupId>org.opentest4j</groupId>
</exclusion>
<exclusion>
<artifactId>junit-platform-commons</artifactId>
<groupId>org.junit.platform</groupId>
</exclusion>
<exclusion>
<artifactId>apiguardian-api</artifactId>
<groupId>org.apiguardian</groupId>
</exclusion>
</exclusions>
</dependency>
Expand Down
3 changes: 2 additions & 1 deletion client/src/main/java/zingg/client/pipe/Format.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ public enum Format implements Serializable{
AVRO("avro"),
SNOWFLAKE("net.snowflake.spark.snowflake"),
TEXT("text"),
BIGQUERY("bigquery");
BIGQUERY("bigquery"),
INMEMORY("inMemory");

String type;
static Map<String, Format> map;
Expand Down
32 changes: 32 additions & 0 deletions client/src/main/java/zingg/client/pipe/InMemoryPipe.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package zingg.client.pipe;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public class InMemoryPipe extends Pipe{
Dataset <Row> dataset;

public InMemoryPipe(Dataset<Row> ds) {
dataset = ds;
}

public InMemoryPipe() {
}

public Dataset <Row> getRecords() {
return dataset;
}

public void setRecords(Dataset <Row> ds) {
dataset = ds;
}

public InMemoryPipe(Pipe p) {
clone(p);
}

@Override
public Format getFormat() {
return Format.INMEMORY;
}
}
2 changes: 2 additions & 0 deletions client/src/main/java/zingg/client/pipe/PipeFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ private static final Pipe getPipe(Pipe p) {
return p;
case JDBC:
return new JdbcPipe(p);
case INMEMORY:
return new InMemoryPipe(p);
default:
break;
}
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/zingg/util/PipeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import zingg.client.pipe.ElasticPipe;
import zingg.client.pipe.FilePipe;
import zingg.client.pipe.Format;
import zingg.client.pipe.InMemoryPipe;
import zingg.client.pipe.Pipe;
import scala.Option;
import scala.collection.JavaConverters;
Expand Down Expand Up @@ -71,6 +72,10 @@ private static DataFrameReader getReader(SparkSession spark, Pipe p) {
private static Dataset<Row> read(DataFrameReader reader, Pipe p, boolean addSource) {
Dataset<Row> input = null;
LOG.warn("Reading " + p);

if (p.getFormat() == Format.INMEMORY) {
return ((InMemoryPipe) p).getRecords();
}
if (p.getProps().containsKey(FilePipe.LOCATION)) {
input = reader.load(p.get(FilePipe.LOCATION));
}
Expand Down Expand Up @@ -186,6 +191,11 @@ public static void write(Dataset<Row> toWriteOrig, Arguments args, JavaSparkCont

LOG.warn("Writing output " + p);

if (p.getFormat() == Format.INMEMORY) {
((InMemoryPipe) p).setRecords(toWriteOrig);
return;
}

if (p.getMode() != null) {
writer.mode(p.getMode());
}
Expand Down

0 comments on commit 9496679

Please sign in to comment.