Skip to content

Commit

Permalink
Merge pull request #624 from zinggAI/obviousDupes
Browse files Browse the repository at this point in the history
Obvious dupes condition in config
  • Loading branch information
sonalgoyal committed Jul 27, 2023
2 parents b6a2ff1 + dce9feb commit a8dc931
Show file tree
Hide file tree
Showing 10 changed files with 425 additions and 17 deletions.
10 changes: 5 additions & 5 deletions common/client/src/main/java/zingg/common/client/Arguments.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public class Arguments implements Serializable {
float stopWordsCutoff = 0.1f;
long blockSize = 100L;
String column;
String obviousDupeString;
String obviousDupeCondition;
private static final String ENV_VAR_MARKER_START = "$";
private static final String ENV_VAR_MARKER_END = "$";
private static final String ESC = "\\";
Expand Down Expand Up @@ -677,12 +677,12 @@ public void setColumn(String column) {
this.column = column;
}

public String getObviousDupeString() {
return obviousDupeString;
public String getObviousDupeCondition() {
return obviousDupeCondition;
}

public void setObviousDupeString(String obviousDupeString) {
this.obviousDupeString = obviousDupeString;
public void setObviousDupeCondition(String obviousDupeCondition) {
this.obviousDupeCondition = obviousDupeCondition;
}

public long getBlockSize() {
Expand Down
18 changes: 17 additions & 1 deletion common/client/src/main/java/zingg/common/client/ZFrame.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,20 @@

import java.util.List;


//Dataset, Row, column
public interface ZFrame<D, R, C> {

public static final String RIGHT_JOIN = "right";
public static final String LEFT_JOIN = "left";

public static final String COL_COUNT = "count";
public static final String COL_VALUE = "VALUE";

public static final String orSeperator = "\\|";
public static final String andSeperator = "\\&";


public ZFrame<D, R, C> cache();
public ZFrame<D, R, C> as(String s);
public String[] columns();
Expand All @@ -28,6 +36,7 @@ public interface ZFrame<D, R, C> {
/**doesnt dupe the join col */
public ZFrame<D, R, C> joinOnCol(ZFrame<D, R, C> lines1, String joinColumn);

public ZFrame<D, R, C> joinOnCol(ZFrame<D, R, C> lines1, C joinColumn);

public ZFrame<D, R, C> join(ZFrame<D, R, C> lines1, String joinColumn1, String joinColumn2);

Expand Down Expand Up @@ -145,6 +154,13 @@ public interface ZFrame<D, R, C> {
public ZFrame<D, R, C> filterNotNullCond(String colName);

public ZFrame<D, R, C> filterNullCond(String colName);

public C getObviousDupesFilter(String obviousDupeString);


public C getObviousDupesFilter(ZFrame<D, R, C> dfToJoin, String obviousDupeString);

public C getReverseObviousDupesFilter(String obviousDupeString);

public C getReverseObviousDupesFilter(ZFrame<D, R, C> dfToJoin, String obviousDupeString);

}
57 changes: 53 additions & 4 deletions common/core/src/main/java/zingg/common/core/executor/Matcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public void execute() throws ZinggClientException {
blocked.show();
}
//LOG.warn("Num distinct hashes " + blocked.agg(functions.approx_count_distinct(ColName.HASH_COL)).count());

ZFrame<D,R,C>blocks = getBlocks(selectColsFromBlocked(blocked), testData);
//blocks.explain();
//LOG.info("Blocks " + blocks.count());
Expand All @@ -120,6 +121,14 @@ public void execute() throws ZinggClientException {
//check if all fields equal
//ZFrame<D,R,C>allEqual = getDSUtil().allFieldsEqual(blocks, args);
//allEqual = allEqual.cache();

//get obvious dupes
ZFrame<D, R, C> obvDupePairs = getObvDupePairs(blocked);
if (obvDupePairs != null) {
LOG.info("obvDupePairs count " + obvDupePairs.count());
blocks = removeObvDupesFromBlocks(blocks);
}

//send remaining to model
Model model = getModel();
//blocks.cache().withColumn("partition_id", functions.spark_partition_id())
Expand All @@ -132,6 +141,7 @@ public void execute() throws ZinggClientException {
*/
ZFrame<D,R,C>dupes = model.predict(blocks); //.exceptAll(allEqual));
//allEqual = massageAllEquals(allEqual);
dupes = addObvDupes(obvDupePairs, dupes);
if (LOG.isDebugEnabled()) {
LOG.debug("Found dupes " + dupes.count());
}
Expand All @@ -152,6 +162,40 @@ public void execute() throws ZinggClientException {
}
}

protected ZFrame<D, R, C> addObvDupes(ZFrame<D, R, C> obvDupePairs, ZFrame<D, R, C> dupes) {
if (obvDupePairs != null) {
// unionByName as positions may differ
dupes = dupes.unionByName(obvDupePairs, false);
}
return dupes;
}

protected ZFrame<D, R, C> removeObvDupesFromBlocks(ZFrame<D, R, C> blocks) {
LOG.info("blocks count before removing obvDupePairs " + blocks.count());
C reverseOBVDupeDFFilter = blocks.getReverseObviousDupesFilter(args.getObviousDupeCondition());
if (reverseOBVDupeDFFilter != null) {
// remove dupes as already considered in obvDupePairs
blocks = blocks.filter(reverseOBVDupeDFFilter);
}
LOG.info("blocks count after removing obvDupePairs " + blocks.count());
return blocks;
}

protected ZFrame<D,R,C> getObvDupePairs(ZFrame<D,R,C> blocked) {

ZFrame<D,R,C> prefixedColsDF = getDSUtil().getPrefixedColumnsDS(blocked);
C obvDupeDFFilter = blocked.getObviousDupesFilter(prefixedColsDF,args.getObviousDupeCondition());
if (obvDupeDFFilter == null) {
return null;
}

ZFrame<D, R, C> obvDupePairs = blocked.joinOnCol(prefixedColsDF, obvDupeDFFilter);
obvDupePairs = obvDupePairs.filter(obvDupePairs.gt(ColName.ID_COL));
obvDupePairs = massageAllEquals(obvDupePairs);

return obvDupePairs;
}

public void writeOutput( ZFrame<D,R,C> blocked, ZFrame<D,R,C> dupesActual) throws ZinggClientException {
try{
//input dupes are pairs
Expand All @@ -168,17 +212,22 @@ public void writeOutput( ZFrame<D,R,C> blocked, ZFrame<D,R,C> dupesActual) th

dupesActual = dupesActual.cache();
System.out.println("dupes ------------");
dupesActual.show();
if (LOG.isDebugEnabled()) {
dupesActual.show();
}
ZFrame<D,R,C>graph = getGraphUtil().buildGraph(blocked, dupesActual).cache();
//graph.toJavaRDD().saveAsTextFile("/tmp/zgraph");
System.out.println("graph ------------");
graph.show();
if (LOG.isDebugEnabled()) {
graph.show();
}
//write score
ZFrame<D,R,C>score = getMinMaxScores(dupesActual, graph).cache();
//score.toJavaRDD().coalesce(1).saveAsTextFile("/tmp/zallscoresAvg");
graph = graph.repartition(args.getNumPartitions(), graph.col(ColName.ID_COL)).cache();

score.show();
if (LOG.isDebugEnabled()) {
score.show();
}
ZFrame<D,R,C>graphWithScores = getDSUtil().joinZColFirst(
score, graph, ColName.ID_COL, false).cache();
//graphWithScores.toJavaRDD().saveAsTextFile("/tmp/zgraphWScores");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,15 @@ public void execute() throws ZinggClientException {

blocked = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL)).cache();
System.out.println("blocked");
blocked.show(true);
if (LOG.isDebugEnabled()) {
blocked.show(true);
}
ZFrame<D,R,C> blocks = getDSUtil().joinWithItself(blocked, ColName.HASH_COL, true);
blocks = blocks.cache();
System.out.println("blocks");
blocks.show();
if (LOG.isDebugEnabled()) {
blocks.show();
}
//TODO HASH Partition
if (negPairs!= null) negPairs = negPairs.cache();
//train classifier and predict the blocked values from classifier
Expand Down Expand Up @@ -129,7 +133,9 @@ public void execute() throws ZinggClientException {
}

public void writeUncertain(ZFrame<D,R,C> dupesActual, ZFrame<D,R,C> sampleOrginal) throws ZinggClientException {
dupesActual.show(40);
if (LOG.isDebugEnabled()) {
dupesActual.show(40);
}
//input dupes are pairs
dupesActual = getDSUtil().addClusterRowNumber(dupesActual);
dupesActual = getDSUtil().addUniqueCol(dupesActual, ColName.CLUSTER_COLUMN );
Expand Down
6 changes: 4 additions & 2 deletions common/core/src/main/java/zingg/common/core/util/DSUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@ public ZFrame<D, R, C> join(ZFrame<D, R, C> lines, ZFrame<D, R, C> lines1, Strin
}
if (filter) {
pairs = pairs.filter(pairs.gt(ColName.ID_COL));
}
pairs.show(true);
}
if (LOG.isDebugEnabled()) {
pairs.show(true);
}
return pairs;
}

Expand Down
29 changes: 29 additions & 0 deletions docs/stepbystep/configuration/obviousDupes.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
---
description: >-
Defining which field(s) if a match classifies two records as an exact match
---

# Obvious Duplicates

* Certain fields or combination of fields may mean an exact match for two records

* By configuring the obvious dupes condition the user ensures that such records always result in a match and together in a cluster

* This also gives better performance and score

# Configuration

* In the config.json file put the json element like this:

"obviousDupeCondition" : "field1 & field2 | field3 | field4 & field5 & field6"

| => OR condition
& => AND condition

* The two records in above example will be considered an exact match if:

value of both field1 & field2 is exactly same in both records and both are not null
OR
value of field3 is not null and is exactly same in both records (e.g. something like SSN can't be same for two people)
OR
value of all 3 fields field4 & field5 & field6 is exactly same in both records and none of them is null
96 changes: 96 additions & 0 deletions examples/febrl/configObvDupe.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
{
"fieldDefinition":[
{
"fieldName" : "recId",
"matchType" : "dont_use",
"fields" : "recId",
"dataType": "string"
},
{
"fieldName" : "fname",
"matchType" : "fuzzy",
"fields" : "fname",
"dataType": "string"
},
{
"fieldName" : "lname",
"matchType" : "fuzzy",
"fields" : "lname",
"dataType": "string"
},
{
"fieldName" : "stNo",
"matchType": "fuzzy",
"fields" : "stNo",
"dataType": "string"
},
{
"fieldName" : "add1",
"matchType": "fuzzy",
"fields" : "add1",
"dataType": "string"
},
{
"fieldName" : "add2",
"matchType": "fuzzy",
"fields" : "add2",
"dataType": "string"
},
{
"fieldName" : "city",
"matchType": "fuzzy",
"fields" : "city",
"dataType": "string"
},
{
"fieldName" : "areacode",
"matchType": "fuzzy",
"fields" : "areacode",
"dataType": "string"
},
{
"fieldName" : "state",
"matchType": "fuzzy",
"fields" : "state",
"dataType": "string"
},
{
"fieldName" : "dob",
"matchType": "fuzzy",
"fields" : "dob",
"dataType": "string"
},
{
"fieldName" : "ssn",
"matchType": "fuzzy",
"fields" : "ssn",
"dataType": "string"
}
],
"output" : [{
"name":"output",
"format":"csv",
"props": {
"location": "/tmp/zinggOutput",
"delimiter": ",",
"header":true
}
}],
"data" : [{
"name":"test",
"format":"csv",
"props": {
"location": "examples/febrl/test.csv",
"delimiter": ",",
"header":false
},
"schema": "recId string, fname string, lname string, stNo string, add1 string, add2 string, city string, state string, areacode string, dob string, ssn string"
}
],
"obviousDupeCondition" : "FNAME & STNO & ADD1",
"labelDataSampleSize" : 0.5,
"numPartitions":4,
"modelId": 100,
"zinggDir": "models"

}
11 changes: 11 additions & 0 deletions python/zingg/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,17 @@ def setModelId(self, id):
def getModelId(self):
return self.args.getModelId()

def setObviousDupeCondition(self, obviousDupeCondition):
""" Method to set the obviousDupeCondition used for matching
:param id: obviousDupeCondition value
:type id: String
"""
self.args.setObviousDupeCondition(obviousDupeCondition)

def getObviousDupeCondition(self):
return self.args.getObviousDupeCondition()

def setZinggDir(self, f):
""" Method to set the location for Zingg to save its internal computations and models. Please set it to a place where the program has to write access.
Expand Down
Loading

0 comments on commit a8dc931

Please sign in to comment.