In [9]:
import yaml
import pandas as pd

In [6]:


# Path to the YAML configuration file
config_file_path = 'demo_config.yml'  # Replace with actual path

# Function to read a YAML file and return a dictionary
def read_yaml_config(file_path):
    with open(file_path, 'r') as file:
        config_dict = yaml.safe_load(file)
    return config_dict

# Reading the configuration file
config_data = read_yaml_config(config_file_path)
config_data

{'evaluation_window': {'start': '2019-01-01T00:00:00Z',
  'end': '2019-01-02T00:00:00Z'},
 'peak_timestamps': {'manual_input': True,
  'values': ['2019-01-01T12:00:00Z', '2019-01-01T13:00:00Z']},
 'data_source_type': 'csv',
 'path_to_dataset': 'demo_validation_data.csv',
 'data_reading_method': 'pandas',
 'pandas_parsing_specs': {'timestamps': {'column_identifier': 'Datetime',
   'quantity': 'time',
   'dtype': 'datetime',
   'unit': None},
  'baseline_power_profile': {'column_identifier': 'Baseline (kW)',
   'quantity': 'power',
   'dtype': 'float',
   'unit': 'kW'},
  'flexible_power_profile': {'column_identifier': 'Flexible (kW)',
   'quantity': 'power',
   'dtype': 'float',
   'unit': 'kW'},
  'baseline_energy_profile': {'column_identifier': 'Baseline (kWh)',
   'quantity': 'energy',
   'dtype': 'float',
   'unit': 'kWh'},
  'flexible_energy_profile': {'column_identifier': 'Flexible (kWh)',
   'quantity': 'energy',
   'dtype': 'float',
   'unit': 'kWh'},
  'generic_quantity_profile

In [11]:
df = pd.read_csv(config_data['path_to_dataset'])

In [13]:
df

Unnamed: 0,Datetime,Baseline (kWh),Flexible (kWh),Peak
0,12/17/2021 0:00,0.030926,-3.620000e-14,False
1,12/17/2021 0:01,0.031567,2.610000e-11,False
2,12/17/2021 0:02,0.031963,2.650000e-11,False
3,12/17/2021 0:03,0.032306,2.690000e-11,False
4,12/17/2021 0:04,0.032656,2.720000e-11,False
...,...,...,...,...
7195,12/21/2021 23:55,0.033474,1.021433e-02,False
7196,12/21/2021 23:56,0.033610,1.021549e-02,False
7197,12/21/2021 23:57,0.033553,1.021675e-02,False
7198,12/21/2021 23:58,0.033582,1.021811e-02,False


In [15]:
import rdflib
from pyshacl import validate

# Load the data graph (assuming you have a function to do this based on your config file)
data_graph = create_data_graph_from_config('demo_config.yml')

# Load the SHACL shapes graph
shapes_graph = rdflib.Graph()
shapes_graph.parse('path/to/shapes_file.ttl', format='ttl')

# Run SHACL validation
conforms, results_graph, results_text = validate(data_graph, shacl_graph=shapes_graph)

# Check if the data conforms to the SHACL shapes
if conforms:
    print("The dataset is sufficient for the Flexibility Factor KPI calculation.")
else:
    print("The dataset does not meet the requirements for the Flexibility Factor KPI:")
    print(results_text)


### Build an RDF graph from the config file

In [29]:
import rdflib
import yaml

def create_data_graph_from_config(config_file_path):
    # Load the config file
    with open(config_file_path, 'r') as file:
        config = yaml.safe_load(file)

    # Initialize an RDF graph
    g = rdflib.Graph()

    # Namespace for your ontology
    efkpis = rdflib.Namespace("http://example.org/efkpis#")
    xsd = rdflib.Namespace("http://www.w3.org/2001/XMLSchema#")

    # Add the necessary triples based on the config file
    # ------------------- Evaluation Window -------------------
    evaluation_window = efkpis.EvaluationWindow
    start_time = rdflib.Literal(config['evaluation_window']['start'], datatype=xsd.dateTime)
    end_time = rdflib.Literal(config['evaluation_window']['end'], datatype=xsd.dateTime)
    g.add((evaluation_window, efkpis.startTime, start_time))
    g.add((evaluation_window, efkpis.endTime, end_time))


    # ------------------- Data Source Information -------------------
    # Add data source type, path, and reading method to the graph
    data_source_type = rdflib.Literal(config['data_source_type'])
    path_to_dataset = rdflib.Literal(config['path_to_dataset'])
    data_reading_method = rdflib.Literal(config['data_reading_method'])

    # Assign a unique identifier for the data source information
    data_source_info = rdflib.BNode()
    g.add((data_source_info, efkpis.dataSourceType, data_source_type))
    g.add((data_source_info, efkpis.pathToDataset, path_to_dataset))
    g.add((data_source_info, efkpis.dataReadingMethod, data_reading_method))


    # ------------------- Peak Timestamps Specs -------------------
    # High Load Start and End Timestamps
    for ts in config['peak_timestamps']['values']:
        timestamp = rdflib.Literal(ts, datatype=xsd.dateTime)
        if ts == config['peak_timestamps']['values'][0]:
            g.add((efkpis.HighLoadStartTimestamp, efkpis.hasValue, timestamp))
        else:
            g.add((efkpis.HighLoadEndTimestamp, efkpis.hasValue, timestamp))

    # ------------------- Load Profiles Specs -------------------
    # Common function to add profiles to the graph
    def add_profile_to_graph(profile_name, profile_data):
        profile_node = efkpis[profile_name]
        for key, value in profile_data.items():
            literal_value = rdflib.Literal(value or "unknown")
            g.add((profile_node, efkpis[key], literal_value))

    # Add profiles to the graph
    add_profile_to_graph('GenericLoadProfile', config['pandas_parsing_specs']['generic_quantity_profile'])
    add_profile_to_graph('BaselineEnergyProfile', config['pandas_parsing_specs']['baseline_energy_profile'])
    add_profile_to_graph('FlexibleEnergyProfile', config['pandas_parsing_specs']['flexible_energy_profile'])

    # Return the RDF graph
    return g

# Example usage
data_graph = create_data_graph_from_config('demo_config.yml')

data_graph.serialize(format='turtle')


'@prefix ns1: <http://example.org/efkpis#> .\n@prefix xsd: <http://www.w3.org/2001/XMLSchema#> .\n\nns1:BaselineEnergyProfile ns1:column_identifier "Baseline (kWh)" ;\n    ns1:dtype "float" ;\n    ns1:quantity "energy" ;\n    ns1:unit "kWh" .\n\nns1:EvaluationWindow ns1:endTime "2019-01-02T00:00:00+00:00"^^xsd:dateTime ;\n    ns1:startTime "2019-01-01T00:00:00+00:00"^^xsd:dateTime .\n\nns1:FlexibleEnergyProfile ns1:column_identifier "Flexible (kWh)" ;\n    ns1:dtype "float" ;\n    ns1:quantity "energy" ;\n    ns1:unit "kWh" .\n\nns1:GenericLoadProfile ns1:column_identifier "Cost ($)" ;\n    ns1:dtype "float" ;\n    ns1:quantity "unknown" ;\n    ns1:unit "unknown" .\n\nns1:HighLoadEndTimestamp ns1:hasValue "2019-01-01T13:00:00+00:00"^^xsd:dateTime .\n\nns1:HighLoadStartTimestamp ns1:hasValue "2019-01-01T12:00:00+00:00"^^xsd:dateTime .\n\n[] ns1:dataReadingMethod "pandas" ;\n    ns1:dataSourceType "csv" ;\n    ns1:pathToDataset "demo_validation_data.csv" .\n\n'

### Query evaluation window parameters from the graph

In [20]:
# SPARQL query to get the evaluation window
query = """
PREFIX efkpis: <http://example.org/efkpis#>

SELECT ?startTime ?endTime
WHERE {
    efkpis:EvaluationWindow efkpis:startTime ?startTime ;
                           efkpis:endTime ?endTime .
}
"""

# Execute the query
qres = data_graph.query(query)

# Print results
for row in qres:
    print(f"Start Time: {row.startTime}, End Time: {row.endTime}")

Start Time: 2019-01-01T00:00:00+00:00, End Time: 2019-01-02T00:00:00+00:00


### Query high load start and end timestamp from the graph

In [21]:
# SPARQL query to get the high load timestamps
query = """
PREFIX efkpis: <http://example.org/efkpis#>

SELECT ?startTimestamp ?endTimestamp
WHERE {
    efkpis:HighLoadStartTimestamp efkpis:hasValue ?startTimestamp .
    efkpis:HighLoadEndTimestamp efkpis:hasValue ?endTimestamp .
}
"""

# Execute the query
qres = data_graph.query(query)

# Print results
for row in qres:
    print(f"High Load Start Timestamp: {row.startTimestamp}, High Load End Timestamp: {row.endTimestamp}")


High Load Start Timestamp: 2019-01-01T12:00:00+00:00, High Load End Timestamp: 2019-01-01T13:00:00+00:00


### Query load profile data specs

In [30]:
# Example SPARQL query to get GenericLoadProfile specs
query = """
PREFIX efkpis: <http://example.org/efkpis#>

SELECT ?columnIdentifier ?quantity ?dtype ?unit
WHERE {
    efkpis:GenericLoadProfile efkpis:columnIdentifier ?columnIdentifier ;
                              efkpis:quantity ?quantity ;
                              efkpis:dataType ?dtype ;
                              efkpis:unit ?unit .
}
"""

# Execute the query on your RDF graph
qres = data_graph.query(query)

# Print results
for row in qres:
    print(f"Column Identifier: {row.columnIdentifier}, Quantity: {row.quantity}, Data Type: {row.dtype}, Unit: {row.unit}")
