In [0]:
#importing statements
from great_expectations.data_context.types.base import DataContextConfig, DatasourceConfig, FilesystemStoreBackendDefaults
from great_expectations.data_context import BaseDataContext

In [0]:
#Intializing the data source Configuration
my_spark_datasource_config = DatasourceConfig(
    class_name="SparkDFDatasource",
    batch_kwargs_generators={},
)

  and should_run_async(code)


In [0]:
data_context_config = DataContextConfig(
    datasources={"my_spark_datasource": my_spark_datasource_config},
    stores={
        "expectations_filesystem_store": {
            "class_name": "ExpectationsStore",
            "store_backend": {
                "class_name": "TupleFilesystemStoreBackend",
                "base_directory":"/FileStorage/great_expectations/es/",
            },
        },
        "validations_filesystem_store": {
           "class_name": "ValidationsStore",
            "store_backend": {
            "class_name": "TupleFilesystemStoreBackend",
            "base_directory":"/FileStorage/great_expectations/vs/",
            },
        },
       "evaluation_filesystem_store": {
            "class_name": "EvaluationParameterStore"
        },
    },
    expectations_store_name="expectations_filesystem_store",
    validations_store_name="validations_filesystem_store",
    evaluation_parameter_store_name="evaluation_filesystem_store",
    validation_operators={
        "action_list_operator":{
            "class_name":"ActionListValidationOperator",
            "action_list":[
                {
                 "name":"store_validation_result",
                  "action":{"class_name":"StoreValidationResultAction"},
                }
            ],
        }
    },
    store_backend_defaults=FilesystemStoreBackendDefaults(root_directory="/FileStorage/great_expectations/"),
    )
 

In [0]:
context=BaseDataContext(project_config=data_context_config)

######d1 contains a csv file which is having two columns i.e., name of the dataset and path of the dataset

In [0]:
d1 = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/shared_uploads/tulasinimmagadda09@gmail.com/MasterInputConfig.csv")
display(d1)

dataset,path
sales1,dbfs:/FileStore/shared_uploads/tulasinimmagadda09@gmail.com/1500000_Sales_Records.csv
sales2,dbfs:/FileStore/shared_uploads/tulasinimmagadda09@gmail.com/WALMART_SALES_DATA.csv


In [0]:
####################Iterating through d1 for each iteration it will consider each dataset########################################
for i in d1.collect():
    df = spark.read.format("csv") \
    .option("inferSchema","true") \
    .option("header","true") \
    .load(i["path"])
    #display(df)  #displays the contents of dataset
    
    context.create_expectation_suite("practice1",overwrite_existing=True) #creates expectation suite for each dataset seperately
    #Batch for each dataset seperately
    my_batch1 = context.get_batch({
        "dataset": df,
        "datasource": "my_spark_datasource",
    }, "practice1")
    #csvfile-4 is a csv file which is having 2 columns i.e., tablename and path of the particular configuration file for each table
    #Reading that file using spark
    df2=spark.read.format("csv") \
      .option("header","true") \
      .option("inferSchema","true") \
      .load("dbfs:/FileStore/shared_uploads/tulasinimmagadda09@gmail.com/ReferenceInputConfig.csv")
     
    display(df2) #Using display function to see the contents of the configuration file
    #Iterating through df2 
    for j in df2.collect():
        #In the below if condition it will check whether the name of dataset and tablename are equal or not, 
        #if it is equal it will run the expectations for that particular dataset
        #if not it will go back to the outer for loop
        if i["dataset"] == j["tablename"] :
            df3=spark.read.format("csv").option("header","true").option("inferSchema","true").load(j["path"])
            display(df3)
            for k in df3.collect():
                #Expectation for column values to be between
                my_batch1.expect_column_values_to_be_between(column=k["column"],min_value=k["val_min"], max_value=k["val_max"])
                #Expectation for min to be between
                my_batch1.expect_column_min_to_be_between(column=k["column"],min_value=k["min_min"], max_value=k["min_max"])
                #Expectation for max to be between
                my_batch1.expect_column_max_to_be_between(column=k["column"],min_value=k["max_min"], max_value=k["max_max"])
                #Expectation for mean to be between
                my_batch1.expect_column_mean_to_be_between(column=k["column"],min_value=k["mean_min"], max_value=k["mean_max"])
                #Expectation for median to be between
                my_batch1.expect_column_median_to_be_between(column=k["column"],
                                                             min_value=k["median_min"],max_value=k["median_max"])
                #Duplicate checks
                my_batch1.expect_column_values_to_be_unique(column=k["column"])
            #prints the validation result for each dataset
            print(my_batch1.validate())
        #context.run_validation_operator("action_list_operator",assets_to_validate=[my_batch1])#save and run validation


tablename,path
sales1,dbfs:/FileStore/shared_uploads/tulasinimmagadda09@gmail.com/salescsv.csv
sales2,dbfs:/FileStore/shared_uploads/tulasinimmagadda09@gmail.com/walcsv-3.csv


table,column,val_min,val_max,min_min,min_max,max_min,max_max,mean_min,mean_max,median_min,median_max
t1,Units Sold,6.0,10000.0,0.0,6.0,9000,10000,4999,5000,4995,4998
t1,Unit Price,9.33,668.77,9.33,10.0,50,76,260,266,150,155
t1,Unit Cost,6.92,524.96,5.0,6.92,500,524,185,188,95,98
t1,Total Revenue,9.33,6682700.0,7.0,9.33,6682698,6682700,1329528,1329529,784625,784631
t1,Total Cost,6.92,5249600.0,5.0,6.92,5249598,5249600,937196,937199,467030,467035
t1,Total Profit,2.41,1738700.0,0.0,2.41,1738600,1738700,392330,392333,281126,281129


{
  "evaluation_parameters": {},
  "meta": {
    "great_expectations_version": "0.14.13",
    "expectation_suite_name": "practice1",
    "run_id": {
      "run_time": "2022-04-05T09:13:25.522435+00:00",
      "run_name": null
    },
    "batch_kwargs": {
      "datasource": "my_spark_datasource",
      "SparkDFRef": true,
      "ge_batch_id": "43b2cb08-b4c0-11ec-99bb-00163e1a28e3"
    },
    "batch_markers": {
      "ge_load_time": "20220405T091040.415188Z"
    },
    "batch_parameters": null,
    "validation_time": "20220405T091325.522271Z",
    "expectation_suite_meta": {
      "great_expectations_version": "0.14.13"
    }
  },
  "results": [
    {
      "result": {
        "element_count": 1048575,
        "missing_count": 0,
        "missing_percent": 0.0,
        "unexpected_count": 521,
        "unexpected_percent": 0.04968647926948477,
        "unexpected_percent_total": 0.04968647926948477,
        "unexpected_percent_nonmissing": 0.04968647926948477,
        "partial_unexpecte

tablename,path
sales1,dbfs:/FileStore/shared_uploads/tulasinimmagadda09@gmail.com/salescsv.csv
sales2,dbfs:/FileStore/shared_uploads/tulasinimmagadda09@gmail.com/walcsv-3.csv


column,min_min,min_max,max_min,max_max,val_min,val_max,mean_min,mean_max,median_min,median_max
Weekly_Sales,209984.0,209987.0,3818686.0,3818687.0,209986.0,3818686.45,1046964.0,1046965.0,960746.0,960477.0
Temperature,-2.1,-2.06,100.0,100.14,-2.06,100.0,59.0,61.0,62.0,64.0
Fuel_Price,2.2,2.5,4.4,5.0,2.472,4.468,3.2,3.5,3.2,3.5
CPI,126.0,127.0,226.0,228.0,126.0,228.0,170.0,172.0,180.0,184.0


{
  "evaluation_parameters": {},
  "meta": {
    "great_expectations_version": "0.14.13",
    "expectation_suite_name": "practice1",
    "run_id": {
      "run_time": "2022-04-05T09:14:15.208462+00:00",
      "run_name": null
    },
    "batch_kwargs": {
      "datasource": "my_spark_datasource",
      "SparkDFRef": true,
      "ge_batch_id": "b985662e-b4c0-11ec-99bb-00163e1a28e3"
    },
    "batch_markers": {
      "ge_load_time": "20220405T091358.088874Z"
    },
    "batch_parameters": null,
    "validation_time": "20220405T091415.208328Z",
    "expectation_suite_meta": {
      "great_expectations_version": "0.14.13"
    }
  },
  "results": [
    {
      "result": {
        "element_count": 6435,
        "missing_count": 0,
        "missing_percent": 0.0,
        "unexpected_count": 0,
        "unexpected_percent": 0.0,
        "unexpected_percent_total": 0.0,
        "unexpected_percent_nonmissing": 0.0,
        "partial_unexpected_list": []
      },
      "exception_info": {
      

In [0]:
res=my_batch1.validate()

In [0]:
res.write.format("parquet").saveAsTable(tab_name)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAttributeError[0m                            Traceback (most recent call last)
[0;32m<command-3063403114751774>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mres[0m[0;34m.[0m[0mwrite[0m[0;34m.[0m[0mformat[0m[0;34m([0m[0;34m"parquet"[0m[0;34m)[0m[0;34m.[0m[0msaveAsTable[0m[0;34m([0m[0mtab_name[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;31mAttributeError[0m: 'ExpectationSuiteValidationResult' object has no attribute 'write'