# Preparing and uploading data to Fair Data Point with SeMPyRO

**Prerequirements:** To execute this notebook in full one needs to have a running FAIR Data Point (FDP) instance with an active write access account.

Let us consider uploading datasets from example_data.csv to FDP. 
FDP requires each dataset to be a part of a catalogue, therefore we need to create a catalogue. Let's see what we need to provide for that:

In [None]:
from sempyro.dcat import DCATCatalog

catalog_fields = DCATCatalog.annotate_model()
print(catalog_fields.mandatory_fields())

Let's create a minimum catalogue with an example title and description. We also need a URI to use as a graph subject at serialization. Let's use `example.com` domain for now for this purpose:

In [None]:
from sempyro import LiteralField
from rdflib import URIRef

catalog_subject = URIRef("http://example.com/test_catalog_1")

catalog = DCATCatalog(title=[LiteralField(value="Test catalog", language="en")],
                      description=[LiteralField(value="Catalog for test example datasets", language="en")])
catalog_record = catalog.to_graph(catalog_subject)
print(catalog_record.serialize())

Now we need to define an FDP API client which can log into an FDP, get a token and perform basic calls.


In [None]:
import logging
import requests
from typing import Dict, Union
from requests import Response
from urllib.parse import urljoin
import urllib3
import sys

logger = logging.getLogger(__name__)

class FDPClient:

    def __init__(self, base_url: str, username: str, password: str):
        self.base_url = base_url
        self.username = username
        self.password = password
        self.token = self.login_fdp()
        self.headers = self.get_headers()
        self.session = requests.session()
        self.session.headers.update(self.headers)
        self.ssl_verification = False
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
        

    def login_fdp(self) -> str:
        token_response = requests.post(
            f"{self.base_url}/tokens",
            json={"email": self.username, "password": self.password},
        )
        token_response.raise_for_status()
        response = token_response.json()
        return response["token"]

    def get_headers(self):
        return {"Authorization": f"Bearer {self.token}", "Content-Type": "text/turtle"}


    def _call_method(self, method, path, params: Dict = None, data=None):
        if method.upper() not in ["GET", "POST", "PUT", "DELETE"]:
            raise ValueError(f"Unsupported method {method}")
        url = urljoin(self.base_url, path)
        response = None
        try:
            response = self.session.request(
                method, url, params=params, data=data, verify=self.ssl_verification
            )
            response.raise_for_status()
            return response
        except requests.exceptions.HTTPError as e:
            logger.error(e)
            if response is not None:
                logger.error(response.text)
            sys.exit(1)
        except (requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.RequestException) as e:
            logger.error(e)
            sys.exit(1)

    def get(self, path: str, params: Dict = None) -> Response:
        return self._call_method("GET", path, params=params)

    def post(self, path: str, params: Dict = None, data=None) -> Response:
        return self._call_method("POST", path, params=params, data=data)

    def update(self, path: str, params: Dict = None, data=None) -> Response:
        return self._call_method("PUT", path, params=params, data=data)

    def delete(self, path: str, params: Dict = None, data=None) -> Response:
        return self._call_method("DELETE", path, params=params, data=data)

Publishing a record in FDP consists of two steps: creating a record and publishing. These two actions are performed as API calls with different content types, so we need to implement methods for changing content type, creating a record and publishing the record. After that, the client looks like this:

In [None]:
from rdflib import Graph


class FDPEndPoints:
    meta = "meta"
    state = f"{meta}/state"
    members = "members"
    expanded = "expanded"


class FDPClient:

    def __init__(self, base_url: str, username: str, password: str):
        self.base_url = base_url
        self.username = username
        self.password = password
        self.token = self.login_fdp()
        self.headers = self.get_headers()
        self.session = requests.session()
        self.session.headers.update(self.headers)
        self.ssl_verification = False
        urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
        

    def login_fdp(self) -> str:
        token_response = requests.post(
            f"{self.base_url}/tokens",
            json={"email": self.username, "password": self.password},
        )
        token_response.raise_for_status()
        response = token_response.json()
        return response["token"]

    def get_headers(self):
        return {"Authorization": f"Bearer {self.token}", "Content-Type": "text/turtle"}


    def _call_method(self, method, path, params: Dict = None, data=None):
        if method.upper() not in ["GET", "POST", "PUT", "DELETE"]:
            raise ValueError(f"Unsupported method {method}")
        url = urljoin(self.base_url, path)
        response = None
        try:
            response = self.session.request(
                method, url, params=params, data=data, verify=self.ssl_verification
            )
            response.raise_for_status()
            return response
        except requests.exceptions.HTTPError as e:
            logger.error(e)
            if response is not None:
                logger.error(response.text)
            sys.exit(1)
        except (requests.exceptions.ConnectionError, requests.exceptions.Timeout, requests.exceptions.RequestException) as e:
            logger.error(e)
            sys.exit(1)

    def get(self, path: str, params: Dict = None) -> Response:
        return self._call_method("GET", path, params=params)

    def post(self, path: str, params: Dict = None, data=None) -> Response:
        return self._call_method("POST", path, params=params, data=data)

    def update(self, path: str, params: Dict = None, data=None) -> Response:
        return self._call_method("PUT", path, params=params, data=data)

    def delete(self, path: str, params: Dict = None, data=None) -> Response:
        return self._call_method("DELETE", path, params=params, data=data)

    def _update_session_headers(self):
        self.session.headers.update(self.headers)

    def _change_content_type(self, content_type):
        self.headers["Content-Type"] = content_type
        self._update_session_headers()

    def post_serialised(self, resource_type: str, metadata: Graph) -> Union[requests.Response, None]:
        self._change_content_type("text/turtle")
        path = f"{self.base_url}/{resource_type}"
        response = self.post(path=path, data=metadata.serialize())
        return response

    def publish_record(self, record_url):
        self._change_content_type("application/json")
        path = f"{record_url}/{FDPEndPoints.state}"
        data = '{"current": "PUBLISHED"}'
        self.update(path=path, data=data)

    def create_and_publish(self, resource_type: str, metadata: Graph) -> URIRef:
        post_response = self.post_serialised(resource_type=resource_type, metadata=metadata)
        fdp_subject = URIRef(post_response.headers["Location"])
        self.publish_record(fdp_subject)
        return fdp_subject

In [None]:
fdp_base=input("Enter base link to FDP: ")

In [None]:
username=input("Enter username: ")

In [None]:
from getpass import getpass
password = getpass()

So far catalogue record was compliant with DCAT AP notation but FDP requires to add a `publisher` in a form of an IRI:

In [None]:
catalog.publisher = ["https://harrypotter.fandom.com/wiki/Hogwarts_School_of_Witchcraft_and_Wizardry"]
catalog_record = catalog.to_graph(catalog_subject)
print(catalog_record.serialize())

Another FDP requirement is a link pointing to a parent object, in the case of a catalogue it is FDP itself and it should be a property `is_part_of` in the range `DCTERMS.isPartOf`. This property is outside of DCAT AP specification. There are two ways to add it: the first way is to add it directly to a graph (not forgetting to convert the base FDP link to URIRef):

In [None]:
from rdflib import DCTERMS

catalog_record.add((catalog_subject, DCTERMS.isPartOf, URIRef(fdp_base)))
print(catalog_record.serialize())

The record above can be published to FDP. But if you want to create a reusable code it is better to create a child catalog class for FDP specifically and reflect the logic required for FDP.

In `DCATCatalog` `publisher` field is inherited from DCATResource, is optional and takes either AnyHttpUrl or Agent:
```
publisher: List[Union[AnyHttpUrl, Agent]] = Field(
        default=None,
        description="The entity responsible for making the resource available.",
        rdf_term=DCTERMS.publisher,
        rdf_type="uri"
    )
```

❗Note, that a particular configuration concerning mandatory fields and field types may be defined differently in Shape Constraint Language (SCHACL) forms for an FDP instance. In this case you may need to change the example code below accordingly to prevent validation errors on uploading data. To review your instance's SCHACL forms, go to `<your FDP host>/schemas` and select the resource type of interest.

Let us create a child FDP-compatible class for a catalogue with a mandatory `publisher` field taking links only:

In [None]:
from pydantic import AnyHttpUrl, Field
from typing import List

class FDPCatalog(DCATCatalog):
    publisher: List[AnyHttpUrl] = Field(description="The entity responsible for making the resource available.",
                                        rdf_term=DCTERMS.publisher,
                                        rdf_type="uri")
    is_part_of: [AnyHttpUrl] = Field(description="Link to parent object",
                                   rdf_term=DCTERMS.isPartOf,
                                   rdf_type="uri"
                                  )

# Create a class instance with the same data
fdp_catalog = FDPCatalog(
    title=[LiteralField(value="Test catalog", language="en")],
    description=[LiteralField(value="Catalog for test example datasets", language="en")],
    publisher=["https://harrypotter.fandom.com/wiki/Hogwarts_School_of_Witchcraft_and_Wizardry"],
    is_part_of=[fdp_base]
                        )

fdp_catalog_record = fdp_catalog.to_graph(catalog_subject)
print(fdp_catalog_record.serialize())

In [None]:
fdp_client = FDPClient(base_url=fdp_base, username=username, password=password)

catalog_fdp_id = fdp_client.create_and_publish(resource_type="catalog", metadata=fdp_catalog_record)
print(catalog_fdp_id)

If everything goes well you should be able to see a new catalog entry in your FDP instance: ![newly created catalog](./imgs/fdp_catalog.png)

Now let's add datasets to the catalog.
Data for example datasets will be fetched from `./example_data_fdp.csv` file. Let's look into the data:

In [None]:
from tabulate import tabulate
import pandas as pd

df = pd.read_csv("./example_data_fdp.csv", sep=";")
print(tabulate(df, headers='keys', tablefmt='psql', showindex=False))

Let's prepare source data: 

In [None]:
from sempyro.vcard import VCard

df["keywords"] = df["keywords"].apply(lambda x: x.split(","))
df["theme"] = df["theme"].apply(lambda x: x.split(","))
df["id"] = df["id"].apply(lambda x: [str(x)])
df["contact_point"] = df.apply(lambda x: VCard(hasEmail=x["contact_point"], full_name=[x["author_name"]], 
                                               hasUID=x["author_id"]), axis=1)


This time let's prepare a class for an FDP-compartible dataset inheriting from sempyro DCATDataset.
We need to extend the base class with `is_part_of` property similarly as we have done for the catalogue.

Another property to add is an identifier. It is not mandatory in the way that FDP does not require this property but useful in case you need to update a record in FDP. Each time a record is created in FDP a unique id is assigned to it. (For the catalogue record example above we have extracted it from the response header). The fact the identifier does not exist before the record is created in an FDP makes it quite hard to track. Hence, having an identifier on the data level is highly recommended to implement incremental updates.



In [None]:
from sempyro.dcat import DCATDataset

class FDPDataset(DCATDataset):
    is_part_of: [AnyHttpUrl] = Field(description="Link to parent object",
                                   rdf_term=DCTERMS.isPartOf,
                                   rdf_type="uri"
                                  )
    identifier: List[Union[str, LiteralField]] = Field(
        description="A unique identifier of the resource being described or catalogued.",
        rdf_term=DCTERMS.identifier,
        rdf_type="rdfs_literal")

Now let's create datasets filling in mandatory fields and some optional which persist in the data and publish them to FDP:

In [None]:
datasets = df.to_dict('records')
for record in datasets:
    dataset = FDPDataset(
        title=[LiteralField(value=record["name"])],
        description=[LiteralField(value=record["description"])],
        identifier=record["id"],
        is_part_of = [catalog_fdp_id],
        creator=[record["author_id"]],
        release_date=record["issued"],
        publisher=[record["publisher_id"]],
        theme=record["theme"],
        keyword=[LiteralField(value=x) for x in record["keywords"]]
    )
    dataset_subject = URIRef(f"http://example.com/dataset_{record['id'][0]}")
    dataset_graph = dataset.to_graph(dataset_subject)
    print(dataset_graph.serialize())
    fdp_client.create_and_publish(resource_type="dataset", metadata=dataset_graph)


The catalogue we have created earlier is now updated with 4 datasets ![catalog](./imgs/ds_in_catalog.png)

and datasets themselves are available: ![datasets](./imgs/datasets_fdp.png)