In [1]:
import sys, os, inspect, re
sys.path.append("/home/vbhargava/feature_test0/msaction_backend/common/BU3.0_core/util/Py_utils/taxonomy_utils")
import time, logging
numeric_level = getattr(logging, 'INFO', None)
stdout_handler = logging.StreamHandler(sys.stdout)
logging.basicConfig(level=numeric_level,
                        format='%(asctime)s %(levelname)s %(name)s: %(message)s',
                        handlers=[stdout_handler])

In [2]:
from libs.s3_ops import S3_OPs
from libs.s3_stream import S3Stream
from libs.configs import Config
from libs.nio_executor import NIO
from libs import utils
from collections import defaultdict

In [3]:
config = '/home/vbhargava/feature_test0/msaction_backend/customers/raj_ford_test/common/config/inputs/platform_config.xml'
lmt_src = 's3://qubole-ford/taxonomy_cs/test1/src/'
lmt_data = 's3://qubole-ford/taxonomy_cs/test1/data/'

In [6]:
config_data = Config.get_qubole_config(config)
ACCESS_KEY=config_data['access_key']
SECRET_KEY=config_data['secret_key']

In [7]:
TG_EXTRACT_REGEX = '^.*?/([a-zA-Z]+\-?[0-9]*)/$' 
FILE_EXTRACT_REGEX = '^.*/([a-zA-Z0-9.\-_]{0,255}.csv)$' #'^.*/([a-zA-Z0-9.\-_]{0,255}.csv)$'
TARGET_EXTRACT_REGEX ='^.*,?(target_[A-Za-z0-9_-]+).*$'
VALID_FILE_KEY_REGEX = '^(.*/([a-zA-Z]+\-?[0-9]*)?/)?(([a-zA-Z]+\-?[0-9]*?)_([0-9]{4}-[0-9]{2}-[0-9]{2}?)_([a-zA-Z0-9.\-_]+?).csv?)$'

In [8]:
s3_ops = S3_OPs(ACCESS_KEY, SECRET_KEY)

def filename_by_key(key):
    return get_val_by_regex(key, FILE_EXTRACT_REGEX, error_msg="Not vaild key for taxonomy data csv file")

def find_by_data_tg(key, regex):
    return get_val_by_regex(key, regex, error_msg="Not vaild taxonomy data dir")

        
def get_val_by_regex(key, regex, error_msg="can't be extract a val."):
    matched = re.findall(regex, key)
    if len(matched) > 0:
        return matched[0]
    else:
        raise Exception(error_msg)
        
def get_data_n_schema(tg, data_files_loc):
    data_file_lock_detail = s3_ops.get_bucket_name(data_files_loc)
    files = s3_ops.list_complete(data_file_lock_detail['bucket'], data_file_lock_detail['key'])
    res = {}
    if len(files)>0:
        s3_stream = S3Stream(ACCESS_KEY, SECRET_KEY)
        schema = s3_stream.get_header(s3_ops.get_full_s3_path(data_file_lock_detail['bucket'],files[0]['Key']))
        #res[tg]={'schema':schema, 'files': files}
        res['schema'] = {tg:schema}
        res['files'] = {tg:files}
    return res

def extract_schema(schema):
    return schema.replace(" ","").lower()

def validate_schema(schema):
    if schema=='': 
        return {'IsValid' : False, 'schema': schema, 'message' : "Schema shouldn't be empty"}
    tokens = schema.split(',')
    if len(tokens) < 2:
         return {'IsValid' : False, 'schema': schema, 'message' : "Schema should have at least 2 columns"}
    KEY_REGEX = '^[Kk]ey_[A-Za-z0-9_]{2,30}$'
    TARGET_REGEX = '^[Tt]arget_[A-Za-z0-9_]{2,30}$'
    key_cnt = 0
    target_cnt = 0
    invalid_headers = []
    columns = defaultdict(list)
    res = {}
    target_col = None
    key_cols_set = set()
    for t in tokens:
        t = t.strip()
        if re.match(TARGET_REGEX, t):
            target_cnt = target_cnt + 1
            target_col = t
        elif re.match(KEY_REGEX, t):
            key_cnt = key_cnt + 1
            key_cols_set.add(t)
        else:
            invalid_headers.append(t)
        columns[t.lower()].append(1)

    error_msgs=[]
    if target_cnt != 1 :
        error_msgs.append("Exact one Target column is required!")
    if key_cnt < 1 :
        error_msgs.append("At least one Key column is required!")
    if len(invalid_headers) > 0 :
        error_msgs.append("All given columns should Key or Target!")
    for k, v in columns.items():

        if len(v) > 1:
            print("--")
            error_msgs.append("Same name: {} should not represent more than one column in schema! cols names are case insensitive. ".format(k))

    if len(error_msgs) > 0:
        return {'IsValid' : False, 'schema': schema, 'errors' : " \n".join(error_msgs)}
    #print(str(key_cnt)+":"+str(target_cnt)+":"+str(invalid_headers)+":"+str(columns))
    return {'IsValid' : True, 'Schema': schema.replace(" ","").lower(), 
            'TargetCol' : target_col, 'KeyColsSet' : key_cols_set}

In [9]:
def extract_data_detail(lmt_src, lmt_data, access_key, secret_key):
#     Valid data Taxonomy Grps
    
    #
    lmt_data_loc_detail = s3_ops.get_bucket_name(lmt_data)
    lmt_data_loc_bucket = lmt_data_loc_detail['bucket']
    lmt_data_loc_key = lmt_data_loc_detail['key']
    valid_tg_list_res = s3_ops.list_subdirs(lmt_data_loc_detail['bucket'],lmt_data_loc_detail['key'],)
    
    valid_tgrp_loc_list = [ [find_by_data_tg(item['Prefix'], TG_EXTRACT_REGEX), 
                         '{}{}'.format(lmt_data, find_by_data_tg(item['Prefix'], TG_EXTRACT_REGEX))] 
                       for item in valid_tg_list_res]
    
    collected = NIO.decorated_run_io(task=get_data_n_schema, task_n_args_list=valid_tgrp_loc_list, max_workers=25,)
#     return collected
    tg_data_schema_dict = {k:extract_schema(v)  for item in collected for k, v in item['result']['schema'].items()}
    tg_data_files_dict = {k:{filename_by_key(u['Key']):u for u in v } for item in collected for k, v in item['result']['files'].items()}
    target_data_tg_dict = {re.findall(TARGET_EXTRACT_REGEX,V)[0]: K for K, V in tg_data_schema_dict.items()}
    
    return tg_data_schema_dict, tg_data_files_dict,target_data_tg_dict

In [10]:
def is_valid_file(key:str='', regex = VALID_FILE_KEY_REGEX):
    if re.match(regex, key) is None:
        return False
    return True

def extract_info(key:str='', regex = VALID_FILE_KEY_REGEX):
    matched = re.findall(regex, key)
    return {
            'KeyDirPath' : matched[0][0],
            'ParentDir' : matched[0][1],
            'FileName' : matched[0][2],
            'FileGrp' :  matched[0][3],
            'Date' :  matched[0][4],
            'ClientName' : matched[0][5]
           }
def extract_info_with_bucket(key:str='', bucket = ''):
    res = extract_info(key)
    res.update({'Bucket' : bucket})
    return res

In [11]:
def grouped_tg(collected, tg_files_dict_type='new_tg_files_dict'):
    collect = defaultdict(dict)
    tg_f_gen = (item['result'][tg_files_dict_type] for item in collected if len(item['result'][tg_files_dict_type]) > 0)
    tg_f_gen2 = (collect[tg].update({filename: file_dict})  for item in tg_f_gen for tg, file_detail_dict in item.items() for filename, file_dict in file_detail_dict.items())
    [ i for i in tg_f_gen2]
    tg = dict(collect)
    return tg

def grouped_flag_dict(collected, flag_dict_type='schema_tg_dict'):
    f_gen = (item['result'][flag_dict_type] for item in collected if len(item['result'][flag_dict_type]) > 0)
    collect = defaultdict(set)
    f_gen2 = (collect[K].add(V)  for item in f_gen for K, V in item.items())
    [ i for i in f_gen2]
    res = dict(collect)
    return res

def grouped_set_of_flags_dict(collected, flag_dict_type='schema_tg_dict'):
    f_gen = (item['result'][flag_dict_type] for item in collected if len(item['result'][flag_dict_type]) > 0)
    collect = defaultdict(set)
    f_gen2 = (collect[K].update(V)  for item in f_gen for K, V in item.items())
    [ i for i in f_gen2]
    res = dict(collect)
    return res

def grouped_set_of_flags(collected, flag_dict_type='invalid_schema_files'):
    res_set=set()
    f_gen = (res_set.update(item['result'][flag_dict_type]) for item in collected if len(item['result'][flag_dict_type]) > 0)
    [ i for i in f_gen]
    return res_set

In [12]:

def file_process_task(src_file_details):
    
    invalid_schema_files = set()

    target_already_exist_files = set()
    
    ''' {'key_evt_advertiser_key,target_evt_advertiser_name': {'tg1', 'tg2', ...}}'''
    schema_tg_dict = {}
    
    ''' {'target_evt_advertiser_name': {'tg1', 'tg2', ...}}'''
    target_tg_dict = {}

    ''' {'tg': {'key_evt_advertiser_key,target_evt_advertiser_name', '',...}}'''
    new_tg_schema_dict = {}
    ''' {'tg': {'AdvertiserReporting_2020-06-01_ford.csv': {file detailed obj dict} }  }'''
    new_tg_files_dict = {}
    ''' {'tg': {'AdvertiserReporting_2020-06-01_ford.csv': {file detailed obj dict} }  }'''
    existing_tg_files_dict = {}


    # tg_data_schema_dict = 
    # tg_data_files_dict = 
    # target_data_tg_dict = 

#     src_file_details = valid_file_arg[0]
    src_file_loc = s3_ops.get_full_s3_path(src_file_details['Bucket'], src_file_details['Key'])

    s3_stream = S3Stream(ACCESS_KEY, SECRET_KEY)
    schema =  s3_stream.get_header(src_file_loc)
    #schema = 'key_evt_advertiser_key, targe_evt_advertiser_name'
    validate_res = validate_schema(schema)
    if validate_res['IsValid']:
        
        
        
        tg = src_file_details['FileGrp']
        file_name = src_file_details['FileName']
        
        if tg_data_schema_dict.get(tg) is None or tg_data_schema_dict.get(tg) != validate_res['Schema']:
                
#             data_tg_for_target = target_data_tg_dict.get(validate_res['TargetCol'])
#             if  data_tg_for_target is not None:# and data_tg_for_target != tg:
#                 target_already_exist_files.add((src_file_loc, data_tg_for_target))
#             else:
            new_tg_schema_dict[tg] = validate_res['Schema']
            new_tg_files_dict[tg] = {file_name: src_file_details}
        else:
            existing_tg_files_dict[tg] = {file_name: src_file_details}
        
        src_file_details['Schema'] = validate_res['Schema']
        schema_tg_dict[validate_res['Schema']] = tg
        target_tg_dict[validate_res['TargetCol']] = tg

    else:
        invalid_schema_files.add((src_file_loc, schema, validate_res['errors']))

    return {'invalid_schema_files': invalid_schema_files,
#             'target_already_exist_files':target_already_exist_files,
            'schema_tg_dict': schema_tg_dict,
            'target_tg_dict':target_tg_dict,
            'new_tg_schema_dict': new_tg_schema_dict,
            'new_tg_files_dict' : new_tg_files_dict,
            'existing_tg_files_dict' : existing_tg_files_dict
           }



def src_list_page_process_task(list_page):
    
    lmt_src_loc_detail = s3_ops.get_bucket_name(lmt_src)
    lmt_src_loc_bucket = lmt_src_loc_detail['bucket']
    lmt_src_loc_key = lmt_src_loc_detail['key']
    
    invalid_files_set = { s3_ops.get_full_s3_path(lmt_src_loc_detail['bucket'], item['Key']) for item in list_page if  not is_valid_file(key=item['Key'])}
    valid_file_set = [[utils.dict_append(extract_info_with_bucket(item['Key'], lmt_src_loc_detail['bucket']),item)] for item in list_page if  is_valid_file(key=item['Key']) ]
    collected = NIO.decorated_run_io(task=file_process_task, task_n_args_list=valid_file_set, max_workers=25,)
#     return collected
    return {'invalid_files_set' : invalid_files_set,
            'invalid_schema_files': grouped_set_of_flags(collected, flag_dict_type='invalid_schema_files'),
#             'target_already_exist_files' : grouped_set_of_flags(collected, flag_dict_type='target_already_exist_files'),
            'schema_tg_dict': grouped_flag_dict(collected, flag_dict_type='schema_tg_dict'),
            'target_tg_dict': grouped_flag_dict(collected, flag_dict_type='target_tg_dict'),
            'new_tg_schema_dict': grouped_flag_dict(collected, flag_dict_type='new_tg_schema_dict'),
            'new_tg_files_dict' : grouped_tg(collected, 'new_tg_files_dict'),
            'existing_tg_files_dict' : grouped_tg(collected, 'existing_tg_files_dict')
           }

In [13]:
def extract_src_detail(maxKeysPerReq=3):
    lmt_src_loc_detail = s3_ops.get_bucket_name(lmt_src)
    lmt_src_loc_bucket = lmt_src_loc_detail['bucket']
    lmt_src_loc_key = lmt_src_loc_detail['key']
    page_generator = s3_ops.list_gen(lmt_src_loc_bucket, lmt_src_loc_key, maxKeysPerReq=maxKeysPerReq, )
    page_args_generator = ([page] for page in page_generator)
    #list_page = [i for i in page_generator][0]
    collected = NIO.decorated_run_with_args_generator(task=src_list_page_process_task, args_generator=page_args_generator, is_kernal_thread=True,)
    
    return {'invalid_files_set' : grouped_set_of_flags(collected, flag_dict_type='invalid_files_set'),
            'invalid_schema_files': grouped_set_of_flags(collected, flag_dict_type='invalid_schema_files'),
#             'target_already_exist_files' : grouped_set_of_flags(collected, flag_dict_type='target_already_exist_files'),
            'schema_tg_dict': grouped_set_of_flags_dict(collected, flag_dict_type='schema_tg_dict'),
            'target_tg_dict': grouped_set_of_flags_dict(collected, flag_dict_type='target_tg_dict'),
            'new_tg_schema_dict': grouped_set_of_flags_dict(collected, flag_dict_type='new_tg_schema_dict'),
            'new_tg_files_dict' : grouped_tg(collected, 'new_tg_files_dict'),
            'existing_tg_files_dict' : grouped_tg(collected, 'existing_tg_files_dict')
               }

In [22]:
def s3_copy_into_data_loc_task(tg, file_name, src_file, src_size, dry_run=True):
#     data_file_loc_detail = s3_ops.get_bucket_name(lmt_data)
    src_file_loc_detail = s3_ops.get_bucket_name(lmt_src)
    src_s3 = 's3://{}/{}'.format(src_file_loc_detail['bucket'], src_file)
    dest_s3 = '{}{}/{}'.format(lmt_data,tg, file_name)
    if dry_run:
        print("[dry_run]: S3 copy from {} to {}".format(src_s3, dest_s3))
    else:
        pass
        #s3_ops.copy(src=src_s3, dest = dest_s3, src_size=src_size)
    return 'Copied Successfully! by task'


def s3_remove_at_data_loc_task(file,  dry_run=True):
    data_file_loc_detail = s3_ops.get_bucket_name(lmt_data)
#     src_file_loc_detail = s3_ops.get_bucket_name(lmt_src)
#     src_s3 = 's3://{}/{}'.format(lmt_src, src_file)
    
    if dry_run:
        file_loc = 's3://{}/{}'.format(data_file_loc_detail['bucket'], file)
        print("[dry_run]: S3 delete from {} ".format(file_loc))
    else:
        pass
        #s3_ops.delete_file(data_file_loc_detail['bucket'], file)
    return 'Deleted Successfully! by task'

''' E.g. '''
# s3_copy_into_data_loc_task('tg5', 'tg5_2020-11-01_ford.csv', 'taxonomy_cs/test1/src/tg5_2020-11-01_ford.csv', 48 )
# s3_remove_at_data_loc_task('taxonomy_cs/test1/data/tg2/tg2_2020-11-01_ford.csv')

' E.g. '

### Delta

In [14]:
''' Get Existing State of System'''
tg_data = extract_data_detail(lmt_src, lmt_data, ACCESS_KEY, SECRET_KEY)
tg_data_schema_dict = tg_data[0]
tg_data_files_dict = tg_data[1]
target_data_tg_dict = tg_data[2]



2020-10-30 20:06:35,751:25496 MainThread run_blocking_tasks: starting

2020-10-30 20:06:35,752:25496 MainThread run_blocking_tasks: creating executor tasks

2020-10-30 20:06:35,752:25496 ThreadPoolExecutor-0_0 (task-0): passed args :['tg1', 's3://qubole-ford/taxonomy_cs/test1/data/tg1']

2020-10-30 20:06:35,753:25496 ThreadPoolExecutor-0_1 (task-1): passed args :['tg2', 's3://qubole-ford/taxonomy_cs/test1/data/tg2']

2020-10-30 20:06:35,753:25496 ThreadPoolExecutor-0_2 (task-2): passed args :['tg3', 's3://qubole-ford/taxonomy_cs/test1/data/tg3']

2020-10-30 20:06:35,754:25496 ThreadPoolExecutor-0_3 (task-3): passed args :['tg5', 's3://qubole-ford/taxonomy_cs/test1/data/tg5']

2020-10-30 20:06:35,755:25496 ThreadPoolExecutor-0_4 (task-4): passed args :['tg6', 's3://qubole-ford/taxonomy_cs/test1/data/tg6']

2020-10-30 20:06:35,755:25496 ThreadPoolExecutor-0_0 (task-0): running

2020-10-30 20:06:35,755:25496 ThreadPoolExecutor-0_5 (task-5): passed args :['tg7', 's3://qubole-ford/taxonomy_

In [16]:
src_delta = extract_src_detail()

2020-10-30 20:09:44,659   process-id:25496 run_blocking_tasks: starting

2020-10-30 20:09:44,660   process-id:25496 run_blocking_tasks: creating executor tasks

2020-10-30 20:09:44,754   process-id:61190   (task-0): passed args :[[{'Key': 'taxonomy_cs/test1/src/tg0_202-11-01_ford.csv', 'LastModified': datetime.datetime(2020, 10, 29, 19, 53, 45, tzinfo=tzlocal()), 'ETag': '"535b60451f6d20c2826b045438a50fb9"', 'Size': 48, 'StorageClass': 'STANDARD'}, {'Key': 'taxonomy_cs/test1/src/tg0_2020-11-02_ford.csv', 'LastModified': datetime.datetime(2020, 10, 29, 19, 53, 45, tzinfo=tzlocal()), 'ETag': '"7a5d08cbb4c718d16851d1f2b57ffc50"', 'Size': 27, 'StorageClass': 'STANDARD'}, {'Key': 'taxonomy_cs/test1/src/tg0_2020-11-03_ford.csv', 'LastModified': datetime.datetime(2020, 10, 29, 21, 21, 36, tzinfo=tzlocal()), 'ETag': '"b19c288a2ef5e2ec6739cac3674391a6"', 'Size': 28, 'StorageClass': 'STANDARD'}]]

2020-10-30 20:09:44,757   process-id:61190   (task-0): running

2020-10-30 20:09:44,760:61190 MainT

2020-10-30 20:09:44,806:61192 MainThread run_blocking_tasks: waiting for executor tasks

2020-10-30 20:09:44,806:61192 ThreadPoolExecutor-1_1 (task-1): running

2020-10-30 20:09:44,819:61193 MainThread run_blocking_tasks: starting

2020-10-30 20:09:44,821:61193 MainThread run_blocking_tasks: creating executor tasks

2020-10-30 20:09:44,822:61193 ThreadPoolExecutor-1_0 (task-0): passed args :[{'KeyDirPath': 'taxonomy_cs/test1/src/', 'ParentDir': 'src', 'FileName': 'tg2_2020-11-03_ford.csv', 'FileGrp': 'tg2', 'Date': '2020-11-03', 'ClientName': 'ford', 'Bucket': 'qubole-ford', 'Key': 'taxonomy_cs/test1/src/tg2_2020-11-03_ford.csv', 'LastModified': datetime.datetime(2020, 10, 29, 19, 53, 45, tzinfo=tzlocal()), 'ETag': '"dde15e0dffb34575c1aa95f81c8867c0"', 'Size': 50, 'StorageClass': 'STANDARD'}]

2020-10-30 20:09:44,823:61193 ThreadPoolExecutor-1_1 (task-1): passed args :[{'KeyDirPath': 'taxonomy_cs/test1/src/', 'ParentDir': 'src', 'FileName': 'tg4_2020-11-01_ford.csv', 'FileGrp': 'tg4', 

2020-10-30 20:09:44,861:61195 ThreadPoolExecutor-1_1 (task-1): running

2020-10-30 20:09:44,877:61192 ThreadPoolExecutor-1_2 (task-2): done

2020-10-30 20:09:44,879:61192 MainThread run_blocking_tasks: exiting

2020-10-30 20:09:44,880   process-id:61192   (task-2): done

2020-10-30 20:09:44,863:61195 ThreadPoolExecutor-1_2 (task-2): running

2020-10-30 20:09:44,881:61193 ThreadPoolExecutor-1_0 (task-0): done

2020-10-30 20:09:44,885:61193 ThreadPoolExecutor-1_2 (task-2): done

2020-10-30 20:09:44,887:61193 ThreadPoolExecutor-1_1 (task-1): done

2020-10-30 20:09:44,890:61193 MainThread run_blocking_tasks: exiting

2020-10-30 20:09:44,891   process-id:61193   (task-3): done

2020-10-30 20:09:44,904:61194 ThreadPoolExecutor-1_0 (task-0): done

2020-10-30 20:09:44,913:61194 ThreadPoolExecutor-1_2 (task-2): done

2020-10-30 20:09:44,915:61194 ThreadPoolExecutor-1_1 (task-1): done

2020-10-30 20:09:44,917:61194 MainThread run_blocking_tasks: exiting

2020-10-30 20:09:44,916   process-id:6119

In [25]:
''' Extract Info needed to expose configs and show in logs and reports'''

invalid_files_set = src_delta['invalid_files_set']
invalid_schema_files = src_delta['invalid_schema_files']

tg_data = { k for k in tg_data_files_dict.keys()}
tg_existing = { k for k in src_delta['existing_tg_files_dict'].keys()}
tg_new ={ k for k in src_delta['new_tg_files_dict'].keys()}
tg_all = tg_new.union(tg_existing)

many_tg4schema_check_gen = (v for k, v in src_delta['schema_tg_dict'].items() if len(v) > 1)
many_tg4target_check_gen = (v for k, v in src_delta['target_tg_dict'].items() if len(v) > 1)
newTg4schema = {k for k, v in src_delta['new_tg_schema_dict'].items() if len(v) > 1}


tg4schema = set()
[tg4schema.update(i) for i in many_tg4schema_check_gen]
tg4target = set()
[tg4target.update(i) for i in many_tg4target_check_gen]


invalid_tg_with_dup_schema = (tg4schema.union(newTg4schema)).difference(tg_existing)

invalid_tg_with_dup_target = tg4target.difference(tg_existing)

invalid_tg_all = invalid_tg_with_dup_schema.union(invalid_tg_with_dup_target)

tg_delta = tg_new.difference(invalid_tg_all)

tg_delta_create = tg_delta.difference(tg_data)

tg_delta_drop_n_create = (tg_delta.intersection(tg_data)).difference(tg_existing)

tg_dropped = tg_data.difference(tg_all)

tg_dropped_all = tg_dropped.union(tg_delta_drop_n_create)
tg_create_all = tg_delta_create.union(tg_delta_drop_n_create)

''' File Sync'''
files_to_be_dropped_args = [ [f['Key']] for i in  tg_dropped_all for fn, f in tg_data_files_dict.get(i).items()]

files_to_be_created = [[i, f['FileName'], f['Key'], f['Size']] for i in  tg_create_all for fn, f in src_delta['new_tg_files_dict'].get(i).items()]
files_to_be_copied = [[k, f_dict['FileName'], f_dict['Key'], f_dict['Size']] for k, v in src_delta['existing_tg_files_dict'].items() for f, f_dict in v.items()] #['Key']]
file_copy_args = []
file_copy_args.extend(files_to_be_created )
file_copy_args.extend(files_to_be_copied )

collected = NIO.decorated_run_io(task=s3_remove_at_data_loc_task, task_n_args_list=files_to_be_dropped_args, 
                                 is_kernal_thread=False,)

collected = NIO.decorated_run_io(task=s3_copy_into_data_loc_task, task_n_args_list=file_copy_args, 
                                 is_kernal_thread=False,)



''' Expose details to generate configs'''
tg_create_all_n_schema = {tg: schema for tg in tg_create_all for schema in src_delta['new_tg_schema_dict'].get(tg)}
tg_retain_all_n_schema = {tg: tg_data_schema_dict.get(tg) for tg in tg_existing}
#tg_dropped


''' Report '''
# invalid files Schema delails
# To Be

# TG level
tg_create_n_schema = [(i, src_delta['new_tg_schema_dict'].get(i)) for i in tg_delta_create]
tg_drop_create_n_schema = [(tg, tg_data_schema_dict.get(tg), schema_new) for tg in tg_delta_drop_n_create for schema_new in src_delta['new_tg_schema_dict'].get(tg)]
tg_drop_n_schema = [(i, tg_data_schema_dict.get(i)) for i in tg_dropped]
tg_retain_n_schema = [(i, tg_data_schema_dict.get(i)) for i in tg_existing]

# File Level
files_to_be_dropped = [ f['Key'] for i in  tg_dropped for fn, f in tg_data_files_dict.get(i).items()]
files_to_be_dropped_schema_change = [ f['Key'] for i in  tg_delta_drop_n_create for fn, f in tg_data_files_dict.get(i).items()]
files_to_be_created = {f['Key'] :f['Schema'] for i in  tg_delta_create for fn, f in src_delta['new_tg_files_dict'].get(i).items()}
files_to_be_created_schema_change = {f['Key'] :f['Schema'] for i in  tg_delta_drop_n_create for fn, f in src_delta['new_tg_files_dict'].get(i).items()}



2020-10-30 22:40:41,747:25496 MainThread run_blocking_tasks: starting

2020-10-30 22:40:41,748:25496 MainThread run_blocking_tasks: creating executor tasks

2020-10-30 22:40:41,749:25496 ThreadPoolExecutor-3_0 (task-0): passed args :['taxonomy_cs/test1/data/tg3/tg3_2020-11-01_ford.csv']

2020-10-30 22:40:41,750:25496 ThreadPoolExecutor-3_0 (task-0): running

[dry_run]: S3 delete from s3://qubole-ford/taxonomy_cs/test1/data/tg3/tg3_2020-11-01_ford.csv 
2020-10-30 22:40:41,750:25496 ThreadPoolExecutor-3_1 (task-1): passed args :['taxonomy_cs/test1/data/tg3/tg3_2020-11-02_ford.csv']

2020-10-30 22:40:41,751:25496 ThreadPoolExecutor-3_2 (task-2): passed args :['taxonomy_cs/test1/data/tg6/tg6_2020-11-01_ford.csv']

2020-10-30 22:40:41,751:25496 ThreadPoolExecutor-3_0 (task-0): done

2020-10-30 22:40:41,751:25496 ThreadPoolExecutor-3_3 (task-3): passed args :['taxonomy_cs/test1/data/tg6/tg6_2020-11-02_ford.csv']

2020-10-30 22:40:41,752:25496 ThreadPoolExecutor-3_1 (task-1): running

[dry_ru

2020-10-30 22:40:41,794:25496 ThreadPoolExecutor-4_4 (task-14): passed args :['tg1', 'tg1_2020-11-01_ford.csv', 'taxonomy_cs/test1/src/tg1_2020-11-01_ford.csv', 48]

2020-10-30 22:40:41,794:25496 ThreadPoolExecutor-4_1 (task-11): running

[dry_run]: S3 copy from s3://qubole-ford/taxonomy_cs/test1/src/tg5_2020-11-03_ford.csv to s3://qubole-ford/taxonomy_cs/test1/data/tg5/tg5_2020-11-03_ford.csv
2020-10-30 22:40:41,795:25496 ThreadPoolExecutor-4_5 (task-15): passed args :['tg1', 'tg1_2020-11-02_ford.csv', 'taxonomy_cs/test1/src/tg1_2020-11-02_ford.csv', 48]

2020-10-30 22:40:41,797:25496 ThreadPoolExecutor-4_2 (task-12): running

[dry_run]: S3 copy from s3://qubole-ford/taxonomy_cs/test1/src/tg5_2020-11-01_ford.csv to s3://qubole-ford/taxonomy_cs/test1/data/tg5/tg5_2020-11-01_ford.csv
2020-10-30 22:40:41,800:25496 ThreadPoolExecutor-4_0 (task-10): done

2020-10-30 22:40:41,800:25496 ThreadPoolExecutor-4_3 (task-13): running

[dry_run]: S3 copy from s3://qubole-ford/taxonomy_cs/test1/src/

In [26]:
tg_create_all_n_schema

{'tg6': 'key_a6,target_a61',
 'tg0': 'key_a0,target_a0',
 'tg4': 'key_a4,target_a4',
 'tg2': 'key_a21,target_a21'}

In [27]:
tg_retain_all_n_schema

{'tg1': 'key_a1,target_a1',
 'tg7': 'key_a7,target_a7',
 'tg5': 'key_a5,target_a5'}