## Intancia um objeto MinioSpark

In [5]:
from minio_spark import MinioSpark
datalake = MinioSpark()

## Lista todos os buckets do datalake

In [6]:
buckets = datalake.client.list_buckets()
print("Buckets in the root of the DataLake:")
for bucket in buckets:
    print(bucket.name)

Buckets in the root of the DataLake:
raw
stage


## Lista todos os arquivos do bucket "raw" que iniciam com "GSH_APAC"

In [9]:
raw_bucket = datalake.get_bucket('raw')
for obj in raw_bucket.list_objects(prefix="MVSISS/GSH_APAC_"):
    print(obj.name)

MVSISS/GSH_APAC_1.zip
MVSISS/GSH_APAC_2.zip
MVSISS/GSH_APAC_3.zip
MVSISS/GSH_APAC_4.zip
MVSISS/GSH_APAC_5.zip
MVSISS/GSH_APAC_MEDICAMENTO_1.zip
MVSISS/GSH_APAC_MEDICAMENTO_2.zip


In [13]:
from minio.error import S3Error
raw_bucket = datalake.get_bucket('raw')
prefix = "MVSISS/GSH_APAC_"

# Lista os objetos com o prefixo definido
objects = datalake.list_objects(bucket_name, prefix=prefix, recursive=True)
for obj in objects:
    print(obj.object_name)


raw_bucket = datalake.get_bucket('raw')
for obj in raw_bucket.list_objects(prefix="MVSISS/GSH_APAC_",  recursive=True):
    print(obj.name)

AttributeError: 'MinioSpark' object has no attribute 'list_objects'

## Carrega os arquivos .zip em um dataframe spark.

O método `read_csv_from_zip` tem como objetivo ler arquivos CSV compactados em um arquivo ZIP. O processo ocorre em três etapas:

1. **Criação da pasta:**

Um objeto Minio é utilizado para criar uma pasta temporária com o nome do prefixo fornecido, no exemplo "GSH_APAC". Essa pasta servirá para armazenar os arquivos descompactados.

2. **Descompactação dos arquivos:**

Todos os arquivos dentro do arquivo ZIP serão descompactados e extraídos para a pasta criada na etapa anterior. Isso garante que os arquivos CSV fiquem acessíveis para leitura.

3. **Leitura dos arquivos CSV:**

Finalmente, o método lê cada um dos arquivos CSV descompactados na pasta "GSH_APAC". O conteúdo desses arquivos pode ser processado conforme as necessidades da aplicação.

In [4]:
df = datalake.read_csv_from_zip('raw', 'GSH_APAC_', delimiter='|')
df.count()

AnalysisException: [PATH_NOT_FOUND] Path does not exist: s3a://raw/GSH_APAC_.

## Lista os arquivos descompactados na pasta "GSH_APAC"

In [9]:
raw_bucket = datalake.get_bucket('raw')
for obj in raw_bucket.list_objects(prefix="GSH_APAC/"):
    print(obj.name)

GSH_APAC/GSH_APAC_1.txt
GSH_APAC/GSH_APAC_2.txt
GSH_APAC/GSH_APAC_3.txt
GSH_APAC/GSH_APAC_4.txt
GSH_APAC/GSH_APAC_5.txt


## Carrega o arquivo (objeto MinIO) GSH_APAC/GSH_APAC_1.txt em um datafame

In [7]:
df = datalake.read_csv_to_dataframe('raw', 'GSH_APAC/GSH_APAC_1.txt', delimiter='|')
df.count()

956914

In [17]:
df.select('APAC_ID', 'APCIDPRINC_CID', 'APCIDPRINC_DESCR_CID').show(truncate=False)

+-------+--------------+--------------------------------------------+
|APAC_ID|APCIDPRINC_CID|APCIDPRINC_DESCR_CID                        |
+-------+--------------+--------------------------------------------+
|4      |M810          |OSTEOPOROSE POS-MENOPAUSICA                 |
|5      |L700          |ACNE VULGAR                                 |
|6      |L700          |ACNE VULGAR                                 |
|7      |N180          |DOENC RENAL EM ESTADIO FINAL                |
|8      |E78           |DISTURBIOS METAB LIPOPROTEINAS E OUT LIPIDEM|
|9      |G300          |DOENC DE ALZHEIMER DE INICIO PRECOCE        |
|10     |F900          |DISTURBIOS DA ATIVIDADE E DA ATENCAO        |
|11     |E228          |OUTR HIPERFUNCOES DA HIPOFISE               |
|12     |L708          |OUTR FORM DE ACNE                           |
|13     |F200          |ESQUIZOFRENIA PARANOIDE                     |
|14     |L700          |ACNE VULGAR                                 |
|15     |M068       

In [2]:
df = datalake.read_csv_from_zip('raw', 'GSH_PESSOAS.zip')
df.printSchema()

root
 |-- ID_PESSOA: integer (nullable = true)
 |-- NOME: string (nullable = true)
 |-- APELIDO: string (nullable = true)
 |-- DATA_NASCIMENTO: date (nullable = true)
 |-- DATA_FALECIMENTO: timestamp (nullable = true)
 |-- OBITO: integer (nullable = true)
 |-- EMAIL: string (nullable = true)
 |-- CPF: long (nullable = true)
 |-- NOME_PAI: string (nullable = true)
 |-- NOME_MAE: string (nullable = true)
 |-- RG: string (nullable = true)
 |-- RG_DATA_EMISSAO: date (nullable = true)
 |-- CARTAO_SUS: long (nullable = true)
 |-- CTPS_NUMERO: integer (nullable = true)
 |-- CTPS_SERIE: integer (nullable = true)
 |-- CTPS_DATA_EMISSAO: date (nullable = true)
 |-- CERTIDAO_NUM_FOLHA: string (nullable = true)
 |-- CERTIDAO_NUM_LIVRO: string (nullable = true)
 |-- CERTIDAO_NUM_TERMO: string (nullable = true)
 |-- CERTIDAO_CARTORIO: string (nullable = true)
 |-- CERTIDAO_DATA_EMISSAO: date (nullable = true)
 |-- ENDERECO_NUMERO: integer (nullable = true)
 |-- ENDERECO_COMPLEMENTO: string (nullable

In [11]:
df.printSchema()

root
 |-- APAC_ID: string (nullable = true)
 |-- APAC_ID_ANTERIOR: string (nullable = true)
 |-- APAC_PPI_MOVIMENTACAO: string (nullable = true)
 |-- APAC_USUARIO_AUTORIZADOR: string (nullable = true)
 |-- APAC_DATA_AUTORIZACAO: string (nullable = true)
 |-- APAC_DATA_FIM: string (nullable = true)
 |-- APAC_DATA_INICIO: date (nullable = true)
 |-- APAC_DATA_PREVISAO_AUTORIZACAO: integer (nullable = true)
 |-- APAC_DATA_REJEICAO: string (nullable = true)
 |-- APAC_DATA_SOLICITACAO: string (nullable = true)
 |-- APAC_DT_ENCERRAMENTO: string (nullable = true)
 |-- APAC_DT_FINALIZACAO_APAC: string (nullable = true)
 |-- APAC_DT_SENHA_TRIAGEM: string (nullable = true)
 |-- APAC_DV_APAC: string (nullable = true)
 |-- APAC_EXAMES_CLINICOS: string (nullable = true)
 |-- APAC_FLAG_ESTADO: integer (nullable = true)
 |-- APAC_FLAG_NUMERACAO: string (nullable = true)
 |-- APAC_FLAG_RECEITA_SUS: string (nullable = true)
 |-- APAC_ID_APAC_CANCELA_MOTIVO: string (nullable = true)
 |-- APAC_ID_APAC_MO