# Objective

This notebook updates CSV files in a Google Cloud Storage bucket by adding new columns to all CSV files missing this information.

This function performs the following steps:
1. Backs up all CSV files before making any changes
2. Updates each CSV file by adding a new column and reordering the columns, uploads the updated CSV file back to the bucket, replacing the old one.

In [1]:
import os
import datetime
import pandas as pd
from google.oauth2.service_account import Credentials
from google.cloud import storage
from tqdm.notebook import trange, tqdm

## 1. Backup Step

### 1.1. Set Parameters

In [2]:
# GOOGLE CLOUD
key_file_path = "/Users/.credentials/rj-sms.json"
bucket_name = "rj-sms"

# Specify the prefixes (folders) you want to scan
prefixes = [
    "staging/brutos_prontuario_vitacare/estoque_movimento/",
]

# LOCAL
backup_folder = "/Users/tmp"
tmp_folder = "/Users/projects/pipelines_rj_sms/data/raw"

### 1.2. Backup

In [4]:
credentials = Credentials.from_service_account_file(key_file_path)
client = storage.Client(credentials=credentials)
bucket = client.get_bucket(bucket_name)

# Calculate the total number of files
total_files = 0

for prefix in prefixes:
    blobs = bucket.list_blobs(prefix=prefix)
    for blob in blobs:
        if blob.name.endswith(".csv"):
            total_files += 1

print(f"Total files: {total_files}")


# Start backing up the files
for prefix in prefixes:
    blobs = bucket.list_blobs(prefix=prefix)
    for blob in tqdm(blobs, total=total_files):
        if blob.name.endswith(".csv"):
            file = blob.name.split("/")[-1]
            if file not in os.listdir(backup_folder):
                blob.download_to_filename(f"{backup_folder}/{file}")

Total files: 32183


  0%|          | 0/32183 [00:00<?, ?it/s]

## 2. Update & Upload Step

### 2.1. Set Parameters

In [5]:
# PAYLOAD
new_columns = ["cpfProfPrescritor", "codWms", "armazemOrigem", "armazemDestino"]
columns_order = [
    "ap",
    "cnesUnidade",
    "nomeUnidade",
    "desigMedicamento",
    "atc",
    "code",
    "lote",
    "dtaMovimento",
    "tipoMovimento",
    "motivoCorrecao",
    "justificativa",
    "cpfProfPrescritor",
    "cnsProfPrescritor",
    "cpfPatient",
    "cnsPatient",
    "qtd",
    "codWms",
    "armazemOrigem",
    "armazemDestino",
    "id",
    "_data_carga",
]

### 2.2. Update & Upload

In [20]:
tmp_path = f"{backup_folder}/_temp.csv"

for prefix in prefixes:

    blobs = bucket.list_blobs(prefix=prefix)
    for blob in tqdm(blobs, total=total_files):

        if blob.name.endswith(".csv"):

            if blob.updated < datetime.datetime(
                2024, 7, 19, 6, 30, 0, 0, tzinfo=datetime.timezone.utc
            ):

                file = blob.name.split("/")[-1]

                try:
                    df = pd.read_csv(
                        f"{backup_folder}/{file}",
                        sep=";",
                        dtype=str,
                        keep_default_na=False,
                        encoding="utf-8",
                    )

                    for new_column in new_columns:
                        if new_column not in df.columns:
                            df[new_column] = ""

                    df = df[columns_order]
                    df.to_csv(tmp_path, index=False, sep=";", encoding="utf-8")
                    blob.upload_from_filename(tmp_path)

                except Exception as e:
                    print(f"Error processing file {file}: {e}")

print("All CSV files have been updated.")

  0%|          | 0/32183 [00:00<?, ?it/s]

Error processing file estoque_movimento_ap33_cnes6761704_2024-03-23.csv: No columns to parse from file
Error processing file estoque_movimento_ap33_cnes2269759_2024-05-15.csv: No columns to parse from file
Error processing file estoque_movimento_ap32_cnes6919626_2024-06-26.csv: No columns to parse from file
All CSV files have been updated.
