# Integrate Data Validation Into Your Pipeline



In [None]:
import json
import great_expectations as ge
import great_expectations.jupyter_ux
from great_expectations.datasource.types import BatchKwargs
from datetime import datetime

## Integrate data validation into your pipeline

[**Watch a short tutorial video**](https://greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation#video)


[**Read more in the tutorial**](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation)

**Reach out for help on** [**Great Expectations Slack**](https://greatexpectations.io/slack)




### Get a DataContext object


In [None]:
context = ge.data_context.DataContext()

### Get a pipeline run id

[Read more in the tutorial](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation#set-a-run-id)


In [None]:
# Generate a run id - a pipeline run id, a timestamp or any other string that is meaningful to you 
# and will help you refer to validation results. We recommend they be chronologically sortable.
run_id = datetime.utcnow().isoformat().replace(":", "") + "Z"
run_id

### Choose data asset name and expectation suite name

[Read more in the tutorial](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation#choose-data-asset-and-expectation-suite)


In [None]:
great_expectations.jupyter_ux.list_available_data_asset_names(context)

In [None]:
data_asset_name = "REPLACE ME!" # TODO: replace with your value!
expectation_suite_name = "warning" # TODO: replace with your value!

### Obtain the batch to validate

Learn about `get_batch` in [this tutorial]](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation#obtain-a-batch-to-validate)



##### If your pipeline processes Pandas Dataframes:

```
import pandas as pd
df = pd.read_csv(file_path_to_validate)
batch = context.get_batch(data_asset_name, expectation_suite_name, BatchKwargs(df=df))
batch.head()
```

##### If your pipeline processes Spark Dataframes:
```
from pyspark.sql import SparkSession
from great_expectations.dataset import PandasDataset, SqlAlchemyDataset, SparkDFDataset
spark = SparkSession.builder.getOrCreate()
df = SparkDFDataset(spark.read.csv(file_path_to_validate))
batch = context.get_batch(data_asset_name, expectation_suite_name, BatchKwargs(df=df))
batch.head()
```

##### If your pipeline processes SQL querues:

* A. To validate an existing table:

```
data_asset_name = 'USE THE TABLE NAME'
batch = context.get_batch(data_asset_name, 
                        expectation_suite_name=expectation_suite_name,
                        BatchKwargs(table=data_asset_name)) 
batch.head()
```

* B. To validate a query result set:

```
data_asset_name = 'USE THE NAME YOU SPECIFIED WHEN YOU CREATED THE EXPECTATION SUITE FOR THIS QUERY'
batch = context.get_batch(data_asset_name, 
                        expectation_suite_name=expectation_suite_name,
                        BatchKwargs(query='SQL FOR YOUR QUERY'))
batch.head()
```


In [None]:
batch = context.get_batch(COPY THE APPROPRIATE CODE SNIPPET FROM THE CELL ABOVE)
batch.head()


### Validate the batch

This is the "workhorse" method of Great Expectations. Call it in your pipeline code after loading the file and just before passing it to your computation.

[Read more in the tutorial](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation#validate)



In [None]:
validation_result = batch.validate(run_id=run_id)

if validation_result["success"]:
    print("This file meets all expectations from a valid batch of {0:s}".format(data_asset_name))
else:
    print("This file is not a valid batch of {0:s}".format(data_asset_name))


### Review the validation results

[Read more in the tutorial](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation#review-validation-results)


In [None]:
print(json.dumps(validation_result, indent=4))

### Validation Operators

The `validate` method evaluates one batch of data against one expectation suite and returns a dictionary of validation results. This is sufficient when you explore your data and get to know Great Expectations.
When deploying Great Expectations in a real data pipeline, you will typically discover additional needs:

* validating a group of batches that are logically related
* validating a batch against several expectation suites
* doing something with the validation results (e.g., saving them for a later review, sending notifications in case of failures, etc.).

Validation Operators provide a convenient abstraction for both bundling the validation of multiple expectation suites and the actions that should be taken after the validation.

[Read more about Validation Operators](https://docs.greatexpectations.io/en/latest/features/validation_operators_and_actions.html?utm_source=notebook&utm_medium=integrate_validation)




In [None]:
# This is an example of invoking a validation operator that is configured by default in the great_expectations.yml file

results = context.run_validation_operator(
    assets_to_validate=[batch],
    run_id=run_id,
    validation_operator_name="action_list_operator",
)

results