In [1]:
io_config = {
    'data_directory': 'data',
    'config_directory': 'configs',
    'model_directory': 'models',
    'log__directory': 'logs',
    "compression": "gzip",
    "data_format": "csv",
    "select_columns": ["a", "b", "c", "d", "e", "f", "g", "h", "i"],
    "data_type_dict": {"number": ["a", "b", "c", "d"], "string": ["e", "f", "g"], "date": ["h", "i"]}
}

parsing_config = {
    "date_format_configs":  [{"date_column": "h", "format": "%Y-%m-%d"},{"date_column": "i", "format": "%Y-%m-%d"}],
    "input_data_precision": 2,
    "output_data_precision": 2,
    "optimized_data_schema": "optimized_data_schema.json",
}

validator_config = {
    "column_check": ["a", "b", "c", "d", "e", "f", "g", "h", "i"],
    "range_check": {"a": [0, 100], "b": [0, 100], "c": [0, 100], "d": [0, 100]},
    "unique_check": {"a":[], "b":[], "c":[], "d":[]},
    "null_check": ["a", "b", "c", "d", "e", "f", "g", "h", "i"],
    "cardinality_check": ["a", "b"],
}


pre_processor_config = {
    'imputation': {'a': 'mean', 'b': 'mean', 'c': 'mean', 'd': 'mean'},
    'model_granularity': ['a','b'],
    'date_granularity': [{'date_col':'MS','date_col2':'D'}],
    'aggregate': {'a': ['sum', 'mean'], 'b': ['sum', 'mean'],'target': ['max']},
    'req_col_mapper': {'poc_id': 'a', 'order_id': 'b','quantity': 'c', 'date': 'd', 'target': 'e'},
    }





# 1. read sample data: use `select_cols` for reading data. example: sample_df = pd.read_csv('data.csv', usecols=select_cols, nrows=1000)
# 2. parse date columns according to `date_configs`
# 3. then optimized the data set so that we can get a `full_data_types_dict`.
# 4. read full data using `full_data_types_dict` and `select_cols`: optimized_data = pd.read_csv(path, data_types_dict=full_data_types_dict, usecols=select_cols).rename(columns=rename_cols)
# 5. rename_cols: should be for mandatory columns. We need to have assert statement to check if all mandatory columns are present in the data set. We should not allow other columns to be renamed.
# 6. 


In [None]:

pre_processor_config = {
    'req_col_mapper': [{'poc_id': 'a', 'order_id': 'b','quantity': 'c', 'date': 'd', 'target': 'e'}],
    'model_granularity': ['a','b'],
    'date_granularity': [{'date_col':'MS','date_col2':'D'}],
    'aggregate': [{'a': ['sum', 'mean'], 'b': ['sum', 'mean'],'target': ['max']}],
    }


# write a function to preprocss the data based on the config

    

In [55]:
# Write a function which can check a list of folders present in local or not. If not present then create them.
import os

def create_folder(folder_path):
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

def create_folders(folder_list):
    for folder in folder_list:
        create_folder(folder)


In [20]:
def convert_date(df,date_col_day_mapping):
    for date_col,date_mapping in date_col_day_mapping.items():
        if date_mapping =='M':
            df[date_col].apply(lambda x: x.replace(day=1))
        elif date_mapping =='W':
            df[date_col] = df[date_col] - timedelta(days = df[date_col].weekday())
        elif date_mapping =='D':
            continue
        elif date_mapping =='Y':
            df[date_col]  = df[date_col].replace(day=1,month=1)
        else:
            raise ValueError('date mapping not supported')
    return df


def create_model_granularity(df,model_granularity):
    if len(model_granularity)>1:
        colname = '_'.join(model_granularity)
        df[colname] = df[model_granularity[0]].astype(str)
        for col in model_granularity[1:]:
            df[colname] = df[colname].astype(str) + "||" + df[col].astype(str)
    else:
        colname = model_granularity[0]
    return df,colname


def aggregate_data(df,model_granularity_col_name,aggregate_config,date_col):
    df = df.copy()
    if date_col:
        df = df.groupby([model_granularity_col_name,date_col]).agg(aggregate_config)
    else:
        df = df.groupby([model_granularity_col_name]).agg(aggregate_config)
    df.columns = ["_".join(x) for x in df.columns.tolist()]
    return df

In [21]:
pre_processor_config = {
    'req_col_mapper': {'poc_id': 'a', 'order_id': 'b','quantity': 'c', 'date': 'date1', 'target': 'e'},
    'imputation': {'a': 'mean', 'b': 'mean', 'c': 'mean', 'd': 'mean'},
    'model_granularity': ['e','f'],
    'date_granularity': [{'date1':'M'}],
    'aggregate': {'a': ['sum', 'mean'], 'b': ['sum', 'mean'],'target': ['max']},
    }

In [22]:
pre_processor_config

{'req_col_mapper': {'poc_id': 'a',
  'order_id': 'b',
  'quantity': 'c',
  'date': 'date1',
  'target': 'e'},
 'imputation': {'a': 'mean', 'b': 'mean', 'c': 'mean', 'd': 'mean'},
 'model_granularity': ['e', 'f'],
 'date_granularity': [{'date1': 'M'}],
 'aggregate': {'a': ['sum', 'mean'], 'b': ['sum', 'mean'], 'target': ['max']}}

In [23]:
def pre_processor_pipe(df,pre_processor_config):
    df = convert_date(df,pre_processor_config['date_granularity'][0])
    df,model_granularity_col_name = create_model_granularity(df,pre_processor_config['model_granularity'])
    df = aggregate_data(df,model_granularity_col_name,pre_processor_config['aggregate'],pre_processor_config['req_col_mapper']['date'])
    return df.reset_index()

In [24]:
import pyspark.pandas as pd

In [25]:
data = pd.read_csv('data.csv')

In [26]:
data.dtypes
data['date1'] = pd.to_datetime(data['date1'])
data['date2'] = pd.to_datetime(data['date2'])

In [27]:
pre_processor_pipe(data,pre_processor_config)

23/03/09 00:52:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/09 00:52:52 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
  series = series.astype(t, copy=False)
  fields = [
23/03/09 00:52:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/09 00:52:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/09 00:52:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/09 00:52:53 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this 

Unnamed: 0,e_f,date1,a_sum,a_mean,b_sum,b_mean,target_max
0,a||c,2018-07-09,45,45.0,1,1.0,1
1,c||c,2019-01-05,77,77.0,74,74.0,1
2,a||a,2019-04-10,94,94.0,54,54.0,1
3,c||c,2019-05-10,84,84.0,91,91.0,1
4,c||b,2019-10-26,44,44.0,88,88.0,1
5,b||b,2020-01-24,89,89.0,42,42.0,1
6,c||c,2020-05-31,32,32.0,35,35.0,0
7,b||a,2020-07-14,20,20.0,41,41.0,1
8,b||b,2020-08-09,78,78.0,49,49.0,0
9,b||c,2020-09-21,52,52.0,19,19.0,1
