# Harmonising Data with the Ocean Information Model

In this example notebook, we demonstrate how to ingest data into the Ocean Data Platform using the Ocean Information Model harmonisation layer.

In [8]:
import requests
from os.path import basename
import os
import pandas as pd 
import numpy as np
from io import StringIO
import datetime, time, threading, json
import zipfile
from zipfile import ZipFile

import json
import uuid
from odp.client import OdpClient
from odp.dto.catalog import  DatasetDto
from odp.client.dto.file_dto import FileMetadataDto
import geopandas as gpd

## Prepare data for OIM

In [3]:
# Define file names
input_file = '16.10.2024-OIM-updated-sites.xlsx'
output_file = '16.10.2024-OIM-updated-sites.csv'

# Read the first sheet of the Excel file
df = pd.read_excel(input_file, sheet_name=0)  # `sheet_name=0` refers to the first tab

# Save the dataframe as a CSV file
df.to_csv(output_file, index=False)

print(f"Saved {output_file} successfully.")

Saved 16.10.2024-OIM-updated-sites.csv successfully.


  for idx, row in parser.parse():


## OIM Harmonisation Process

In [4]:
def dpi_get_token():
    ## get token
    data = {
        'client_id': 'dpi-enabler-cli',
        'client_secret': 'U7hbMiInBTACDArOchhd11dzsL076mEq',
        'username': 'rpalma@man.poznan.pl',
        'password': 'ttoeuc',
        'grant_type': 'password'
    }
    response = requests.post('https://keycloak-dev.apps.paas-dev.psnc.pl/auth/realms/demeter/protocol/openid-connect/token', data=data)
    #print(json.dumps(response.json(), indent=2))
    return response.json()['access_token']

def upload_to_dpi(fullfilename):
    path=os.path.dirname(os.path.abspath(fullfilename))
    #print("upload_to_dpi fullfilename: " + fullfilename)
    #print("upload_to_dpi path: "+ path)

    ## get token
    dpitoken=dpi_get_token()
    
    ## upload file to DPI and get shareable link
    headers = {
        'Authorization': 'Bearer '+dpitoken,
        'accept': 'application/json',
        #'Content-Type': 'application/zip'
    }
    data = { "description": "input.zip"}
    files = {    
        'file': (os.path.join(path, "input.zip"), open(os.path.join(path, "input.zip"), 'rb')),
    }
    response = requests.post('https://dpi-enabler-demeter.apps.paas-dev.psnc.pl/api/access/files', headers=headers, files=files, data=data)
    if response.ok:
        #print("time to execute api call to upload file to dpi: "+str(response.elapsed.total_seconds()))
        #print(json.dumps(response.json(), indent=2))
        sharelink=response.json()['output_url']        
        #delete zip file
        if os.path.exists(os.path.join(path, "input.zip")):
            os.remove(os.path.join(path, "input.zip"))
        return sharelink
    else: 
        print (response)
        return None 

def execute_pipeline(pipelineid,graph,sharelink):    
    ## get token
    dpitoken=dpi_get_token()
    headers = {
        'Authorization': 'Bearer '+dpitoken,
        'accept': 'application/json',        
    }

    data = { 
        "preprocess": True,
        "mapping": True,
        "transform": True,
        "postprocess": True,
        "load": True,
        "link": False,
        "input_type": "CSV",
        "preprocess_activities":  [ "normalize_delimiter" ],
        "graph_uri": graph, 
        "url_input": sharelink #"https://dpi-enabler-demeter.apps.paas-dev.psnc.pl/api/access/files/3d744c2f-aa90-41a2-b666-e95da7130c9f/download",        
    }
    response = requests.patch('https://dpi-enabler-demeter.apps.paas-dev.psnc.pl/api/generic/'+pipelineid, headers=headers, data=data)
    if response.ok:
        #print("time to execute api call to update pipeline: "+str(response.elapsed.total_seconds()))        
        #print(json.dumps(response.json(), indent=2))  

        ## finally execute it
        response = requests.post('https://dpi-enabler-demeter.apps.paas-dev.psnc.pl/api/generic/'+pipelineid+'/execute', headers=headers)
        if response.ok:
            #print("time to execute api call to execute pipeline: "+str(response.elapsed.total_seconds()))                    
            #print(json.dumps(response.json(), indent=2))
            jobid=response.json()['identifier']
            return jobid
        else:
            print (response)
            return None
    else:
        print (response)
        return None

def get_results_url(jobid):
    job_url = 'https://dpi-enabler-demeter.apps.paas-dev.psnc.pl/api/jobs/'+str(jobid)    
    
    response = None
    status = 'PENDING'
    while status == 'PENDING':
        ## sleep 7 sec
        now = datetime.datetime.now()
        print(now.strftime("%Y-%m-%d %H:%M:%S ==> ")+"sleeping: "+status)                      
        time.sleep(7.0)
        ## get token
        dpitoken=dpi_get_token()    
        ## upload file to DPI and get shareable link
        headers = {
            'Authorization': 'Bearer '+dpitoken,
            'accept': 'application/json',            
        }
        response = requests.get(job_url, headers=headers)        
        if response.ok:            
            status=response.json()['status']            
        else:
            status="ERROR"        

    print(now.strftime("%Y-%m-%d %H:%M:%S ==> ")+"job finished with status: "+status)        
    if response.ok:            
        output_url=response.json()['results']            
        return output_url
    else: 
        print (response)
        return None    

def download_harmonized_data_file (graph_uri, harmonized_data_file):
    headers = {
        'Accept': 'application/ld+json',
        'Content-Type': 'application/x-www-form-urlencoded',
    }    
    data = {
        'query': 'CONSTRUCT { ?s ?p ?o } WHERE {GRAPH <'+graph_uri+'> { ?s ?p ?o } }',
    }
    response = requests.post('https://www.foodie-cloud.org/sparql', headers=headers, data=data)
    with open(harmonized_data_file, 'wb') as f:
        f.write(response.content)

######## start execution #######

### input variables
input = '16.10.2024-OIM-updated-sites.csv' ### this is the CSV file you get in ODP from the input excel for the tab sites
pipeline_id = "3d377d87-960c-4cac-84e1-54f2e31d944d"
graph_uri = "https://w3id.org/iliad/israel/culturalHeritage"
zfile='input.zip'

### zip input CSV
with ZipFile(zfile, 'w') as zipf:
    zipf.write(input, arcname=os.path.basename(input))
now = datetime.datetime.now()
print(now.strftime("%Y-%m-%d %H:%M:%S ==> ")+"zipfile: "+zfile)      

### upload zip input CSV file (and remove it)
finallink = upload_to_dpi (zfile)
now = datetime.datetime.now()
print(now.strftime("%Y-%m-%d %H:%M:%S ==> ")+"finallink: "+finallink)      

### execute pipeline
jobid = execute_pipeline(pipeline_id,graph_uri,finallink)                       
now = datetime.datetime.now()
print(now.strftime("%Y-%m-%d %H:%M:%S ==> ")+"jobid: "+jobid)      

### wait for job to finish
results_url=get_results_url(jobid)
now = datetime.datetime.now()
if results_url!=None:
    print(now.strftime("%Y-%m-%d %H:%M:%S ==> ")+"results_url: "+results_url)      
    ### get harmonized data
    harmonized_data_file = input.replace('.csv','.jsonld')
    download_harmonized_data_file (graph_uri, harmonized_data_file)
    print(now.strftime("%Y-%m-%d %H:%M:%S ==> ")+"Finished. Harmonized data available in file: "+harmonized_data_file) ### this is the harmonized data named as input file chaning extension from csv to jsonld

2024-11-15 15:00:26 ==> zipfile: input.zip
2024-11-15 15:00:27 ==> finallink: https://dpi-enabler-demeter.apps.paas-dev.psnc.pl/api/access/files/003d9dda-ea2b-4d2a-8fef-e288117b3cb2/download
2024-11-15 15:00:29 ==> jobid: cdade8ac-1f81-4c3c-94d2-2dfcdbab71b1
2024-11-15 15:00:29 ==> sleeping: PENDING
2024-11-15 15:00:36 ==> sleeping: PENDING
2024-11-15 15:00:44 ==> sleeping: PENDING
2024-11-15 15:00:52 ==> sleeping: PENDING
2024-11-15 15:00:59 ==> sleeping: PENDING
2024-11-15 15:01:07 ==> sleeping: PENDING
2024-11-15 15:01:15 ==> sleeping: PENDING
2024-11-15 15:01:15 ==> job finished with status: SUCCESS
2024-11-15 15:01:22 ==> results_url: https://dpi-enabler-demeter.apps.paas-dev.psnc.pl/api/generic/3d377d87-960c-4cac-84e1-54f2e31d944d/download
2024-11-15 15:01:22 ==> Finished. Harmonized data available in file: 16.10.2024-OIM-updated-sites.jsonld


## Ingest to ODP

### Create resource in ODP Catalogue

In [5]:
client = OdpClient()
uuid_str = str(uuid.uuid4())

In [27]:
# remove first row of csv data

# Load the CSV file into a DataFrame
df = pd.read_csv('16.10.2024-OIM-updated-sites.csv')

# Remove the first row (index 0)
df = df.iloc[1:]

# Optionally, save the modified DataFrame back to a CSV file
df.to_csv('16.10.2024-OIM-updated-sites_odp.csv', index=False)

In [6]:
# create collection
with open("collection.json") as file:
    collection_config = json.load(file)

# randomize collection name:
collection_config["metadata"]["name"] = collection_config["metadata"]["name"] + "-" + uuid_str

collection_manifest = DataCollectionDto(**collection_config)
collection_dto = client.catalog.create(collection_manifest)

### Load data into to ODP

In [13]:
dataset_config ={
        "dataset_config": {
            "kind": "catalog.hubocean.io/dataset",
            "version": "v1alpha3",
            "metadata": {
                "name": "iliad-cultural-heritage-oim",
                "display_name": "Iliad Underwater Cultural Heritage Sites- OIM",
                "description": "A collection of underwater cultural heritage sites in Isreal, harmonised through OIM.",
                "labels": {
                    "catalog.hubocean.io/internal": True
                }
            },
            "spec": {
                "storage_class": "registry.hubocean.io/storageClass/raw",
                "storage_controller": "registry.hubocean.io/storageController/storage-raw-cdffs",
                "data_collection": "catalog.hubocean.io/dataCollection/iliad_cultural_heritage-af7f917c-6168-4e2a-8b1e-c6107ee14394", # ensure the same as above
                "maintainer": {
                    "contact": "Ehud Galili <galiliudi@gmail.com>",
                    "organisation": "University of Haifa"
                },
                "citation": None,
                "documentation": [],
                "facets": None,
                "tags": [
                ]
            }
        }
    }


file_config = {
    "local_file_path": "16.10.2024-OIM-updated-sites.jsonld", # output of OIM
    "mime_type": "application/ld+json",
    "metadata": {}
}

client = OdpClient()
dataset_manifest = DatasetDto(**dataset_config["dataset_config"])
dataset_dto = client.catalog.create(dataset_manifest)




In [14]:
local_file_path = file_config["local_file_path"]
mime_type = file_config["mime_type"]
metadata = file_config["metadata"]
geo_location = file_config.get("geo_location")
file_name = local_file_path.split("/")[-1]
with open(local_file_path, "rb") as file:
    byte_content = file.read()
client.raw.create_file(
    resource_dto=dataset_dto,
    file_metadata_dto=FileMetadataDto(
        **{
            "name": file_name,
            "mime_type": mime_type,
            "metadata": metadata,
            "geo_location": geo_location,
        }
    ),
    contents=byte_content,
)

FileMetadataDto(name='16.10.2024-OIM-updated-sites.jsonld', mime_type='application/ld+json', dataset=UUID('024a2bd2-e050-4d26-ade0-c6049f998c55'), metadata={'hubocean.io/app': 'odcat', 'hubocean.io/dataset': '024a2bd2-e050-4d26-ade0-c6049f998c55'}, geo_location=None, size_bytes=2008082, checksum='9b6455823df87d2867f81395256068e9', created_time=datetime.datetime(2024, 11, 15, 15, 16, 34, 248000), modified_time=datetime.datetime(2024, 11, 15, 15, 16, 34, 856000), deleted_time=None)

## Add tabulr upload