Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

hash value

  • Loading branch information...
commit 6408fb9f134d2397aa236694a246a0d24b5d0ed8 1 parent f4c85fc
Hsiao Su authored
View
17 src/java/com/marklogic/ps/xqsync/Configuration.java
@@ -202,6 +202,8 @@
public static final String USE_IN_FOREST_EVAL_DEFAULT = "false";
+ public static final String HASH_MODULE_KEY = "HASH_MODULE";
+
/* internal constants */
protected static final String CSV_SCSV_SSV_REGEX = "[,;\\s]+";
@@ -1002,6 +1004,21 @@ public boolean useInForestEval() {
}
/**
+ * @return whether hash module should be used or not
+ */
+ public boolean useHashModule() {
+ String m = getHashModule();
+ return (m != null && !m.isEmpty());
+ }
+
+ /**
+ * @return
+ */
+ public String getHashModule() {
+ return properties.getProperty(HASH_MODULE_KEY);
+ }
+
+ /**
*
*/
public void close() {
View
14 src/java/com/marklogic/ps/xqsync/MetadataInterface.java
@@ -1,4 +1,5 @@
-/**
+/** -*- mode: java; indent-tabs-mode: nil; c-basic-offset: 4; -*-
+ *
* Copyright (c) 2008-2010 Mark Logic Corporation. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -72,4 +73,15 @@
*/
void clearProperties();
+ /**
+ * Set the hash value for this document
+ */
+ void setHashValue(String hashValue);
+
+ /**
+ * @return the hash value for this document
+ */
+ String getHashValue();
+
+
}
View
26 src/java/com/marklogic/ps/xqsync/SessionReader.java
@@ -89,6 +89,7 @@ public SessionReader(Configuration _configuration)
try {
if (null == query) {
initQuery();
+ logger.fine("reader query = \n" + query);
}
} catch (RequestException e) {
throw new SyncException(e);
@@ -142,6 +143,8 @@ public void read(String[] _uris, DocumentInterface _document)
}
req.setNewStringVariable("MODULE-URI",
(null == inputModule) ? "" : inputModule);
+ if (configuration.useHashModule())
+ req.setNewStringVariable("HASH-MODULE", configuration.getHashModule());
rs = session.submitRequest(req);
// success!
@@ -189,8 +192,8 @@ public void read(String[] _uris, DocumentInterface _document)
logger.fine("uri at " + urisIndex + " is null");
break;
}
- resultIndex = readDocument(_document, items, urisIndex,
- resultIndex);
+ logger.fine("reading uri: " + _uris[urisIndex]);
+ resultIndex = readDocument(_document, items, urisIndex, resultIndex);
urisIndex++;
}
@@ -263,6 +266,14 @@ private int readDocument(DocumentInterface _document,
_resultIndex++;
}
+ // handle hash value, optional
+ if (configuration.useHashModule()) {
+ String hashValue = _items[_resultIndex].asString();
+ metadata.setHashValue(hashValue);
+ logger.fine("hashValue = " + hashValue);
+ _resultIndex++;
+ }
+
// verify end-of-record marker, which should be 0
// this is a must. If this verification fails, we have a parsing problem
if (ValueType.XS_INTEGER != _items[_resultIndex].getItemType()) {
@@ -382,6 +393,9 @@ private synchronized void initQuery() throws RequestException {
local_q += "declare option xdmp:output \"indent=no\";\n";
}
+ if (configuration.useHashModule())
+ local_q += "declare variable $HASH-MODULE as xs:string external;\n";
+
// prolog - some variables are per-input
local_q += "declare variable $MODULE-URI as xs:string external;\n";
@@ -478,6 +492,14 @@ private synchronized void initQuery() throws RequestException {
local_q += "(),\n";
}
+ if (configuration.useHashModule()) {
+ local_q += "if ($URI-" + i + " eq '') then ()\n"
+ + "else xdmp:invoke($HASH-MODULE, (xs:QName('URI'), $URI-" + i
+ + ")),\n";
+ } else {
+ local_q += "(),\n";
+ }
+
// end-of-record marker
local_q += "0\n";
}
View
60 src/java/com/marklogic/ps/xqsync/SessionWriter.java
@@ -27,6 +27,9 @@
import com.marklogic.ps.Session;
import com.marklogic.ps.Utilities;
+import com.marklogic.xcc.Request;
+import com.marklogic.xcc.ResultItem;
+import com.marklogic.xcc.ResultSequence;
import com.marklogic.xcc.Content;
import com.marklogic.xcc.ContentCreateOptions;
import com.marklogic.xcc.ContentFactory;
@@ -51,7 +54,10 @@
protected String forestNameArray[] = null;
- /**
+ protected int last_batch_size = -1;
+ protected String query = null;
+
+ /**
* @param _configuration
* @throws SyncException
*/
@@ -296,6 +302,36 @@ public int write(String[] _outputUri, byte[][] _contentBytes,
}
}
+ // verify hash value
+ if (configuration.useHashModule()) {
+ try {
+ String q = getQuery(_outputUri.length);
+ logger.fine("writer hash query = \n" + q);
+ Request req = session.newAdhocQuery(query);
+ for (int i = 0; i < _outputUri.length; i++)
+ req.setNewStringVariable("URI-" + i, _outputUri[i] == null ? "" : _outputUri[i]);
+ ResultSequence rs = session.submitRequest(req);
+ ResultItem items[] = rs.toResultItemArray();
+
+ for (int i = 0; i < _outputUri.length; i++) {
+ if (ignoreList[i])
+ continue;
+
+ String srcHash = _metadata[i].getHashValue();
+ String dstHash = items[i].asString();
+ if ((srcHash == null && dstHash != null) ||
+ !srcHash.equals(dstHash))
+ logger.warning("hash value mismatch, uri = " + _outputUri[i] +
+ ",src hash = " + srcHash +
+ ",dst hash = " + dstHash);
+ }
+ } catch (Exception e) {
+ logger.logException("hash comparison failed", e);
+ for (int i = 0; i < _outputUri.length; i++)
+ logger.warning("no hash comparison for uri=" + _outputUri[i]);
+ }
+ }
+
// compute total ingested bytes
if (retries >= 0) {
for (int i = 0; i < _outputUri.length; i++) {
@@ -346,4 +382,26 @@ private boolean matchesFilters(String _outputUri,
return false;
}
+ protected String getQuery(int uriCount) {
+ if (query == null || uriCount != last_batch_size) {
+ String local_q = "";
+ String m = configuration.getHashModule();
+
+ for (int i = 0; i < uriCount; i++)
+ local_q += "declare variable $URI-" + i + " external;\n";
+
+ local_q += "\n";
+
+ for (int i = 0; i < uriCount; i++) {
+ local_q += "xdmp:invoke(\"" + m + "\", (xs:QName(\"URI\"), $URI-" + i + "))\n";
+ if (i < uriCount - 1)
+ local_q += ",\n";
+ }
+
+ query = local_q;
+ last_batch_size = uriCount;
+ }
+ return query;
+ }
+
}
View
1  src/java/com/marklogic/ps/xqsync/TaskFactory.java
@@ -93,7 +93,6 @@ public TaskFactory(Configuration _config, Monitor _monitor)
*/
public Callable<TimedEvent[]> newTask(String[] _uris)
throws SyncException {
- logger.fine(_uris.length + ", " + _uris[0]);
return new CallableSync(this, _uris);
}
View
8 src/java/com/marklogic/ps/xqsync/UriQueue.java
@@ -152,15 +152,15 @@ public void run() {
}
if (0 == count) {
- logger.fine("took first uri: " + uri);
+ logger.finest("took first uri: " + uri);
}
- logger.fine(count + ": uri = " + uri);
+ logger.finest(count + ": uri = " + uri);
buffer[bufferIndex] = uri;
bufferIndex++;
if (buffer.length == bufferIndex) {
- logger.fine("submitting " + buffer.length);
+ logger.finest("submitting " + buffer.length);
completionService.submit(factory.newTask(buffer));
buffer = new String[buffer.length];
bufferIndex = 0;
@@ -187,7 +187,7 @@ public void run() {
System.exit(1);
}
- logger.fine("finished queuing " + count + " uris");
+ logger.finest("finished queuing " + count + " uris");
}
public synchronized void shutdown() {
View
2  src/java/com/marklogic/ps/xqsync/XQSync.java
@@ -33,7 +33,7 @@
*/
public class XQSync {
- public static String VERSION = "2012-02-20.0";
+ public static String VERSION = "2012-02-23.0";
private static String versionMessage = "version " + VERSION + " on "
+ System.getProperty("java.version") + " ("
View
19 src/java/com/marklogic/ps/xqsync/XQSyncDocumentMetadata.java
@@ -1,4 +1,5 @@
-/*
+/** -*- mode: java; indent-tabs-mode: nil; c-basic-offset: 4; -*-
+ *
* Copyright (c)2004-2009 Mark Logic Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -48,6 +49,8 @@
String properties = null;
+ protected String hashValue = null;
+
/**
* @param reader
*/
@@ -250,4 +253,18 @@ public void addCollections(String[] _collections) {
addCollection(_collections[i]);
}
}
+
+ /**
+ * Set the hash value for this document
+ */
+ public void setHashValue(String hashValue) {
+ this.hashValue = hashValue;
+ }
+
+ /**
+ * @return the hash value for this document
+ */
+ public String getHashValue() {
+ return hashValue;
+ }
}
View
6 src/java/com/marklogic/ps/xqsync/XQSyncManager.java
@@ -342,7 +342,7 @@ private long queueFromInputPackageFile(File _path)
InputPackage inputPackage = new InputPackage(_path
.getCanonicalPath(), configuration);
- // ensure that the package won't close while queuing
+ // ensure that the package won't close while euing
inputPackage.addReference();
logger.fine("listing package " + _path + " ("
+ inputPackage.size() + ")");
@@ -362,7 +362,7 @@ private long queueFromInputPackageFile(File _path)
while (iter.hasNext()) {
path = iter.next();
- logger.fine("queuing " + count + ": " + path);
+ logger.finest("queuing " + count + ": " + path);
inputPackage.addReference();
uriQueue.add(path);
count++;
@@ -494,7 +494,7 @@ private long queueFromInputConnection(boolean _useLexicon)
if (0 == count) {
logger.info("queuing first task: " + uri);
}
- logger.fine("queuing " + count + ": " + uri);
+ logger.finest("queuing " + count + ": " + uri);
uriQueue.add(uri);
count++;
}
View
14 src/xqy/computehash.xqy
@@ -0,0 +1,14 @@
+
+declare variable $URI external;
+
+(: must return 0 for empty or non-existence URIs :)
+
+if ($URI and fn:doc-available($URI)) then
+xdmp:md5(fn:concat(
+ xdmp:quote(fn:doc($URI)),
+ xdmp:quote(xdmp:document-properties($URI)),
+ for $i in xdmp:quote(xdmp:document-get-collections($URI))
+ order by $i
+ return $i
+))
+else 0
Please sign in to comment.
Something went wrong with that request. Please try again.