# Testing Pluggable Rules
This template shows how to test pluggable rules, whethere they are created by the user or used from the platform.

# Note
Some rules use specific libraries, in principal Energyworx supports different libraries. However sometimes the Jupyter Notebook environment does not have all the neccesary libraries installed. Since each user has their own sandboxed Jupyter Notebook client environment, you can install libraries yourself. Caveat is that when a restart is done, you have to reinstall libraries. On request we can add supported libraries to the sandboxed runtime environment so after restart they will be available.

In [11]:
# Import some neccesary python libraries
import os
import time
import logging
from energyworx_client.storage import read_file
from energyworx_client.client import EWX
import unittest
from energyworx_public.domain import KeyValueDomain, TagDomain, ChannelDomain, DatasourceDomain
from energyworx_public import enums
from energyworx_public.rule import AbstractRule, RuleResult, Detector


## Inspect the base classes
To inspect the documentation of the base classes you can execute the following code.

### Look at the available in-code documentation

In [7]:
# Check the doc string of a class, to know the attribute type
print Detector.__init__.__doc__
print AbstractRule.__init__.__doc__
print RuleResult.__init__.__doc__


        Detector class containing detected feature
        Args:
            detector (str): The name of the detector
            function_name (str): the name of the function that holds the detector rule
            start_timestamp (datetime): The start timestamp of the detected feature
            end_timestamp (datetime): The end timestamp of the detected feature
            value (str): String value of the feature
            properties (List[dict]): Optional list of properties (key/values)
        

        Abstract implementation of the pluggable rule
        Args:
            datasource (Datasource):
            dataframe (DataFrame):
            detectors (dict):
            source_column (str):
            destination_column (str):
            sequence_index (int):
            data_filter (Series):
            namespace (Namespace):
        

        RuleResult that contains the information that the apply function of the AbstractRule returns
        Args:
            result (

### Check the available unit types

In [8]:
# Check enums for UnitType:
def check_unit_enum(enum_class, unit):
    """
    enum_class (enum class): enums.UnitType or enums.DatapointType
    unit (str): unit to check
    """
    return enums.str_to_enum(enum_class, unit, ignore_case=False)

#example
enum_for_kWh = check_unit_enum(enums.UnitType, 'kWh')
print('enum_for_kWh: ', enum_for_kWh)
enum_for_register = check_unit_enum(enums.DatapointType, 'register')
print('enum_for_register', enum_for_register)

('enum_for_kWh: ', <UnitType.kWh: 0>)
('enum_for_register', <DatapointType.register: 1>)


# Create the rule class
Below is a sample of a Rule class that is supported by our Pluggable Rules framework.

In [12]:
from energyworx_public.rule import AbstractRule, RuleResult
from energyworx_public.domain import ChannelDomain, DatasourceDomain
from energyworx_public import enums


class ZeroReads(AbstractRule):

    def apply(self, margin=0.01, **kwargs):
        """
        Check whether the Data frame has values equal to 0 or equal to (0 + margin).
        If there are values matching the filter they are added into a Series and returned.

        Args:
            margin (float): The margin for a value to be considered a zero value

        Returns:
            RuleResult : A Series with a flag
                        {'margin': 0.01}
                        or an empty Series if no flags are set

        Raises:
            TypeError: If margin is None or a negative number
        """
        import pandas as pd
        if margin is None or margin < 0:
            raise TypeError('margin: [{}] is not a valid margin'.format(margin))

        flag_filter = (self.dataframe[self.source_column] <= margin)
        result = pd.Series(data=flag_filter, index=self.dataframe[flag_filter].index)
        return RuleResult(result=result)


# Create a unittest for the rule
To make sure the rule keeps being tested even if changes are made, you can create a unit-test. The next example shows how this can be done.

In [10]:
from energyworx_public.domain import ChannelDomain, DatasourceDomain
from energyworx_public import enums
from parameterized import parameterized
import pandas as pd
import unittest

class ZeroReadsTest(unittest.TestCase):
    
    def runTest(self):
        test_annotations (self)
    
    @classmethod
    def setUp(self):
        """This function create a datasource with four channels."""
        self.datasource = DatasourceDomain(
            id='id',
            name='name',
            description='test datasource',
            timezone='Europe/Amsterdam',
            channels=[
                ChannelDomain(
                    id='1234EKN',
                    name='ENERGY_KWH_NORMAL',
                    description='Test channel normal',
                    classifier='ENERGY_KWH',
                    unit_type=enums.UnitType(0),
                    datapoint_type=enums.DatapointType(1),
                    is_source=True
                )])
       
    @parameterized.expand([(pd.DataFrame({
            'ENERGY_KWH_NORMAL': [1,2,3,4,5,6,7,8,9,0] * 10},
            index=pd.date_range(start=pd.Timestamp(2017, 1, 1),
                                periods=100, freq='H', tz='UTC')), 
                                10) ])     
    def test_annotations_static(self, dataframe, expected_flag_count):
        """Assert the count of OK flags and NOK flags to what we expect."""
        zero_read_rule = ZeroReads(self.datasource, dataframe=dataframe, source_column="ENERGY_KWH_NORMAL", destination_column="zero_reads")
        zero_read_result = zero_read_rule.apply()
        failed_count = zero_read_result.result.dropna().count()
        self.assertEqual(expected_flag_count, failed_count, "Count of " + str(failed_count) + " was not expected")
        
if __name__ == "__main__":
    suite = unittest.TestLoader().loadTestsFromModule (ZeroReadsTest())
    unittest.TextTestRunner().run(suite)

.
----------------------------------------------------------------------
Ran 1 test in 0.004s

OK


<unittest.runner.TextTestResult run=1 errors=0 failures=0>

# End


In [13]:
# define the namespace that contains the data that you have access to
namespace_name = 'demo.energyworx.org'
# Intialize the energyworx client
ewx_client = EWX(namespace_id=namespace_name)

In [14]:
# Getting FLOW data
flow_query = "SELECT STRUCT(timestamp, STRUCT(flow_id, ARRAY[STRUCT(channel_classifier_id, value, \
              ARRAY(SELECT AS STRUCT annotation, sequence_id, ARRAY_AGG(STRUCT(key, value))))] AS channel) AS flow) AS row \
              FROM flows WHERE flow_id IN ('{flow_id}') \
              AND timestamp > '{start_timestamp}' AND timestamp <= '{end_timestamp}' \
              GROUP BY timestamp, flow_id, channel_classifier_id, value ORDER BY timestamp, flow_id "
        
timeseries_df = ewx_client.execute_query(flow_query.format( flow_id='827f412225ed40ceaac13ca2bfa332d5', start_timestamp='2017-01-01T00:00:00', end_timestamp='2019-08-17T00:00:00')
, limit=9999)
timeseries_df.head(10)



HttpError: <HttpError 403 when requesting https://ewx-live.appspot.com/_ah/api/ewx/v1/query/execute?query=SELECT+STRUCT%28timestamp%2C+STRUCT%28flow_id%2C+ARRAY%5BSTRUCT%28channel_classifier_id%2C+value%2C+++++++++++++++ARRAY%28SELECT+AS+STRUCT+annotation%2C+sequence_id%2C+ARRAY_AGG%28STRUCT%28key%2C+value%29%29%29%29%5D+AS+channel%29+AS+flow%29+AS+row+++++++++++++++FROM+flows+WHERE+flow_id+IN+%28%27827f412225ed40ceaac13ca2bfa332d5%27%29+++++++++++++++AND+timestamp+%3E+%272017-01-01T00%3A00%3A00%27+AND+timestamp+%3C%3D+%272019-08-17T00%3A00%3A00%27+++++++++++++++GROUP+BY+timestamp%2C+flow_id%2C+channel_classifier_id%2C+value+ORDER+BY+timestamp%2C+flow_id+&alt=json&limit=9999 returned "Permission denied to resource query and action GET (admin required? False)">

In [24]:
#ZeroReadTest.main(argv=['first-arg-is-ignored'], exit=False)
ZeroReadsTest()


<__main__.ZeroReadsTest testMethod=runTest>