# EarthCube GeoCODES FAIR Digital Object Server Playground

## About

Ted Habberman presented to the CDF on ISO metadata analysis.  I have been exploring some approaches to accessing the FAIR Digital Object server that is part of the standard setup supporting GeoCODES and other groups.  Leveraging Python Dask and S3 Boto it is easy to access these object stores in a highly performant manner.  

In addition to DASK there is S3SELECT as well as SPARQL calls on objects and triplestores.  


## Why Dask
 
![Example of Dask](../../docs/images/mydask10.png)


## Notes

Curious above local context file reading when doing lots of calls.  PyLD seems to know to do caching but not sure how it could in a Dask based pattern.

May need to explore approach to read local files for context like

```
from pathlib import Path
ctx = Path('jsonldcontext.json').read_text()
```

## References

* https://www.project-freya.eu/en/about/mission
* https://docs.dask.org/en/latest/
* https://examples.dask.org/bag.html
* https://s3fs.readthedocs.io/en/latest/
* https://docs.dask.org/en/latest/remote-data-services.html
* https://tutorial.dask.org/01_dask.delayed.html
* https://examples.dask.org/applications/embarrassingly-parallel.html

## Imports


In [1]:
import dask, boto3
import dask.dataframe as dd
import pandas as pd
import json
from pyld import jsonld
import s3fs 
from pyld import jsonld
import numpy as np
from dask import delayed
from fastparquet import ParquetFile

import kglab
from rdflib import Graph, plugin
from rdflib.serializer import Serializer

  from . import DatatypeHandling, Closure
  from owlrl.AxiomaticTriples import RDFS_Axiomatic_Triples, RDFS_D_Axiomatic_Triples
  from owlrl.AxiomaticTriples import RDFS_Axiomatic_Triples, RDFS_D_Axiomatic_Triples
  from owlrl.AxiomaticTriples import RDFS_Axiomatic_Triples, RDFS_D_Axiomatic_Triples
  from owlrl.AxiomaticTriples import RDFS_Axiomatic_Triples, RDFS_D_Axiomatic_Triples
  from owlrl.AxiomaticTriples import RDFS_Axiomatic_Triples, RDFS_D_Axiomatic_Triples
  from owlrl.AxiomaticTriples import RDFS_Axiomatic_Triples, RDFS_D_Axiomatic_Triples
  from owlrl.AxiomaticTriples import RDFS_Axiomatic_Triples, RDFS_D_Axiomatic_Triples
  from owlrl.AxiomaticTriples import RDFS_Axiomatic_Triples, RDFS_D_Axiomatic_Triples
  from owlrl.AxiomaticTriples import RDFS_Axiomatic_Triples, RDFS_D_Axiomatic_Triples
  from .XsdDatatypes import OWL_RL_Datatypes, OWL_Datatype_Subsumptions
  from .RestrictedDatatype import extract_faceted_datatypes


### Set PD frame options

In [2]:
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)
pd.set_option('display.width', None)
pd.set_option('display.max_colwidth', None)

### Set S3 File System

In [3]:
oss = s3fs.S3FileSystem(
      anon=True,
      client_kwargs = {"endpoint_url":"https://oss.geodex.org"}
   )

In [4]:
# [optional] List the directories we can work with later
# oss.ls('gleaner/summoned')

### Set Shape graph

In [5]:
sg = './shapes/eco_general1.ttl'

### Definitions

Define some functions to use

In [6]:
# some color coding for pandas for later
def change_color_group(x):
    df = x.copy()
    df.loc[df['severity'] == "shacl:Violation", :] = 'background-color: #F89782'
    df.loc[df['severity'] == "shacl:Warning", :] = 'background-color: #F0F480'
    df.loc[df['severity'] == "shacl:Info", :] = 'background-color: #CBFBD2'
    return df

In [7]:
# strip all elements of a list, used in processing keyword lists
def striplist(l):
    return([x.strip() for x in l])

In [8]:
# Simple read and return entire JSON-LD object task
# Not as efficient as incorporating the processing into the Dask pattern
@dask.delayed()
def read_a_file(fn):
    # or preferably open in text mode and json.load from the file
    with oss.open(fn, 'rb') as f:
        #return json.loads(f.read().replace('\n',' '))
        return json.loads(f.read().decode("utf-8", "ignore").replace('\n',' '))

In [9]:
# Simple JSON-LD framing inside Dask function
@dask.delayed()
def read_and_frame(fn):
    # or preferably open in text mode and json.load from the file
    citationframe = {"@context":{"@vocab": "https://schema.org/"}, "@type": "Dataset", "@explicit": "true", "citation": {}};
#   citationframe = {"@context":{"@vocab": ctx}, "@type": "Dataset", "@explicit": "true", "citation": {}};
    with oss.open(fn, 'rb') as f:
        #return json.loads(f.read().replace('\n',' '))
        jld = json.loads(f.read().decode("utf-8", "ignore").replace('\n',' '))
        framed = jsonld.frame(jld, citationframe)
        return framed['citation']


In [10]:
# Read parquet
@dask.delayed
def load_chunk(fn):
    with oss.open(fn, 'rb') as f:
        return ParquetFile(f).to_pandas()

In [11]:
# SHACL function
@dask.delayed()
def read_and_validate(fn):
    # make a kg to hold our data graph
    namespaces = {
        "schema":  "https://schema.org/",
        "shacl":   "http://www.w3.org/ns/shacl#" ,
    }

    kg = kglab.KnowledgeGraph(
        name = "Schema.org based datagraph",
        base_uri = "https://example.org/id/",
        namespaces = namespaces,
    )

    with oss.open(fn, 'rb') as f:
        jld = json.loads(f.read().decode("utf-8", "ignore").replace('\n',' '))
        jlds = json.dumps(jld)

        g = Graph().parse(data=jlds, format='json-ld')
        context = {
            "@vocab": "https://schema.org/",
        }

        ttl = g.serialize(format='ttl', context=context, indent=4)
        kg.load_rdf_text(ttl, format="ttl", base=None)

        conforms, report_graph, report_text = kg.validate(
            shacl_graph=sg,
            shacl_graph_format="ttl"
        )

        return report_graph.save_rdf_text()  # return graph object or string of graph

## Get files


In [12]:
filenames = oss.ls('gleaner/summoned/opentopo')
output = [read_a_file(f) for f in filenames]
# output = [read_and_frame(f) for f in filenames]

## Parquet testing


In [13]:
# ddf = dd.from_delayed([load_chunk(f) for f in filenames])
# ddf.groupby(['col_A', 'col_B']).value.sum().compute()

## Main set

#### Test simple single validation

In [15]:
%%capture cap --no-stderr  --no-stdout

fns = oss.ls('gleaner/summoned/opentopo')
o = [read_and_validate(f) for f in fns]
j = o[1].compute()
print(j)

Usage of abort_on_error is deprecated. Use abort_on_first instead.


@prefix oihval: <https://oceans.collaborium.io/voc/validation/1.0.1/shacl#> .
@prefix schema: <https://schema.org/> .
@prefix shacl: <http://www.w3.org/ns/shacl#> .
@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .

[] a shacl:ValidationReport ;
    shacl:conforms false ;
    shacl:result [ a shacl:ValidationResult ;
            shacl:focusNode <https://portal.opentopography.org/raster?opentopoID=OTSDEM.052020.6341.1> ;
            shacl:resultMessage "A provider must be noted"@en ;
            shacl:resultPath schema:provider ;
            shacl:resultSeverity shacl:Violation ;
            shacl:sourceConstraintComponent shacl:MinCountConstraintComponent ;
            shacl:sourceShape oihval:identifierProviderProperty ] .




#### Test simple single frame

In [61]:
# Example of setting up and processing a single object through a function for testing.
fns = oss.ls('gleaner/summoned/opentopo')
o = [read_and_frame(f) for f in fns]
j = o[1].compute()
print(j)

Speed, C. (2020). Interpreting Fluvial Processes from Channel-Belt Deposits, Utah 2018. National Center for Airborne Laser Mapping (NCALM). Distributed by OpenTopography. https://doi.org/10.5069/G9J964J3. Accessed: 2021-05-20


In [85]:
%%time 
# An example of a known poor way to do this.   Using the pull the file function and
# then processing each file object in the loop.  Done to get something like a liner baseline 

ukw = []  # Unique keyword list

for doc in range(len(output)):  #range([10 | output]):
  try:
    jld = output[doc].compute()
  except:
    print("Doc has bad encoding")

    
  kws = jld["keywords"].split(sep=",") 
  kws = striplist(kws)
  for i in kws:
    if i not in ukw:  # if unique
        ukw.append(i)

CPU times: user 32.8 s, sys: 2.95 s, total: 35.7 s
Wall time: 1min 49s


In [12]:
# Human needs to see it to believe it
print(len(ukw))
# print(ukw)  # this is kinda big..  I don't usually print it

1004


In [16]:
from dask.distributed import Client, progress
client = Client(threads_per_worker=5, n_workers=4)
client

0,1
Connection method: Cluster object,Cluster type: LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Status: running,Using processes: True
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads:  20,Total memory:  31.17 GiB

0,1
Comm: tcp://127.0.0.1:42447,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads:  20
Started:  Just now,Total memory:  31.17 GiB

0,1
Comm: tcp://127.0.0.1:39275,Total threads: 5
Dashboard: http://127.0.0.1:40855/status,Memory: 7.79 GiB
Nanny: tcp://127.0.0.1:40357,
Local directory: /home/fils/Containers/dvols/jupyter/work/ECO/dask-worker-space/worker-bkxw_dm4,Local directory: /home/fils/Containers/dvols/jupyter/work/ECO/dask-worker-space/worker-bkxw_dm4
GPU: NVIDIA GeForce GTX 1050 Ti,GPU memory: 3.94 GiB

0,1
Comm: tcp://127.0.0.1:43823,Total threads: 5
Dashboard: http://127.0.0.1:46655/status,Memory: 7.79 GiB
Nanny: tcp://127.0.0.1:36597,
Local directory: /home/fils/Containers/dvols/jupyter/work/ECO/dask-worker-space/worker-eypge45x,Local directory: /home/fils/Containers/dvols/jupyter/work/ECO/dask-worker-space/worker-eypge45x
GPU: NVIDIA GeForce GTX 1050 Ti,GPU memory: 3.94 GiB

0,1
Comm: tcp://127.0.0.1:46331,Total threads: 5
Dashboard: http://127.0.0.1:38687/status,Memory: 7.79 GiB
Nanny: tcp://127.0.0.1:33335,
Local directory: /home/fils/Containers/dvols/jupyter/work/ECO/dask-worker-space/worker-_h659rqc,Local directory: /home/fils/Containers/dvols/jupyter/work/ECO/dask-worker-space/worker-_h659rqc
GPU: NVIDIA GeForce GTX 1050 Ti,GPU memory: 3.94 GiB

0,1
Comm: tcp://127.0.0.1:33177,Total threads: 5
Dashboard: http://127.0.0.1:42299/status,Memory: 7.79 GiB
Nanny: tcp://127.0.0.1:33291,
Local directory: /home/fils/Containers/dvols/jupyter/work/ECO/dask-worker-space/worker-tifd01f6,Local directory: /home/fils/Containers/dvols/jupyter/work/ECO/dask-worker-space/worker-tifd01f6
GPU: NVIDIA GeForce GTX 1050 Ti,GPU memory: 3.94 GiB


In [17]:
%%time
%%capture cap --no-stderr  --no-stdout

fns = oss.ls('gleaner/summoned/opentopo')
o = [read_and_validate(f) for f in fns]

ca = []
for doc in range(len(o)):
  ca.append(o[doc])

results = dask.compute(*ca)  ## Wait..  can I just dask.compute(o)  ????


  from . import DatatypeHandling, Closure
  from owlrl.AxiomaticTriples import RDFS_Axiomatic_Triples, RDFS_D_Axiomatic_Triples
  from owlrl.AxiomaticTriples import RDFS_Axiomatic_Triples, RDFS_D_Axiomatic_Triples
  from owlrl.AxiomaticTriples import RDFS_Axiomatic_Triples, RDFS_D_Axiomatic_Triples
  from owlrl.AxiomaticTriples import RDFS_Axiomatic_Triples, RDFS_D_Axiomatic_Triples
  from owlrl.AxiomaticTriples import RDFS_Axiomatic_Triples, RDFS_D_Axiomatic_Triples
  from owlrl.AxiomaticTriples import RDFS_Axiomatic_Triples, RDFS_D_Axiomatic_Triples
  from owlrl.AxiomaticTriples import RDFS_Axiomatic_Triples, RDFS_D_Axiomatic_Triples
  from owlrl.AxiomaticTriples import RDFS_Axiomatic_Triples, RDFS_D_Axiomatic_Triples
  from owlrl.AxiomaticTriples import RDFS_Axiomatic_Triples, RDFS_D_Axiomatic_Triples
  from .XsdDatatypes import OWL_RL_Datatypes, OWL_Datatype_Subsumptions
  from .RestrictedDatatype import extract_faceted_datatypes
Usage of abort_on_error is deprecated. Use abort_on_

CPU times: user 709 ms, sys: 131 ms, total: 840 ms
Wall time: 7.99 s


Usage of abort_on_error is deprecated. Use abort_on_first instead.
Usage of abort_on_error is deprecated. Use abort_on_first instead.
Usage of abort_on_error is deprecated. Use abort_on_first instead.
Usage of abort_on_error is deprecated. Use abort_on_first instead.
Usage of abort_on_error is deprecated. Use abort_on_first instead.
Usage of abort_on_error is deprecated. Use abort_on_first instead.
Usage of abort_on_error is deprecated. Use abort_on_first instead.
Usage of abort_on_error is deprecated. Use abort_on_first instead.
Usage of abort_on_error is deprecated. Use abort_on_first instead.
Usage of abort_on_error is deprecated. Use abort_on_first instead.
Usage of abort_on_error is deprecated. Use abort_on_first instead.
Usage of abort_on_error is deprecated. Use abort_on_first instead.
Usage of abort_on_error is deprecated. Use abort_on_first instead.


### Loop on results and load to graph

In [18]:
rnamespaces = {
    "schema":  "https://schema.org/",
    "shacl":   "http://www.w3.org/ns/shacl#" ,
}

rkg = kglab.KnowledgeGraph(
    name = "Schema.org shacl eval datagraph",
    base_uri = "https://example.org/id/",
    namespaces = rnamespaces,
)

for r in results:
    rkg.load_rdf_text(data=r, format="ttl")

In [19]:
sparql = """
SELECT ?path ?value ?constraint ?severity ?message ?id ?focus
  WHERE {
    ?id rdf:type shacl:ValidationResult .
    ?id shacl:focusNode ?focus .
    ?id shacl:resultMessage ?message .
    ?id shacl:resultSeverity ?severity .
    ?id shacl:sourceConstraintComponent ?constraint .
    OPTIONAL {
        ?id shacl:resultPath ?path .
    }
    OPTIONAL {
        ?id shacl:value ?value .
    }
  }
"""

df = rkg.query_as_df(sparql)

In [28]:
pdf = df.to_pandas()
# pdf.style.apply(change_color_group, axis=None)

In [29]:
pdf.info(10)


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 666 entries, 0 to 665
Data columns (total 6 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   path        666 non-null    object
 1   constraint  666 non-null    object
 2   severity    666 non-null    object
 3   message     666 non-null    object
 4   id          666 non-null    object
 5   focus       666 non-null    object
dtypes: object(6)
memory usage: 31.3+ KB


In [30]:
pdf.head(10)

Unnamed: 0,path,constraint,severity,message,id,focus
0,schema1:provider,shacl:MinCountConstraintComponent,shacl:Violation,A provider must be noted,_:n8ddfb9d60a084cfbb685b592e245962cb2,<https://portal.opentopography.org/dataspace/dataset?opentopoID=OTDS.122019.4326.2>
1,schema1:provider,shacl:MinCountConstraintComponent,shacl:Violation,A provider must be noted,_:n9ed47cbc53ba46418375658ef243a6d9b2,<https://portal.opentopography.org/raster?opentopoID=OTSDEM.052020.6341.1>
2,schema1:provider,shacl:MinCountConstraintComponent,shacl:Violation,A provider must be noted,_:nd869583ab9dc48dcbcac2c5cf9256097b2,<https://portal.opentopography.org/dataspace/dataset?opentopoID=OTDS.062019.32736.1>
3,schema1:provider,shacl:MinCountConstraintComponent,shacl:Violation,A provider must be noted,_:na4f539848c384a9fac470ec08b7a3a2eb2,<https://portal.opentopography.org/lidarDataset?opentopoID=OTLAS.032012.26916.1>
4,schema1:provider,shacl:MinCountConstraintComponent,shacl:Violation,A provider must be noted,_:n68cddfc79e734d7d8ff938dc61edadfdb2,<https://portal.opentopography.org/dataspace/dataset?opentopoID=OTDS.062020.32611.1>
5,schema1:provider,shacl:MinCountConstraintComponent,shacl:Violation,A provider must be noted,_:n3f74849dbf2f4d69b40f77e25c197472b2,<https://portal.opentopography.org/raster?opentopoID=OTSDEM.032011.26910.1>
6,schema1:provider,shacl:MinCountConstraintComponent,shacl:Violation,A provider must be noted,_:n5da8a3410ab84c7ab1aa37d1555e1209b2,<https://portal.opentopography.org/lidarDataset?opentopoID=OTLAS.072018.6635.1>
7,schema1:provider,shacl:MinCountConstraintComponent,shacl:Violation,A provider must be noted,_:n6062192201864e5aa871d7ea39728fffb2,<https://portal.opentopography.org/raster?opentopoID=OTSDEM.082016.26913.3>
8,schema1:provider,shacl:MinCountConstraintComponent,shacl:Violation,A provider must be noted,_:n1d955306361347beb19ae8d9b2e52ed5b2,<https://portal.opentopography.org/raster?opentopoID=OTSDEM.122019.2229.1>
9,schema1:provider,shacl:MinCountConstraintComponent,shacl:Violation,A provider must be noted,_:nc8e709a363e34ecf9e89529792dfa84fb2,<https://portal.opentopography.org/lidarDataset?opentopoID=OTLAS.032017.26911.1>


In [31]:
pdf["severity"].value_counts()

shacl:Violation    666
Name: severity, dtype: int64

In [32]:
pdf["message"].value_counts()

A provider must be noted    666
Name: message, dtype: int64

In [33]:
pdf["focus"].value_counts()

<https://portal.opentopography.org/lidarDataset?opentopoID=OTLAS.052012.26917.1>        1
<https://portal.opentopography.org/lidarDataset?opentopoID=OTLAS.082012.26911.2>        1
<https://portal.opentopography.org/lidarDataset?opentopoID=OTLAS.112011.26911.3>        1
<https://portal.opentopography.org/raster?opentopoID=OTSDEM.042019.6342.1>              1
<https://portal.opentopography.org/lidarDataset?opentopoID=OTLAS.082012.26911.1>        1
<https://portal.opentopography.org/raster?opentopoID=OTSDEM.092020.26912.1>             1
<https://portal.opentopography.org/raster?opentopoID=OTSDEM.072016.32759.1>             1
<https://portal.opentopography.org/raster?opentopoID=OTSDEM.062012.26911.1>             1
<https://portal.opentopography.org/lidarDataset?opentopoID=OTLAS.022020.2193.1>         1
<https://portal.opentopography.org/raster?opentopoID=OTSDEM.072016.26917.1>             1
<https://portal.opentopography.org/lidarDataset?opentopoID=OTLAS.102016.26916.1>        1
<https://p