In [None]:
import os
import json
import requests

## Specifications for accessing the KLMS Data API and Airflow

In [None]:
# URL to KLMS API 
# e.g., URL_KLMS_DATA_API='http://127.0.0.1:9055/api/v1/'
URL_KLMS_DATA_API='XXXXXXXXXXXXXXXXXXXXXXX'

# API token generated for user authentication in the Data API
USER_API_KEY = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'

# Provide the API token required for requests regarding datasets in the Catalog
user_headers = { 'Content-Type' : 'application/json', 'Api-Token' : USER_API_KEY }

In [None]:
# The URL to Airflow server where a workflow (e.g., UC_A3) has been specified
# e.g., airflow_api_url = "http://127.0.0.1:8090/api/v1/dags/UC_A3/dagRuns"
URL_AIRFLOW_API = "XXXXXXXXXXXXXXXXXXXXXXXX"

# Necessary credentials for connecting to Airflow
username = "XXXXXXXXXXXXX" 
password = "XXXXXXXXXXXXX"

# Publish data sources as PRIVATE datasets in the Data Catalog

### A) Announcements of food product recalls

#### Publish metadata about this dataset in the Data Catalog

In [None]:
# Path to MinIO bucket where the dataset has been stored, e.g.:
path_dataset1 = "s3://agroknow-bucket/incidents.csv"

In [None]:
# Path to JSON profile extracted by STELAR Profiler over the dataset, e.g.:
path_profile1 = "<PATH-TO-FILE>/incidents_profile.json"

In [None]:
# Metadata provided from publisher (Agroknow) + Automated metadata extracted through the Data Profiler
metadata_dataset1 = {
    "basic_metadata": {    # Basic metadata
        "title": "Public Announcements for Food Incidents",
        "notes": "This dataset contains public announcements of food incidents around the world. Each row in the CSV file represents an announcement. Information about food product recalls includes their titles, dates, the food products being recalled as well as the broader food category they fall into (e.g. herbs, confectionery), the hazard present in the products as well as the broader hazard category they fall into (e.g. biological, allergens), the recalled products suppliers and the URLs of the announcements.",
        "private": "True",
        "tags": ["AGROKNOW", "food safety", "incidents", "hazards", "recalls", "products", "public announcements"]
    },
    "extra_metadata": {    # Extra metadata
        "theme": ["Food Safety"],
        "format": "csv",
        "temporal_start":"1994-01-07",
        "temporal_end":"2022-07-30",
        "license": "https://creativecommons.org/licenses/by-sa/3.0/igo/"
    },
    "profile_metadata": {
        "file": path_profile1, 
        "name": "Profile on Incidents of food product recalls", 
        "resource_type": "Tabular",
        "description": "Tabular profile in JSON format for Public Announcements for Incidents of food product recalls from AGROKNOW", 
        "format": "JSON"
    }
}

In [None]:
# Make a POST request to the KLMS Data API to publish this dataset (with its profile)
pub_response1 = requests.post(URL_KLMS_DATA_API+'catalog/publish', json=metadata_dataset1, headers=user_headers)

In [None]:
# Check response from KLMS Data API
print("Status Code", pub_response1.status_code)
response_dict1 = pub_response1.json()
if (response_dict1['success'] is True):
    # Extract the ID of the newly created package
    pid1 = response_dict1['result'][0]['result']['id']
    print('Status Code', pub_response1.status_code, '. Published new data source in the Data Catalog with ID:' + pid1)
else:
    print('Status Code', pub_response1.status_code, '. Data source not published in Data Catalog.')

#### Also publish the actual data stored in MinIO as a resource available to KLMS workflows

In [None]:
# Utilize the PID of the published dataset to establish the association in the Data Catalog
metadata_res1 = {
  "resource_metadata": {
    "description": "This CSV file provides information regarding incidents of food product recalls. It includes their titles, dates, the food products being recalled as well as the broader food category they fall into (e.g. herbs, confectionery), the hazard present in the products as well as the broader hazard category they fall into (e.g. biological, allergens), the recalled products suppliers and the URLs of the announcements.",
    "format": "CSV",
    "name": "Incidents of food product recalls",
    "package_id": pid1,
    "resource_tags": [
      "food safety","incidents", "hazards", "recalls"
    ],
    "resource_type": "Tabular",
    "url": path_dataset1
  }
}

In [None]:
# Make a POST request to the KLMS Data API to associate this resource with the dataset
res_response1 = requests.post(URL_KLMS_DATA_API+'resource/link', json=metadata_res1, headers=user_headers)

In [None]:
# Check response from KLMS Data API
response_dict1 = res_response1.json()
if (response_dict1['success'] is True):
    # Extract the ID of the newly created package
    rid1 = response_dict1['result']['id']
    print('Status Code', res_response1.status_code, '. Associated new resource with ID:' + rid1 + ' to already published dataset with ID: ' + pid1 + ' in the Data Catalog.')
else:
    print('Status Code', res_response1.status_code, '. Resource not published in Data Catalog.')

### B) Dictionary of recalled food products

#### Publish metadata about this dataset in the Data Catalog

In [None]:
# Path to MinIO bucket where the dataset has been stored, e.g.:
path_dataset2 = "s3://agroknow-bucket/ak_dict.csv"

In [None]:
# Path to JSON profile extracted by STELAR Profiler over the dataset, e.g.:
path_profile2 = "<PATH-to-FILE>/ak_dict_profile.json"

In [None]:
# Metadata provided from publisher (Agroknow) + Automated metadata extracted through the Data Profiler
metadata_dataset2 = {
    "basic_metadata": {    # Basic metadata
        "title": "Recalled food products dictionary",
        "notes": "Dictionary of food products that have been recalled. This dataset is taken from the dataset [Public Announcements for Food Incidents](https://ckan.magellan2.imsi.athenarc.gr/dataset/public_announcements_for_food_incidents).",
        "private": "True",
        "tags": ["AGROKNOW", "food safety", "recalls", "products", "public announcements"]
    },
    "extra_metadata": {    # Extra metadata
        "theme": ["Food Safety"],
        "format": "csv",
        "license": "https://creativecommons.org/licenses/by-sa/3.0/igo/"
    },
    "profile_metadata": {
        "file": path_profile2, 
        "name": "Profile on dictionary of Recalled food products", 
        "resource_type": "Tabular",
        "description": "Tabular profile in JSON format for the dictionary of Recalled food products from AGROKNOW", 
        "format": "JSON"
    }
}

In [None]:
# Make a POST request to the KLMS Data API to publish this dataset (with its profile)
pub_response2 = requests.post(URL_KLMS_DATA_API+'catalog/publish', json=metadata_dataset2, headers=user_headers)

In [None]:
# Check response from KLMS Data API
print("Status Code", pub_response2.status_code)
response_dict2 = pub_response2.json()
if (response_dict2['success'] is True):
    # Extract the ID of the newly created package
    pid2 = response_dict2['result'][0]['result']['id']
    print('Status Code', pub_response2.status_code, '. Published new data source in the Data Catalog with ID:' + pid2)
else:
    print('Status Code', pub_response2.status_code, '. Data source not published in Data Catalog.')

#### Also publish the actual data stored in MinIO as a resource available to KLMS workflows

In [None]:
# Utilize the PID of the published dataset to establish the association in the Data Catalog
metadata_res2 = {
  "resource_metadata": {
    "description": "This CSV file provides a Dictionary of food products that have been recalled.",
    "format": "CSV",
    "name": "Dictionary data of Recalled food products",
    "package_id": pid2,
    "resource_tags": [
      "dictionary", "food safety", "recalls", "products"
    ],
    "resource_type": "Tabular",
    "url": path_dataset2
  }
}

In [None]:
# Make a POST request to the KLMS Data API to associate this resource with the dataset
res_response2 = requests.post(URL_KLMS_DATA_API+'resource/link', json=metadata_res2, headers=user_headers)

In [None]:
# Check response from KLMS Data API
response_dict2 = res_response2.json()
if (response_dict2['success'] is True):
    # Extract the ID of the newly created package
    rid2 = response_dict2['result']['id']
    print('Status Code', res_response2.status_code, '. Associated new resource with ID:' + rid2 + ' to already published dataset with ID: ' + pid2 + ' in the Data Catalog.')
else:
    print('Status Code', res_response2.status_code, '. Resource not published in Data Catalog.')

# Triggering Airflow via REST API

In [None]:
# Define the configuration for the DAG run 
# Input involves two data resources (rid1, rid2) previously published in the Data Catalog
config = {
   "username": username,
   "password": password,
   "input": [rid1, rid2],
   "tools":{
      "entity_extraction":{
          "output_file":"out.csv",
          "text_column":"description",
          "csv_delimiter":",",
          "N":100,
          "extraction_type":"food",
          "model":"instafoodroberta",
          "syntactic_analysis_tool":"stanza"
      },
      "entity_linking":{
         "col_id_left":1,
         "col_text_left":2,
         "separator_left":" ",
         "col_id_right":0,
         "col_text_right":1,
         "separator_right":" ",
         "k":1,
         "delta_alg":"1",
         "output_file":"out.csv"
      }
   },
   "package_metadata":{
      "title":"Entity Extraction on Food Incidents (UC_A3)",  #if cell is re-ran, it will fail. Give a diff name, e.g. _2 or remove all package_metadata
      "notes":"Entity Extraction on food recall incidents, accompanied by Entity Linking to a known entity dictionary.",
      "tags":[
         {
            "name":"AgroKnow"
         },
         {
            "name":"Food Incidents"
         },
         {
            "name":"Entity Extraction"
         },
         {
            "name":"Entity Linking"
         }
      ]
   }
}

In [None]:
# Make the POST request to trigger the DAG run
response = requests.post(URL_AIRFLOW_API, json={"conf": config}, auth=(username, password))

# Check if the request was successful
if response.status_code == 200:
    print("DAG run triggered successfully.")
else:
    print("Failed to trigger DAG run. Status code:", response.status_code)
    print("Error message:", response.text)

# Inspecting workflow executions

### I) Specify GET requests to Data API

#### List workflows where a dataset was given as input

In [None]:
# List all workflows where a dataset (specified by its UUID) has been given as input: 
# e.g., pid = '82aaa2df-be92-46ee-a36b-cc59122a5d5b'
pid = 'XXXXXXXXXXXXXXXXXXXXXX'
input_response= requests.get(URL_KLMS_DATA_API+'/workflow/input/dataset?id='+pid)
print(input_response.json())

#### List of tasks defined in a workflow

In [None]:
# List the tasks involved in one of these workflows (specified by a workflow identifier returned by the previous request): 
# e.g., workflow_id = 'UC_A3'
workflow_id = 'XXXXXXXXXXXXXX'
tasks_response= requests.get(URL_KLMS_DATA_API+'/workflow/tasks?id='+workflow_id)
print(tasks_response.json())

#### List all executions of a given task 

In [None]:
# List details of all executions of one of the previously returned tasks: 
# e.g., task_id = 'entity_extraction'
task_id = 'XXXXXXXXXXXXXXXXXX'
task_executions_response= requests.get(URL_KLMS_DATA_API+'/task/executions?id='+task_id)
print(task_executions_response.json())

#### Parameter values specified in a task execution

In [None]:
# Fetch all user-specified parameter values for a given task execution (using its MLFlow identifier obtained from the previous request): 
# e.g., task_exec_id = '43ce98256c104f72b1a1c84c34c04a04'
task_exec_id = 'XXXXXXXXXXXXXXXX'
parameters_response= requests.get(URL_KLMS_DATA_API+'/task/execution/parameters?id='+task_exec_id)
print(parameters_response.json())

#### Performance metrics collected for a task execution

In [None]:
# Fetch all performance metrics reported in MLFlow for the given task execution: 
# e.g., task_exec_id = '43ce98256c104f72b1a1c84c34c04a04'
task_exec_id = 'XXXXXXXXXXXXXXX'
metrics_response= requests.get(URL_KLMS_DATA_API+'/task/execution/metrics?id='+task_exec_id)
print(metrics_response.json())

### II) SPARQL queries against the Knowledge Graph

#### Q1: List the tasks specified in a given workflow ("UC_A3") and the user who defined them

In [None]:
sparql1 = { "q": "PREFIX dct: <http://purl.org/dc/terms/> PREFIX foaf: <http://xmlns.com/foaf/0.1/> PREFIX klms: <http://stelar-project.eu/klms#> SELECT ?workflow_desc ?task_name ?username WHERE { ?workflow a klms:Workflow . ?workflow dct:creator ?user . ?user foaf:name ?username .  ?workflow dct:title \"UC_A3\" .   ?workflow dct:description ?workflow_desc . ?task a klms:Task . ?task dct:isPartOf ?workflow . ?task dct:title ?task_name }" }

In [None]:
# Make a POST request to the KLMS API with this SPARQL query
graph_response1 = requests.post(URL_KLMS_DATA_API+'graph/search', json=sparql1, headers=user_headers)

In [None]:
print("Q1 results: ", graph_response1.text)

#### Q2: Metadata about output files issued after all executions of a workflow

In [None]:
# Display metadata details (title, format, date, path to MinIO) about output files issued after all executions of a specific workflow ("UC_A3"):
sparql2 = { "q": "PREFIX dcat: <http://www.w3.org/ns/dcat#> PREFIX dct: <http://purl.org/dc/terms/> PREFIX klms: <http://stelar-project.eu/klms#> SELECT ?id ?title ?format ?date ?file_path WHERE { ?workflow dct:title \"UC_A3\" . ?workflowExec klms:instantiates ?workflow . ?taskExec dct:isPartOf ?workflowExec . ?taskExec klms:hasOutput ?resource . ?resource dct:identifier ?id . ?resource dct:title ?title . ?resource dct:format ?format . ?resource dct:issued ?date . ?resource dcat:accessURL ?file_path }"}

In [None]:
# Make a POST request to the KLMS API with this SPARQL query
graph_response2 = requests.post(URL_KLMS_DATA_API+'graph/search', json=sparql2, headers=user_headers)

In [None]:
print("Q2 results: ", graph_response2.text)

#### Q3: Compare execution time among all executions of a given task

In [None]:
# Compare execution time among all executions of a given task ("entity_linking"):
sparql3 = { "q": "PREFIX dct: <http://purl.org/dc/terms/> PREFIX klms: <http://stelar-project.eu/klms#> SELECT ?value ?timestamp WHERE {?task a klms:Task . ?task dct:title \"entity_linking\" . ?taskExec klms:instantiates ?task . ?taskExec klms:hasMetrics ?kvpair . ?kvpair klms:key \"total_time\" . ?kvpair klms:value ?value . ?kvpair dct:issued ?timestamp }"}

In [None]:
# Make a POST request to the KLMS API with this SPARQL query
graph_response3 = requests.post(URL_KLMS_DATA_API+'graph/search', json=sparql3, headers=user_headers)

In [None]:
print("Q3 results: ", graph_response3.text)