In [1]:
import sys
sys.path.append("/home/jovyan/work/sem-covid/")
sys.path = list(set(sys.path))
import os
os.getcwd()
os.chdir('/home/jovyan/work/sem-covid/')
from sem_covid import config
from sem_covid.services.store_registry import store_registry
import pandas as pd
import requests as req
import json
from typing import List

In [2]:
RML_MAPPER_URL = "http://srv.meaningfy.ws:4000/execute"
MINIO_RML_BUCKET = 'rdf-transformer'
MINIO_RML_RULES_DIR = 'rml_rules'
MINIO_RML_SOURCES_DIR = 'fields'
MINIO_RML_RESULTS_DIR = 'results'
DATASET_INDEX_NAME = 'ds_unified_dataset'
DATASET_PART_SIZE = 100
RML_RULES_FILE_NAME = 'ds_unified_dataset.ttl'
RDF_RESULT_FILE_NAME = 'ds_unified_dataset_result.ttl'
RML_MAPPING_SOURCES = ['country.json', 'datasets.json', 'eu_cellar_author_labels.json',
                       'eu_cellar_directory_code_labels.json', 'eu_cellar_resource_type_labels.json',
                       'eu_cellar_subject_matter_labels.json', 'eu_timeline_topic.json', 'ireland_keyword.json',
                       'ireland_page_type.json', 'pwdb_actors.json', 'pwdb_category.json', 'pwdb_funding.json',
                       'pwdb_target_group_l1.json', 'pwdb_target_group_l2.json', 'pwdb_type_of_measure.json']
RDF_RESULT_FORMAT = 'nt11'


In [3]:
from abc import ABC, abstractmethod
from sem_covid.adapters.abstract_store import ObjectStoreABC, IndexStoreABC, TripleStoreABC
import numpy as np

class RMLMapperABC(ABC):

    @abstractmethod
    def transform(self, rml_rule: str, sources: dict) -> str:
        pass

class RMLMapper(RMLMapperABC):

    def __init__(self,
                rml_mapper_url: str
                ):
        self.rml_mapper_url = rml_mapper_url
    
    def transform(self, rml_rule: str, sources: dict)-> str:
        rml_mapper_query = {"rml": rml_rule, "sources": sources}
        rml_mapper_result = req.post(self.rml_mapper_url, json = rml_mapper_query)
        if rml_mapper_result.ok:
            return json.loads(rml_mapper_result.text)['output']
        else:
            print(rml_mapper_result)
            return None

class RMLTransformPipeline:

    def __init__(self,
                rml_rules_file_name: str,
                source_file_names: List[str],
                rdf_result_file_name: str,
                rml_mapper: RMLMapperABC,
                object_storage: ObjectStoreABC,
                index_storage: IndexStoreABC,
                triple_storage: TripleStoreABC
                ):
        self.rml_rules_file_name = rml_rules_file_name
        self.source_file_names = source_file_names
        self.rdf_result_file_name = rdf_result_file_name
        self.rml_mapper  = rml_mapper
        self.object_storage = object_storage
        self.index_storage = index_storage
        self.triple_storage = triple_storage
        self.rml_rule = None
        self.sources = None
        self.rdf_results = None
        self.dataset_parts = None
    
    def extract(self):
        self.rml_rule = self.object_storage.get_object(object_name=f'{MINIO_RML_RULES_DIR}/{self.rml_rules_file_name}').decode('utf8')
        self.sources = { file_name : self.object_storage.get_object(object_name=f'{MINIO_RML_SOURCES_DIR}/{file_name}').decode('utf8')
                for file_name in self.source_file_names}
        dataset = self.index_storage.get_dataframe(index_name=DATASET_INDEX_NAME).head(100)
        dataset['index'] = dataset.index
        self.dataset = dataset
        df_size = len(self.dataset)
        part_size = DATASET_PART_SIZE
        number_of_parts = int(round(df_size/part_size,0)) +1
        self.dataset_parts = np.array_split(self.dataset, number_of_parts)  

    def transform(self):
        assert self.rml_rule is not None
        assert self.sources is not None
        assert self.dataset_parts is not None 
        self.rdf_results = []
        for dataset_part in self.dataset_parts:
            sources = self.sources.copy()
            sources['data.json'] = dataset_part.to_json(orient='index')
            self.rdf_results.append(self.rml_mapper.transform(rml_rule=self.rml_rule, sources = sources))
        self.rdf_results = '\n'.join(self.rdf_results) # this is the source of potential resource issues

    def load(self):
        assert self.rdf_results is not None 
        self.object_storage.put_object(object_name=f'{MINIO_RML_RESULTS_DIR}/{self.rdf_result_file_name}', content=self.rdf_results.encode('utf8'))
        self.triple_storage.create_dataset(dataset_id=DATASET_INDEX_NAME)
        self.triple_storage.upload_triples(dataset_id=DATASET_INDEX_NAME,quoted_triples = self.rdf_results, rdf_fmt = RDF_RESULT_FORMAT)

    def execute(self):
        self.extract()
        self.transform()
        self.load()

In [4]:
rml_mapper = RMLMapper(rml_mapper_url= RML_MAPPER_URL)
rml_transform_pipeline = RMLTransformPipeline(rml_rules_file_name= RML_RULES_FILE_NAME,
                                              source_file_names = RML_MAPPING_SOURCES,
                                              rdf_result_file_name = RDF_RESULT_FILE_NAME,
                                              rml_mapper = rml_mapper,
                                              object_storage = store_registry.minio_object_store(minio_bucket=MINIO_RML_BUCKET),
                                              index_storage = store_registry.es_index_store(),
                                              triple_storage= store_registry.fuseki_triple_store()
                                              )
rml_transform_pipeline.execute()

100% (6360 of 6360) |####################| Elapsed Time: 0:00:16 Time:  0:00:16
