# How to write multi-batch `BatchRequest` - `Sql` Example
* A `BatchRequest` facilitates the return of a `batch` of data from a configured `Datasource`. To find more about `Batches`, please refer to the [related documentation](https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/how_to_get_a_batch_of_data_from_a_configured_datasource#1-construct-a-batchrequest). 
* A `BatchRequest` can return 0 or more Batches of data depending on the underlying data, and how it is configured. This guide will help you configure `BatchRequests` to return multiple batches, which can be used by
   1. Self-Initializing Expectations to estimate parameters
   2. DataAssistants to profile your data and create and Expectation suite with self-intialized parameters.
   
* Note : Multi-batch BatchRequests are not supported in `RuntimeDataConnector`.

In [1]:
import great_expectations as ge
from ruamel import yaml
from great_expectations.core.batch import BatchRequest
import sqlite3
import pprint

* Load `DataContext`

In [2]:
data_context: ge.DataContext = ge.get_context()

## Sql Example

### Example Database

Imagine we have a database of 12 tables, each corresponding to 1 month of Taxi rider data. 


In [4]:
data_path: str = "../../../test_sets/taxi_yellow_tripdata_samples/sqlite/yellow_tripdata_2020.db"

In [5]:
# connect to sqlite DB, and print the existing tables
con = sqlite3.connect(data_path)
cur = con.cursor()
cur.execute('SELECT name from sqlite_master where type= "table"')
pprint.pprint(cur.fetchall())

[('yellow_tripdata_sample_2020_01',),
 ('yellow_tripdata_sample_2020_02',),
 ('yellow_tripdata_sample_2020_03',),
 ('yellow_tripdata_sample_2020_04',),
 ('yellow_tripdata_sample_2020_05',),
 ('yellow_tripdata_sample_2020_06',),
 ('yellow_tripdata_sample_2020_07',),
 ('yellow_tripdata_sample_2020_08',),
 ('yellow_tripdata_sample_2020_09',),
 ('yellow_tripdata_sample_2020_10',),
 ('yellow_tripdata_sample_2020_11',),
 ('yellow_tripdata_sample_2020_12',)]


### `SimpleSqlDatasource` Example

In our example, we add a `SimpleSqlalchemyDatasource` named `taxi_multi_batch_sql_datasource` with 2 `tables`, namely `yellow_tripdata_sample_2020_01`, `yellow_tripdata_sample_2020_02`. The configuration for `yellow_tripdata_sample_2020_02` is mostly used for our `Checkpoint` at the end, so the following doc will focus more on `yellow_tripdata_sample_2020_01`.

The configuration for `yellow_tripdata_sample_2020_01` also contains 2 `partitioners` which correspond to names of `ConfiguredAssetSqlDataConnectors`. 

**Note**: This example only uses `tables`, but `introspection` could also be used. For more information, please refer to the document [How to configure a DataConnector for splitting and sampling tables in SQL](https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/advanced/how_to_configure_a_dataconnector_for_splitting_and_sampling_tables_in_sql)

The partitioner `whole_table` is built-in to GE, and takes the whole table and returns it as a single Batch. 

It gives the following output, which corresponds to our two tables: 

```bash 
Data Connectors:
    whole_table : ConfiguredAssetSqlDataConnector
    Available data_asset_names (1 of 1):
        yellow_tripdata_sample_2020_01 (1 of 1): [{}]
        yellow_tripdata_sample_2020_02 (1 of 1): [{}]
```


The partitioner `by_vendor_id` is configured by us, and uses a `splitter_method` to split the table values into multiple batches. The splitter we use is `_split_on_divided_integer`, which creates Batches according to rows where value of column_name divided (using integral division) by the given divisor are same. The column name and divisor are given as part of the `splitter_kwargs` parameter.
    
Here is the output, which shows the data asset `yellow_tripdata_sample_2020_01` with 3 batches, each associated with a different `vendor_id`. These become our `batch_identifiers` that distinguish one `Batch` from another.

```bash
Data Connectors:
	by_vendor_id : ConfiguredAssetSqlDataConnector
	Available data_asset_names (1 of 1):
		yellow_tripdata_sample_2020_01 (3 of 3): [{'vendor_id': 0}, {'vendor_id': 1}, {'vendor_id': 2}]
```

In [6]:
data_path: str = "../../../test_sets/taxi_yellow_tripdata_samples/sqlite/yellow_tripdata_2020.db"

datasource_config = {
    "name": "taxi_multi_batch_sql_datasource",
    "module_name": "great_expectations.datasource",
    "class_name": "SimpleSqlalchemyDatasource",
    "connection_string": "sqlite:///" + data_path,
    "tables":{
        "yellow_tripdata_sample_2020_01": {
            "partitioners":{
                "whole_table": {},
                "by_vendor_id":{
                    "splitter_method": "_split_on_divided_integer",
                    "splitter_kwargs": {
                        "column_name": "vendor_id",
                        "divisor": 1
                        }
                    }
                },
            },
        "yellow_tripdata_sample_2020_02": {
            "partitioners":{
                "whole_table": {},
                },
            }
    },
}
data_context.test_yaml_config(yaml.dump(datasource_config))

Attempting to instantiate class from config...
	Instantiating as a Datasource, since class_name is SimpleSqlalchemyDatasource
	Successfully instantiated SimpleSqlalchemyDatasource


ExecutionEngine class name: SqlAlchemyExecutionEngine
Data Connectors:
	by_vendor_id : ConfiguredAssetSqlDataConnector

	Available data_asset_names (1 of 1):
		yellow_tripdata_sample_2020_01 (3 of 3): [{'vendor_id': 0}, {'vendor_id': 1}, {'vendor_id': 2}]

	Unmatched data_references (0 of 0):[]

	whole_table : ConfiguredAssetSqlDataConnector

	Available data_asset_names (2 of 2):
		yellow_tripdata_sample_2020_01 (1 of 1): [{}]
		yellow_tripdata_sample_2020_02 (1 of 1): [{}]

	Unmatched data_references (0 of 0):[]



<great_expectations.datasource.simple_sqlalchemy_datasource.SimpleSqlalchemyDatasource at 0x7f882ed410a0>

In [7]:
# add_datasource only if it doesn't already exist in our configuration
try:
    data_context.get_datasource(datasource_config["name"])
except ValueError:
    data_context.add_datasource(**datasource_config)

## BatchRequest

Depending on which `DataConnector` (ie. `Partitioner`) you send a `BatchRequest` to, you will retrieve a different number of `Batches`

Single Batch returned by `whole_table`

In [8]:
single_batch_batch_request: BatchRequest = BatchRequest(
    datasource_name="taxi_multi_batch_sql_datasource",
    data_connector_name="whole_table",
    data_asset_name="yellow_tripdata_sample_2020_01",
)

In [9]:
batch_list = data_context.get_batch_list(batch_request=single_batch_batch_request)

In [10]:
batch_list

[<great_expectations.core.batch.Batch at 0x7f8846647e50>]

Multi Batch returned by `by_vendor_id`

In [28]:
multi_batch_batch_request: BatchRequest = BatchRequest(
    datasource_name="taxi_multi_batch_sql_datasource",
    data_connector_name="by_vendor_id",
    data_asset_name="yellow_tripdata_sample_2020_01",
)

In [29]:
multi_batch_batch_list = data_context.get_batch_list(batch_request=multi_batch_batch_request)

In [30]:
multi_batch_batch_list

[<great_expectations.core.batch.Batch at 0x7f882ef8aa30>,
 <great_expectations.core.batch.Batch at 0x7f884664d910>,
 <great_expectations.core.batch.Batch at 0x7f882ed00340>]

You can also get a single Batch from a multi-batch DataConnector by passing in `data_connector_query`. 

**Note**: Currently only `batch_filter_parameters` are supported, where we are defining our `vendor_id` to be 2. These will correspond to our `batch_identifiers` when returned from `get_batch_list()`.

In [31]:
single_batch_batch_request_from_multi: BatchRequest = BatchRequest(
    datasource_name="taxi_multi_batch_sql_datasource",
    data_connector_name="by_vendor_id",
    data_asset_name="yellow_tripdata_sample_2020_01",
    data_connector_query={ 
        "batch_filter_parameters": {"vendor_id": 2}
    }
)

In [32]:
batch_list = data_context.get_batch_list(batch_request=single_batch_batch_request_from_multi)

In [33]:
batch_list[0].to_dict() # 'batch_identifiers': {'vendor_id': '2'}},

{'data': '<great_expectations.execution_engine.sqlalchemy_batch_data.SqlAlchemyBatchData object at 0x7f882ed3bac0>',
 'batch_request': {'datasource_name': 'taxi_multi_batch_sql_datasource',
  'data_connector_name': 'by_vendor_id',
  'data_asset_name': 'yellow_tripdata_sample_2020_01',
  'batch_spec_passthrough': None,
  'limit': None,
  'data_connector_query': {'batch_filter_parameters': {'vendor_id': 2}}},
 'batch_definition': {'datasource_name': 'taxi_multi_batch_sql_datasource',
  'data_connector_name': 'by_vendor_id',
  'data_asset_name': 'yellow_tripdata_sample_2020_01',
  'batch_identifiers': {'vendor_id': 2}},
 'batch_spec': {'data_asset_name': 'yellow_tripdata_sample_2020_01',
  'table_name': 'yellow_tripdata_sample_2020_01',
  'batch_identifiers': {'vendor_id': 2},
  'splitter_kwargs': {'column_name': 'vendor_id', 'divisor': 1},
  'splitter_method': '_split_on_divided_integer'},
 'batch_markers': {'ge_load_time': '20220722T235040.915401Z'}}

# Using auto-initializing `Expectations` to generate parameters

We will generate a `Validator` using our `multi_batch_batch_list`

In [34]:
multi_batch_batch_list = data_context.get_batch_list(batch_request=multi_batch_batch_request)

In [35]:
example_suite = data_context.create_expectation_suite(expectation_suite_name="example_sql_suite", overwrite_existing=True)

In [36]:
validator = data_context.get_validator_using_batch_list(batch_list=multi_batch_batch_list, expectation_suite=example_suite)

When you run methods on the validator, it will typically run on the most recent batch (index `-1`), even if the Validator has access to a longer Batch list. For example, notice that rows below are all associated with `vendor_id` : `2`.

In [37]:
validator.head()

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Unnamed: 0,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,rate_code_id,store_and_fwd_flag,pickup_location_id,dropoff_location_id,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,2,2020-01-17 05:51:40,2020-01-17 06:10:33,1,9.21,1,N,162,138,1,27.0,0.5,0.5,7.38,6.12,0.3,44.3,2.5
1,2,2020-01-21 12:57:08,2020-01-21 13:02:57,1,0.8,1,N,161,237,1,5.5,0.0,0.5,1.76,0.0,0.3,10.56,2.5
2,2,2020-01-18 23:56:11,2020-01-19 00:06:59,5,3.34,1,N,233,75,1,11.5,0.5,0.5,3.06,0.0,0.3,18.36,2.5
3,2,2020-01-09 08:35:37,2020-01-09 09:09:13,3,6.91,1,N,142,69,2,29.0,0.0,0.5,0.0,0.0,0.3,32.3,2.5
4,2,2020-01-24 19:39:05,2020-01-24 19:46:47,1,1.02,1,N,264,264,2,7.0,1.0,0.5,0.0,0.0,0.3,11.3,2.5


### Typical Workflow
A `batch_list` becomes really useful when you are calculating parameters for auto-initializing Expectations, as they us a `RuleBasedProfiler` under-the-hood to calculate parameters.

Here is an example running `expect_column_median_to_be_between()` by "guessing" at the `min_value` and `max_value`. 

In [38]:
validator.expect_column_median_to_be_between(column="trip_distance", min_value=0, max_value=1)

Calculating Metrics:   0%|          | 0/9 [00:00<?, ?it/s]

{
  "meta": {},
  "result": {
    "observed_value": 1.6
  },
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  },
  "success": false
}

The observed value for our `yellow_tripdata_sample_2020_01` table where `vendor_id` = `2`  is going to be `1.6`, which means the Expectation fails

Now we run the same expectation again, but this time with `auto=True`. This means the `median` values are going to calculated across the `batch_list` associated with the `Validator` (ie 3 Batches for `yellow_tripdata_sample_2020_01`), which gives the min value of `1.5` and the max value of `5.23`

In [40]:
validator.expect_column_median_to_be_between(column="trip_distance", auto=True)




Generating Expectations:   0%|          | 0/1 [00:00<?, ?it/s]

Profiling Dataset:         0%|          | 0/1 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/9 [00:00<?, ?it/s]

{
  "meta": {},
  "result": {
    "observed_value": 1.6
  },
  "expectation_config": {
    "meta": {
      "auto_generated_at": "20220722T235124.583834Z",
      "great_expectations_version": "0.15.15+13.gfd707510f.dirty"
    },
    "kwargs": {
      "strict_max": false,
      "max_value": 5.16,
      "strict_min": false,
      "column": "trip_distance",
      "min_value": 1.5
    },
    "expectation_type": "expect_column_median_to_be_between"
  },
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  },
  "success": true
}

The `auto=True` will also automatically run the Expectation against the most recent Batch (which has an observed value of `1.61`) and the Expectation will pass. 

You can now save the `ExpectationSuite`.

In [41]:
validator.save_expectation_suite()

### Running the `ExpectationSuite` against single `Batch`

Now the ExpectationSuite can be used to validate single batches using a Checkpoint. In our example, let's validate a different table, `yellow_tripdata_sample_2020_02`, using the `ExpectationSuite` we built from `yellow_tripdata_sample_2020_01`.

In [42]:
single_batch_batch_request_from_multi: BatchRequest = BatchRequest(
    datasource_name="taxi_multi_batch_sql_datasource",
    data_connector_name="whole_table",
    data_asset_name="yellow_tripdata_sample_2020_02",

)


In [43]:
checkpoint_config = {
    "name": "my_checkpoint",
    "config_version": 1,
    "class_name": "SimpleCheckpoint",
    "validations": [
        {
            "batch_request": single_batch_batch_request_from_multi,
            "expectation_suite_name": "example_sql_suite",            
        }
    ],
}
data_context.add_checkpoint(**checkpoint_config)

{
  "action_list": [
    {
      "name": "store_validation_result",
      "action": {
        "class_name": "StoreValidationResultAction"
      }
    },
    {
      "name": "store_evaluation_params",
      "action": {
        "class_name": "StoreEvaluationParametersAction"
      }
    },
    {
      "name": "update_data_docs",
      "action": {
        "class_name": "UpdateDataDocsAction",
        "site_names": []
      }
    }
  ],
  "batch_request": {},
  "class_name": "Checkpoint",
  "config_version": 1.0,
  "evaluation_parameters": {},
  "module_name": "great_expectations.checkpoint",
  "name": "my_checkpoint",
  "profilers": [],
  "runtime_configuration": {},
  "validations": [
    {
      "batch_request": {
        "datasource_name": "taxi_multi_batch_sql_datasource",
        "data_connector_name": "whole_table",
        "data_asset_name": "yellow_tripdata_sample_2020_02"
      },
      "expectation_suite_name": "example_sql_suite"
    }
  ]
}

In [44]:
results = data_context.run_checkpoint(
    checkpoint_name="my_checkpoint"
)

Calculating Metrics:   0%|          | 0/9 [00:00<?, ?it/s]

In [45]:
results.success

True