## Importar librerías

In [1]:
import pandas as pd
import numpy as np
import glob
from datetime import datetime as dt
import datetime
from datetime import date
import string
import unicodedata
import os

## Utilidades

In [2]:
def rreplace(s, old, new, n_occurrence):
    # Replace the last n_occurrence (n_occurrence = 1 is the last occurrence) of an expression in a string s
    li = s.rsplit(old, n_occurrence)
    return new.join(li)

def str_format(stringg):
    
    #Elimina simbolos de puntuación, u otros simbolos de un string
    
    punctuation = list(string.punctuation)          
    symbols = ['^','°', 'ª', os.linesep, '\n']                     
    new_string = unicodedata.normalize('NFKD', stringg).encode('ascii', errors='ignore').decode('utf-8') # Elimina tildes 

    for i in range(len(punctuation)):
        new_string = new_string.replace(punctuation[i],'_')   # Elimina simbolos de puntuación
    
    for i in range(len(symbols)):
        new_string = new_string.replace(symbols[i],'')       # Elimina otros simbolos
    
    new_string = new_string.replace(' ','_').lower()         # Elimina espacios 
    
    while new_string[-1] == '_':
        new_string = rreplace(new_string,'_','',1)           # Elimina los "espacios" (_) al final del string
    
    return new_string


def read_txt(path, header_int, sep = '|', encoding = 'latin1'):
# Lee los archivos txt del path (correspondientes a la fecha de descarga), y crea diccionario con respectivos dataframes
    all_files = pd.Series(glob.glob(path + "/*.txt"))
    fecha_str = date.today().strftime("%Y%m")
    files = all_files[all_files.astype(str).str.contains(fecha_str)].tolist()
    d = {}
    
    for i in range(0,len(files)):
        file_path = files[i]
        index_from = file_path.rfind("\\") + 1
        file_name = str_format(file_path[index_from:-4])
        
        if file_name[:8].isdigit():
            file_name = file_name[9:]
        
        d[str(file_name)] = pd.read_csv(file_path,header=header_int, sep=sep, encoding=encoding, dtype=str)
    return d

def clean_columns(dict, del_from, col=0):
    for keys in dict:
        # Elimina la primera y última columna 
        dict[keys] = dict[keys].drop(columns=dict[keys].columns[0]).\
        drop(columns = dict[keys].columns[-1]).dropna(how = 'all').reset_index(drop = True)
        del_from_index = dict[keys].index[dict[keys][dict[keys].columns[col]].str.contains(del_from)].tolist()[0]
        
        #Elimina filas al final de txt que presentan el resumen del libro de compras/ventas
        dict[keys] = dict[keys].iloc[0:del_from_index,:]
    return dict

def save_as_csv(dict, path, suffixes='', sep=';', encoding = 'latin1'):
    
    for keys in dict:
        dict[keys].to_csv(path + '\\' + dt.now().strftime('%Y%m%d') + str(keys) + suffixes + '.csv', sep=sep)
        
def read_xlsx(path, header_int, sheet_name_str):
# Lee los archivos xlsx del path (correspondientes a la fecha de descarga), y crea diccionario con respectivos dataframes
    all_files = pd.Series(glob.glob(path + "/*.xlsx"))
    fecha_str = date.today().strftime("%Y%m")
    files = all_files[all_files.astype(str).str.contains(fecha_str)].tolist()
    d = {}
    
    for i in range(0,len(files)):
        file_path = files[i]
        index_from = file_path.rfind("\\") + 1
        file_name = str_format(file_path[index_from:-4])
        
        if file_name[:8].isdigit():
            file_name = file_name[9:]
        
        d[str(file_name)] = pd.read_excel(file_path, header=header_int, sheet_name=sheet_name_str, dtype=str)
    return d

def col_names_format(dict):
    
    for keys in dict:
        df_col = dict[keys].columns
        col_list = []
        for col in df_col:
            col_list.append(str_format(col))
        dict[keys].columns = col_list
    
    return dict

def schema_cols(dict, col_names, empty_col_value = np.nan):
    for keys in dict:
        for col in col_names:
            if col not in dict[keys].columns:
                dict[keys][col] = empty_col_value
        dict[keys] = dict[keys][col_names]
    return dict

## Ejecutar limpieza
### Leer archivos y guardar en diccionario dict

In [3]:
dict_plan = read_xlsx(r'C:/projects/data_lake/data/input/current/plan_trienal',[4,5], sheet_name_str='Plan trienal 19-21')

### Reemplazar columnas que contengan "Unnamed" por ""

In [4]:
columns = []

for keys in dict_plan:
    for col in dict_plan[keys].columns:
        col_list = list(col)
        for name in col_list:
            if 'Unnamed' in str(name):
                col_list[1] = ''
        columns.append(tuple(col_list))
    dict_plan[keys].columns = columns

### Unificar celdas combinadas en cada df del dict

In [5]:
for keys in dict_plan:
    # Convertir cada elemento de las columnas en string
    dict_plan[keys].columns = [tuple(map(str, col)) for col in dict_plan[keys].columns]
    
    # Unir celdas combinadas
    dict_plan[keys].columns = ['_'.join(col) for col in dict_plan[keys].columns]

### Cambiar formato de nombre de columnas

In [6]:
dict_plan = col_names_format(dict_plan)

### Guardar en archivos csv

In [7]:
save_as_csv(dict_plan, r'C:/projects/data_lake/data/output/current/plan_trienal')