diff --git a/examples/single-step-ingest/README.md b/examples/single-step-ingest/README.md new file mode 100644 index 0000000000..cf0b8850c9 --- /dev/null +++ b/examples/single-step-ingest/README.md @@ -0,0 +1,74 @@ +# Example Single Step Ingest +This example shows how to insert a document into the STAGING database and harmonize the document ingested at the same time while calling a custom REST endpoint named **run-ingest-harmonize** + +This example uses some of the same data as the online-store example + +The sample data is located in the input/ folder +``` +|-- input + |-- products + |-- xxx.xml +``` + +# TLDR; How do I run it? +1. Download the [latest quick-start war](https://github.com/marklogic-community/marklogic-data-hub/releases/download/v2.0.3/quick-start-2.0.3.war) into this folder. + +1. Run the quick-start jar `java -jar quick-start-2.0.3.war` + +1. Open your web browser to [http://localhost:8080](http://localhost:8080). + +1. Browse to this folder from the login screen. + +1. Initialize the project (if necessary) + +1. Login with your MarkLogic credentials + +1. Install the Hub into MarkLogic (if necessary) + +# Loading and Ingesting the Products data via the REST API +To load the shirt.xml file from the input/products directory by calling the custom REST extension: + +``` +curl --anyauth --user user:password -X PUT \ + -T shirt.xml -i -H "Content-type: application/xml" \ + 'http://localhost:8010/v1/resources/run-ingest-harmonize?rs:uri=shirt.xml&rs:job-id=1234&rs:entity-name=Products&rs:ingest-flow-name=Load Products&rs:harmonize-flow-name=Harmonize Products' +``` + +The parameters are: +- **rs:uri** - the URI that the document in the STAGING database should be saved to +- **rs:job-id** - a job id. any string is legit +- **rs:entity-name** - the name of the entity the flow belongs to +- **rs:ingest-flow-name** - the name of the ingestion flow +- **rs:harmonize-flow-name** - the name of the harmonization flow + +The successful response of the CURL request is the following: +``` + + + + + + + + + 10 + 380140431212 + Shirt + promising title + A shirt for promising title + 10.0 + 1000174 + 182232002232 + + + + + + + true + false + + +``` + +The envelope that is generated from the ingestion flow is returned in the `envelope` element inside of the `ingestion` element. The `harmonization` element will contain a boolean value if the harmonization is successful or not and if there was an error found during the harmonization process. If there is an error thrown during the harmonization process, then the error string will be displayed. diff --git a/examples/single-step-ingest/input/products/shirt.xml b/examples/single-step-ingest/input/products/shirt.xml new file mode 100644 index 0000000000..b20fc7324a --- /dev/null +++ b/examples/single-step-ingest/input/products/shirt.xml @@ -0,0 +1,10 @@ + + 10 + 380140431212 + Shirt + promising title + A shirt for promising title + 10.0 + 1000174 + 182232002232 + \ No newline at end of file diff --git a/examples/single-step-ingest/input/products/sunglasses.xml b/examples/single-step-ingest/input/products/sunglasses.xml new file mode 100644 index 0000000000..950dc39802 --- /dev/null +++ b/examples/single-step-ingest/input/products/sunglasses.xml @@ -0,0 +1,10 @@ + + 9 + 372801441675 + Sunglasses + liable greenhouse + sunglasses for liable greenhouse + 15.0 + 1000194 + 105698185919 + \ No newline at end of file diff --git a/examples/single-step-ingest/plugins/entities/Products/Products.entity.json b/examples/single-step-ingest/plugins/entities/Products/Products.entity.json new file mode 100644 index 0000000000..db18ffc514 --- /dev/null +++ b/examples/single-step-ingest/plugins/entities/Products/Products.entity.json @@ -0,0 +1,14 @@ +{ + "info" : { + "title" : "Products", + "version" : "0.0.1" + }, + "definitions" : { + "Products" : { + "required" : [ ], + "rangeIndex" : [ ], + "wordLexicon" : [ ], + "properties" : { } + } + } +} \ No newline at end of file diff --git a/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/Harmonize Products.properties b/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/Harmonize Products.properties new file mode 100644 index 0000000000..d2480a9cba --- /dev/null +++ b/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/Harmonize Products.properties @@ -0,0 +1,6 @@ +mainModule=main.xqy +collectorCodeFormat=xqy +mainCodeFormat=xqy +codeFormat=xqy +collectorModule=collector.xqy +dataFormat=xml diff --git a/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/collector.xqy b/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/collector.xqy new file mode 100644 index 0000000000..bfe4fc2192 --- /dev/null +++ b/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/collector.xqy @@ -0,0 +1,20 @@ +xquery version "1.0-ml"; + +module namespace plugin = "http://marklogic.com/data-hub/plugins"; + +declare option xdmp:mapping "false"; + +(:~ + : Collect IDs plugin + : + : @param $options - a map containing options. Options are sent from Java + : + : @return - a sequence of ids or uris + :) +declare function plugin:collect( + $options as map:map) as xs:string* +{ + (: by default we return the URIs in the same collection as the Entity name :) + cts:uris((), (), cts:collection-query(map:get($options, "entity"))) +}; + diff --git a/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/content.xqy b/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/content.xqy new file mode 100644 index 0000000000..41b1bf21f2 --- /dev/null +++ b/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/content.xqy @@ -0,0 +1,48 @@ +xquery version "1.0-ml"; + +module namespace plugin = "http://marklogic.com/data-hub/plugins"; + +declare namespace es = "http://marklogic.com/entity-services"; + +declare option xdmp:mapping "false"; + +(:~ + : Create Content Plugin + : + : @param $id - the identifier returned by the collector + : @param $options - a map containing options. Options are sent from Java + : + : @return - your transformed content + :) +declare function plugin:create-content( + $id as xs:string, + $options as map:map) as item()? +{ + let $doc := fn:doc($id) + let $source := + if ($doc/es:envelope) then + $doc/es:envelope/es:instance/node() + else if ($doc/envelope/instance) then + $doc/envelope/instance + else + $doc + return plugin:extractInstanceProduct($source) +}; + +declare private function plugin:extractInstanceProduct( + $source as node()?) as item()? +{ + let $attachments := $source + + let $sku := xs:string($source/sku || $source/SKU) + let $title := xs:string($source/title) + let $price := xs:decimal($source/price) + + let $object := json:object() + let $_ := map:put($object, "$attachments", $attachments) + let $_ := map:put($object, "$type", "Product") + let $_ := map:put($object, "sku", $sku) + let $_ := map:put($object, "title", $title) + let $_ := map:put($object, "price", $price) + return $object +}; diff --git a/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/headers.xqy b/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/headers.xqy new file mode 100644 index 0000000000..a305d20f86 --- /dev/null +++ b/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/headers.xqy @@ -0,0 +1,24 @@ +xquery version "1.0-ml"; + +module namespace plugin = "http://marklogic.com/data-hub/plugins"; + +declare namespace es = "http://marklogic.com/entity-services"; + +declare option xdmp:mapping "false"; + +(:~ + : Create Headers Plugin + : + : @param $id - the identifier returned by the collector + : @param $content - the output of your content plugin + : @param $options - a map containing options. Options are sent from Java + : + : @return - zero or more header nodes + :) +declare function plugin:create-headers( + $id as xs:string, + $content as item()?, + $options as map:map) as node()* +{ + () +}; diff --git a/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/main.xqy b/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/main.xqy new file mode 100644 index 0000000000..32fb9b77df --- /dev/null +++ b/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/main.xqy @@ -0,0 +1,55 @@ +xquery version "1.0-ml"; + +(: Your plugin must be in this namespace for the DHF to recognize it:) +module namespace plugin = "http://marklogic.com/data-hub/plugins"; + +(: + : This module exposes helper functions to make your life easier + : See documentation at: + : https://github.com/marklogic/marklogic-data-hub/wiki/dhf-lib + :) +import module namespace dhf = "http://marklogic.com/dhf" + at "/com.marklogic.hub/dhf.xqy"; + +(: include modules to construct various parts of the envelope :) +import module namespace content = "http://marklogic.com/data-hub/plugins" at "content.xqy"; +import module namespace headers = "http://marklogic.com/data-hub/plugins" at "headers.xqy"; +import module namespace triples = "http://marklogic.com/data-hub/plugins" at "triples.xqy"; + +(: include the writer module which persists your envelope into MarkLogic :) +import module namespace writer = "http://marklogic.com/data-hub/plugins" at "writer.xqy"; + +declare option xdmp:mapping "false"; + +(:~ + : Plugin Entry point + : + : @param $id - the identifier returned by the collector + : @param $options - a map containing options. Options are sent from Java + : + :) +declare function plugin:main( + $id as xs:string, + $options as map:map) +{ + let $content-context := dhf:content-context() + let $content := dhf:run($content-context, function() { + content:create-content($id, $options) + }) + + let $header-context := dhf:headers-context($content) + let $headers := dhf:run($header-context, function() { + headers:create-headers($id, $content, $options) + }) + + let $triple-context := dhf:triples-context($content, $headers) + let $triples := dhf:run($triple-context, function() { + triples:create-triples($id, $content, $headers, $options) + }) + + let $envelope := dhf:make-envelope($content, $headers, $triples, map:get($options, "dataFormat")) + return + (: writers must be invoked this way. + https://marklogic-community.github.io/marklogic-data-hub/docs/server-side/#run-writer :) + dhf:run-writer(xdmp:function(xs:QName("writer:write")), $id, $envelope, $options) +}; diff --git a/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/triples.xqy b/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/triples.xqy new file mode 100644 index 0000000000..7de8ded3c6 --- /dev/null +++ b/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/triples.xqy @@ -0,0 +1,26 @@ +xquery version "1.0-ml"; + +module namespace plugin = "http://marklogic.com/data-hub/plugins"; + +declare namespace es = "http://marklogic.com/entity-services"; + +declare option xdmp:mapping "false"; + +(:~ + : Create Triples Plugin + : + : @param $id - the identifier returned by the collector + : @param $content - the output of your content plugin + : @param $headers - the output of your headers plugin + : @param $options - a map containing options. Options are sent from Java + : + : @return - zero or more triples + :) +declare function plugin:create-triples( + $id as xs:string, + $content as item()?, + $headers as item()*, + $options as map:map) as sem:triple* +{ + () +}; diff --git a/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/writer.xqy b/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/writer.xqy new file mode 100644 index 0000000000..a6d42d9f3d --- /dev/null +++ b/examples/single-step-ingest/plugins/entities/Products/harmonize/Harmonize Products/writer.xqy @@ -0,0 +1,22 @@ +xquery version "1.0-ml"; + +module namespace plugin = "http://marklogic.com/data-hub/plugins"; + +declare option xdmp:mapping "false"; + +(:~ + : Writer Plugin + : + : @param $id - the identifier returned by the collector + : @param $envelope - the final envelope + : @param $options - a map containing options. Options are sent from Java + : + : @return - nothing + :) +declare function plugin:write( + $id as xs:string, + $envelope as node(), + $options as map:map) as empty-sequence() +{ + xdmp:document-insert($id, $envelope, xdmp:default-permissions(), map:get($options, "entity")) +}; diff --git a/examples/single-step-ingest/plugins/entities/Products/input/Load Products/Load Products.properties b/examples/single-step-ingest/plugins/entities/Products/input/Load Products/Load Products.properties new file mode 100644 index 0000000000..27d29e28da --- /dev/null +++ b/examples/single-step-ingest/plugins/entities/Products/input/Load Products/Load Products.properties @@ -0,0 +1,4 @@ +mainModule=main.xqy +mainCodeFormat=xqy +codeFormat=xqy +dataFormat=xml diff --git a/examples/single-step-ingest/plugins/entities/Products/input/Load Products/content.xqy b/examples/single-step-ingest/plugins/entities/Products/input/Load Products/content.xqy new file mode 100644 index 0000000000..a63ed58fa1 --- /dev/null +++ b/examples/single-step-ingest/plugins/entities/Products/input/Load Products/content.xqy @@ -0,0 +1,22 @@ +xquery version "1.0-ml"; + +module namespace plugin = "http://marklogic.com/data-hub/plugins"; + +declare option xdmp:mapping "false"; + +(:~ + : Create Content Plugin + : + : @param $id - the identifier returned by the collector + : @param $raw-content - the raw content being loaded. + : @param $options - a map containing options. Options are sent from Java + : + : @return - your transformed content + :) +declare function plugin:create-content( + $id as xs:string, + $raw-content as node()?, + $options as map:map) as node()? +{ + $raw-content +}; diff --git a/examples/single-step-ingest/plugins/entities/Products/input/Load Products/headers.xqy b/examples/single-step-ingest/plugins/entities/Products/input/Load Products/headers.xqy new file mode 100644 index 0000000000..63ce4ff4da --- /dev/null +++ b/examples/single-step-ingest/plugins/entities/Products/input/Load Products/headers.xqy @@ -0,0 +1,24 @@ +xquery version "1.0-ml"; + +module namespace plugin = "http://marklogic.com/data-hub/plugins"; + +declare namespace envelope = "http://marklogic.com/data-hub/envelope"; + +declare option xdmp:mapping "false"; + +(:~ + : Create Headers Plugin + : + : @param $id - the identifier returned by the collector + : @param $content - the output of your content plugin + : @param $options - a map containing options. Options are sent from Java + : + : @return - zero or more header nodes + :) +declare function plugin:create-headers( + $id as xs:string, + $content as node()?, + $options as map:map) as node()* +{ + () +}; diff --git a/examples/single-step-ingest/plugins/entities/Products/input/Load Products/main.xqy b/examples/single-step-ingest/plugins/entities/Products/input/Load Products/main.xqy new file mode 100644 index 0000000000..b0afee017a --- /dev/null +++ b/examples/single-step-ingest/plugins/entities/Products/input/Load Products/main.xqy @@ -0,0 +1,56 @@ +xquery version "1.0-ml"; + +(: Your plugin must be in this namespace for the DHF to recognize it:) +module namespace plugin = "http://marklogic.com/data-hub/plugins"; + +(: + : This module exposes helper functions to make your life easier + : See documentation at: + : https://github.com/marklogic/marklogic-data-hub/wiki/dhf-lib + :) +import module namespace dhf = "http://marklogic.com/dhf" + at "/com.marklogic.hub/dhf.xqy"; + +(: include modules to construct various parts of the envelope :) +import module namespace content = "http://marklogic.com/data-hub/plugins" at "content.xqy"; +import module namespace headers = "http://marklogic.com/data-hub/plugins" at "headers.xqy"; +import module namespace triples = "http://marklogic.com/data-hub/plugins" at "triples.xqy"; + +declare option xdmp:mapping "false"; + +(:~ + : Plugin Entry point + : + : @param $id - the identifier returned by the collector + : @param $options - a map containing options. Options are sent from Java + : + :) +declare function plugin:main( + $id as xs:string, + $raw-content as node()?, + $options as map:map) +{ + let $content-context := dhf:content-context($raw-content) + let $content := dhf:run($content-context, function() { + content:create-content($id, $raw-content, $options) + }) + + let $header-context := dhf:headers-context($content) + let $headers := dhf:run($header-context, function() { + headers:create-headers($id, $content, $options) + }) + + let $triple-context := dhf:triples-context($content, $headers) + let $triples := dhf:run($triple-context, function() { + plugin:create-triples($id, $content, $headers, $options) + }) + + let $envelope := dhf:make-envelope($content, $headers, $triples, map:get($options, "dataFormat")) + (: + : log the final envelope as a trace + : only fires if tracing is enabled + :) + let $_ := dhf:log-trace(dhf:writer-context($envelope)) + return + $envelope +}; diff --git a/examples/single-step-ingest/plugins/entities/Products/input/Load Products/triples.xqy b/examples/single-step-ingest/plugins/entities/Products/input/Load Products/triples.xqy new file mode 100644 index 0000000000..6d3bc542b0 --- /dev/null +++ b/examples/single-step-ingest/plugins/entities/Products/input/Load Products/triples.xqy @@ -0,0 +1,26 @@ +xquery version "1.0-ml"; + +module namespace plugin = "http://marklogic.com/data-hub/plugins"; + +declare namespace envelope = "http://marklogic.com/data-hub/envelope"; + +declare option xdmp:mapping "false"; + +(:~ + : Create Triples Plugin + : + : @param $id - the identifier returned by the collector + : @param $content - the output of your content plugin + : @param $headers - the output of your headers plugin + : @param $options - a map containing options. Options are sent from Java + : + : @return - zero or more triples + :) +declare function plugin:create-triples( + $id as xs:string, + $content as node()?, + $headers as node()*, + $options as map:map) as sem:triple* +{ + () +}; diff --git a/examples/single-step-ingest/plugins/entities/Products/input/REST/services/run-ingest-harmonize.xqy b/examples/single-step-ingest/plugins/entities/Products/input/REST/services/run-ingest-harmonize.xqy new file mode 100644 index 0000000000..02200b60d4 --- /dev/null +++ b/examples/single-step-ingest/plugins/entities/Products/input/REST/services/run-ingest-harmonize.xqy @@ -0,0 +1,154 @@ +xquery version "1.0-ml"; + +module namespace runIngestHarmonize = "http://marklogic.com/rest-api/resource/run-ingest-harmonize"; + +import module namespace consts = "http://marklogic.com/data-hub/consts" + at "/com.marklogic.hub/lib/consts.xqy"; + +import module namespace flow = "http://marklogic.com/data-hub/flow-lib" + at "/com.marklogic.hub/lib/flow-lib.xqy"; + +import module namespace config = "http://marklogic.com/data-hub/config" + at "/com.marklogic.hub/lib/config.xqy"; + +declare namespace hub = "http://marklogic.com/data-hub"; +declare namespace rapi = "http://marklogic.com/rest-api"; +declare namespace error = "http://marklogic.com/xdmp/error"; + +declare %rapi:transaction-mode("update") function runIngestHarmonize:put( + $context as map:map, + $params as map:map, + $content as document-node() + ) as document-node() +{ + let $job-id := map:get($params, "job-id") + let $entity-name := map:get($params, 'entity-name') + let $ingest-flow-name := map:get($params, 'ingest-flow-name') + let $uri := map:get($params, 'uri') + let $flow := flow:get-flow($entity-name, $ingest-flow-name, $consts:INPUT_FLOW) + let $_ := + if ($flow) then () + else + fn:error(xs:QName("MISSING_FLOW"), "The specified flow " || $entity-name || ":" || $ingest-flow-name || " is missing.") + + (: configure the options :) + let $options as map:map := ( + map:get($params, "options") ! xdmp:unquote(.)/object-node(), + map:map() + )[1] + let $_ := flow:set-default-options($options, $flow) + + (: this can throw, but we want the REST API to know about the problems, so let it :) + let $envelope := runIngestHarmonize:run-flow($job-id, $flow, $uri, $content, $options) + + (: Insert the document into the STAGING database :) + let $permissions := (xdmp:default-permissions(), xdmp:permission("rest-reader", "read"), xdmp:permission("rest-writer", "update")) + let $collections := (xdmp:default-collections(), $entity-name, $ingest-flow-name, $consts:INPUT_FLOW) + let $insertOptions := + + {$permissions} + { + for $collection in $collections + return {$collection} + } + + let $_ := runIngestHarmonize:insert-document($uri, $envelope, $insertOptions) + + (: build all of the harmonization information :) + let $harmonize-flow-name := map:get($params, 'harmonize-flow-name') + let $flow := flow:get-flow($entity-name, $harmonize-flow-name, $consts:HARMONIZE_FLOW) + let $_ := + if ($flow) then () + else + fn:error(xs:QName("MISSING_FLOW"), "The specified flow " || $entity-name || ":" || $harmonize-flow-name || " is missing.") + + let $target-database := + if (fn:exists(map:get($params, "target-database"))) then + xdmp:database(map:get($params, "target-database")) + else + xdmp:database($config:FINAL-DATABASE) + (: add the target database to the harmonization options :) + let $_ := ( + flow:set-default-options($options, $flow), + map:put($options, "target-database", $target-database) + ) + let $error := + try { + runIngestHarmonize:run-flow($job-id, $flow, $uri, (), $options) + } + catch($ex) { + xdmp:log(("caught error in run-ingest-harmonize.xqy")), + xdmp:log($ex/error:format-string), + $ex/error:format-string + } + + return + document { + element response { + element ingestion { + $envelope + }, + element harmonization { + if (fn:empty($error)) then ( + element harmonizationSuccessful {fn:true()}, + element errorFound {fn:false()} + ) + else ( + element harmonizationSuccessful {fn:false()}, + element errorFound {fn:true()}, + element errorMessage {$error} + ) + } + } + } +}; + +declare private function runIngestHarmonize:run-flow( + $job-id as xs:string, + $flow as element(hub:flow), + $uri as xs:string, + $content as node()?, + $options as map:map) +{ + (: The PUT command runs in update mode, so we need to eval in query mode :) + xdmp:eval(' + import module namespace flow = "http://marklogic.com/data-hub/flow-lib" + at "/com.marklogic.hub/lib/flow-lib.xqy"; + + declare variable $job-id external; + declare variable $flow external; + declare variable $uri external; + declare variable $content external; + declare variable $options external; + + flow:run-flow($job-id, $flow, $uri, $content, $options) + ', + map:new(( + map:entry("job-id", $job-id), + map:entry("flow", $flow), + map:entry("uri", $uri), + map:entry("content", $content), + map:entry("options", $options) + ))) +}; + +declare private function runIngestHarmonize:insert-document( + $uri as xs:string, + $root as node(), + $options as element()?) as empty-sequence() +{ + (: We need to evaluate the insert in a separate transaction so that it is visible to the harmonization flow :) + xdmp:eval(' + declare variable $uri external; + declare variable $root external; + declare variable $options external; + declare variable $collections external; + + xdmp:document-insert($uri, $root, $options) + ', + map:new(( + map:entry("uri", $uri), + map:entry("root", $root), + map:entry("options", $options) + ))) +};