In [9]:
import sys
import io
import csv
from pathlib import Path
import pandas as pd
from openpyxl import load_workbook 
import config as cfg
import eda_tools as tls

load config

In [10]:
# configuration load
log_path = Path(Path.cwd().parent /  r"config/config.json")
if not log_path.exists(): 
    print(f"Arquivo de configuração não encontrado !\n{log_path}")
    sys.exit(1)
log = cfg.config_log(log_path)
cfg.load_config() 
pd.set_option('future.no_silent_downcasting', True)

load sheets

In [11]:
# sheets dfs

df_tables = pd.read_excel(cfg.eda_sheet_full_path,sheet_name='tables')
df_fields = pd.read_excel(cfg.eda_sheet_full_path,sheet_name='fields', header=1)
df_fields = df_fields.astype('object')
df_fields.set_index(["table", "field"], inplace=True)

# format headers
df_tables.columns = df_tables.columns.str.strip().str.lower()
df_fields.columns = df_fields.columns.str.strip().str.lower()

# open sheet
try:
    wb = load_workbook(cfg.eda_sheet_full_path)
    tables_sheet = wb["tables"]
    fields_sheet = wb["fields"]
except FileNotFoundError:
    print("Erro ao abrir planilhas")
    exit(1)

collect describes

In [None]:
# Estatisticas de contagem por tipo de conteudo do campo 
df_stats = pd.DataFrame(columns=['table','field','stat','value'])
stats_collected = []
for index, table in df_tables.iterrows():
    # load_data
    table_name = table['table']
    print(f"-----{table_name}-----")
    data_path = Path(cfg.data_file_path / table['file'])
    df_dados = pd.read_csv(data_path,encoding=tls.encode(data_path),quotechar=None, quoting=3,keep_default_na=True,sep=cfg.csv_sep,engine='python')
    df_dados.columns = df_dados.columns.str.strip().str.lower()
    table_count = len(df_dados)

    # Counts by content type 
    df_types = df_dados.map(tls.classify_content)
    df_types.apply(pd.Series.value_counts)

    for field in df_types.columns:
        counts = df_types[field].value_counts(dropna=False)
        for stat, value in counts.items():
            stats_collected.append({"table": table_name, "field": field, "stat": stat, "value": int(value)})

    # describe stats
    df_fields_table = df_fields.reset_index()
    df_fields_table = df_fields_table[df_fields_table['table'] == table_name]
    for idx, fld in df_fields_table.iterrows(): 
        field_name = fld['field']
        if fld['type'] == "str":
            field_series = df_dados[field_name]             
            field_series = field_series.astype('object')            
        else: 
           field_series = pd.to_numeric(df_dados[field_name], errors='coerce')
        # count 
        stats_collected.append({"table": table_name, "field": field_name, "stat": "count" , "value": table_count})
        # min
        stats_collected.append({"table": table_name, "field": field_name, "stat": "min" , "value": field_series.min()})
        # max 
        stats_collected.append({"table": table_name, "field": field_name, "stat": 'max', "value": field_series.max()})
        # mean 
        try: 
            stats_collected.append({"table": table_name, "field": field_name, "stat": "mean", "value": field_series.mean()})        
        except: 
            stats_collected.append({"table": table_name, "field": field_name, "stat": "mean", "value": "no number"})
        # std
        try:        
            stats_collected.append({"table": table_name, "field": field_name, "stat":"std", "value": field_series.std()}) 
        except: 
            stats_collected.append({"table": table_name, "field": field_name, "stat":"std", "value": "no number"})        
        stats_collected.append({"table": table_name, "field": field_name, "stat":"unique", "value": field_series.nunique()})
        # top
        try: 
            stats_collected.append({"table": table_name, "field": field_name, "stat":"top", "value": field_series.mode().iloc[0]})
        except: 
            stats_collected.append({"table": table_name, "field": field_name, "stat":"top", "value": "no value"})
        # freq
        try: 
            stats_collected.append({"table": table_name, "field": field_name, "stat":"freq", "value": field_series.value_counts().iloc[0]})
        except: 
           stats_collected.append({"table": table_name, "field": field_name, "stat":"freq", "value": "no value"})
        # q1, q2, q3
        try: 
            q_values = field_series.quantile([0.25, 0.5, 0.75])
            stats_collected.append({"table": table_name, "field": field_name, "stat":"Q1(25%)", "value": q_values.iloc[0]})
            stats_collected.append({"table": table_name, "field": field_name, "stat":"Q2(50%)", "value": q_values.iloc[1]})
            stats_collected.append({"table": table_name, "field": field_name, "stat":"Q3(75%)", "value": q_values.iloc[2]})
        except: 
            stats_collected.append({"table": table_name, "field": field_name, "stat":"Q1(25%)", "value": "no value"})
            stats_collected.append({"table": table_name, "field": field_name, "stat":"Q2(50%)", "value": "no value"})
            stats_collected.append({"table": table_name, "field": field_name, "stat":"Q3(75%)", "value": "no value"})
        # format
        try: 
            if not pd.isna(field_series['regex']): 
                regex_pattern = field_series['regex']
                bool_series = field_series.str.match(regex_pattern, na=False)
                stats_collected.append({"table": table_name, "field": field_name, "stat":"valid_format", "value": bool_series.sum()})                
            else: 
                stats_collected.append({"table": table_name, "field": field_name, "stat":"valid_format", "value": "err"})
        except: 
                stats_collected.append({"table": table_name, "field": field_name, "stat":"valid_format", "value": "no format"})
        # list
        try: 
            if not pd.isna(field_series['list']): 
                values_lst = field_series['list']
                bool_series = field_series.astype(str).isin(values_lst)
                stats_collected.append({"table": table_name, "field": field_name, "stat":"valid_on_list", "value": bool_series.sum()})                
            else: 
                stats_collected.append({"table": table_name, "field": field_name, "stat":"valid_on_list", "value": "err"})
        except: 
                stats_collected.append({"table": table_name, "field": field_name, "stat":"valid_on_list", "value": "no list"})
        # range 
        try: 
            if not pd.isna(field_series['list']): 
                ranges_list = field_series['list']
                range_lst = ranges_list.split(';') 
                min_limit, max_limit = range_lst[0], range_lst[1]
                qty_range = ((field_series.astype(float) >= float(min_limit)) & (field_series.astype(float) <= float(max_limit))).sum()
                stats_collected.append({"table": table_name, "field": field_name, "stat":"valid_on_range", "value": qty_range})                
            else: 
                stats_collected.append({"table": table_name, "field": field_name, "stat":"valid_on_range", "value": "err"})
        except: 
                stats_collected.append({"table": table_name, "field": field_name, "stat":"valid_on_range", "value": "no range"})
    
    # stats consolidation     
    df_stats = pd.concat([df_stats,pd.DataFrame(stats_collected)])   
df_stats = df_stats.astype('object')

df_stats_pivot = (df_stats.pivot_table(index=["table","field"], columns="stat", values="value", fill_value=0,aggfunc='first').reset_index())
df_stats_pivot_para_update = df_stats_pivot.set_index(["table", "field"])
df_fields.update(df_stats_pivot_para_update)
col_list = ['nulls', 'blanks', 'int', 'float', 'str', 'date']
df_fields[col_list] = df_fields[col_list].fillna(0)

-----ses_cias-----
-----ses_ramos-----
-----ses_seguros-----


In [13]:
df_fld = df_fields.reset_index()
col_series = df_fld.columns 

for idx_row, fld_row in df_fld.iterrows(): 
    line = int(idx_row) + 3
    for col_name in col_series: 
        col = col_series.get_loc(col_name) + 1
        #print(fld_row['table'], fld_row['field'], fld_row[col_name], col_name, line, col) 
        if col_name == "size": 
            fields_sheet.cell(row=line, column=col).value = "Sergio"
        fields_sheet.cell(row=line, column=col).value = fld_row[col_name]
        #print(fields_sheet.cell(row=line, column=col).value)

wb.save(cfg.eda_sheet_full_path)

        #fields_sheet.cell(row=line, column=col).value = str(stat_value) 
        



#
    






# for idx, fld_row in df_fields.reset_index().iterrows(): 
#     field_name = fld_row['field']
#     line = int(idx) + 2
#     for col_name in df_fields.reset_index().columns.names:
#         col = df_fields.reset_index().columns.get_loc(col_name) + 1 
#         print(col_name,line, col)



    # col_series = 
    # fields_sheet.cell(row=line, column=col).value = str(stat_value) 
    

In [14]:
    # col_type = df_types.apply(lambda col: col.value_counts().idxmax())
    # for coluna, tipo in col_type.items():
        
    #     if tipo == "int":
    #         df_dados[coluna] = pd.to_numeric(df_dados[coluna], errors="coerce").astype("Int64")
    #     elif tipo == "float":
    #         df_dados[coluna] = pd.to_numeric(df_dados[coluna], errors="coerce").astype("Float64")
    #     elif tipo == "date":
    #         df_dados[coluna] = pd.to_datetime(df_dados[coluna], format="%d/%m/%Y", errors="coerce")
    #     elif tipo in ["null", "blank"]:
    #         df_dados[coluna] = pd.NA
    #     else:
    #         df_dados[coluna] = df_dados[coluna].astype("string")



    # print(repr(df_dados.iloc[1, 0]))
    # print(type(df_dados.iloc[1, 0]))

    # for coluna in df_dados.columns:
    #     print(f'------ {coluna} ------') 
    #     qtd_nulls, qtd_blank, qtd_int, qtd_float, qtd_str, qtd_date = 0, 0, 0, 0, 0, 0
    #     for valor in df_dados[coluna]:
    #         type_class = classify_content(valor)

 
    # with open(data_path, "r", newline='', encoding=encode(data_path)) as f:
    #     reader = csv.reader(f)
    #     # row_count = sum(1 for _ in reader)
    #     # print(row_count)
    #     for row_index, row in enumerate(reader):
    #         for field_index, field_value in row:
    #                 print(f"Campo {field_index}: {field_value}")
                    



    # with open(data_path, "r", newline='', encoding=encode(data_path)) as f:
    #     leitor = csv.DictReader(f, delimiter=";")
    #     leitor.fieldnames = [nome.lower() for nome in leitor.fieldnames]
    #     for coluna in df_dados.columns:
    #         for linha in leitor:
    #             print(linha[coluna])
    #             print(type(linha[coluna]))
    #             # print(linha)






#     # print(df_dados.columns)
#     # print(df_dados.describe(include='all')) 
#     # print(df_dados[['noenti', 'cogrupo']].dtypes)

#     # describe_collect     
#     df_describe = df_dados.describe(include='all'); 
#     df_describe.columns = df_describe.columns.str.strip().str.lower()

#     # print(df_describe.columns)

#     # for id, reg in df_describe.iterrows(): 
#     #     print(reg)

#     for idx, desc_row in df_describe.iterrows():
#         stat_name = desc_row.name 
#         type_name = desc_row.dtype
#         for col_name, value  in desc_row.items():
#             field_name = col_name
#             stat_value = value

#             stats_collected.append({'table': table_name , 'field': field_name , 'stat': stat_name, 'value': stat_value})
#         # dtype_field = df_dados[field_name].dtype
#         # stats_collected.append({'table': table_name , 'field': field_name , 'stat': 'type', 'value': dtype_field})

# df_stats = pd.concat([df_stats, pd.DataFrame(stats_collected)], ignore_index=True)
# df_stats.rename(columns={'count': 'not-null'}, inplace=True)


# for idx, reg in df_stats.iterrows(): 
#     print(reg['table'], reg['field'], reg['stat'], reg['value'])

Save stats

In [15]:
# # tables sheet 
# header_tables = headers = {str(cell.value).lower(): idx+1 for idx, cell in enumerate(tables_sheet[1]) if cell.value}
# # tables sheet loop
# for tb_row in tables_sheet.iter_rows(min_row=2):
#     print('===== Tables Loop =====')
#     table_name = tb_row[header_tables['table']-1].value
#     print(f"=== {table_name} ===")
#     # fields sheet loop
#     print('****** fields sheet loop ********')
#     df_table_fields = df_fields[df_fields['table'] == table_name]
#     for index, fld_row in df_table_fields.iterrows(): 
#         field_name = fld_row['field']
#         print(f'*** {field_name} ***')
#         line = int(index) + 2
#         # field stats loop 
#         df_fld_stats = df_stats[df_stats['field'] == field_name]
#         print('###### stats loop #####')
#         for idx, stat_row in df_fld_stats.iterrows(): 
#             stat_name = stat_row['stat'] 
#             print(f' ## {stat_name} ##')
#             col = df_fields.columns.get_loc(stat_name) + 1 
#             try:
#                 stat_value = str(stat_row['value']) 
#                 print("line/col: ",line, " / ", col)
#                 print(f'stat_value: {stat_value}')
#                 fields_sheet.cell(row=line, column=col).value = str(stat_value) 
#             except: 
#                 print('exception')
#                 continue
#     print("")
#     print("-*" * 50 )
#     print("")

# wb.save(cfg.eda_sheet_full_path)