In [279]:
import os
import sys
import pandas as pd
src_path = os.path.abspath("../src")
sys.path.insert(0, src_path)
%load_ext autoreload
%autoreload 2
from utils import *
from dag_creation import *
from bucket import *

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Advanced: Manual DAG Preparation

Use this section only if you need to customize the pipeline.
For standard ingestions, use: `python main.py --env dev`

In [280]:
filename = "../ingestion.csv"
df = pd.read_csv(filename)
print(f"Loaded {len(df)} tables")
df.head()

Loaded 2 tables


Unnamed: 0,icdsTableName,BANNER_NAME,dataSensitivity,OP-Company code,dlSchemaName,dlTableName,tableLoadType,keyPreCombine,keyPrimaryKey,bucket_id
0,IKPF,"MDD,MAK,MSB",se,"SA-MDD,SA-MAK,SA-MSB",sa_mdse_dl_secure,PHYSL_INVT_DOC,INC,ds_load_ts,"clnt,application,cond_type,sales_org,distr_cha...",123
1,PF,"MDD,MAK,MSB",se,"SA-MDD,SA-MAK,SA-MSB",sa_mdse_dl_table,PHYSL_INVT_DOC,INC,ds_load_ts,"clnt,application,cond_type,sales_org,distr_cha...",345


In [281]:
def get_cluster_name(row):
    schema = row.dlSchemaName.lower().replace('_', '-')
    banner = row.BANNER_NAME.strip().lower()
    table = row.dlTableName.lower().replace('_', '-')
    return f"{schema}-{banner}-{table}"

In [282]:
def get_table_name(row):
    return f"{row.BANNER_NAME.strip().lower()}-{row.dlTableName.lower()}"


In [283]:

df['cluster_name'] = df.apply(get_cluster_name, axis=1)
df['table_name'] = df.apply(get_table_name, axis=1)

In [284]:
# OM
def column_mapping(row):
    return {1}
df['column_mapping'] = df.apply(column_mapping,axis=1)

In [285]:
input_file = 'FinalBucketInfo.csv'

def extaract_bucket_id(row): 
    return {123}
df['bucket_id'] = df.apply(extaract_bucket_id,axis=1)

In [286]:
df

Unnamed: 0,icdsTableName,BANNER_NAME,dataSensitivity,OP-Company code,dlSchemaName,dlTableName,tableLoadType,keyPreCombine,keyPrimaryKey,bucket_id,cluster_name,table_name,column_mapping
0,IKPF,"MDD,MAK,MSB",se,"SA-MDD,SA-MAK,SA-MSB",sa_mdse_dl_secure,PHYSL_INVT_DOC,INC,ds_load_ts,"clnt,application,cond_type,sales_org,distr_cha...",{123},"sa-mdse-dl-secure-mdd,mak,msb-physl-invt-doc","mdd,mak,msb-physl_invt_doc",{1}
1,PF,"MDD,MAK,MSB",se,"SA-MDD,SA-MAK,SA-MSB",sa_mdse_dl_table,PHYSL_INVT_DOC,INC,ds_load_ts,"clnt,application,cond_type,sales_org,distr_cha...",{123},"sa-mdse-dl-table-mdd,mak,msb-physl-invt-doc","mdd,mak,msb-physl_invt_doc",{1}


In [287]:
def generate_dags_for_row(row):
    tableLoadType = row['tableLoadType']  # INC
    dlSchemaName = row['dlSchemaName']  # example : sa_mdse_dl_secure
    dlTableName = row['dlTableName']  # example : PHYSL_INVT_DOC
    dataSensitivity = row['dataSensitivity']  # SE/NS/HS
    banner_list = row['BANNER_NAME'].split(',')  # [MDD,MAK,MSB]
    
    # Get lists of names for all banners in this row
    table_names = prepare_table_name(banner_list, dlTableName)
    cluster_names = prepare_cluster_name(dlSchemaName, banner_list, dlTableName)
    
    sample_dag_file = "../sample_dag.py"
    
    created_dags = []
    
    # for each banner, there will be dag_config
    for i, b in enumerate(banner_list):
        dag_config = {
            "sensitivity": dataSensitivity,
            "cluster_name": cluster_names[i],
            "banner_name": b,
            "table_name": table_names[i],
            "tags": ["Massmart-eComm","P2","Ephemeral","SA","SECURE","MDSE",f"{b}",f"{table_names[i]}","SLT"],
            "tableLoadType": tableLoadType,
            "output_dir": f"../output/{dlSchemaName}/{table_names[i]}",
            "dag_name": f"INTLDLDAT-SA{b}-{tableLoadType}-{dlSchemaName.upper()}-{b}_{dlTableName}",
        }
        # for each dag , generate input to create a bcuket and append in csv file
        create_bucket(dag_config, dlSchemaName, output_file="../buckets/bucket_input.csv", env='prod')
        
        prepare_dag_file(sample_dag_file, dag_config)
        created_dags.append(dag_config['dag_name'])
        
    return created_dags

# Apply the function to each row
df['created_dags'] = df.apply(generate_dags_for_row, axis=1)

Creating bulk bucket ... 
✓ Added bucket entry for mdd_physl_invt_doc to ../buckets/bucket_input.csv
Creating bulk bucket ... 
✓ Added bucket entry for mak_physl_invt_doc to ../buckets/bucket_input.csv
Creating bulk bucket ... 
✓ Added bucket entry for msb_physl_invt_doc to ../buckets/bucket_input.csv
Creating bulk bucket ... 
✓ Added bucket entry for mdd_physl_invt_doc to ../buckets/bucket_input.csv
Creating bulk bucket ... 
✓ Added bucket entry for mak_physl_invt_doc to ../buckets/bucket_input.csv
Creating bulk bucket ... 
✓ Added bucket entry for msb_physl_invt_doc to ../buckets/bucket_input.csv
✓ Added bucket entry for mak_physl_invt_doc to ../buckets/bucket_input.csv
Creating bulk bucket ... 
✓ Added bucket entry for msb_physl_invt_doc to ../buckets/bucket_input.csv
