Skip to content

Commit

Permalink
Documenter refactoring, handled error 'Path does not exist' #199
Browse files Browse the repository at this point in the history
  • Loading branch information
navinrathore committed Apr 27, 2022
1 parent ec9799e commit 108d710
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 168 deletions.
170 changes: 12 additions & 158 deletions core/src/main/java/zingg/Documenter.java
@@ -1,181 +1,35 @@
package zingg;

import static org.apache.spark.sql.functions.desc;
import static org.apache.spark.sql.functions.explode;
import static org.apache.spark.sql.functions.split;

import java.io.File;
import java.io.FileWriter;
import java.io.Writer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import freemarker.template.Configuration;
import freemarker.template.Template;
import freemarker.template.TemplateExceptionHandler;
import zingg.client.FieldDefinition;
import zingg.client.MatchType;
import zingg.client.ZinggClientException;
import zingg.client.ZinggOptions;
import zingg.client.util.ColName;
import zingg.util.DSUtil;
import zingg.util.PipeUtil;
import zingg.util.RowWrapper;
import zingg.documenter.ModelDocumenter;
import zingg.documenter.ColumnDocumenter;

public class Documenter extends ZinggBase {

protected static String name = "zingg.Documenter";
public static final Log LOG = LogFactory.getLog(Documenter.class);
public static Configuration config;

private final String CSV_TEMPLATE = "stopWordsCSV.ftlh";
private final String HTML_TEMPLATE = "stopWordsHTML.ftlh";

public Documenter() {
setZinggOptions(ZinggOptions.GENERATE_DOCS);
config = createConfigurationObject();
}
}

public void execute() throws ZinggClientException {
try {
LOG.info("Document generation starts");
Dataset<Row> markedRecords = PipeUtil.read(spark, false, false, PipeUtil.getTrainingDataMarkedPipe(args));
markedRecords = markedRecords.cache();
//List<Column> displayCols = DSUtil.getFieldDefColumns(markedRecords, args, false);
List<Row> clusterIDs = markedRecords.select(ColName.CLUSTER_COLUMN).distinct().collectAsList();
int totalPairs = clusterIDs.size();
/* Create a data-model */
Map<String, Object> root = new HashMap<String, Object>();
root.put("modelId", args.getModelId());
root.put("clusters", markedRecords.collectAsList());
root.put("numColumns", markedRecords.columns().length);
root.put("columns", markedRecords.columns());
root.put("fieldDefinitionCount", args.getFieldDefinition().size());
buildAndWriteHTML(root);
extractStopWords();
LOG.info("Document generation completes");
} catch (Exception e) {
e.printStackTrace();
throw new ZinggClientException(e.getMessage());
}
}

public void buildAndWriteHTML(Map<String, Object> root) throws Exception {

Configuration cfg = getTemplateConfig();

/* Get the template (uses cache internally) */
Template temp = cfg.getTemplate("model.ftlh");

/* Merge data-model with template */
// Writer out = new OutputStreamWriter(System.out);
Writer file = new FileWriter(new File(args.getZinggDocFile()));
// StringWriter writer = new StringWriter();
temp.process(root, file);
// Note: Depending on what `out` is, you may need to call `out.close()`.
// This is usually the case for file output, but not for servlet output.
// file.flush();

// List<String> textList = Collections.singletonList(writer.toString());

// Dataset<Row> data = spark.createDataset(textList, Encoders.STRING()).toDF();

// PipeUtil.write(data, args, ctx, PipeUtil.getModelDocumentationPipe(args));
file.close();
// LOG.warn("written documentation at " + args.getZinggDocFile());
}

public Configuration getTemplateConfig() {
if (config == null) {
config = createConfigurationObject();
}
return config;
}

private Configuration createConfigurationObject() {
/* ------------------------------------------------------------------------ */
/* You should do this ONLY ONCE in the whole application life-cycle: */

/* Create and adjust the configuration singleton */
Configuration cfg = new Configuration(Configuration.VERSION_2_3_29);
cfg.setClassForTemplateLoading(this.getClass(), "/");
// cfg.setDirectoryForTemplateLoading(new File("/where/you/store/templates"));
// Recommended settings for new projects:
cfg.setDefaultEncoding("UTF-8");
cfg.setTemplateExceptionHandler(TemplateExceptionHandler.RETHROW_HANDLER);
cfg.setLogTemplateExceptions(false);
cfg.setWrapUncheckedExceptions(true);
cfg.setFallbackOnNullLoopVariable(false);
cfg.setObjectWrapper(new RowWrapper(cfg.getIncompatibleImprovements()));

/* ------------------------------------------------------------------------ */
/* You usually do these for MULTIPLE TIMES in the application life-cycle: */
return cfg;
}

private void extractStopWords() throws ZinggClientException {
LOG.info("Stop words generation starts");
Dataset<Row> data = PipeUtil.read(spark, false, false, args.getData());
LOG.warn("Read input data : " + data.count());

String stopWordsDir = args.getZinggDocDir() + "/stopWords/";
String columnsDir = args.getZinggDocDir() + "/columns/";
checkAndCreateDir(stopWordsDir);
checkAndCreateDir(columnsDir);

List<FieldDefinition> fields = DSUtil.getFieldDefinitionFiltered(args, MatchType.DONT_USE);
for (FieldDefinition field : fields) {
findAndWriteStopWords(data, field.fieldName, stopWordsDir, columnsDir);
}
findAndWriteStopWords(spark.emptyDataFrame(), ColName.SCORE_COL, stopWordsDir, columnsDir);
findAndWriteStopWords(spark.emptyDataFrame(), ColName.SOURCE_COL, stopWordsDir, columnsDir);

LOG.info("Stop words generation finishes");
}

private void findAndWriteStopWords(Dataset<Row> data, String fieldName, String stopWordsDir, String columnsDir) throws ZinggClientException {
LOG.debug("Field: " + fieldName);
if(!data.isEmpty()) {
data = data.select(split(data.col(fieldName), "\\s+").as("split"));
data = data.select(explode(data.col("split")).as("word"));
data = data.filter(data.col("word").notEqual(""));
data = data.groupBy("word").count().orderBy(desc("count"));
data = data.limit(Math.round(data.count()*args.getStopWordsCutoff()));
}

Map<String, Object> root = new HashMap<String, Object>();
root.put("modelId", args.getModelId());
root.put("stopWords", data.collectAsList());
String filenameCSV = stopWordsDir + fieldName + ".csv";
String filenameHTML = columnsDir + fieldName + ".html";
writeStopWords(CSV_TEMPLATE, root, filenameCSV);
writeStopWords(HTML_TEMPLATE, root, filenameHTML);
}

private void checkAndCreateDir(String dirName) {
File directory = new File(dirName);
if (!directory.exists()) {
directory.mkdirs();
}
}

public void writeStopWords(String template, Map<String, Object> root, String fileName)
throws ZinggClientException {
try {
Configuration cfg = getTemplateConfig();
Template temp = cfg.getTemplate(template);
Writer file = new FileWriter(new File(fileName));
temp.process(root, file);
file.close();
LOG.info("Documenter starts");
// Marked records details
ModelDocumenter colDoc = new ModelDocumenter(spark, args);
colDoc.process();
// Stop Words generation
ColumnDocumenter stopWordDoc = new ColumnDocumenter(spark, args);
stopWordDoc.process();
LOG.info("Documenter finishes");
} catch (Exception e) {
e.printStackTrace();
throw new ZinggClientException(e.getMessage());
}
}
}
}
113 changes: 113 additions & 0 deletions core/src/main/java/zingg/documenter/ColumnDocumenter.java
@@ -0,0 +1,113 @@
package zingg.documenter;

import static org.apache.spark.sql.functions.desc;
import static org.apache.spark.sql.functions.explode;
import static org.apache.spark.sql.functions.split;

import java.io.File;
import java.io.FileWriter;
import java.io.Writer;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

import freemarker.template.Configuration;
import freemarker.template.Template;
import zingg.client.Arguments;
import zingg.client.FieldDefinition;
import zingg.client.MatchType;
import zingg.client.ZinggClientException;
import zingg.client.util.ColName;
import zingg.util.PipeUtil;

public class ColumnDocumenter extends DocumenterBase {
protected static String name = "zingg.ColumnDocumenter";
public static final Log LOG = LogFactory.getLog(ColumnDocumenter.class);

private final String CSV_TEMPLATE = "stopWordsCSV.ftlh";
private final String HTML_TEMPLATE = "stopWordsHTML.ftlh";

public ColumnDocumenter(SparkSession spark, Arguments args) {
super(spark, args);
}

public void process() throws ZinggClientException {
createColumnDocuments();
}

private void createColumnDocuments() throws ZinggClientException {
LOG.info("Column Documents generation starts");

Dataset<Row> data = PipeUtil.read(spark, false, false, args.getData());
LOG.warn("Read input data : " + data.count());

String stopWordsDir = args.getZinggDocDir() + "/stopWords/";
String columnsDir = args.getZinggDocDir() + "/columns/";
checkAndCreateDir(stopWordsDir);
checkAndCreateDir(columnsDir);

for (FieldDefinition field: args.getFieldDefinition()) {
if ((field.getMatchType() == null || field.getMatchType().equals(MatchType.DONT_USE))) {
prepareAndWriteColumnDocument(spark.emptyDataFrame(), field.fieldName, stopWordsDir, columnsDir);
continue;
}
prepareAndWriteColumnDocument(data, field.fieldName, stopWordsDir, columnsDir);
}

prepareAndWriteColumnDocument(spark.emptyDataFrame(), ColName.SCORE_COL, stopWordsDir, columnsDir);
prepareAndWriteColumnDocument(spark.emptyDataFrame(), ColName.SOURCE_COL, stopWordsDir, columnsDir);

LOG.info("Column Documents generation finishes");
}
private void prepareAndWriteColumnDocument(Dataset<Row> data, String fieldName, String stopWordsDir, String columnsDir) throws ZinggClientException {
Map<String, Object> root = new HashMap<String, Object>();
root.put("title", fieldName);
root.put("modelId", args.getModelId());
root = addStopWords(data, fieldName, root);

String filenameCSV = stopWordsDir + fieldName + ".csv";
String filenameHTML = columnsDir + fieldName + ".html";
writeColumnDocument(CSV_TEMPLATE, root, filenameCSV);
writeColumnDocument(HTML_TEMPLATE, root, filenameHTML);
}

public void writeColumnDocument(String template, Map<String, Object> root, String fileName)
throws ZinggClientException {
try {
Configuration cfg = getTemplateConfig();
Template temp = cfg.getTemplate(template);
Writer file = new FileWriter(new File(fileName));
temp.process(root, file);
file.close();
} catch (Exception e) {
e.printStackTrace();
throw new ZinggClientException(e.getMessage());
}
}

private void checkAndCreateDir(String dirName) {
File directory = new File(dirName);
if (!directory.exists()) {
directory.mkdirs();
}
}

public Map<String, Object> addStopWords(Dataset<Row> data, String fieldName, Map<String, Object> params) {
LOG.debug("Field: " + fieldName);
if(!data.isEmpty()) {
data = data.select(split(data.col(fieldName), "\\s+").as("split"));
data = data.select(explode(data.col("split")).as("word"));
data = data.filter(data.col("word").notEqual(""));
data = data.groupBy("word").count().orderBy(desc("count"));
data = data.limit(Math.round(data.count()*args.getStopWordsCutoff()));
}
params.put("stopWords", data.collectAsList());

return params;
}
}
48 changes: 48 additions & 0 deletions core/src/main/java/zingg/documenter/DocumenterBase.java
@@ -0,0 +1,48 @@
package zingg.documenter;

import org.apache.spark.sql.SparkSession;

import freemarker.template.Configuration;
import freemarker.template.TemplateExceptionHandler;
import zingg.client.Arguments;
import zingg.util.RowWrapper;

class DocumenterBase {
protected static Configuration config;
protected SparkSession spark;
protected Arguments args;

public DocumenterBase(SparkSession spark, Arguments args) {
this.spark = spark;
this.args = args;
config = createConfigurationObject();
}

public Configuration getTemplateConfig() {
if (config == null) {
config = createConfigurationObject();
}
return config;
}

private Configuration createConfigurationObject() {
/* ------------------------------------------------------------------------ */
/* You should do this ONLY ONCE in the whole application life-cycle: */

/* Create and adjust the configuration singleton */
Configuration cfg = new Configuration(Configuration.VERSION_2_3_29);
cfg.setClassForTemplateLoading(this.getClass(), "/");
// cfg.setDirectoryForTemplateLoading(new File("/where/you/store/templates"));
// Recommended settings for new projects:
cfg.setDefaultEncoding("UTF-8");
cfg.setTemplateExceptionHandler(TemplateExceptionHandler.RETHROW_HANDLER);
cfg.setLogTemplateExceptions(false);
cfg.setWrapUncheckedExceptions(true);
cfg.setFallbackOnNullLoopVariable(false);
cfg.setObjectWrapper(new RowWrapper(cfg.getIncompatibleImprovements()));

/* ------------------------------------------------------------------------ */
/* You usually do these for MULTIPLE TIMES in the application life-cycle: */
return cfg;
}
}

0 comments on commit 108d710

Please sign in to comment.