In [None]:
# Import required libraries
import os
import glob
import zipfile
import pandas as pd
import datetime
from google.cloud import bigquery
from google.oauth2 import service_account
import re
from collections import defaultdict
import unicodedata
import json

In [None]:
# Parameters 
RAW_FILES_DIR = './raw_files'
EXTRACTED_DIR = './extracted_files'
CSV_OUTPUT_DIR = './csv_output'

os.makedirs(EXTRACTED_DIR, exist_ok=True)
os.makedirs(CSV_OUTPUT_DIR, exist_ok=True)

# List all ZIP files in the raw_files directory
zip_files = glob.glob(os.path.join(RAW_FILES_DIR, '*.zip'))
print(f"Found {len(zip_files)} ZIP files.")

# Today
today = datetime.datetime.today()
# today = today + datetime.timedelta(days=-1)
today = today.strftime('%Y-%m-%d')

In [None]:
# Decompress all ZIP files
for zip_path in zip_files:
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(EXTRACTED_DIR)
print(f"All files extracted to {EXTRACTED_DIR}.")


In [None]:
# Consolidate files by prefix and save to unique CSVs in ./csv_output (full join on columns)

# Find all files in the extracted_files directory
extracted_files = glob.glob(os.path.join(EXTRACTED_DIR, '*'))
prefix_groups = defaultdict(list)

# Group files by prefix (before '-')
for file_path in extracted_files:
    filename = os.path.basename(file_path)
    match = re.match(r'([^\-]+)-', filename)
    if match:
        prefix = match.group(1)
        prefix_groups[prefix].append(file_path)

# For each prefix, concatenate all files and save as prefix.csv (full join on columns)
for prefix, files in prefix_groups.items():
    dfs = []
    for f in files:
        try:
            if f.endswith('.csv'):
                df = pd.read_csv(f, dtype=str, encoding='utf-8', sep=';')
            elif f.endswith('.xlsx'):
                df = pd.read_excel(f, dtype=str)
            else:
                continue
            # Normalize column names: remove accents and replace special characters
            table_names_path = os.path.join(os.path.dirname(__file__), 'table_names.json') if '__file__' in globals() else './table_names.json'
            with open(table_names_path, 'r') as f_table_names:
                table_names = json.load(f_table_names)
            def normalize_col(col):
                col = unicodedata.normalize('NFKD', col).encode('ASCII', 'ignore').decode('ASCII')
                col = col.replace('%', 'percent')
                col = col.replace(' ', '_')
                col = col.replace('/', '_')
                col = col.replace('-', '_')
                col = col.replace('.', '_')
                return col
            df.columns = [normalize_col(c) for c in df.columns]
            # Convert decimal separator from ',' to '.' for all string columns
            for col in df.columns:
                if df[col].dtype == object:
                    df[col] = df[col].str.replace('.', '', regex=False)
                    df[col] = df[col].str.replace(',', '.', regex=False)
            # Convert DATE columns to date format if schema is available
            dfs.append(df)
        except Exception as e:
            print(f"Error reading {f}: {e}")
    if dfs:
        try:
            # Use outer join to ensure all columns are included
            combined = pd.concat(dfs, ignore_index=True, sort=True)
            if len(combined) == 0:
                print(f"No rows to save for prefix {prefix}, skipping CSV output.")
                continue
            if prefix == 'capa':
                print(f"Skipping CSV output for prefix 'capa'.")
                continue

            prefix_lc = prefix.lower()
            if prefix_lc in table_names:
                schema_file = os.path.join(os.path.dirname(table_names_path), 'table_schemas', f"{table_names[prefix_lc]}.json")
                try:
                    with open(schema_file, 'r') as f_schema:
                        schema = json.load(f_schema)
                    date_cols = [col['name'] for col in schema if col['type'] == 'DATE']
                    for date_col in date_cols:
                        if date_col in combined.columns:
                            # print(pd.to_datetime(df[date_col], dayfirst = True, errors='coerce').dt.strftime('%Y-%m-%d'))
                            # print(date_col)

                            combined[date_col] = pd.to_datetime(combined[date_col], dayfirst = True, errors='coerce').dt.strftime('%Y-%m-%d')
                            # print(f"Converted column {date_col} to date format for prefix {prefix}.")
                except Exception as e:
                    print(f"Could not process schema for {prefix}: {e}")

            combined['updated_on'] = today  # Add updated_on column
            # Reorder columns so 'updated_on' is first
            cols = ['updated_on'] + [col for col in combined.columns if col != 'updated_on']
            combined = combined[cols]
            out_path = os.path.join(CSV_OUTPUT_DIR, f'{prefix}.csv')
            combined.to_csv(out_path, index=False, encoding='utf-8', sep=';')
            print(f"Saved {out_path} with {len(combined)} rows and {len(list(combined.columns))} columns: {list(combined.columns)}.")
        except Exception as e:
            print(f"Error saving {prefix}: {e}")
    else:
        print(f"No data for prefix {prefix}.")


### Atenção: precisa revisar essa célula e mudar o procedimento dela para pegar os dados das tabelas no BQ, comparar as mudanças, e atualizar os novos. lembrar de particionar as tabelas se elas forem criadas

In [None]:
## Essa parte serve para subir as tabelas pela primeira vez
## Essa parte ainda não foi implementada e testada

# Upload each CSV in csv_output to BigQuery as a table in the Finance dataset

# Set your GCP project and dataset
GCP_PROJECT = 'api-ms-data'
BQ_DATASET = 'Finance'
CREDENTIALS_PATH = '../Credentials/your_service_account.json'

# Authenticate and create BigQuery client
credentials = service_account.Credentials.from_service_account_file(CREDENTIALS_PATH)
bq_client = bigquery.Client(credentials=credentials, project=GCP_PROJECT)

# List all CSV files in the output directory
csv_files = glob.glob(os.path.join(CSV_OUTPUT_DIR, '*.csv'))

for csv_path in csv_files:
    # Extract table name from file name
    base = os.path.basename(csv_path)
    name, _ = os.path.splitext(base)
    table_id = f"{GCP_PROJECT}.{BQ_DATASET}.Tbl_SubAdquirencia_Rede_{name}"
    print(f"Uploading {csv_path} to {table_id} ...")
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        autodetect=True,
        field_delimiter=';',
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE  # Overwrite table if exists
    )
    with open(csv_path, "rb") as source_file:
        load_job = bq_client.load_table_from_file(source_file, table_id, job_config=job_config)
        load_job.result()  # Wait for the job to complete
    print(f"Loaded data to {table_id}")

#### Essa parte já está pacificada, não precisa de revisão

In [None]:
# Delete all files in raw_files, extracted_files, and csv_output except .gitignore

folders = ["./raw_files", "./extracted_files", "./csv_output"]

for folder in folders:
    files = glob.glob(os.path.join(folder, "*"))
    for f in files:
        if not f.endswith(".gitignore"):
            try:
                os.remove(f)
                # print(f"Deleted: {f}")
            except Exception as e:
                print(f"Could not delete {f}: {e}")