<a href="https://colab.research.google.com/github/kassiaoliveiraa/palestracampuspartynordeste/blob/main/palestracampusparty.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark
!pip install pyarrow
!pip install pandas



In [2]:
from pyspark.sql import SparkSession
import pandas as pd
import os
import shutil

spark = SparkSession.builder.appName("ETL").getOrCreate()

In [None]:
#Tamanho do Arquivo CSV

file_path = "/home/etl/processo_inicio_1990.csv"
file_size_bytes = os.path.getsize(file_path)
file_size_kb = file_size_bytes / 1024

print(f"O tamanho do arquivo é {file_size_kb:.2f} KB")

In [4]:
df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("sep", ";") \
    .load("/home/etl/processo_inicio_1990.csv")

df.show()

+----------+--------------------+--------------------+----------+---------+--------------------+----------+--------------------+-------+--------------------+--------------------+
|DTABERTURA|             NUMPROC|             ASSUNTO|   DTMOVIM|CODORIGEM|             DESCORI|CODDESTINO|          DESCDESTIN|MOVSTAT|         INTERESSADO|         PROCEDENCIA|
+----------+--------------------+--------------------+----------+---------+--------------------+----------+--------------------+-------+--------------------+--------------------+
|1993-04-13|00000.001433/1983...|        -        ...|1993-08-03| 20002230|SECAO DE PROTOCOL...|  36000000|PROCURADORIA FEDE...|      S|                 ...|                 ...|
|1992-11-30|00000.000519/1985...|        -        ...|1993-05-28| 20002230|SECAO DE PROTOCOL...|  36000000|PROCURADORIA FEDE...|      S|                 ...|                 ...|
|1988-11-14|00001.005652/1988...|        -        ...|1998-10-23|  2000000|DIRETORIA DE ADMI...|  2000300

In [5]:
df.createOrReplaceTempView("processos")

queryCount = """
SELECT count(*) AS count
FROM processos
"""

queryprocAno = """
SELECT count(*) AS count, DTABERTURA
FROM processos
GROUP BY DTABERTURA
"""

result = spark.sql(queryCount)

result.show()

+-----+
|count|
+-----+
|51621|
+-----+



In [9]:
df_1994 = df.filter(df["DTABERTURA"].contains("1993"))

df_1994.show()

df_Aposentados2000 = df.filter(df["ASSUNTO"].contains("APOSENTADORIA") & df["DTABERTURA"].contains("1989"))

df_Aposentados2000.show()

+----------+--------------------+--------------------+----------+---------+--------------------+----------+--------------------+-------+--------------------+--------------------+
|DTABERTURA|             NUMPROC|             ASSUNTO|   DTMOVIM|CODORIGEM|             DESCORI|CODDESTINO|          DESCDESTIN|MOVSTAT|         INTERESSADO|         PROCEDENCIA|
+----------+--------------------+--------------------+----------+---------+--------------------+----------+--------------------+-------+--------------------+--------------------+
|1993-04-13|00000.001433/1983...|        -        ...|1993-08-03| 20002230|SECAO DE PROTOCOL...|  36000000|PROCURADORIA FEDE...|      S|                 ...|                 ...|
|1993-05-26|25001.010173/1970...|029.    -OUTROS A...|1997-06-23| 65000000|INST.NAC.CONTROLE...|  39000000|ORGAO GOVERNAMENT...|      S|MONICA BRAGA TEIX...|                 ...|
|1993-04-26|25380.002190/ 193...|013.5   -INQUERIT...|2016-07-29| 20001210|SECAO DE PROTOCOL...|  2000123

In [41]:
df.write.parquet("/home/etl/processos_temp/")

In [None]:
temp_folder = '/home/etl/processos_temp/'
destination_folder = '/home/etl/procesosparquet/'

os.makedirs(destination_folder, exist_ok=True)

file_counter = 1

for file_name in os.listdir(temp_folder):
    if file_name.endswith(".parquet"):
        source_file = os.path.join(temp_folder, file_name)

        destination_file = os.path.join(destination_folder, f"processos_{file_counter}.parquet")

        try:
            shutil.move(source_file, destination_file)
            file_counter += 1
        except FileNotFoundError as e:
            print(f"Erro ao mover o arquivo {source_file}: {e}")
        except Exception as e:
            print(f"Erro inesperado: {e}")

try:
    shutil.rmtree(temp_folder)
except FileNotFoundError as e:
    print(f"Erro ao remover a pasta temporária: {e}")

print("Todos os arquivos foram salvos com sucesso e renomeados.")

In [None]:
df = spark.read \
    .format("parquet") \
    .load("/home/etl/procesosparquet")

df.count()

In [None]:
#Tamanho do Arquivo Parquet
def get_size_of_folder_in_kb(folder_path):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(folder_path):
        for filename in filenames:
            if filename.endswith(".parquet"):
                file_path = os.path.join(dirpath, filename)
                total_size += os.path.getsize(file_path)
    return total_size / 1024

folder_path = '/home/etl/procesosparquet'
total_size_kb = get_size_of_folder_in_kb(folder_path)

print(f"Tamanho total dos arquivos Parquet: {total_size_kb:.2f} KB")