Skip to content

Commit

Permalink
Merge branch 'zinggAI:main' into zDocumenter
Browse files Browse the repository at this point in the history
  • Loading branch information
navinrathore committed May 6, 2022
2 parents 108d710 + 9496679 commit d136d94
Show file tree
Hide file tree
Showing 13 changed files with 491 additions and 15 deletions.
115 changes: 115 additions & 0 deletions api/scala/FebrlExample.scala
@@ -0,0 +1,115 @@
import zingg.client._;
import java.util.ArrayList;

//setting silent mode for Argument creation only
:silent

//build the arguments for zingg
val args = new Arguments();
//set field definitions
val fname = new FieldDefinition();
fname.setFieldName("fname");
fname.setDataType("\"string\"");
fname.setMatchType(MatchType.FUZZY);
fname.setFields("fname");

val lname = new FieldDefinition();
lname.setFieldName("lname");
lname.setDataType("\"string\"");
lname.setMatchType(MatchType.FUZZY);
lname.setFields("lname");

val stNo = new FieldDefinition();
stNo.setFieldName("stNo");
stNo.setDataType("\"string\"");
stNo.setMatchType(MatchType.FUZZY);
stNo.setFields("stNo");

val add1 = new FieldDefinition();
add1.setFieldName("add1");
add1.setDataType("\"string\"");
add1.setMatchType(MatchType.FUZZY);
add1.setFields("add1");

val add2 = new FieldDefinition();
add2.setFieldName("add2");
add2.setDataType("\"string\"");
add2.setMatchType(MatchType.FUZZY);
add2.setFields("add2");

val city = new FieldDefinition();
city.setFieldName("city");
city.setDataType("\"string\"");
city.setMatchType(MatchType.FUZZY);
city.setFields("city");

val areacode = new FieldDefinition();
areacode.setFieldName("areacode");
areacode.setDataType("\"string\"");
areacode.setMatchType(MatchType.FUZZY);
areacode.setFields("areacode");

val state = new FieldDefinition();
state.setFieldName("state");
state.setDataType("\"string\"");
state.setMatchType(MatchType.FUZZY);
state.setFields("state");

val dob = new FieldDefinition();
dob.setFieldName("dob");
dob.setDataType("\"string\"");
dob.setMatchType(MatchType.FUZZY);
dob.setFields("dob");

val ssn = new FieldDefinition();
ssn.setFieldName("ssn");
ssn.setDataType("\"string\"");
ssn.setMatchType(MatchType.FUZZY);
ssn.setFields("ssn");
:silent

val fieldDef = new ArrayList[FieldDefinition]();
fieldDef.add(fname);
fieldDef.add(lname);
fieldDef.add(stNo);
fieldDef.add(add1);
fieldDef.add(add2);
fieldDef.add(city);
fieldDef.add(areacode);
fieldDef.add(state);
fieldDef.add(dob);
fieldDef.add(ssn);

args.setFieldDefinition(fieldDef);
//set the modelid and the zingg dir
args.setModelId("100");
args.setZinggDir("models");
args.setNumPartitions(4);
args.setLabelDataSampleSize(0.5f);

//reading dataset into inputPipe and settint it up in 'args'
//below line should not be required if you are reading from in memory dataset
//in that case, replace df with input df
val df = spark.read.option("header", "true").csv("examples/febrl/test.csv")
import zingg.client.pipe.InMemoryPipe;
import java.util.HashMap

val inputPipe = new InMemoryPipe(df);
inputPipe.setProps(new HashMap[String, String]());
val pipes = Array[zingg.client.pipe.Pipe](inputPipe);
args.setData(pipes);

//setting outputpipe in 'args'
val outputPipe = new InMemoryPipe();
//outputPipe.setProps(new HashMap[String, String]());
val pipes = Array[zingg.client.pipe.Pipe](outputPipe);
args.setOutput(pipes);

val options = new ClientOptions("--phase", "match", "--conf", "dummy", "--license", "dummy", "--email", "xxx@yyy.com");

//Zingg execution for the given phase
val client = new Client(args, options);
client.init();
client.execute();
//the output is in outputPipe.getRecords
outputPipe.getRecords().show()
34 changes: 29 additions & 5 deletions assembly/dependency-reduced-pom.xml
Expand Up @@ -347,14 +347,38 @@
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>5.8.1</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>hamcrest-core</artifactId>
<groupId>org.hamcrest</groupId>
<artifactId>junit-platform-engine</artifactId>
<groupId>org.junit.platform</groupId>
</exclusion>
<exclusion>
<artifactId>apiguardian-api</artifactId>
<groupId>org.apiguardian</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.8.1</version>
<scope>test</scope>
<exclusions>
<exclusion>
<artifactId>opentest4j</artifactId>
<groupId>org.opentest4j</groupId>
</exclusion>
<exclusion>
<artifactId>junit-platform-commons</artifactId>
<groupId>org.junit.platform</groupId>
</exclusion>
<exclusion>
<artifactId>apiguardian-api</artifactId>
<groupId>org.apiguardian</groupId>
</exclusion>
</exclusions>
</dependency>
Expand Down
4 changes: 3 additions & 1 deletion client/src/main/java/zingg/client/pipe/Format.java
Expand Up @@ -20,7 +20,9 @@ public enum Format implements Serializable{
PARQUET("PARQUET"),
AVRO("avro"),
SNOWFLAKE("net.snowflake.spark.snowflake"),
TEXT("text");
TEXT("text"),
BIGQUERY("bigquery"),
INMEMORY("inMemory");

String type;
static Map<String, Format> map;
Expand Down
32 changes: 32 additions & 0 deletions client/src/main/java/zingg/client/pipe/InMemoryPipe.java
@@ -0,0 +1,32 @@
package zingg.client.pipe;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public class InMemoryPipe extends Pipe{
Dataset <Row> dataset;

public InMemoryPipe(Dataset<Row> ds) {
dataset = ds;
}

public InMemoryPipe() {
}

public Dataset <Row> getRecords() {
return dataset;
}

public void setRecords(Dataset <Row> ds) {
dataset = ds;
}

public InMemoryPipe(Pipe p) {
clone(p);
}

@Override
public Format getFormat() {
return Format.INMEMORY;
}
}
2 changes: 2 additions & 0 deletions client/src/main/java/zingg/client/pipe/PipeFactory.java
Expand Up @@ -36,6 +36,8 @@ private static final Pipe getPipe(Pipe p) {
return p;
case JDBC:
return new JdbcPipe(p);
case INMEMORY:
return new InMemoryPipe(p);
default:
break;
}
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/zingg/util/PipeUtil.java
Expand Up @@ -28,6 +28,7 @@
import zingg.client.pipe.ElasticPipe;
import zingg.client.pipe.FilePipe;
import zingg.client.pipe.Format;
import zingg.client.pipe.InMemoryPipe;
import zingg.client.pipe.Pipe;
import scala.Option;
import scala.collection.JavaConverters;
Expand Down Expand Up @@ -71,6 +72,10 @@ private static DataFrameReader getReader(SparkSession spark, Pipe p) {
private static Dataset<Row> read(DataFrameReader reader, Pipe p, boolean addSource) {
Dataset<Row> input = null;
LOG.warn("Reading " + p);

if (p.getFormat() == Format.INMEMORY) {
return ((InMemoryPipe) p).getRecords();
}
if (p.getProps().containsKey(FilePipe.LOCATION)) {
input = reader.load(p.get(FilePipe.LOCATION));
}
Expand Down Expand Up @@ -186,6 +191,11 @@ public static void write(Dataset<Row> toWriteOrig, Arguments args, JavaSparkCont

LOG.warn("Writing output " + p);

if (p.getFormat() == Format.INMEMORY) {
((InMemoryPipe) p).setRecords(toWriteOrig);
return;
}

if (p.getMode() != null) {
writer.mode(p.getMode());
}
Expand Down
134 changes: 134 additions & 0 deletions core/src/test/java/zingg/util/TestDSUtil.java
@@ -0,0 +1,134 @@
package zingg.util;

import static org.apache.spark.sql.functions.col;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import zingg.BaseSparkTest;
import zingg.client.Arguments;
import zingg.client.FieldDefinition;
import zingg.client.MatchType;
import zingg.client.util.ColName;

public class TestDSUtil extends BaseSparkTest{
public static final Log LOG = LogFactory.getLog(TestDSUtil.class);

@Test
public void testGetFieldDefColumnsWhenShowConciseIsTrue() {
FieldDefinition def1 = new FieldDefinition();
def1.setFieldName("field_fuzzy");
def1.setDataType("\"string\"");
def1.setMatchType(MatchType.FUZZY);
def1.setFields("field_fuzzy");

FieldDefinition def2 = new FieldDefinition();
def2.setFieldName("field_match_type_DONT_USE");
def2.setDataType("\"string\"");
def2.setMatchType(MatchType.DONT_USE);
def2.setFields("field_match_type_DONT_USE");

FieldDefinition def3 = new FieldDefinition();
def3.setFieldName("field_str_DONTspaceUSE");
def3.setDataType("\"string\"");
def3.setMatchType(MatchType.getMatchType("DONT USE"));
def3.setFields("field_str_DONTspaceUSE");

List<FieldDefinition> fieldDef = new ArrayList<FieldDefinition>();
fieldDef.add(def1);
fieldDef.add(def2);
fieldDef.add(def3);
Arguments args = null;
try {
args = new Arguments();
args.setFieldDefinition(fieldDef);
} catch (Throwable e) {
e.printStackTrace();
}
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField(def1.getFieldName(), def1.getDataType(), false),
DataTypes.createStructField(def2.getFieldName(), def2.getDataType(), false),
DataTypes.createStructField(def3.getFieldName(), def3.getDataType(), false),
DataTypes.createStructField(ColName.SOURCE_COL, DataTypes.StringType, false)
});
List<Row> list = Arrays.asList(RowFactory.create("1", "first", "one", "Junit"), RowFactory.create("2", "second", "two", "Junit"),
RowFactory.create("3", "third", "three", "Junit"), RowFactory.create("4", "forth", "Four", "Junit"));
Dataset<Row> ds = spark.createDataFrame(list, schema);

List<String> expectedColumns = new ArrayList<String>();
expectedColumns.add("field_fuzzy");
expectedColumns.add(ColName.SOURCE_COL);
List<Column> colList = DSUtil.getFieldDefColumns (ds, args, false, true);
assertTrue(expectedColumns.size() == colList.size());
for (int i = 0; i < expectedColumns.size(); i++) {
assertTrue(expectedColumns.get(i).equals(colList.get(i).toString()));
};
}

@Test
public void testGetFieldDefColumnsWhenShowConciseIsFalse() {
FieldDefinition def1 = new FieldDefinition();
def1.setFieldName("field_fuzzy");
def1.setDataType("\"string\"");
def1.setMatchType(MatchType.FUZZY);
def1.setFields("field_fuzzy");

FieldDefinition def2 = new FieldDefinition();
def2.setFieldName("field_match_type_DONT_USE");
def2.setDataType("\"string\"");
def2.setMatchType(MatchType.DONT_USE);
def2.setFields("field_match_type_DONT_USE");

FieldDefinition def3 = new FieldDefinition();
def3.setFieldName("field_str_DONTspaceUSE");
def3.setDataType("\"string\"");
def3.setMatchType(MatchType.getMatchType("DONT USE"));
def3.setFields("field_str_DONTspaceUSE");

List<FieldDefinition> fieldDef = new ArrayList<FieldDefinition>();
fieldDef.add(def1);
fieldDef.add(def2);
fieldDef.add(def3);
Arguments args = null;
try {
args = new Arguments();
args.setFieldDefinition(fieldDef);
} catch (Throwable e) {
e.printStackTrace();
}
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField(def1.getFieldName(), def1.getDataType(), false),
DataTypes.createStructField(def2.getFieldName(), def2.getDataType(), false),
DataTypes.createStructField(def3.getFieldName(), def3.getDataType(), false),
DataTypes.createStructField(ColName.SOURCE_COL, DataTypes.StringType, false)
});
List<Row> list = Arrays.asList(RowFactory.create("1", "first", "one", "Junit"), RowFactory.create("2", "second", "two", "Junit"),
RowFactory.create("3", "third", "three", "Junit"), RowFactory.create("4", "forth", "Four", "Junit"));
Dataset<Row> ds = spark.createDataFrame(list, schema);

List<Column> colListTest2 = DSUtil.getFieldDefColumns (ds, args, false, false);
List<String> expectedColumnsTest2 = new ArrayList<String>();
expectedColumnsTest2.add("field_fuzzy");
expectedColumnsTest2.add("field_match_type_DONT_USE");
expectedColumnsTest2.add("field_str_DONTspaceUSE");
expectedColumnsTest2.add(ColName.SOURCE_COL);

assertTrue(expectedColumnsTest2.size() == colListTest2.size());
for (int i = 0; i < expectedColumnsTest2.size(); i++) {
assertTrue(expectedColumnsTest2.get(i).contains(colListTest2.get(i).toString()));
};
}
}
1 change: 1 addition & 0 deletions docs/SUMMARY.md
Expand Up @@ -23,6 +23,7 @@
* [MongoDB](dataSourcesAndSinks/mongodb.md)
* [Neo4j](dataSourcesAndSinks/neo4j.md)
* [Parquet](dataSourcesAndSinks/parquet.md)
* [BigQuery](dataSourcesAndSinks/bigquery.md)
* [Running Zingg on Cloud](running/running.md)
* [Running on AWS](running/aws.md)
* [Running on Azure](running/azure.md)
Expand Down

0 comments on commit d136d94

Please sign in to comment.