# ML Insights Data Reader & Data Source Example

# Use Case

This Notebook shows example of different Data Reader & Data Source for reading data for a specific format (CSV, JSON, JSONL data types).

### About Dataset
The Iris flower data set or Fisher's Iris data set is a multivariate data set . The data set consists of 50 samples from each of three species of Iris (Iris setosa, Iris virginica and Iris versicolor). Four features were measured from each sample: the length and the width of the sepals and petals, in centimeters.

Dataset source : https://archive.ics.uci.edu/dataset/53/iris

# Install ML Observability Insights Library SDK

- Prerequisites
    - Linux/Mac (Intel CPU)
    - Python 3.8 and 3.9 only


- Installation
    - MLM Insights is made available as a Python package (via Artifactory) which can be installed using pip install as shown below. Depending on the execution engine on which to do the run, one can use scoped package. For eg: if we want to run on dask, use mlm-insights[dask], for spark use mlm-insights[spark], for native use mlm-insights. One can install all the dependencies as use mlm-insights[all]

      !pip install oracle-ml-insights

Refer : [Installation and Setup](https://docs.oracle.com/iaas/tools/ml-insights-docs/latest/ml-insights-documentation/html/user_guide/tutorials/install.html)
 

In [None]:
!python3 -m pip install oracle-ml-insights

# 1 ML Insights Imports 

In [39]:
# imports

import os
from typing import Any
import pyarrow as pa
import pandas as pd

# Import metrics
from mlm_insights.core.features.feature import FeatureMetadata
from mlm_insights.core.metrics.max import Max
from mlm_insights.core.metrics.min import Min
from mlm_insights.core.metrics.rows_count import RowCount
from mlm_insights.core.metrics.quartiles import Quartiles
from mlm_insights.core.metrics.sum import Sum

from mlm_insights.builder.builder_component import MetricDetail, EngineDetail
from mlm_insights.constants.types import FeatureType, DataType, VariableType
from mlm_insights.core.metrics.metric_metadata import MetricMetadata

# import data reader
from mlm_insights.core.data_sources import LocalDatePrefixDataSource
from mlm_insights.core.data_sources import OCIDatePrefixDataSource
from mlm_insights.mlm_native.readers import CSVNativeDataReader, NestedJsonNativeDataReader

from mlm_insights.builder.insights_builder import InsightsBuilder


# 2 Configure Feature schema

Feature Schema defines the structure and metadata of the input data, which includes data type, column type, column mapping . The framework, uses this information as the ground truth and any deviation in the actual data is taken as an anomaly and the framework usually will ignore such all such anomaly in data.

In [40]:
def get_input_schema():
    return {
        "sepal length (cm)": FeatureType(data_type=DataType.FLOAT, variable_type=VariableType.CONTINUOUS),
        "sepal width (cm)": FeatureType(data_type=DataType.FLOAT, variable_type=VariableType.CONTINUOUS),
        "petal length (cm)": FeatureType(data_type=DataType.FLOAT, variable_type=VariableType.CONTINUOUS),
        "petal width (cm)": FeatureType(data_type=DataType.FLOAT, variable_type=VariableType.CONTINUOUS)
    }


# 3 Configure Metrics

Metrics are the core construct for the framework. This component is responsible for calculating all statistical metrics and algorithms. Metric components work based on the type of features (eg. input feature, output feature etc.) available, their data type (eg. int, float, string etc.) as well as additional context (e.g. if any previous computation is available to compare against). ML Insights provides commonly used metrics out of the box for different ML observability use cases.

Refer : [Metrics Component Documentation](https://objectstorage.us-ashburn-1.oraclecloud.com/p/hmjkj956kj4ZVD0RrN7mEMf5O3l2hP2TX0Y0RXZPAbQqYltPcrsJrm7olfNpFf_a/n/idqzqf6isito/b/ml-insight-doc/o/user_guide/getting_started/metrics_component.html)

In [41]:
def get_metrics():
    metrics = [
               MetricMetadata(klass=Sum),
               MetricMetadata(klass=Quartiles),
               MetricMetadata(klass=Max),
               MetricMetadata(klass=Min)
              ]
    uni_variate_metrics = {
        "sepal length (cm)": metrics,
        "sepal width (cm)": metrics,
        "petal length (cm)": metrics,
        "petal width (cm)": metrics
    }
    
    dataset_metrics = [MetricMetadata(klass=RowCount)]
    metric_details = MetricDetail(univariate_metric=uni_variate_metrics,
                                  dataset_metrics=dataset_metrics)
    return metric_details

# 4 Configure Data Reader

Data Reader allows for ingestion of raw data into the framework. This component is primarily responsible for understanding different formats of data (e.g. jsonl, csv) etc. and how to properly read them. At its essence, the primary responsibility of this component is that given a set of valid file locations which represents file of a specific type, reader can properly decode the content and load them in memory.

Additionally, Data Source component is an optional subcomponent, which is usually used along side the Reader. The primary responsibility of the data source component is to embed logic on filtering and partitioning of files to be read by the framework.

Refer : [Data Reader Documentation](https://objectstorage.us-ashburn-1.oraclecloud.com/p/52qrFSNgCH85OWPBGIfTgNm-KeibRU8oPSSBdDg_t90gZ89r5qXrQFpTfdvQ9ear/n/bigdatadatasciencelarge/b/ml-insight-doc/o/user_guide/getting_started/data_reader_component.html)

### 4.1.1 Local File Data source and CSV Data Reader

Below example shows how to list and load csv files present in the local system


In [42]:
def get_reader():
    data = {
        "file_type": "csv",
        "date_range": {"start": "2023-06-24", "end": "2023-06-27"}
    }
    base_location ="input_data/iris_dataset"
    ds = LocalDatePrefixDataSource(base_location, **data)
    print(ds.get_data_location())
    csv_reader = CSVNativeDataReader(data_source=ds)
    return csv_reader

### 4.1.1 Local File Data source and Nested Json Data Reader

Below example shows how to list and load data from nested json files present in the local system

In [43]:
def get_nested_json_data_reader():
    data = {
        "file_type": "json",
        "date_range": {"start": "2023-06-24", "end": "2023-06-27"}
    }
    base_location ="input_data/nested_json"

    query = "iris_dataset[].[{column: 'sepal length (cm)', value: sepal_length}, {column: 'sepal width (cm)', value: sepal_width}, {column : 'petal length (cm)', value : petal_length}, {column : 'petal width (cm)', value : petal_width} ]"
    
    ds = LocalDatePrefixDataSource(base_location, **data)
    print(ds.get_data_location())
    nested_json_data_reader = NestedJsonNativeDataReader(data_source=ds, query=query, query_engine_name= "JMESPATH")
    #csv_reader = CSVNativeDataReader(file_path=base_location)
    return nested_json_data_reader

## Compute the Profile 

Create the builder object which provides core set of api, using which user can set the behavior of their monitoring. By selecting what components and variants to run all aspects of the monitoring task can be customised and configured. 

The run() method is responsible to run the internal workflow. It also handles the life cycle of each component passed, which includes creation (if required), invoking interface functions, destroying etc . Additionally, runner also handles some more advanced operations like thread pooling, compute engine abstraction etc.

Refer : [Builder Object Documentation](https://objectstorage.us-ashburn-1.oraclecloud.com/p/52qrFSNgCH85OWPBGIfTgNm-KeibRU8oPSSBdDg_t90gZ89r5qXrQFpTfdvQ9ear/n/bigdatadatasciencelarge/b/ml-insight-doc/o/user_guide/getting_started/builder_object.html)

In [44]:
def main():    
    # Set up the insights builder by passing: input schema, metric, data frame and engine details
    runner = InsightsBuilder(). \
        with_input_schema(get_input_schema()). \
        with_metrics(metrics=get_metrics()). \
        with_reader(reader=get_reader()). \
        build()
    # Other Insights components that can be configured are:
    # - Custom Transformers (ability to transform incoming data frame to add/update/merge/delete/normalize etc features)
    # - Conditional Features (ability to create new features from existing features using python expressions)
    # - Tags (ability to provide custom metadata to be added as key-value pairs to a Profile)

    # Run the evaluation
    run_result = runner.run()
    return run_result.profile
    
profile = main()
profile.to_pandas()

profile_json = profile.to_json()
dataset_metrics = profile_json
pd.json_normalize(dataset_metrics).T.dropna()

['input_data/iris_dataset/2023-06-26/iris.csv', 'input_data/iris_dataset/2023-06-27/iris.csv']


Unnamed: 0,0
dataset_metrics.RowCount.metric_name,RowCount
dataset_metrics.RowCount.metric_description,Dataset-level Metric to compute the total row ...
dataset_metrics.RowCount.variable_count,1
dataset_metrics.RowCount.variable_names,[rows_count]
dataset_metrics.RowCount.variable_types,[DISCRETE]
...,...
feature_metrics.petal width (cm).Sum.variable_types,[CONTINUOUS]
feature_metrics.petal width (cm).Sum.variable_dtypes,[FLOAT]
feature_metrics.petal width (cm).Sum.variable_dimensions,[0]
feature_metrics.petal width (cm).Sum.metric_data,[359.80000000000007]


### Compute Profile using Nested Json Data Reader

Create the builder object which provides core set of api, using which user can set the behavior of their monitoring. By selecting what components and variants to run all aspects of the monitoring task can be customised and configured. 

The run() method is responsible to run the internal workflow. It also handles the life cycle of each component passed, which includes creation (if required), invoking interface functions, destroying etc . Additionally, runner also handles some more advanced operations like thread pooling, compute engine abstraction etc.

Refer : [Builder Object Documentation](https://objectstorage.us-ashburn-1.oraclecloud.com/p/52qrFSNgCH85OWPBGIfTgNm-KeibRU8oPSSBdDg_t90gZ89r5qXrQFpTfdvQ9ear/n/bigdatadatasciencelarge/b/ml-insight-doc/o/user_guide/getting_started/builder_object.html)

In [45]:
def main():    
    # Set up the insights builder by passing: input schema, metric, data frame and engine details
    runner = InsightsBuilder(). \
        with_input_schema(get_input_schema()). \
        with_metrics(metrics=get_metrics()). \
        with_reader(reader=get_nested_json_data_reader()). \
        build()
    # Other Insights components that can be configured are:
    # - Custom Transformers (ability to transform incoming data frame to add/update/merge/delete/normalize etc features)
    # - Conditional Features (ability to create new features from existing features using python expressions)
    # - Tags (ability to provide custom metadata to be added as key-value pairs to a Profile)

    # Run the evaluation
    run_result = runner.run()
    return run_result.profile
    
profile = main()
profile.to_pandas()

profile_json = profile.to_json()
dataset_metrics = profile_json
pd.json_normalize(dataset_metrics).T.dropna()

['input_data/nested_json/2023-06-26/nested_json.json', 'input_data/nested_json/2023-06-27/nested_json.json']


Unnamed: 0,0
dataset_metrics.RowCount.metric_name,RowCount
dataset_metrics.RowCount.metric_description,Dataset-level Metric to compute the total row ...
dataset_metrics.RowCount.variable_count,1
dataset_metrics.RowCount.variable_names,[rows_count]
dataset_metrics.RowCount.variable_types,[DISCRETE]
...,...
feature_metrics.petal width (cm).Sum.variable_types,[CONTINUOUS]
feature_metrics.petal width (cm).Sum.variable_dtypes,[FLOAT]
feature_metrics.petal width (cm).Sum.variable_dimensions,[0]
feature_metrics.petal width (cm).Sum.metric_data,[394.6000000000001]


### 4.2.1 Object Storage Data source and CSV Data Reader

Below example shows see how to list and  load nested json files present in the Object Storage

Need to enable OCI_RESOURCE_PRINCIPAL authentication for target object storage bucket to run the following in local or in customer tenancy

In [46]:
def get_object_storage_reader():
    data = {
        "file_type": "csv",
        "date_range": {"start": "2023-06-24", "end": "2023-06-27"},
        "bucket_name": "mlm-insights",
        "namespace" : "bigdatadatasciencelarge",
        "object_prefix" : "input/iris_dataset"
    }
    #base_location ="oci://mlm-insights/input/iris_dataset"
    ds = OCIDatePrefixDataSource(**data)
    print(ds.get_data_location())
    csv_reader = CSVNativeDataReader(data_source=ds)
    #csv_reader = CSVNativeDataReader(file_path=base_location)
    return csv_reader

### 4.2.2 Object Storage Data source and Nested Json Data Reader

Below example shows see how to list and load csv files present in the Object Storage

Need to enable OCI_RESOURCE_PRINCIPAL authentication for target object storage bucket to run the following in local or in customer tenancy

In [None]:
def get_object_storage_nested_json_data_reader():
    
    data = {
        "file_type": "json",
        "date_range": {"start": "2023-06-24", "end": "2023-06-27"},
        "bucket_name": "mlm-insights",
        "namespace" : "bigdatadatasciencelarge",
        "object_prefix" : "input/nested_json"
    }
   
    query = "iris_dataset[].[{column: 'sepal length (cm)', value: sepal_length}, {column: 'sepal width (cm)', value: sepal_width}, {column : 'petal length (cm)', value : petal_length}, {column : 'petal width (cm)', value : petal_width} ]"

    ds = OCIDatePrefixDataSource(**data)
    print(ds.get_data_location())
    nested_json_data_reader = NestedJsonNativeDataReader(data_source=ds, query=query, query_engine_name= "JMESPATH")
    return nested_json_data_reader

## Compute the Profile 

Create the builder object which provides core set of api, using which user can set the behavior of their monitoring. By selecting what components and variants to run all aspects of the monitoring task can be customised and configured. 

The run() method is responsible to run the internal workflow. It also handles the life cycle of each component passed, which includes creation (if required), invoking interface functions, destroying etc . Additionally, runner also handles some more advanced operations like thread pooling, compute engine abstraction etc.

Refer : [Builder Object Documentation](https://objectstorage.us-ashburn-1.oraclecloud.com/p/52qrFSNgCH85OWPBGIfTgNm-KeibRU8oPSSBdDg_t90gZ89r5qXrQFpTfdvQ9ear/n/bigdatadatasciencelarge/b/ml-insight-doc/o/user_guide/getting_started/builder_object.html)

In [None]:
def main():    
    # Set up the insights builder by passing: input schema, metric, data frame and engine details
    runner = InsightsBuilder(). \
        with_input_schema(get_input_schema()). \
        with_metrics(metrics=get_metrics()). \
        with_reader(reader=get_object_storage_reader()). \
        build()
    # Other Insights components that can be configured are:
    # - Custom Transformers (ability to transform incoming data frame to add/update/merge/delete/normalize etc features)
    # - Conditional Features (ability to create new features from existing features using python expressions)
    # - Tags (ability to provide custom metadata to be added as key-value pairs to a Profile)

    # Run the evaluation
    run_result = runner.run()
    return run_result.profile
    
profile = main()
profile.to_pandas()

profile_json = profile.to_json()
dataset_metrics = profile_json
pd.json_normalize(dataset_metrics).T.dropna()

### Compute the Profile for Nested Json Data Reader

Create the builder object which provides core set of api, using which user can set the behavior of their monitoring. By selecting what components and variants to run all aspects of the monitoring task can be customised and configured. 

The run() method is responsible to run the internal workflow. It also handles the life cycle of each component passed, which includes creation (if required), invoking interface functions, destroying etc . Additionally, runner also handles some more advanced operations like thread pooling, compute engine abstraction etc.

Refer : [Builder Object Documentation](https://objectstorage.us-ashburn-1.oraclecloud.com/p/52qrFSNgCH85OWPBGIfTgNm-KeibRU8oPSSBdDg_t90gZ89r5qXrQFpTfdvQ9ear/n/bigdatadatasciencelarge/b/ml-insight-doc/o/user_guide/getting_started/builder_object.html)

In [None]:
def main():    
    # Set up the insights builder by passing: input schema, metric, data frame and engine details
    runner = InsightsBuilder(). \
        with_input_schema(get_input_schema()). \
        with_metrics(metrics=get_metrics()). \
        with_reader(reader=get_object_storage_nested_json_data_reader()). \
        build()
    # Other Insights components that can be configured are:
    # - Custom Transformers (ability to transform incoming data frame to add/update/merge/delete/normalize etc features)
    # - Conditional Features (ability to create new features from existing features using python expressions)
    # - Tags (ability to provide custom metadata to be added as key-value pairs to a Profile)

    # Run the evaluation
    run_result = runner.run()
    return run_result.profile
    
profile = main()
profile.to_pandas()

profile_json = profile.to_json()
dataset_metrics = profile_json
pd.json_normalize(dataset_metrics).T.dropna()