In [0]:
# Especificar o caminho do arquivo CSV onde deseja salvar a tabela
caminho_arquivo_csv = "/mnt/temp/ia/databricks_temp/tratados"

In [0]:
caminho_arquivo_csv

'/mnt/temp/ia/databricks_temp/tratados'

In [0]:
import pandas as pd

# Adjust the path to use the DBFS file system
df = pd.read_csv("file:/dbfs/mnt/temp/ia/databricks_temp/Brahma Sample Data - CVS Pharmacy sample.csv")

In [0]:
df.display()

transaction_id,pharmacy_id,product_id,customer_id,transaction_date,quantity,total_amount,payment_method
1001,1,101,5001,2024-08-01 9:00,2,40,Credit Card
1002,2,102,5002,2024-08-01 10:15,1,20,Cash
1003,1,103,5003,2024-08-01 11:30,3,90,Credit Card
1004,3,104,5004,2024-08-01 12:45,1,15,Debit Card
1005,2,105,5005,2024-08-01 13:00,2,50,Cash
1006,3,106,5006,2024-08-01 14:20,1,30,Credit Card
1007,1,101,5001,2024-08-01 15:30,1,20,Credit Card
1008,2,103,5003,2024-08-01 16:00,2,60,Debit Card
1009,3,102,5002,2024-08-01 17:10,1,20,Cash
1010,1,104,5004,2024-08-02 9:00,4,60,Credit Card


In [0]:
df.count()

transaction_id      12
pharmacy_id         12
product_id          12
customer_id         12
transaction_date    12
quantity            12
total_amount        12
payment_method      12
dtype: int64

In [0]:
# Quantidade de linhas e colunas do conjunto de dados.
num_rows = df.count()
num_cols = len(df.columns)

print("Numero de linhas na tabela", num_rows)
print("Numero de colunas na tabela", num_cols)

Numero de linhas na tabela transaction_id      12
pharmacy_id         12
product_id          12
customer_id         12
transaction_date    12
quantity            12
total_amount        12
payment_method      12
dtype: int64
Numero de colunas na tabela 8


## Verificando a **duplicidade**

In [0]:
from pyspark.sql.functions import count

# Use * to unpack the list of column names into separate arguments for groupBy
df_duplicates = spark.createDataFrame(df).groupBy(*df.columns).agg(count("*").alias("count")).filter("count > 1")
display(df_duplicates)

transaction_id,pharmacy_id,product_id,customer_id,transaction_date,quantity,total_amount,payment_method,count


In [0]:
df.count()

transaction_id      12
pharmacy_id         12
product_id          12
customer_id         12
transaction_date    12
quantity            12
total_amount        12
payment_method      12
dtype: int64

## Transformando em spark dataframe

In [0]:
# Transformando em spark dataframe
df_spark = spark.createDataFrame(df)

# Usando o printschema para verificar se o schema está correto
df_spark.printSchema()

root
 |-- transaction_id: long (nullable = true)
 |-- pharmacy_id: long (nullable = true)
 |-- product_id: long (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- transaction_date: string (nullable = true)
 |-- quantity: long (nullable = true)
 |-- total_amount: long (nullable = true)
 |-- payment_method: string (nullable = true)



## Limpar Diretório


In [0]:
#Limpar o Diretório antes de gravar arquivo
directory = "/mnt/temp/ia/databricks_temp/tratados"

# Listar todos os arquivos no diretório
files = dbutils.fs.ls(directory)

# Iterar sobre os arquivos e remover cada um deles
for file in files:
    dbutils.fs.rm(file.path, recurse=True)

# Verificar se o diretório está vazio
remaining_files = dbutils.fs.ls(directory)
if len(remaining_files) == 0:
    print(f'Todos os arquivos no diretório {directory} foram removidos.')
else:
    print(f'Ainda existem arquivos no diretório {directory}.')

Todos os arquivos no diretório /mnt/temp/ia/databricks_temp/tratados foram removidos.


## Salvar CSV

In [0]:
import os
from pyspark.sql import SparkSession


# Verificar se o diretório já existe e, se existir, deletar
if os.path.exists(caminho_arquivo_csv):
    # Remove the existing directory
    try:
        os.system(f'rm -r {caminho_arquivo_csv}')
        print("Existing CSV directory removed successfully.")
    except Exception as e:
        print("Error removing existing CSV directory:", str(e))

# Salvar o DataFrame no formato CSV em um único arquivo
try:
    df_spark.coalesce(1).write.csv(caminho_arquivo_csv, header=True, mode="overwrite")
    print("DataFrame saved successfully in CSV format.")
except Exception as e:
    print("Error saving DataFrame:", str(e))


DataFrame saved successfully in CSV format.



## Renomeando o Arquivo para nome apropriado para input dados 



In [0]:
import os

# Diretório onde os arquivos estão localizados
directory = caminho_arquivo_csv

# Listar arquivos no diretório
files = dbutils.fs.ls(directory)

# Iterar sobre os arquivos
for file in files:
    if file.name.endswith('.csv'):
        # Armazenar o nome do arquivo sem a extensão em uma variável
        file_name_with_extension = file.name
        file_name_without_extension = os.path.splitext(file_name_with_extension)[0]
        
        # Exibir o nome do arquivo com e sem extensão
        print(f'Nome do arquivo com extensão: {file_name_with_extension}')
        print(f'Nome do arquivo sem extensão: {file_name_without_extension}')
        
        # Caminho completo do arquivo original
        source_path = os.path.join(directory, file_name_with_extension)
        
        # Novo caminho com o novo nome
        destination_path = os.path.join(directory, "brahma.csv")
        
        # Renomear o arquivo
        dbutils.fs.mv(source_path, destination_path)
        print(f'Arquivo renomeado para: {destination_path}')

Nome do arquivo com extensão: part-00000-tid-1093700173197635299-268b5a7e-be92-4ab6-b14c-81b457a3738b-391-1-c000.csv
Nome do arquivo sem extensão: part-00000-tid-1093700173197635299-268b5a7e-be92-4ab6-b14c-81b457a3738b-391-1-c000
Arquivo renomeado para: /mnt/temp/ia/databricks_temp/tratados/brahma.csv
