# Demo 2: Constraint-based Aggregation and Storing of Results and Metadata


This second demonstration shows how constraint-based aggregations can be performed and including the storing of their results in DaQSS.
As example in this demonstration, the database containing product information, as described in the [introduction of the demonstrations](../demo), is used.

1. Import required Python packages:


In [10]:
import os

os.environ["TQDM_DISABLE"] = "1"  # Disable all progress bars, to retain a clear output

from cobadq import *  # import the Python package for the constraint-based aggregation
from daqss import *  # for accessing the database part of DaQSS

import pandas
# for the representation of data and the computation of DQ measurement results 

2. Connect to the database tables and create their representations in DaQSS:

In [11]:
db_connection: str = "sqlite:///../demo_data/product_information.sqlite"

canada = DataSource.from_dataframe(pandas.read_sql_table("ca_products", db_connection, index_col="asin").head(100),
                                   "ca_products")
india = DataSource.from_dataframe(pandas.read_sql_table("in_products", db_connection, index_col="asin").head(100),
                                  "in_products")
usa = DataSource.from_dataframe(pandas.read_sql_table("us_products", db_connection, index_col="asin").head(100),
                                "us_products")


In [12]:
d = DaQSS()
db = Database()
db.identifier = db_connection
db.values = [canada, india, usa]

d.store_data_element(db.identifier, db.identifier, DATABASE)
for table in db.values:
    d.store_data_element(db.identifier + "/" + table.identifier, table.identifier, TABLE, db.identifier)


 - an data element with the same global_identifier already exists, or
 - the provided parent data element global_identifier "sqlite:///../demo_data/product_information.sqlite does not exist.
 - an data element with the same global_identifier already exists, or
 - the provided parent data element global_identifier "sqlite:///../demo_data/product_information.sqlite does not exist.
 - an data element with the same global_identifier already exists, or
 - the provided parent data element global_identifier "sqlite:///../demo_data/product_information.sqlite does not exist.


3. Define and perform the constraint-based aggregation:

In [14]:
# Define constraints
price_is_greater_than_listprice_name = "price is greater than list price"
price_is_greater_than_listprice_con = "price > listPrice"
d.store_aggregation_constraint(price_is_greater_than_listprice_name,
                               "Is fulfilled if the value in a column \"price\" " +
                               "is greater than the value in a column \"listPrice\".",
                               price_is_greater_than_listprice_con, ROW)
else_con = "else"
d.store_aggregation_constraint(else_con, "Is fulfilled is no other constraint is fulfilled.", else_con, ROW)

# Define aggregation functions
value_zero = "0"
value_zero_name = "Row is zero"
d.store_aggregation_function("Row is zero", "The aggregated value for a row will be zero", value_zero, VALUE, ROW)
value_one = "1"
value_one_name = "Row is one"
d.store_aggregation_function("Row is one", "The aggregated value for a row will be one", value_one, VALUE, ROW)

d.store_dq_dimension("Correctness")

agg_process_name = "Product price correctness"
d.store_aggregation_process(agg_process_name,
                            "Check correctness of product prices in per row in the whole database.",
                            f"SELECT value_level.* FROM data_element as value_level, data_element as row_level, " +
                            "data_element as table_level " +
                            "WHERE value_level.parent_data_element_global_identifier = row_level.data_element_global_identifier " +
                            "AND row_level.parent_data_element_global_identifier = table_level.data_element_global_identifier " +
                            "AND table_level.parent_data_element_global_identifier = '{db_connection}'",
                            [(price_is_greater_than_listprice_name, value_one_name), (else_con, value_zero_name)],
                            ["Correctness"])


In [15]:
result_per_row: DataSource = aggregate(
    db,
    d.retrieve_aggregation_process_by_name_as_aggregation_specification(agg_process_name),
    DataGranularity.ROW
)

4. Store the aggregation results in DaQSS:

In [16]:
for table in result_per_row.values:
    series = table.to_dataframe().iloc[:, 0]
    d.store_dq_aggregation_results_from_series("Product price correctness", ROW, db.identifier + "/" + table.identifier,
                                               series)