# How to write multi-batch `BatchRequest` - Inferred `Sql` Example
* A `BatchRequest` facilitates the return of one or more `batch(es)` of data from a configured `Datasource`. To find more about `Batches`, please refer to the [related documentation](https://docs.greatexpectations.io/docs/guides/connecting_to_your_data/how_to_get_one_or_more_batches_of_data_from_a_configured_datasource#1-construct-a-batchrequest). 
* A `BatchRequest` can return 0 or more Batches of data depending on the underlying data, and how it is configured. This guide will help you configure `BatchRequests` to return multiple batches, which can be used by
   1. Self-Initializing Expectations to estimate parameters
   2. DataAssistants to profile your data and create and Expectation suite with self-intialized parameters.
   
* Note : Multi-batch BatchRequests are not supported in `RuntimeDataConnector`.

In [16]:
import great_expectations as gx
from great_expectations.core.yaml_handler import YAMLHandler
from great_expectations.core.batch import BatchRequest
import os

yaml = YAMLHandler()

* Load `DataContext`

In [17]:
data_context: gx.DataContext = gx.get_context()

## Sql Example

### Example Database

Imagine we have a database of 1 table, with `yellow_tripdata_sample_2020`, corresponding to all 12 months' `taxi_trip` data for 2020.


In [18]:
# connect to postgres DB, and print the existing tables
pg_hostname = os.getenv("GE_TEST_LOCAL_DB_HOSTNAME", "localhost")
CONNECTION_STRING = f"postgresql+psycopg2://postgres:@{pg_hostname}/test_ci"
from sqlalchemy import create_engine
from sqlalchemy import inspect

engine = create_engine(CONNECTION_STRING)
insp = inspect(engine)
print(insp.get_table_names())

['yellow_tripdata_sample_2020']


## Example Configuration

In our example, we add a Datasource named `taxi_multi_batch_sql_datasource` with 1 table. We also have a `InferredAssetSqlDataConnector` named `inferred_data_connector_multi_batch_asset`.

The Dataconnector configuration also includes a `splitter_method` to split the table values into multiple batches. The splitter we use is `split_on_year_and_month`, which creates Batches according to the `pickup_datetime` column which of type `timestamp` in the database schema. 


Our configuration also includes `schema_name` that is defined as part of `introspection_directives`. For other options for the DataConnector configuration, including other `introspection_directives`, please refer to the **Appendix** below. 



In [19]:
datasource_config = {
    "name": "taxi_multi_batch_sql_datasource",
    "class_name": "Datasource",
    "module_name": "great_expectations.datasource",
    "execution_engine": {
        "module_name": "great_expectations.execution_engine",
        "class_name": "SqlAlchemyExecutionEngine",
        "connection_string": CONNECTION_STRING,
    },
    "data_connectors": {
        "inferred_data_connector_single_batch_asset": {
            "class_name": "InferredAssetSqlDataConnector",
            "introspection_directives": {"schema_name": "public"},
        },
        "inferred_data_connector_multi_batch_asset_split_on_date_time": {
            "class_name": "InferredAssetSqlDataConnector",
            "splitter_method": "split_on_year_and_month",
            "splitter_kwargs": {
                "column_name": "pickup_datetime",
            },
            "introspection_directives": {"schema_name": "public"},
        },
    },
}

data_context.test_yaml_config(yaml.dump(datasource_config))

Attempting to instantiate class from config...
	Instantiating as a Datasource, since class_name is Datasource
	Successfully instantiated Datasource


ExecutionEngine class name: SqlAlchemyExecutionEngine
Data Connectors:
	inferred_data_connector_multi_batch_asset_split_on_date_time : InferredAssetSqlDataConnector

	Available data_asset_names (1 of 1):
		yellow_tripdata_sample_2020 (3 of 12): [{'pickup_datetime': {'year': 2020, 'month': 1}}, {'pickup_datetime': {'year': 2020, 'month': 10}}, {'pickup_datetime': {'year': 2020, 'month': 11}}]

	Unmatched data_references (0 of 0):[]

	inferred_data_connector_single_batch_asset : InferredAssetSqlDataConnector

	Available data_asset_names (1 of 1):
		yellow_tripdata_sample_2020 (1 of 1): [{}]

	Unmatched data_references (0 of 0):[]



<great_expectations.datasource.new_datasource.Datasource at 0x7f87b72a8610>

We see we have successfully configured this because the output shows 1 data asset `yellow_tripdata_sample_2020` with 12 batches, each associated with a different month in our pickup_datetime column

In [5]:
# add_datasource only if it doesn't already exist in our configuration
try:
    data_context.get_datasource(datasource_config["name"])
except ValueError:
    data_context.add_datasource(**datasource_config)

## BatchRequest

Single Batch returned by `inferred_data_connector_single_batch_asset`

In [20]:
single_batch_batch_request: BatchRequest = BatchRequest(
    datasource_name="taxi_multi_batch_sql_datasource",
    data_connector_name="inferred_data_connector_single_batch_asset",
    data_asset_name="yellow_tripdata_sample_2020",
)

In [21]:
batch_list = data_context.get_batch_list(batch_request=single_batch_batch_request)

In [22]:
batch_list

[<great_expectations.core.batch.Batch at 0x7f87b72a8130>]

Multi Batch returned by `by_pickup_month`

In [25]:
multi_batch_batch_request: BatchRequest = BatchRequest(
    datasource_name="taxi_multi_batch_sql_datasource",
    data_connector_name="inferred_data_connector_multi_batch_asset_split_on_date_time",
    data_asset_name="yellow_tripdata_sample_2020",
)

In [26]:
multi_batch_batch_list = data_context.get_batch_list(
    batch_request=multi_batch_batch_request
)

In [27]:
multi_batch_batch_list  # 12 batches

[<great_expectations.core.batch.Batch at 0x7f87b74c4d30>,
 <great_expectations.core.batch.Batch at 0x7f87b70dcf10>,
 <great_expectations.core.batch.Batch at 0x7f87b704fdf0>,
 <great_expectations.core.batch.Batch at 0x7f87b74d5b50>,
 <great_expectations.core.batch.Batch at 0x7f87b74c49a0>,
 <great_expectations.core.batch.Batch at 0x7f87b74bf6d0>,
 <great_expectations.core.batch.Batch at 0x7f87b74bf850>,
 <great_expectations.core.batch.Batch at 0x7f87b704ff10>,
 <great_expectations.core.batch.Batch at 0x7f87b74d55b0>,
 <great_expectations.core.batch.Batch at 0x7f87b737e3a0>,
 <great_expectations.core.batch.Batch at 0x7f87b4707730>,
 <great_expectations.core.batch.Batch at 0x7f87b71991c0>]

You can also get a single Batch from a multi-batch DataConnector by passing in a `data_connector_query`. 

In [53]:
single_batch_batch_request_from_multi: BatchRequest = BatchRequest(
    datasource_name="taxi_multi_batch_sql_datasource",
    data_connector_name="inferred_data_connector_multi_batch_asset_split_on_date_time",
    data_asset_name="yellow_tripdata_sample_2020",
    data_connector_query={
        "batch_filter_parameters": {"pickup_datetime": {"year": 2020, "month": 1}}
    },
)

In [54]:
batch_list = data_context.get_batch_list(
    batch_request=single_batch_batch_request_from_multi
)

In [60]:
batch_list  # has a length of 1, as expected

[<great_expectations.core.batch.Batch at 0x7f87b7083340>]

Let's review our batch:

In [58]:
batch = batch_list[
    0
]  # our single filtered batch with 'batch_identifiers': {'pickup_datetime': '2020-01'}

In [59]:
batch.to_dict()

{'data': '<great_expectations.execution_engine.sqlalchemy_batch_data.SqlAlchemyBatchData object at 0x7f87b7905190>',
 'batch_request': {'datasource_name': 'taxi_multi_batch_sql_datasource',
  'data_connector_name': 'inferred_data_connector_multi_batch_asset_split_on_date_time',
  'data_asset_name': 'yellow_tripdata_sample_2020',
  'data_connector_query': {'batch_filter_parameters': {'pickup_datetime': {'year': 2020,
     'month': 1}}},
  'limit': None,
  'batch_spec_passthrough': None},
 'batch_definition': {'datasource_name': 'taxi_multi_batch_sql_datasource',
  'data_connector_name': 'inferred_data_connector_multi_batch_asset_split_on_date_time',
  'data_asset_name': 'yellow_tripdata_sample_2020',
  'batch_identifiers': {'pickup_datetime': {'year': 2020, 'month': 1}}},
 'batch_spec': {'data_asset_name': 'yellow_tripdata_sample_2020',
  'table_name': 'yellow_tripdata_sample_2020',
  'batch_identifiers': {'pickup_datetime': {'year': 2020, 'month': 1}},
  'type': 'table',
  'data_asset_

# Using auto-initializing `Expectations` to generate parameters

We will generate a `Validator` using our `multi_batch_batch_list`

In [34]:
multi_batch_batch_list = data_context.get_batch_list(
    batch_request=multi_batch_batch_request
)

In [35]:
example_suite = data_context.create_expectation_suite(
    expectation_suite_name="example_sql_suite", overwrite_existing=True
)

In [36]:
validator = data_context.get_validator_using_batch_list(
    batch_list=multi_batch_batch_list, expectation_suite=example_suite
)

When you run methods on the validator, it will typically run on the most recent batch (index `-1`), even if the Validator has access to a longer Batch list. For example, notice that rows below are all associated with `pickup_datetime` being `9` (September, 2020). This is because the datetime values are stored lexicographically, meaning `1` and `11`, `12` values will appear **before** `2` and `3`.

For simplicity, let's get a `validator` with the December `Batch`, which is in index `"3"` (after `1`, `10`, `11`). Notice that we are also casting the value as a `list` using the square brackets. 

In [37]:
validator = data_context.get_validator_using_batch_list(
    batch_list=[multi_batch_batch_list[3]], expectation_suite=example_suite
)

In [38]:
validator.head()

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Unnamed: 0,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,rate_code_id,store_and_fwd_flag,pickup_location_id,dropoff_location_id,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,2.0,2020-12-15 12:20:27,2020-12-15 12:40:49,4.0,5.76,1.0,N,209,237,1.0,21.0,0.0,0.5,2.5,0.0,0.3,26.8,2.5
1,2.0,2020-12-28 12:51:25,2020-12-28 13:15:12,1.0,11.64,1.0,N,161,220,1.0,33.5,0.0,0.5,5.0,2.8,0.3,44.6,2.5
2,2.0,2020-12-27 10:43:42,2020-12-27 10:51:05,1.0,1.22,1.0,N,163,48,1.0,7.0,0.0,0.5,2.06,0.0,0.3,12.36,2.5
3,2.0,2020-12-08 13:42:52,2020-12-08 13:54:45,1.0,1.84,1.0,N,137,229,2.0,9.0,0.0,0.5,0.0,0.0,0.3,12.3,2.5
4,2.0,2020-12-19 11:56:43,2020-12-19 12:08:43,1.0,1.55,1.0,N,24,74,1.0,9.5,0.0,0.5,2.58,0.0,0.3,12.88,0.0


### Typical Workflow
A `batch_list` becomes really useful when you are calculating parameters for auto-initializing Expectations, as they use a `RuleBasedProfiler` under-the-hood to calculate parameters.

Let's say we don't know the min_value and max_value for `expect_column_median_to_be_between()` so we "guess" at the `min_value` and `max_value`.

In [70]:
validator = data_context.get_validator_using_batch_list(
    batch_list=multi_batch_batch_list, expectation_suite=example_suite
)

In [71]:
validator.expect_column_median_to_be_between(
    column="trip_distance", min_value=0, max_value=1
)

Calculating Metrics:   0%|          | 0/9 [00:00<?, ?it/s]

{
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  },
  "result": {
    "observed_value": 1.75
  },
  "success": false,
  "meta": {}
}

The observed value for our `yellow_tripdata_sample_2020` table where `trip_distance` is going to be `1.75`, which means the Expectation fails. We guessed wrong - but we can do better!

Now we run the same expectation again, but this time with `auto=True`. This means the `median` values are going to calculated across the `batch_list` associated with the `Validator` (ie 12 Batches for `yellow_tripdata_sample_2020`), which gives the min value of `1.6` and the max value of `1.99`

In [72]:
validator.expect_column_median_to_be_between(column="trip_distance", auto=True)




Generating Expectations:   0%|          | 0/1 [00:00<?, ?it/s]

Profiling Dataset:         0%|          | 0/1 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/9 [00:00<?, ?it/s]

{
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  },
  "expectation_config": {
    "expectation_type": "expect_column_median_to_be_between",
    "kwargs": {
      "strict_max": false,
      "min_value": 1.6,
      "column": "trip_distance",
      "strict_min": false,
      "max_value": 1.99
    },
    "meta": {
      "auto_generated_at": "20220909T044852.642862Z",
      "great_expectations_version": "0.15.22+3.g7ae9a4c03.dirty"
    }
  },
  "result": {
    "observed_value": 1.75
  },
  "success": true,
  "meta": {}
}

The `auto=True` will also automatically run the Expectation against the most recent Batch (which has an observed value of `1.61`) and the Expectation will pass. 

You can now save the `ExpectationSuite`.

In [73]:
validator.save_expectation_suite()

### Running the `ExpectationSuite` against single `Batch`

Now the ExpectationSuite we built using all batches can be used to validate single batches using a Checkpoint. For example, we can run this checkpoint on new data when it comes in next month. In our example, let's validate a different batch from February 2020, using the `ExpectationSuite` we built from `yellow_tripdata_sample_2020`.

In [78]:
single_batch_batch_request_from_multi: BatchRequest = BatchRequest(
    datasource_name="taxi_multi_batch_sql_datasource",
    data_connector_name="inferred_data_connector_multi_batch_asset_split_on_date_time",
    data_asset_name="yellow_tripdata_sample_2020",
    data_connector_query={
        "batch_filter_parameters": {"pickup_datetime": {"year": 2020, "month": 2}}
    },
)

In [79]:
checkpoint_config = {
    "name": "my_checkpoint",
    "config_version": 1,
    "class_name": "SimpleCheckpoint",
    "validations": [
        {
            "batch_request": single_batch_batch_request_from_multi,
            "expectation_suite_name": "example_sql_suite",
        }
    ],
}
data_context.add_checkpoint(**checkpoint_config)

{
  "action_list": [
    {
      "name": "store_validation_result",
      "action": {
        "class_name": "StoreValidationResultAction"
      }
    },
    {
      "name": "store_evaluation_params",
      "action": {
        "class_name": "StoreEvaluationParametersAction"
      }
    },
    {
      "name": "update_data_docs",
      "action": {
        "class_name": "UpdateDataDocsAction",
        "site_names": []
      }
    }
  ],
  "batch_request": {},
  "class_name": "Checkpoint",
  "config_version": 1.0,
  "evaluation_parameters": {},
  "module_name": "great_expectations.checkpoint",
  "name": "my_checkpoint",
  "profilers": [],
  "runtime_configuration": {},
  "validations": [
    {
      "batch_request": {
        "datasource_name": "taxi_multi_batch_sql_datasource",
        "data_connector_name": "inferred_data_connector_multi_batch_asset_split_on_date_time",
        "data_asset_name": "yellow_tripdata_sample_2020",
        "data_connector_query": {
          "batch_filter_para

In [80]:
results = data_context.run_checkpoint(checkpoint_name="my_checkpoint")

Calculating Metrics:   0%|          | 0/9 [00:00<?, ?it/s]

In [81]:
results.success

True

# Appendix

## Other Parameters for `InferredAssetSqlDataConnector`

The signature of the `InferredAssetSqlDataConnector` also contains the following required parameters:
- `name`: string describing the name of this DataConnector.
- `datasource_name`: the name of the Datasource that contains it.
- `execution_engine`: the type of ExecutionEngine to use.

And the following optional parameters
- `data_asset_name_prefix`: string describing an optional prefix to prepend to inferred data_asset_names.
- `data_asset_name_suffix`: string describing an optional suffix to append to inferred data_asset_names.
- `include_schema_name`: bool which answers the question : "Should the data_asset_name include the schema as a prefix?"
- `splitter_method`: string that names method to split the target table into multiple `Batches`.
- `splitter_kwargs`: dict containing keyword arguments to pass to `splitter_method`.
- `sampling_method`: string that names method to sample `Batches`.
- `sampling_kwargs`: dict containing keyword arguments to pass to `sampling_method`.
- `excluded_tables`: A list of tables to ignore when inferring data asset_names
- `included_tables`: If not `None`, only include tables in this list when inferring data asset_names.
- `skip_inapplicable_tables`:  If `True`, tables that can't be successfully queried using sampling and splitter methods are excluded from inferred data_asset_names. If `False`, the class will throw an error during initialization if any such tables are encountered.
- `batch_spec_passthrough`: dictionary with keys that will be added directly to batch_spec.
- `introspection_directives`: Arguments passed to the introspection method to guide introspection.

Valid keys for `introspection_directives` include: 
- `schema_name`: string describing schema to introspect (default is `None`). We used this parameter in our example above. 
- `ignore_information_schemas_and_system_tables`: bool (`default=True`) which determines whether to ignore information schemas and system tables when introspecting the database.
- `system_tables`: optional list of strings that define `system_tables` for your db. 
- `information_schemas`: optional list of strings that define `information_schemas` for your db. 
- `include_views`: bool (`default=True`) which determines whether to include `views` when introspecting the database. 

## Loading Data into Postgresql Database

* The following code can be used to build the postgres database used in this notebook. It is included (and commented out) for reference.
* In order to load the data into a local `postgresql` database, please feel free to use the `docker-compose.yml` file available at `great_expectations/assets/docker/postgresql/`. 

### To spin up the `postgresql` database
* Have [Docker Desktop](https://www.docker.com/products/docker-desktop/) running locally.
* Navigate to `great_expectations/assets/docker/postgresql/`
* Type `docker-compose up`
* Then uncomment and run the following snippet

In [50]:
# from tests.test_utils import load_data_into_test_database
# from typing import List
# import sqlalchemy as sa
# import pandas as pd
# pg_hostname = os.getenv("GE_TEST_LOCAL_DB_HOSTNAME", "localhost")
# CONNECTION_STRING = f"postgresql+psycopg2://postgres:@{pg_hostname}/test_ci"

# data_paths: List[str] = [
#      "../../../test_sets/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2020-01.csv",
#      "../../../test_sets/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2020-02.csv",
#      "../../../test_sets/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2020-03.csv",
#      "../../../test_sets/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2020-04.csv",
#      "../../../test_sets/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2020-05.csv",
#      "../../../test_sets/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2020-06.csv",
#      "../../../test_sets/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2020-07.csv",
#      "../../../test_sets/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2020-08.csv",
#      "../../../test_sets/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2020-09.csv",
#      "../../../test_sets/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2020-10.csv",
#      "../../../test_sets/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2020-11.csv",
#      "../../../test_sets/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2020-12.csv",
# ]


# engine = sa.create_engine(CONNECTION_STRING)
# connection = engine.connect()
# table_name = "yellow_tripdata_sample_2020"
# res = connection.execute(f"DROP TABLE IF EXISTS {table_name}")

# for data_path in data_paths:
#     # This utility is not for general use. It is only to support testing.
#     load_data_into_test_database(
#         table_name="yellow_tripdata_sample_2020",
#         csv_path=data_path,
#         connection_string=CONNECTION_STRING,
#         load_full_dataset=True,
#         drop_existing_table=False,
#         convert_colnames_to_datetime=["pickup_datetime", "dropoff_datetime"]
#     )

Adding to existing table yellow_tripdata_sample_2020 and adding data from ['../../../test_sets/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2020-01.csv']
Adding to existing table yellow_tripdata_sample_2020 and adding data from ['../../../test_sets/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2020-02.csv']
Adding to existing table yellow_tripdata_sample_2020 and adding data from ['../../../test_sets/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2020-03.csv']
Adding to existing table yellow_tripdata_sample_2020 and adding data from ['../../../test_sets/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2020-04.csv']
Adding to existing table yellow_tripdata_sample_2020 and adding data from ['../../../test_sets/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2020-05.csv']
Adding to existing table yellow_tripdata_sample_2020 and adding data from ['../../../test_sets/taxi_yellow_tripdata_samples/yellow_tripdata_sample_2020-06.csv']
Adding to existing table yellow_tr