From e910eb8e0d57b75617226b31ed22aa0bc8035221 Mon Sep 17 00:00:00 2001 From: Vikas Gupta Date: Wed, 6 Mar 2024 17:58:18 +0530 Subject: [PATCH] use IPairBuilder for building pairs --- .../zingg/common/core/executor/Linker.java | 22 +++++--- .../zingg/common/core/executor/Matcher.java | 42 +++++++------- .../zingg/common/core/pairs/IPairBuilder.java | 9 +++ .../common/core/pairs/SelfPairBuilder.java | 55 +++++++++++++++++++ .../pairs/SelfPairBuilderSourceSensitive.java | 26 +++++++++ 5 files changed, 125 insertions(+), 29 deletions(-) create mode 100644 common/core/src/main/java/zingg/common/core/pairs/IPairBuilder.java create mode 100644 common/core/src/main/java/zingg/common/core/pairs/SelfPairBuilder.java create mode 100644 common/core/src/main/java/zingg/common/core/pairs/SelfPairBuilderSourceSensitive.java diff --git a/common/core/src/main/java/zingg/common/core/executor/Linker.java b/common/core/src/main/java/zingg/common/core/executor/Linker.java index ed8c6fad1..d97588d28 100644 --- a/common/core/src/main/java/zingg/common/core/executor/Linker.java +++ b/common/core/src/main/java/zingg/common/core/executor/Linker.java @@ -8,28 +8,27 @@ import zingg.common.client.options.ZinggOptions; import zingg.common.client.util.ColName; import zingg.common.client.util.ColValues; +import zingg.common.core.pairs.IPairBuilder; +import zingg.common.core.pairs.SelfPairBuilderSourceSensitive; public abstract class Linker extends Matcher { + private static final long serialVersionUID = 1L; protected static String name = "zingg.Linker"; public static final Log LOG = LogFactory.getLog(Linker.class); public Linker() { setZinggOption(ZinggOptions.LINK); } - - public ZFrame getBlocks(ZFrame blocked, ZFrame bAll) throws Exception{ - // THIS LOG IS NEEDED FOR PLAN CALCULATION USING COUNT, DO NOT REMOVE - LOG.info("in getBlocks, blocked count is " + blocked.count()); - return getDSUtil().joinWithItselfSourceSensitive(blocked, ColName.HASH_COL, args).cache(); - } - + + @Override public ZFrame selectColsFromBlocked(ZFrame blocked) { return blocked; } + @Override public void writeOutput(ZFrame sampleOrginal, ZFrame dupes) throws ZinggClientException { try { // input dupes are pairs @@ -53,12 +52,19 @@ public void writeOutput(ZFrame sampleOrginal, ZFrame dupes) throws } } + @Override public ZFrame getDupesActualForGraph(ZFrame dupes) { ZFrame dupesActual = dupes .filter(dupes.equalTo(ColName.PREDICTION_COL, ColValues.IS_MATCH_PREDICTION)); return dupesActual; } - + @Override + public IPairBuilder getIPairBuilder() { + if(iPairBuilder==null) { + iPairBuilder = new SelfPairBuilderSourceSensitive (getDSUtil(),args); + } + return iPairBuilder; + } } diff --git a/common/core/src/main/java/zingg/common/core/executor/Matcher.java b/common/core/src/main/java/zingg/common/core/executor/Matcher.java index 17109e264..858236f4c 100644 --- a/common/core/src/main/java/zingg/common/core/executor/Matcher.java +++ b/common/core/src/main/java/zingg/common/core/executor/Matcher.java @@ -15,6 +15,8 @@ import zingg.common.core.block.Canopy; import zingg.common.core.block.Tree; import zingg.common.core.model.Model; +import zingg.common.core.pairs.IPairBuilder; +import zingg.common.core.pairs.SelfPairBuilder; import zingg.common.core.preprocess.StopWordsRemover; import zingg.common.core.util.Analytics; import zingg.common.core.util.Metric; @@ -25,6 +27,7 @@ public abstract class Matcher extends ZinggBase{ protected static String name = "zingg.Matcher"; public static final Log LOG = LogFactory.getLog(Matcher.class); + protected IPairBuilder iPairBuilder; public Matcher() { setZinggOption(ZinggOptions.MATCH); @@ -50,26 +53,8 @@ public ZFrame getBlocked( ZFrame testData) throws Exception, Zin return blocked1; } - public ZFrame getBlocks(ZFrameblocked, ZFramebAll) throws Exception{ - ZFramejoinH = getDSUtil().joinWithItself(blocked, ColName.HASH_COL, true).cache(); - /*ZFramejoinH = blocked.as("first").joinOnCol(blocked.as("second"), ColName.HASH_COL) - .selectExpr("first.z_zid as z_zid", "second.z_zid as z_z_zid"); - */ - //joinH.show(); - joinH = joinH.filter(joinH.gt(ColName.ID_COL)); - LOG.warn("Num comparisons " + joinH.count()); - joinH = joinH.repartition(args.getNumPartitions(), joinH.col(ColName.ID_COL)); - bAll = bAll.repartition(args.getNumPartitions(), bAll.col(ColName.ID_COL)); - joinH = joinH.joinOnCol(bAll, ColName.ID_COL); - LOG.warn("Joining with actual values"); - //joinH.show(); - bAll = getDSUtil().getPrefixedColumnsDS(bAll); - //bAll.show(); - joinH = joinH.repartition(args.getNumPartitions(), joinH.col(ColName.COL_PREFIX + ColName.ID_COL)); - joinH = joinH.joinOnCol(bAll, ColName.COL_PREFIX + ColName.ID_COL); - LOG.warn("Joining again with actual values"); - //joinH.show(); - return joinH; + public ZFrame getPairs(ZFrameblocked, ZFramebAll) throws Exception{ + return getIPairBuilder().getPairs(blocked, bAll); } protected abstract Model getModel() throws ZinggClientException; @@ -91,7 +76,7 @@ protected ZFrame predictOnBlocks(ZFrameblocks) throws Exception, Z } protected ZFrame getActualDupes(ZFrame blocked, ZFrame testData) throws Exception, ZinggClientException{ - ZFrame blocks = getBlocks(selectColsFromBlocked(blocked), testData); + ZFrame blocks = getPairs(selectColsFromBlocked(blocked), testData); ZFramedupesActual = predictOnBlocks(blocks); return getDupesActualForGraph(dupesActual); } @@ -285,6 +270,21 @@ protected ZFrame selectColsFromDupes(ZFramedupesActual) { protected abstract StopWordsRemover getStopWords(); + /** + * Each sub class of matcher can inject it's own iPairBuilder implementation + * @return + */ + public IPairBuilder getIPairBuilder() { + if(iPairBuilder==null) { + iPairBuilder = new SelfPairBuilder (getDSUtil(),args); + } + return iPairBuilder; + } + + public void setIPairBuilder(IPairBuilder iPairBuilder) { + this.iPairBuilder = iPairBuilder; + } + } diff --git a/common/core/src/main/java/zingg/common/core/pairs/IPairBuilder.java b/common/core/src/main/java/zingg/common/core/pairs/IPairBuilder.java new file mode 100644 index 000000000..235483818 --- /dev/null +++ b/common/core/src/main/java/zingg/common/core/pairs/IPairBuilder.java @@ -0,0 +1,9 @@ +package zingg.common.core.pairs; + +import zingg.common.client.ZFrame; + +public interface IPairBuilder { + + public ZFrame getPairs(ZFrameblocked, ZFramebAll) throws Exception; + +} diff --git a/common/core/src/main/java/zingg/common/core/pairs/SelfPairBuilder.java b/common/core/src/main/java/zingg/common/core/pairs/SelfPairBuilder.java new file mode 100644 index 000000000..4d0fff71d --- /dev/null +++ b/common/core/src/main/java/zingg/common/core/pairs/SelfPairBuilder.java @@ -0,0 +1,55 @@ +package zingg.common.core.pairs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import zingg.common.client.IArguments; +import zingg.common.client.ZFrame; +import zingg.common.client.util.ColName; +import zingg.common.client.util.DSUtil; + +public class SelfPairBuilder implements IPairBuilder { + + protected DSUtil dsUtil; + public static final Log LOG = LogFactory.getLog(SelfPairBuilder.class); + protected IArguments args; + + public SelfPairBuilder(DSUtil dsUtil, IArguments args) { + this.dsUtil = dsUtil; + this.args = args; + } + + @Override + public ZFrame getPairs(ZFrameblocked, ZFramebAll) throws Exception { + ZFramejoinH = getDSUtil().joinWithItself(blocked, ColName.HASH_COL, true).cache(); + /*ZFramejoinH = blocked.as("first").joinOnCol(blocked.as("second"), ColName.HASH_COL) + .selectExpr("first.z_zid as z_zid", "second.z_zid as z_z_zid"); + */ + //joinH.show(); + joinH = joinH.filter(joinH.gt(ColName.ID_COL)); + LOG.warn("Num comparisons " + joinH.count()); + joinH = joinH.repartition(args.getNumPartitions(), joinH.col(ColName.ID_COL)); + bAll = bAll.repartition(args.getNumPartitions(), bAll.col(ColName.ID_COL)); + joinH = joinH.joinOnCol(bAll, ColName.ID_COL); + LOG.warn("Joining with actual values"); + //joinH.show(); + bAll = getDSUtil().getPrefixedColumnsDS(bAll); + //bAll.show(); + joinH = joinH.repartition(args.getNumPartitions(), joinH.col(ColName.COL_PREFIX + ColName.ID_COL)); + joinH = joinH.joinOnCol(bAll, ColName.COL_PREFIX + ColName.ID_COL); + LOG.warn("Joining again with actual values"); + //joinH.show(); + return joinH; + } + + public DSUtil getDSUtil() { + return dsUtil; + } + + public void setDSUtil(DSUtil dsUtil) { + this.dsUtil = dsUtil; + } + + + +} diff --git a/common/core/src/main/java/zingg/common/core/pairs/SelfPairBuilderSourceSensitive.java b/common/core/src/main/java/zingg/common/core/pairs/SelfPairBuilderSourceSensitive.java new file mode 100644 index 000000000..293eb162c --- /dev/null +++ b/common/core/src/main/java/zingg/common/core/pairs/SelfPairBuilderSourceSensitive.java @@ -0,0 +1,26 @@ +package zingg.common.core.pairs; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import zingg.common.client.IArguments; +import zingg.common.client.ZFrame; +import zingg.common.client.util.ColName; +import zingg.common.client.util.DSUtil; + +public class SelfPairBuilderSourceSensitive extends SelfPairBuilder { + + public static final Log LOG = LogFactory.getLog(SelfPairBuilderSourceSensitive.class); + + public SelfPairBuilderSourceSensitive(DSUtil dsUtil, IArguments args) { + super(dsUtil, args); + } + + @Override + public ZFrame getPairs(ZFrame blocked, ZFrame bAll) throws Exception{ + // THIS LOG IS NEEDED FOR PLAN CALCULATION USING COUNT, DO NOT REMOVE + LOG.info("in getBlocks, blocked count is " + blocked.count()); + return getDSUtil().joinWithItselfSourceSensitive(blocked, ColName.HASH_COL, args).cache(); + } + +}