# Imports

In [2]:
import great_expectations
from great_expectations.profile.basic_dataset_profiler import BasicDatasetProfilerBase
from great_expectations.data_context import DataContext
from great_expectations.core import ExpectationSuite
from great_expectations.core import ExpectationConfiguration
from great_expectations.profile.base import (
    DatasetProfiler,
    ProfilerCardinality,
    ProfilerDataType,
    ProfilerTypeMapping,
)
import logging

# Code

## Metric Builders

In [2]:
class MyRowCountMetricBuilder():
    """
    Leverages Welford's algorithm, a famous algorithm for 
    calculating a running variance. For numerical
    stability, we don't keep track of variance as 
    we go, rather keeping track of the squares of 
    the distance from the mean. Then, at the end, 
    we can calculate the variance
    """

    @classmethod
    def initialize(cls, batch):
        """
        Initializes an aggregate that will track:
            count   the number of data points in the set
            mean    the mean of the data set
            M2      the squared distance from the mean
        as a dictionary
        """
        row_count = batch.get_row_count()
        
        batch_count = 1
        mean = row_count
        M2 = 0

        return {'batch_count': batch_count, 'mean': mean, 'M2': M2}
    
    
    @classmethod
    def update(cls, current_aggregate, batch):
        """
        Adds one data point to an aggregate, tracking:
            count   the number of data points in the set
            mean    the mean of the data set
            M2      the squared distance from the mean
        """
        row_count = batch.get_row_count()
        
        batch_count = current_aggregate['batch_count']
        mean = current_aggregate['mean']
        M2 = current_aggregate['M2']

        batch_count += 1
        delta = row_count - mean
        mean += delta / batch_count
        delta2 = row_count - mean
        M2 += delta * delta2

        return {'batch_count': batch_count, 'mean': mean, 'M2': M2}
    
    
    @classmethod
    def finalize(cls, current_aggregate):
        """
        Uses an aggregate as defined in cls.init and cls.update
        to retrieve mean, variance, and sample variance
        of the data set
        """
        import math
        
        batch_count = current_aggregate['batch_count']
        mean = current_aggregate['mean']
        M2 = current_aggregate['M2']

        if batch_count < 2:
            final_aggregate = {
                'batch_count': batch_count, 
                'mean': mean, 
                'variance': float('nan'), 
                'sample_variance': float('nan'), 
                'standard_deviation': float('nan')
            }
        else: 
            mean = mean
            variance = M2 / batch_count
            standard_deviation = math.sqrt(variance)
            final_aggregate = {
                'batch_count': batch_count, 
                'mean': mean, 
                'standard_deviation': standard_deviation
            }

        return final_aggregate

In [3]:
class MyColumnMeanMetricBuilder():
    """
    Leverages Welford's algorithm, a famous algorithm for 
    calculating a running variance. For numerical
    stability, we don't keep track of variance as 
    we go, rather keeping track of the squares of 
    the distance from the mean. Then, at the end, 
    we can calculate the variance
    """

    @classmethod
    def initialize(cls, batch, column):
        """
        Initializes an aggregate that will track:
            count   the number of data points in the set
            mean    the mean of the data set
            M2      the squared distance from the mean
        as a dictionary
        """
        column_mean = batch.get_column_mean(column)
        
        batch_count = 1
        mean = column_mean
        M2 = 0

        return {'batch_count': batch_count, 'mean': mean, 'M2': M2}
    
    
    @classmethod
    def update(cls, current_aggregate, batch, column):
        """
        Adds one data point to an aggregate, tracking:
            count   the number of data points in the set
            mean    the mean of the data set
            M2      the squared distance from the mean
        """
        column_mean = batch.get_column_mean(column)
        
        batch_count = current_aggregate['batch_count']
        mean = current_aggregate['mean']
        M2 = current_aggregate['M2']

        batch_count += 1
        delta = column_mean - mean
        mean += delta / batch_count
        delta2 = column_mean - mean
        M2 += delta * delta2

        return {'batch_count': batch_count, 'mean': mean, 'M2': M2}
    
    
    @classmethod
    def finalize(cls, current_aggregate):
        """
        Uses an aggregate as defined in cls.init and cls.update
        to retrieve mean, variance, and sample variance
        of the data set
        """
        import math
        
        batch_count = current_aggregate['batch_count']
        mean = current_aggregate['mean']
        M2 = current_aggregate['M2']

        if batch_count < 2:
            final_aggregate = {
                'batch_count': batch_count, 
                'mean': mean, 
                'variance': float('nan'), 
                'sample_variance': float('nan'), 
                'standard_deviation': float('nan')
            }
        else: 
            mean = mean
            variance = M2 / batch_count
            standard_deviation = math.sqrt(variance)
            final_aggregate = {
                'batch_count': batch_count, 
                'mean': mean, 
                'standard_deviation': standard_deviation
            }

        return final_aggregate

In [4]:
class MyColumnValueSetMetricBuilder():
    """
    The aggregate we are building for each column
    looks as follows:
    
    {
        'batch_count': number_of_batches,
        'total_row_count': aggregate_number_of_rows,
        'values':{
            'value_1': {
                'total_instances': total_across_all_batches,
                'batches_found_in': total_batches_found_in
            },
            'value_2': {
                'total_instances': total_across_all_batches,
                'batches_found_in': total_batches_found_in
            }
        }
    }
    """

    @classmethod
    def initialize(cls, batch, column):
        value_counts = batch.get_column_value_counts(column)
        row_count = batch.get_row_count()
        
        aggregate = {}
        aggregate['values'] = {}
        for value in value_counts.index:
            aggregate['values'][value] = {
                'total_instances': value_counts[value],
                'batches_found_in': 1
            }

        aggregate['batch_count'] = 1
        aggregate['total_row_count'] = row_count
        
        return aggregate
    
    
    @classmethod
    def update(cls, current_aggregate, batch, column):
        value_counts = batch.get_column_value_counts(column)
        row_count = batch.get_row_count()
        
        current_aggregate['batch_count'] += 1
        current_aggregate['total_row_count'] += row_count

        for value in value_counts.index:
            if value in current_aggregate['values']:
                current_aggregate['values'][value]['total_instances'] += value_counts[value]
                current_aggregate['values'][value]['batches_found_in'] += 1
            else:
                current_aggregate['values'][value] = {
                    'total_instances': value_counts[value],
                    'batches_found_in': 1
                }
        
        return current_aggregate
    
    
    @classmethod
    def finalize(cls, current_aggregate):
        batch_count = current_aggregate['batch_count']
        total_row_count = current_aggregate['total_row_count']
        
        final_aggregate = current_aggregate.copy()
        
        for value in current_aggregate['values']:  
            batches_found_in = current_aggregate['values'][value]['batches_found_in']
            total_instances = current_aggregate['values'][value]['total_instances']

            batch_frequency = batches_found_in / batch_count
            row_frequency = total_instances / total_row_count
            
            final_aggregate['values'][value] = {
                'batch_frequency': batch_frequency,
                'row_frequency': row_frequency
            }
            
        return final_aggregate

## Expectation Builder

In [5]:
# TODO: catch BatchKwargsError

class BatchLoopingExpectationBuilder():
    """
    Accepts expectation configurations, leverages MetricBuilders 
    while looping over batches, and returns an expectation
    suite containing the specified expectations.
    """

    @classmethod
    def build_expectations(
        cls, 
        context, 
        suite_name, 
        datasource_name, 
        generator_name, 
        data_asset_name,
        row_count_config=None,
        column_mean_configs=None,
        column_value_set_configs=None
    ):

        # initialize suite
        
        suite = ExpectationSuite(suite_name)
        
        
        # get partition ids
        
        datasource = context.get_datasource(datasource_name=datasource_name)
        generator = datasource.get_batch_kwargs_generator(name=generator_name)
        partition_ids = generator.get_available_partition_ids(data_asset_name=data_asset_name)
        partition_ids.sort()
                
        
        # loop batches
        
        for i, partition_id in enumerate(partition_ids):
            batch_kwargs = context.build_batch_kwargs(
                datasource=datasource_name, 
                batch_kwargs_generator=generator_name, 
                data_asset_name=data_asset_name,
                partition_id=partition_id
            )
            batch = context.get_batch(batch_kwargs, suite)
            columns = batch.get_table_columns()
            
            
            if row_count_config:
                if i == 0:
                    row_count_aggregate = MyRowCountMetricBuilder().initialize(batch)
                else:
                    row_count_aggregate = MyRowCountMetricBuilder().update(row_count_aggregate, batch)
                   
                
            if column_mean_configs:
                if i == 0:
                    column_mean_first_batch = {
                            config['column']: True for config in column_mean_configs
                        }
                    column_mean_aggregates = {}
                
                for j, config in enumerate(column_mean_configs):
                    column = config['column']
                    
                    if column in columns:
                        if column_mean_first_batch[column]:
                            column_mean_aggregates[column] = MyColumnMeanMetricBuilder().initialize(
                                batch,
                                column
                            )
                        else:
                            column_mean_aggregates[column] = MyColumnMeanMetricBuilder().update(
                                column_mean_aggregates[column],
                                batch,
                                column
                            )
                        
                        column_mean_first_batch[column] = False
                
                
            if column_value_set_configs:
                if i == 0:
                    column_value_set_first_batch = {
                            config['column']: True for config in column_value_set_configs
                        }
                    column_value_set_aggregates = {}
                
                
                for j, config in enumerate(column_value_set_configs):
                    column = config['column']
                    
                    if column in columns:

                        if column_value_set_first_batch[column]:
                            column_value_set_aggregates[column] = MyColumnValueSetMetricBuilder().initialize(
                                batch, 
                                column
                            )
                        else:
                            column_value_set_aggregates[column] = MyColumnValueSetMetricBuilder().update(
                                column_value_set_aggregates[column],
                                batch,
                                column
                            )      
                                    
                        column_value_set_first_batch[column] = False
        
        
        # finalize and add expectations
        
        if row_count_config:
            number_of_stds = row_count_config['number_of_stds']
            
            final_aggregate = MyRowCountMetricBuilder().finalize(row_count_aggregate)
        
            rc_mean = final_aggregate['mean']
            rc_std = final_aggregate['standard_deviation']
            
        
            rc_min_value = round(rc_mean - (number_of_stds * rc_std))   # min value should be an integer
            if 'min_value' in row_count_config:                         # check to see if a minimum min value has been specified
                if row_count_config['min_value'] < 0:
                    raise ValueError('Minimum value for row count should be at least 0')
                if rc_min_value < row_count_config['min_value']:
                    rc_min_value = row_count_config['min_value']
            else:
                rc_min_value *= int(rc_min_value >= 0)      # row count should always be >= 0
        
            rc_max_value = round(rc_mean + (number_of_stds * rc_std))   # max value should be an integer
            if 'max_value' in row_count_config:                         # check to see if a maximum max value has been specified
                if row_count_config['max_value'] < 0:
                    raise ValueError('Maximum value for row count should be at least 0')
                if rc_max_value > row_count_config['max_value']:
                    rc_max_value = row_count_config['max_value']
        
            suite.add_expectation(
                ExpectationConfiguration(
                    expectation_type='expect_table_row_count_to_be_between',
                    kwargs={
                        'min_value':rc_min_value,
                        'max_value':rc_max_value
                    },
                    meta={
                        'BatchLoopingProfiler': final_aggregate
                    }
                )
            )

            
        if column_mean_configs:
            for j, config in enumerate(column_mean_configs):
                column = config['column']
                
                number_of_stds = 2
                if 'number_of_stds' in config:
                    number_of_stds = config['number_of_stds']

                final_aggregate = MyColumnMeanMetricBuilder().finalize(column_mean_aggregates[column])

                cm_mean = final_aggregate['mean']
                cm_std = final_aggregate['standard_deviation']

                cm_min_value = cm_mean - (number_of_stds * cm_std)
                if 'min_value' in config:                           # check to see if a minimum min value has been specified
                    if cm_min_value < config['min_value']:
                        cm_min_value = config['min_value']

                cm_max_value = cm_mean + (number_of_stds * cm_std)
                if 'max_value' in config:                           # check to see if a maximum max value has been specified
                    if cm_max_value > config['max_value']:
                        cm_max_value = config['max_value']

                suite.add_expectation(
                    ExpectationConfiguration(
                        expectation_type='expect_column_mean_to_be_between',
                        kwargs={
                            'column': column,
                            'min_value': cm_min_value,
                            'max_value': cm_max_value
                        },
                        meta={
                            'BatchLoopingProfiler': final_aggregate
                        }
                    )
                )


        if column_value_set_configs:
            for j, config in enumerate(column_value_set_configs):
                column = config['column']
                if 'batch_frequency_threshold' not in config or 'row_frequency_threshold' not in config:
                    raise ValueError('Please specify a batch_frequency_threshold and a row_frequency_threshold')
                
                batch_frequency_threshold = config['batch_frequency_threshold']
                row_frequency_threshold = config['row_frequency_threshold']
                
                final_aggregate = MyColumnValueSetMetricBuilder().finalize(
                    column_value_set_aggregates[column]
                )
                
                batch_count = final_aggregate['batch_count']
                total_row_count = final_aggregate['total_row_count']
                
                value_set = []
                meta = {
                    'batch_count': batch_count,
                    'total_row_count': total_row_count,
                    'values': {}
                }
                
                for value in final_aggregate['values']:
                    batch_frequency = final_aggregate['values'][value]['batch_frequency']
                    row_frequency = final_aggregate['values'][value]['row_frequency']
                    
                    if batch_frequency >= batch_frequency_threshold:
                        value_set.append(value)
                        meta['values'][value] = {
                            'batch_frequency': batch_frequency,
                            'row_frequency': row_frequency
                        }
                    elif row_frequency >= row_frequency_threshold:
                        value_set.append(value)
                        meta['values'][value] = {
                            'batch_frequency': batch_frequency,
                            'row_frequency': row_frequency
                        }
                
                suite.add_expectation(
                    ExpectationConfiguration(
                        expectation_type='expect_column_values_to_be_in_set',
                        kwargs={
                            'column': column,
                            'value_set': value_set
                        },
                        meta={
                            'BatchLoopingProfiler': meta
                        }
                    )
                )
        
        
        return suite

## Profiler

In [6]:
# TODO: catch BatchKwargsError

class BatchLoopingProfiler(BasicDatasetProfilerBase):
    """
    Accepts profiler configurations
    Learns column types and cardinalities
    
    Uses configurations, column types, and cardinalities together
    to determine which expectations should be developed for each 
    column
    
    Leverages an ExpectationBuilder to build an expectation suite
    """
    @classmethod
    def _get_default_config(cls, expectation, column=None):
        if expectation == 'expect_table_row_count_to_be_between':
            config = {
                'number_of_stds': 2,
            }
        
        elif column:
            if expectation == 'expect_column_mean_to_be_between':
                config = {
                    'column': column,
                    'number_of_stds': 2,
                }
            elif expectation == 'expect_column_values_to_be_in_set':
                config = {
                    'column': column,
                    'batch_frequency_threshold': 0.5,
                    'row_frequency_threshold': 0.001
                }
            else:
                raise NotImplementedError('The specified column level expectation has not been implemented')
                
        else:
            raise NotImplementedError('The specified table level expectation has not been implemented')
        
        return config
    
    
    @classmethod
    def profile(
        cls, 
        context, 
        suite_name, 
        datasource_name, 
        generator_name, 
        data_asset_name,
        all_columns_config=None,
        table_level_config=None,
        column_subset_config=None,
        individual_columns_config=None
    ):
        # build default configs
        default_table_level_config = {
            'expect_table_row_count_to_be_between': cls._get_default_config('expect_table_row_count_to_be_between')
        }
        
        default_column_subset_config = {
            ProfilerDataType.INT: [
                'expect_column_mean_to_be_between'
            ],
            ProfilerDataType.FLOAT: [
                'expect_column_mean_to_be_between'
            ],
            ProfilerDataType.STRING: [
                'expect_column_values_to_be_in_set'
            ],
            ProfilerDataType.BOOLEAN: [
                'expect_column_values_to_be_in_set'
            ],
            ProfilerDataType.DATETIME: [],
            ProfilerDataType.UNKNOWN: []
        }
        
        
        # all_columns_config will be a way for the user to specify expectations
        # to be included or excluded for all columns, as well as columns to be
        # excluded by the profiler
        excluded_columns = None
        if all_columns_config:
            for item in all_columns_config:
                if item == 'excluded_columns':
                    excluded_columns = all_columns_config['excluded_columns']
                else:
                    raise NotImplementedError
        
        # table_level_config will be a way for the user to specify included
        # or excluded table level expectations
        if table_level_config:
            raise NotImplementedError
        
        
        # column_subset_config will be a way for users to specify
        # here's what I want for all columns of type INT, STRING, etc
        if column_subset_config:
            if 'semantic_types' in column_subset_config:
                semantic_types = column_subset_config['semantic_types']
                for semantic_type in semantic_types:
                    if 'additional_expectations' in semantic_types[semantic_type]:
                        raise NotImplementedError
                    if 'excluded_expectations' in semantic_types[semantic_type]:
                        raise NotImplementedError
                    if 'included_expectations' in semantic_types[semantic_type]:
                        raise NotImplementedError
            if 'regex' in column_subset_config:
                raise NotImplementedError
                
                
        # we are going to need to do some sort of join between
        # the user specified configs and the default configs
        table_level_config = default_table_level_config
        column_subset_config = default_column_subset_config
        
        
        # initialize suite
        suite = ExpectationSuite(suite_name)
        
        
        # get partition ids
        datasource = context.get_datasource(datasource_name=datasource_name)
        generator = datasource.get_batch_kwargs_generator(name=generator_name)
        partition_ids = generator.get_available_partition_ids(data_asset_name=data_asset_name)
        partition_ids.sort()
        
        
        # get final batch
        batch_kwargs = context.build_batch_kwargs(
            datasource=datasource_name, 
            batch_kwargs_generator=generator_name, 
            data_asset_name=data_asset_name,
            partition_id=partition_ids[-1]
        )
        final_batch = context.get_batch(batch_kwargs, suite)
        
        
        # gather column information
        columns = final_batch.get_table_columns()
        if excluded_columns:
            for column in excluded_columns:
                if column in columns:
                    columns.remove(column)
                else:
                    raise ValueError('Specified column does not exist in the final batch')
        column_types = {
            column: cls._get_column_type(final_batch, column) for column in columns
        }
        
        
        # instantiate configs
        for expectation in table_level_config:
            if expectation == 'expect_table_row_count_to_be_between':
                row_count_config = table_level_config[expectation]
            else:
                raise NotImplementedError
                
        column_mean_configs = []
        column_value_set_configs = []
        for column in columns:

            if individual_columns_config and (column in individual_columns_config):
                column_config = individual_columns_config[column]
                if 'semantic_type' in column_config:
                    column_types[column] = individual_columns_config[column]['semantic_type']
                if 'additional_expectations' in column_config:
                    raise NotImplementedError
                if 'included_expectations' in column_config:
                    raise NotImplementedError
                if 'excluded_expectations' in column_config:
                    raise NotImplementedError

            column_type = column_types[column]
            expectations = column_subset_config[column_type]
            for expectation in expectations:
                if expectation == 'expect_column_mean_to_be_between':
                    column_mean_configs.append(cls._get_default_config(expectation, column))
                elif expectation == 'expect_column_values_to_be_in_set':
                    column_value_set_configs.append(cls._get_default_config(expectation, column))
                else:
                    raise NotImplementedError
        
        suite = BatchLoopingExpectationBuilder().build_expectations(
            context, 
            expectation_suite_name, 
            datasource_name, 
            generator_name, 
            data_asset_name,
            row_count_config=row_count_config,
            column_mean_configs=column_mean_configs,
            column_value_set_configs=column_value_set_configs
        )
        
        return suite

# Using BatchLoopingExpectationBuilder

In [7]:
context = DataContext()

expectation_suite_name='test_suite'
datasource_name='covid_data'
generator_name='covid_generator'
data_asset_name='covid'

row_count_config = {
    'number_of_stds': 2
}

column_mean_configs=[
    {
        'column': 'Confirmed',
        'number_of_stds': 2,
        'min_value': 0,
    },
    {
        'column': 'Deaths',
        'number_of_stds': 2,
        'min_value': 0
    },
    {
        'column': 'Recovered',
        'number_of_stds': 2,
        'min_value': 0
    }
]

column_value_set_configs=[
    {
        'column': 'Country_Region',
        'batch_frequency_threshold': 0.5,
        'row_frequency_threshold': 0.001
    }
]



suite = BatchLoopingExpectationBuilder().build_expectations(
    context, 
    expectation_suite_name, 
    datasource_name, 
    generator_name, 
    data_asset_name,
    row_count_config=row_count_config,
    column_mean_configs=column_mean_configs,
    column_value_set_configs=column_value_set_configs
)


context.save_expectation_suite(suite, expectation_suite_name)

# Using BatchLoopingProfiler

In [8]:
context = DataContext()

expectation_suite_name='test_suite_2'
datasource_name='covid_data'
generator_name='covid_generator'
data_asset_name='covid'

all_columns_config = {
    'excluded_columns': [
        'Admin2',
        'Last_Update',
        'Lat',
        'Long_',
        'Combined_Key'
    ]
}


individual_columns_config = {
    'FIPS': {
        'semantic_type': ProfilerDataType.STRING
    }
}


suite = BatchLoopingProfiler().profile(
    context, 
    expectation_suite_name, 
    datasource_name, 
    generator_name, 
    data_asset_name,
    all_columns_config=all_columns_config,
    individual_columns_config=individual_columns_config
)


context.save_expectation_suite(suite, expectation_suite_name)

In [9]:
context.build_data_docs()

{'local_site': 'file:///Users/rexboyce/profiler-project/great_expectations/uncommitted/data_docs/local_site/index.html'}

# Sketching out config ideas

In [None]:
domains: 
    domain1:
        columns:
            column1, column2, column3
    domain2:
        domain_builder: 
            MyDomainBuilder

rulesets:
    domain1:
        rules:
            expect_column_mean_to_be_between
            rule2
            rule3
    domain2:
        rule_builder: 
            class_name: MyRuleBuilder
            expectation_type: expect_column_mean_to_be_between
            previous_value: urn:ge:expectation_suites::expect_column_mean_to_be_between.min_value:domain=domain1
        






table_level_config=[               # this list is a standard format that is used for all of the configs
    'expectation_1',               # list can include just the name
    {
        'name': 'expectation_2',   # or an entire dict
        'parameter1': 'value',
        'parameter2': 'value'
    },
    'expectation_3'
]


included_columns = [column1, column7]


all_columns_config={
    'additional_expectations': [   # expectations to include in addition to the defaults
        'expectation_1',               # list can include just the name
        {
            'name': 'expectation_2',   # or an entire dict
            'parameter1': 'value',
            'parameter2': 'value'
        },
        'expectation_3'
    ],
    'included_expectations': None, # might need a better name, this would be an exclusive list
    'excluded_expectations': None  # no matter what, do not include this expectation on any columns
}


column_subset_rules={
    'semantic_types': {
        'text': {
            'included_expectations': None,    # standard list format
            'excluded_expectations': None     # standard list format
        },
        'numeric': config,
        'datetime': config,
        'boolean': config,
    },
    'regex': {
        '.*(_ct)': {
            'included_expectations': None,    # standard list format
            'excluded_expectations': None     # standard list format
        }
    }
}
    
    
individual_columns_config={
    'column1': {
        'semantic_type': ProfilerDataType.STRING,              # user can override 'type' ex: this is an int but treat it as text
        'additional_expectations': None,      # standard list format
        'included_expectations': None,        # standard list format
        'excluded_expectations': None,        # standard list format
    },
    'column2': {
        'semantic_type': ProfilerDataType.FLOAT,
        'additional_expectations': [          # standard list format
            'expect_column_values_to_be_in_set', 
            {
                'name':'expect_column_mean_to_be_between',
                'parameter': value
            }
        ],
        'included_expectations': None,
        'excluded_expectations': None,
    }
}

# Loading Data

## Getting an idea of the data

In [None]:
context = DataContext()

suite_name='test_suite'
datasource_name='covid_data'
generator_name='covid_generator'
data_asset_name='covid'

suite = ExpectationSuite(suite_name)

datasource = context.get_datasource(datasource_name=datasource_name)
generator = datasource.get_batch_kwargs_generator(name=generator_name)
partition_ids = generator.get_available_partition_ids(data_asset_name=data_asset_name)
partition_ids.sort()

In [None]:
batch_kwargs = context.build_batch_kwargs(
    datasource=datasource_name, 
    batch_kwargs_generator=generator_name, 
    data_asset_name=data_asset_name,
    partition_id=partition_ids[-1]
)
batch = context.get_batch(batch_kwargs, suite)
batch.head()

## Data Finagling

In [None]:
context = DataContext()

suite_name='test_suite'
datasource_name='covid_data'
generator_name='covid_generator'
data_asset_name='covid'

suite = ExpectationSuite(suite_name)

datasource = context.get_datasource(datasource_name=datasource_name)
generator = datasource.get_batch_kwargs_generator(name=generator_name)
partition_ids = generator.get_available_partition_ids(data_asset_name=data_asset_name)
partition_ids.sort()

In [None]:
for partition_id in partition_ids[39:60]:
    batch_kwargs = context.build_batch_kwargs(
        datasource=datasource_name, 
        batch_kwargs_generator=generator_name, 
        data_asset_name=data_asset_name,
        partition_id=partition_id
    )
    batch = context.get_batch(batch_kwargs, suite)
    print(batch.columns)
    batch = batch.rename({'Province/State':'Province_State', 'Country/Region':'Country_Region', 'Last Update':'Last_Update'}, axis=1)
    print(batch.columns)
    batch = batch.set_index('Province_State', drop=True)
    batch.to_csv(f'../COVID-19/csse_covid_19_data/csse_covid_19_daily_reports/{partition_id}.csv')