# Data Dispatcher Demo

* Create project
* Project parametrization
* Copying metadata
* Run worker
* Run multiple workers
* Restart project

# Demo Use Case

- Verify checksums and sizes for a set of files seleted by an MQL query
- Option to choose adler32 or crc32 checksum
  - to demo project parametrization

# Tools

* Checksum calculator
* JSON field extrator


# Tools: checksum calculator

Usage:
```shell
$ python checksum.py (crc32|adler32) <file>
```

Python script:
```python
import zlib, sys

typ, path = sys.argv[1:]
h = zlib.adler32 if typ == "adler32" else zlib.crc32

with open(path, "rb") as f:
    c = h(b"")
    data = f.read(8*1024)
    while data:
        c = h(data, c)
        data = f.read(8*1024)
print("%x" % (c & 0xffffffff,))
```

# Tools: JSON field extractor

Usage:

```shell
$ python json_extract.py <file.json> <path/to/field>
```



# Example

info.json:
```json
{
    "run_type": "demo",
    "events": [11,13,15],
    "params" : {
        "debug": true
        "size:" 1273
    }
}
```
```shell
$ python json_extract.py info.json run_type
demo
$ python json_extract.py info.json events/0
11
$ python json_extract.py info.json params
{
  "debug": true
  "size:" 1273
}
$ python json_extract.py info.json params/size
1273
```


# Creating Project

```shell
$ dd project create \
    -A "checksum_type=<the type>" \ # use project attributes to pass job parameters
    -c size,checksums             \ # copy input file size and checksums 
                                  \ # from MetaCat into file attributes
    <MQL query>                                          
```

Project attributes and file attributes become available to the worker when it gets the next file

# create_project.sh

```shell
#!/bin/bash

# Usage: create_project.sh <checksum type> <MQL query>

checksum_type=$1
shift

project_id=`dd project create -A checksum_type=$checksum_type -c size,checksums $@`
echo Project created: $project_id
```

# What the worker receives

```shell
$ dd worker next -j <project id>
```
```json
{
  "project_attributes": { "checksum_type": "adler32" },  
  "attributes": {             
    "checksums": { "adler32": "270725c4" },
    "size": 1332462751
  },
  "name": "file.root",
  "namespace": "dc4-hd-protodune",
  "replicas": [
    {
      "rse": "DUNE_CERN_EOS",
      "url": "root://eospublic.cern.ch//eos/experiment/neutplatform/protodune/dune/dc4-hd-protodune/dc/86/file.root",
      "path": "/eos/experiment/neutplatform/protodune/dune/dc4-hd-protodune/dc/86/file.root",
      "preference": 1,
      ...
    }
  ],
  ...
}
```

# Worker algorithm


* while the project is active (not all files are *done* or *failed permanently*)
    1. get next file from DD
       * wait for a file to become available
           * copied/staged into a known RSE
           * another worker failed non-permanently
    1. download the file using the URL received from DD
    1. calculate the checksum of the requested type -- *use checksum.py*
    1. compare calculated checksum and file size to the file metadata received from DD
    1. print results





# Worker script

```shell
#!/bin/bash

# Usage: run_project.sh <project_id>

if [ $1 == "" ]; then
        echo Usage: run_project.sh \<project_id\>
        exit 2
fi

project_id=$1

cert=${HOME}/certs/ivm@fnal.gov_cert.pem
key=${HOME}/certs/ivm@fnal.gov_key.pem

my_id=`dd worker id checksums_$$`
echo My worker id: $my_id
info_file=/tmp/${my_id}.json
tmpfile=/tmp/${my_id}.data

done="false"
while [ $done == "false" ]; do
	dd worker next -w $my_id -j $project_id > $info_file
        if [ "$?" != "0" ]; then
            # likely the project is done
            done="true"
            cat $info_file
            rm -f $info_file
        else
            url=`python json_extract.py $info_file replicas/0/url`
            namespace=`python json_extract.py $info_file namespace`
            name=`python json_extract.py $info_file name`
            did=${namespace}:${name}
            
            # checksum type from project attributes
            checksum_type=`python json_extract.py $info_file project_attributes/checksum_type`

            # size and checksum from MetaCat via file attributes
            meta_checksum=`python json_extract.py $info_file attributes/checksums/$checksum_type`
            meta_size=`python json_extract.py $info_file attributes/size`

            echo
            echo ------ $did ...

            # download the replica using the URL from the DD
            case $url in
                root\:*|xroot:*)
                    xrdcp --force $url $tmpfile
                    ;;
                http\:*)
                    curl -L -o $tmpfile "$url"
                    ;;
                https\:*)
                    curl -L -k --cert $cert --key $key -o $tmpfile "$url"
                    ;;
                *)
                    echo Unknown URL schema: $url
                    exit 1
                    ;;
            esac
            
            # calculate the checksum and stat the file size
            checksum=`python checksum.py $checksum_type $tmpfile`
            size=`stat -c %s $tmpfile`
            
            # compare and print results
            ok=ok
            if [ "$size" != $meta_size ]; then
                echo File size mismatch for $did: metadata: $meta_size, downloaded: $size
                ok=""
            fi
            if [ "$checksum" != $meta_checksum ]; then
                echo Checksum mismatch for $did: metadata: $meta_checksum, downloaded: $checksum
                ok=""
            fi
            if [ "$ok" == "ok" ]; then
                echo $did:  OK: size=$size $checksum_type=$checksum
            fi
            rm -f $tmpfile
            dd worker done $project_id $did
            echo
    	fi
done
```





# DEMO