diff --git a/common/core/src/main/java/zingg/common/core/data/df/IZFrameEnriched.java b/common/core/src/main/java/zingg/common/core/data/df/IZFrameEnriched.java new file mode 100644 index 00000000..e8b28f6b --- /dev/null +++ b/common/core/src/main/java/zingg/common/core/data/df/IZFrameEnriched.java @@ -0,0 +1,13 @@ +package zingg.common.core.data.df; + +import java.io.Serializable; + +import zingg.common.client.ZFrame; + +public interface IZFrameEnriched extends Serializable{ + + public ZFrame getOriginalDF(); + + public ZFrame getProcessedDF(); + +} diff --git a/common/core/src/main/java/zingg/common/core/data/df/ZData.java b/common/core/src/main/java/zingg/common/core/data/df/ZData.java new file mode 100644 index 00000000..6da22c0d --- /dev/null +++ b/common/core/src/main/java/zingg/common/core/data/df/ZData.java @@ -0,0 +1,96 @@ +package zingg.common.core.data.df; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import zingg.common.client.IArguments; +import zingg.common.client.ZFrame; +import zingg.common.client.ZinggClientException; +import zingg.common.client.cols.SelectedCols; +import zingg.common.client.cols.ZidAndFieldDefSelector; +import zingg.common.client.util.ColName; +import zingg.common.core.context.Context; +import zingg.common.core.data.df.controller.BlockedDataController; +import zingg.common.core.data.df.controller.FieldDefDataController; +import zingg.common.core.data.df.controller.PreprocessorDataController; +import zingg.common.core.data.df.controller.RepartitionDataController; +import zingg.common.core.preprocess.IPreProcessor; + +public class ZData { + + protected ZFrame rawData; + protected IArguments args; + protected Context context; + protected List> preProcessors; + + protected ZFrameEnriched fieldDefFrame; + protected ZFrameEnriched blockedFrame; + protected ZFrameEnriched preprocessedFrame; + protected ZFrameEnriched repartitionFrame; + + public static final Log LOG = LogFactory.getLog(ZData.class); + + public ZData(ZFrame rawData, IArguments args, Context context,List> preProcessors) throws ZinggClientException { + this.rawData = rawData; + this.args = args; + this.context = context; + this.preProcessors = preProcessors; + } + + public ZFrame getRawData() { + return rawData; + } + + public ZFrameEnriched getFieldDefFrame() throws ZinggClientException { + if (fieldDefFrame==null) { + ZFrame originalDF = getRawData(); + FieldDefDataController controller = new FieldDefDataController(args.getFieldDefinition(), + getColSelector()); + this.fieldDefFrame = new ZFrameEnriched(originalDF, controller.process(originalDF)); + } + return fieldDefFrame; + } + + public ZFrameEnriched getPreprocessedFrame() throws ZinggClientException { + if (preprocessedFrame==null) { + ZFrame originalDF = getFieldDefFrame().getProcessedDF(); + PreprocessorDataController controller = new PreprocessorDataController( + preProcessors); + this.preprocessedFrame = new ZFrameEnriched(originalDF, controller.process(originalDF)); + } + return preprocessedFrame; + } + + public ZFrameEnriched getRepartitionFrame() throws ZinggClientException { + if (repartitionFrame==null) { + ZFrame originalDF = getPreprocessedFrame().getProcessedDF(); + RepartitionDataController controller = new RepartitionDataController( + args.getNumPartitions(), ColName.ID_COL); + this.repartitionFrame = new ZFrameEnriched(originalDF, controller.process(originalDF)); + } + return repartitionFrame; + } + + public ZFrameEnriched getBlockedFrame() throws ZinggClientException { + if (blockedFrame==null) { + try { + ZFrame originalDF = getRepartitionFrame().getProcessedDF(); + BlockedDataController controller = new BlockedDataController(args, + context.getBlockingTreeUtil()); + this.blockedFrame = new ZFrameEnriched(originalDF, controller.process(originalDF)); + } catch (ZinggClientException zce) { + throw zce; + } catch (Exception e) { + throw new ZinggClientException(e); + } + } + return blockedFrame; + } + + protected SelectedCols getColSelector() { + return new ZidAndFieldDefSelector(args.getFieldDefinition()); + } + +} diff --git a/common/core/src/main/java/zingg/common/core/data/df/ZDataPair.java b/common/core/src/main/java/zingg/common/core/data/df/ZDataPair.java new file mode 100644 index 00000000..b06647d4 --- /dev/null +++ b/common/core/src/main/java/zingg/common/core/data/df/ZDataPair.java @@ -0,0 +1,40 @@ +package zingg.common.core.data.df; + +import zingg.common.client.ZFrame; +import zingg.common.core.pairs.IPairBuilder; + +public class ZDataPair { + + protected ZFrame data1; + protected ZFrame data2; + protected ZFrame pairs; + protected IPairBuilder iPairBuilder; + + public ZDataPair(ZFrame data1, ZFrame data2, IPairBuilder iPairBuilder) { + this.data1 = data1; + this.data2 = data2; + this.iPairBuilder = iPairBuilder; + } + + public ZFrame getData1() { + return data1; + } + + public ZFrame getData2() { + return data2; + } + + public ZFrame getPairs() throws Exception { + if (pairs==null) { + pairs = iPairBuilder.getPairs(data1, data2); + } + return pairs; + } + + public IPairBuilder getIPairBuilder() { + return iPairBuilder; + } + + + +} diff --git a/common/core/src/main/java/zingg/common/core/data/df/ZFrameEnriched.java b/common/core/src/main/java/zingg/common/core/data/df/ZFrameEnriched.java new file mode 100644 index 00000000..67c1ef7a --- /dev/null +++ b/common/core/src/main/java/zingg/common/core/data/df/ZFrameEnriched.java @@ -0,0 +1,35 @@ +package zingg.common.core.data.df; + +import zingg.common.client.ZFrame; + +public class ZFrameEnriched implements IZFrameEnriched { + private static final long serialVersionUID = 1L; + + protected ZFrame originalDF; + + protected ZFrame processedDF; + + public ZFrameEnriched(ZFrame originalDF) { + this.originalDF = originalDF; + } + + public ZFrameEnriched(ZFrame originalDF, ZFrame processedDF) { + this.originalDF = originalDF; + this.processedDF = processedDF; + } + + @Override + public ZFrame getOriginalDF() { + return originalDF; + } + + @Override + public ZFrame getProcessedDF() { + return processedDF; + } + + public void setProcessedDF(ZFrame processedDF) { + this.processedDF = processedDF; + } + +} diff --git a/common/core/src/main/java/zingg/common/core/data/df/controller/BlockedDataController.java b/common/core/src/main/java/zingg/common/core/data/df/controller/BlockedDataController.java new file mode 100644 index 00000000..65908c74 --- /dev/null +++ b/common/core/src/main/java/zingg/common/core/data/df/controller/BlockedDataController.java @@ -0,0 +1,44 @@ +package zingg.common.core.data.df.controller; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import zingg.common.client.IArguments; +import zingg.common.client.ZFrame; +import zingg.common.client.ZinggClientException; +import zingg.common.client.util.ColName; +import zingg.common.core.block.Canopy; +import zingg.common.core.block.Tree; +import zingg.common.core.util.BlockingTreeUtil; + +public class BlockedDataController implements IDataController { + protected IArguments args; + + protected BlockingTreeUtil blockingTreeUtil; + + public static final Log LOG = LogFactory.getLog(BlockedDataController.class); + + public BlockedDataController(IArguments args, BlockingTreeUtil blockingTreeUtil) throws Exception, ZinggClientException { + this.args = args; + this.blockingTreeUtil = blockingTreeUtil; + } + + protected ZFrame getBlocked(ZFrame originalDF) throws ZinggClientException { + try { + Tree> tree = blockingTreeUtil.readBlockingTree(args); + ZFrame blocked = blockingTreeUtil.getBlockHashes(originalDF, tree); + ZFrame blocked1 = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL));//.cache(); + return blocked1; + } catch (ZinggClientException e) { + throw e; + } catch (Exception e) { + throw new ZinggClientException(e); + } + } + + @Override + public ZFrame process(ZFrame originalDF) throws ZinggClientException { + return getBlocked(originalDF); + } + +} diff --git a/common/core/src/main/java/zingg/common/core/data/df/controller/FieldDefDataController.java b/common/core/src/main/java/zingg/common/core/data/df/controller/FieldDefDataController.java new file mode 100644 index 00000000..1294cb91 --- /dev/null +++ b/common/core/src/main/java/zingg/common/core/data/df/controller/FieldDefDataController.java @@ -0,0 +1,36 @@ +package zingg.common.core.data.df.controller; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import zingg.common.client.FieldDefinition; +import zingg.common.client.ZFrame; +import zingg.common.client.ZinggClientException; +import zingg.common.client.cols.SelectedCols; +import zingg.common.client.cols.ZidAndFieldDefSelector; + +public class FieldDefDataController implements IDataController { + @Override + public ZFrame process(ZFrame originalDF) throws ZinggClientException { + return originalDF.select(selectedCols.getCols()); + } + + protected SelectedCols selectedCols; + + protected List fieldDefinition; + + public static final Log LOG = LogFactory.getLog(FieldDefDataController.class); + + public FieldDefDataController(List fieldDefinition) { + this(fieldDefinition,new ZidAndFieldDefSelector(fieldDefinition)); + } + + public FieldDefDataController(List fieldDefinition, + SelectedCols selectedCols) { + this.fieldDefinition = fieldDefinition; + this.selectedCols = selectedCols; + } + +} diff --git a/common/core/src/main/java/zingg/common/core/data/df/controller/IDataController.java b/common/core/src/main/java/zingg/common/core/data/df/controller/IDataController.java new file mode 100644 index 00000000..ad021d04 --- /dev/null +++ b/common/core/src/main/java/zingg/common/core/data/df/controller/IDataController.java @@ -0,0 +1,9 @@ +package zingg.common.core.data.df.controller; + +import zingg.common.client.ZFrame; +import zingg.common.client.ZinggClientException; + +public interface IDataController { + + public ZFrame process(ZFrame originalDF) throws ZinggClientException; +} diff --git a/common/core/src/main/java/zingg/common/core/data/df/controller/PreprocessorDataController.java b/common/core/src/main/java/zingg/common/core/data/df/controller/PreprocessorDataController.java new file mode 100644 index 00000000..99dc550d --- /dev/null +++ b/common/core/src/main/java/zingg/common/core/data/df/controller/PreprocessorDataController.java @@ -0,0 +1,32 @@ +package zingg.common.core.data.df.controller; + +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import zingg.common.client.ZFrame; +import zingg.common.client.ZinggClientException; +import zingg.common.core.preprocess.IPreProcessor; + +public class PreprocessorDataController implements IDataController { + + protected List> preProcessors; + + public static final Log LOG = LogFactory.getLog(PreprocessorDataController.class); + + public PreprocessorDataController(List> preProcessors) throws ZinggClientException { + this.preProcessors = preProcessors; + } + + @Override + public ZFrame process(ZFrame originalDF) throws ZinggClientException { + ZFrame processedDF = originalDF; + if (preProcessors != null) { + for (IPreProcessor iPreProcessor : preProcessors) { + processedDF = iPreProcessor.preprocess(processedDF); + } + } + return processedDF; + } +} diff --git a/common/core/src/main/java/zingg/common/core/data/df/controller/RepartitionDataController.java b/common/core/src/main/java/zingg/common/core/data/df/controller/RepartitionDataController.java new file mode 100644 index 00000000..8eb6beb7 --- /dev/null +++ b/common/core/src/main/java/zingg/common/core/data/df/controller/RepartitionDataController.java @@ -0,0 +1,30 @@ +package zingg.common.core.data.df.controller; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import zingg.common.client.IArguments; +import zingg.common.client.ZFrame; +import zingg.common.client.ZinggClientException; + +public class RepartitionDataController implements IDataController { + + protected IArguments args; + + protected int numPartitions; + + protected String partitionCol; + + public static final Log LOG = LogFactory.getLog(RepartitionDataController.class); + + public RepartitionDataController(int numPartitions, String partitionCol) { + this.numPartitions = numPartitions; + this.partitionCol = partitionCol; + } + + @Override + public ZFrame process(ZFrame originalDF) throws ZinggClientException { + return originalDF.repartition(numPartitions,originalDF.col(partitionCol)); + } + +} diff --git a/common/core/src/main/java/zingg/common/core/executor/Linker.java b/common/core/src/main/java/zingg/common/core/executor/Linker.java index d6352641..654dfdb3 100644 --- a/common/core/src/main/java/zingg/common/core/executor/Linker.java +++ b/common/core/src/main/java/zingg/common/core/executor/Linker.java @@ -7,6 +7,8 @@ import zingg.common.client.ZinggClientException; import zingg.common.client.options.ZinggOptions; import zingg.common.client.util.ColName; +import zingg.common.core.data.df.ZData; +import zingg.common.core.data.df.ZDataPair; import zingg.common.core.filter.PredictionFilter; import zingg.common.core.pairs.SelfPairBuilderSourceSensitive; @@ -28,19 +30,19 @@ public ZFrame selectColsFromBlocked(ZFrame blocked) { } @Override - public ZFrame getPairs(ZFrameblocked, ZFramebAll) throws Exception{ + public ZDataPair getPairs(ZFrameblocked, ZFramebAll) throws Exception{ return getPairs(blocked, bAll, new SelfPairBuilderSourceSensitive (getDSUtil(),args)); } @Override - protected ZFrame getActualDupes(ZFrame blocked, ZFrame testData) throws Exception, ZinggClientException{ + protected ZFrame getActualDupes(ZData rawData) throws Exception, ZinggClientException{ PredictionFilter predictionFilter = new PredictionFilter(); SelfPairBuilderSourceSensitive iPairBuilder = new SelfPairBuilderSourceSensitive (getDSUtil(),args); - return getActualDupes(blocked, testData,predictionFilter, iPairBuilder, null); + return getActualDupes(rawData,predictionFilter, iPairBuilder, null); } @Override - public void writeOutput(ZFrame sampleOrginal, ZFrame dupes) throws ZinggClientException { + public void writeOutput( ZData rawData, ZFrame dupes) throws ZinggClientException { try { // input dupes are pairs /// pick ones according to the threshold by user @@ -55,7 +57,7 @@ public void writeOutput(ZFrame sampleOrginal, ZFrame dupes) throws dupesActual = dupesActual.withColumn(ColName.CLUSTER_COLUMN, dupesActual.col(ColName.ID_COL)); dupesActual = getDSUtil().addUniqueCol(dupesActual, ColName.CLUSTER_COLUMN); ZFramedupes2 = getDSUtil().alignLinked(dupesActual, args); - dupes2 = getDSUtil().postprocessLinked(dupes2, sampleOrginal); + dupes2 = getDSUtil().postprocessLinked(dupes2, rawData.getFieldDefFrame().getProcessedDF()); LOG.debug("uncertain output schema is " + dupes2.showSchema()); getPipeUtil().write(dupes2, args.getOutput()); } diff --git a/common/core/src/main/java/zingg/common/core/executor/Matcher.java b/common/core/src/main/java/zingg/common/core/executor/Matcher.java index 483059c4..4dd684ac 100644 --- a/common/core/src/main/java/zingg/common/core/executor/Matcher.java +++ b/common/core/src/main/java/zingg/common/core/executor/Matcher.java @@ -9,16 +9,16 @@ import zingg.common.client.ZFrame; import zingg.common.client.ZinggClientException; import zingg.common.client.cols.PredictionColsSelector; -import zingg.common.client.cols.ZidAndFieldDefSelector; import zingg.common.client.options.ZinggOptions; import zingg.common.client.util.ColName; -import zingg.common.core.block.Canopy; -import zingg.common.core.block.Tree; +import zingg.common.core.data.df.ZData; +import zingg.common.core.data.df.ZDataPair; import zingg.common.core.filter.IFilter; import zingg.common.core.filter.PredictionFilter; import zingg.common.core.model.Model; import zingg.common.core.pairs.IPairBuilder; import zingg.common.core.pairs.SelfPairBuilder; +import zingg.common.core.preprocess.IPreProcessor; import zingg.common.core.preprocess.StopWordsRemover; import zingg.common.core.util.Analytics; import zingg.common.core.util.Metric; @@ -33,32 +33,33 @@ public Matcher() { setZinggOption(ZinggOptions.MATCH); } - public ZFrame getTestData() throws ZinggClientException{ - ZFrame data = getPipeUtil().read(true, true, args.getNumPartitions(), true, args.getData()); - return data; + public ZData getRawData() throws ZinggClientException{ + ZFrame data = readInputData(); + + return getDataSelector(data); } - public ZFrame getFieldDefColumnsDS(ZFrame testDataOriginal) { - ZidAndFieldDefSelector zidAndFieldDefSelector = new ZidAndFieldDefSelector(args.getFieldDefinition()); - return testDataOriginal.select(zidAndFieldDefSelector.getCols()); -// return getDSUtil().getFieldDefColumnsDS(testDataOriginal, args, true); + protected ZData getDataSelector(ZFrame data) throws ZinggClientException { + return new ZData(data,args,context,getPreProcessors()); } + protected List> getPreProcessors() { + List> preProcessors = new ArrayList>(); + preProcessors.add(getStopWords()); + return preProcessors; + } - public ZFrame getBlocked( ZFrame testData) throws Exception, ZinggClientException{ - LOG.debug("Blocking model file location is " + args.getBlockFile()); - Tree> tree = getBlockingTreeUtil().readBlockingTree(args); - ZFrame blocked = getBlockingTreeUtil().getBlockHashes(testData, tree); - ZFrame blocked1 = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL)); //.cache(); - return blocked1; + protected ZFrame readInputData() throws ZinggClientException { + return getPipeUtil().read(true, true, args.getNumPartitions(), true, args.getData()); } + - public ZFrame getPairs(ZFrameblocked, ZFramebAll) throws Exception{ + public ZDataPair getPairs(ZFrameblocked, ZFramebAll) throws Exception{ return getPairs(blocked, bAll, new SelfPairBuilder (getDSUtil(),args)); } - public ZFrame getPairs(ZFrameblocked, ZFramebAll, IPairBuilder iPairBuilder) throws Exception{ - return iPairBuilder.getPairs(blocked, bAll); + public ZDataPair getPairs(ZFrameblocked, ZFramebAll, IPairBuilder iPairBuilder) throws Exception{ + return new ZDataPair(blocked, bAll,iPairBuilder); } protected abstract Model getModel() throws ZinggClientException; @@ -67,27 +68,27 @@ protected ZFrame selectColsFromBlocked(ZFrameblocked) { return blocked.select(ColName.ID_COL, ColName.HASH_COL); } - protected ZFrame predictOnBlocks(ZFrameblocks) throws Exception, ZinggClientException{ + protected ZFrame predictOnBlocks(ZDataPair blocks) throws Exception, ZinggClientException{ if (LOG.isDebugEnabled()) { - LOG.debug("block size" + blocks.count()); + LOG.debug("block size" + blocks.getPairs().count()); } Model model = getModel(); - ZFrame dupes = model.predict(blocks); + ZFrame dupes = model.predict(blocks.getPairs()); if (LOG.isDebugEnabled()) { LOG.debug("Found dupes " + dupes.count()); } return dupes; } - protected ZFrame getActualDupes(ZFrame blocked, ZFrame testData) throws Exception, ZinggClientException{ + protected ZFrame getActualDupes(ZData rawData) throws Exception, ZinggClientException{ PredictionFilter predictionFilter = new PredictionFilter(); SelfPairBuilder iPairBuilder = new SelfPairBuilder (getDSUtil(),args); - return getActualDupes(blocked, testData,predictionFilter, iPairBuilder,new PredictionColsSelector()); + return getActualDupes(rawData,predictionFilter, iPairBuilder,new PredictionColsSelector()); } - protected ZFrame getActualDupes(ZFrame blocked, ZFrame testData, + protected ZFrame getActualDupes(ZData rawData, IFilter predictionFilter, IPairBuilder iPairBuilder, PredictionColsSelector colsSelector) throws Exception, ZinggClientException{ - ZFrame blocks = getPairs(selectColsFromBlocked(blocked), testData, iPairBuilder); + ZDataPair blocks = getPairs(selectColsFromBlocked(rawData.getBlockedFrame().getProcessedDF()), rawData.getRepartitionFrame().getProcessedDF(), iPairBuilder); ZFramedupesActual = predictOnBlocks(blocks); ZFrame filteredData = predictionFilter.filter(dupesActual); if(colsSelector!=null) { @@ -100,32 +101,24 @@ protected ZFrame getActualDupes(ZFrame blocked, ZFrame test public void execute() throws ZinggClientException { try { // read input, filter, remove self joins - ZFrame testDataOriginal = getTestData(); - testDataOriginal = getFieldDefColumnsDS(testDataOriginal); - ZFrame testData = getStopWords().preprocessForStopWords(testDataOriginal); - testData = testData.repartition(args.getNumPartitions(), testData.col(ColName.ID_COL)); - //testData = dropDuplicates(testData); - long count = testData.count(); - LOG.info("Read " + count); - Analytics.track(Metric.DATA_COUNT, count, args.getCollectMetrics()); - - ZFrameblocked = getBlocked(testData); + ZData rawData = getRawData(); + writeAnalytics(rawData); LOG.info("Blocked "); /*blocked = blocked.cache(); blocked.withColumn("partition_id", functions.spark_partition_id()) .groupBy("partition_id").agg(functions.count("z_zid")).as("zid").orderBy("partition_id").toJavaRDD().saveAsTextFile("/tmp/zblockedParts"); */ if (LOG.isDebugEnabled()) { - LOG.debug("Num distinct hashes " + blocked.select(ColName.HASH_COL).distinct().count()); - blocked.show(); + LOG.debug("Num distinct hashes " + rawData.getBlockedFrame().getProcessedDF().select(ColName.HASH_COL).distinct().count()); + rawData.getBlockedFrame().getProcessedDF().show(); } //LOG.warn("Num distinct hashes " + blocked.agg(functions.approx_count_distinct(ColName.HASH_COL)).count()); - ZFrame dupesActual = getActualDupes(blocked, testData); + ZFrame dupesActual = getActualDupes(rawData); //dupesActual.explain(); //dupesActual.toJavaRDD().saveAsTextFile("/tmp/zdupes"); - writeOutput(testDataOriginal, dupesActual); + writeOutput(rawData, dupesActual); } catch (Exception e) { if (LOG.isDebugEnabled()) e.printStackTrace(); @@ -134,18 +127,20 @@ public void execute() throws ZinggClientException { } } + protected void writeAnalytics(ZData rawData) throws Exception, ZinggClientException { + long count = rawData.getBlockedFrame().getOriginalDF().count(); + LOG.info("Read " + count); + Analytics.track(Metric.DATA_COUNT, count, args.getCollectMetrics()); + LOG.debug("Blocking model file location is " + args.getBlockFile()); + } - - - public void writeOutput( ZFrame blocked, ZFrame dupesActual) throws ZinggClientException { + public void writeOutput( ZData rawData, ZFrame dupesActual) throws ZinggClientException { try{ //input dupes are pairs ///pick ones according to the threshold by user - - //all clusters consolidated in one place if (args.getOutput() != null) { - ZFrame graphWithScores = getOutput(blocked, dupesActual); + ZFrame graphWithScores = getOutput(rawData, dupesActual); getPipeUtil().write(graphWithScores, args.getOutput()); } } @@ -157,19 +152,19 @@ public void writeOutput( ZFrame blocked, ZFrame dupesActual) th - protected ZFrame getOutput(ZFrame blocked, ZFrame dupesActual) throws ZinggClientException, Exception { + protected ZFrame getOutput(ZData rawData, ZFrame dupesActual) throws ZinggClientException, Exception { //-1 is initial suggestion, 1 is add, 0 is deletion, 2 is unsure /*blocked = blocked.drop(ColName.HASH_COL); blocked = blocked.drop(ColName.SOURCE_COL); blocked = blocked.cache(); */ - + ZFrame fieldDefColumnsDS = rawData.getFieldDefFrame().getProcessedDF(); dupesActual = dupesActual.cache(); if (LOG.isDebugEnabled()) { LOG.debug("dupes ------------"); dupesActual.show(); } - ZFramegraph = getGraphUtil().buildGraph(blocked, dupesActual).cache(); + ZFramegraph = getGraphUtil().buildGraph(fieldDefColumnsDS, dupesActual).cache(); //graph.toJavaRDD().saveAsTextFile("/tmp/zgraph"); if (LOG.isDebugEnabled()) { diff --git a/common/core/src/main/java/zingg/common/core/preprocess/IPreProcessor.java b/common/core/src/main/java/zingg/common/core/preprocess/IPreProcessor.java new file mode 100644 index 00000000..53699d2a --- /dev/null +++ b/common/core/src/main/java/zingg/common/core/preprocess/IPreProcessor.java @@ -0,0 +1,10 @@ +package zingg.common.core.preprocess; + +import zingg.common.client.ZFrame; +import zingg.common.client.ZinggClientException; + +public interface IPreProcessor { + + public ZFrame preprocess(ZFrame ds) throws ZinggClientException; + +} diff --git a/common/core/src/main/java/zingg/common/core/preprocess/StopWordsRemover.java b/common/core/src/main/java/zingg/common/core/preprocess/StopWordsRemover.java index 9742426c..d367631d 100644 --- a/common/core/src/main/java/zingg/common/core/preprocess/StopWordsRemover.java +++ b/common/core/src/main/java/zingg/common/core/preprocess/StopWordsRemover.java @@ -16,7 +16,7 @@ import zingg.common.client.util.PipeUtilBase; import zingg.common.core.context.Context; -public abstract class StopWordsRemover implements Serializable{ +public abstract class StopWordsRemover implements Serializable, IPreProcessor{ private static final long serialVersionUID = 1L; protected static String name = "zingg.preprocess.StopWordsRemover"; @@ -45,6 +45,11 @@ public ZFrame preprocessForStopWords(ZFrame ds) throws ZinggCl return ds; } + @Override + public ZFrame preprocess(ZFrame ds) throws ZinggClientException { + return preprocessForStopWords(ds); + } + protected ZFrame getStopWords(FieldDefinition def) throws ZinggClientException { PipeUtilBase pipeUtil = getContext().getPipeUtil(); ZFrame stopWords = pipeUtil.read(false, false, pipeUtil.getStopWordsPipe(getArgs(), def.getStopWords()));