In [None]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

# inicia a sessão do spark
spark = SparkSession.builder\
        .appName('sniversariantes')\
        .getOrCreate()

25/08/25 07:49:15 WARN Utils: Your hostname, inet resolves to a loopback address: 127.0.1.1; using 10.11.94.116 instead (on interface enp0s3)
25/08/25 07:49:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/25 07:49:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/25 07:49:17 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/08/25 07:49:17 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [21]:
# exibe a sessão do spark
spark

In [22]:
# carrega o arquivo csv que vai servir de base de dados do projeto
df = spark.read.csv('./aniversariantes.csv', sep=';', header=True, encoding='utf-8')

In [23]:
# lista de colunas que terão seus nomes substituidos
listColumns = ["r.a.",
"aluno",
"periodoLetivo",
"codigoCurso",
"curso",
"codigoHabilitacao",
"habilitacao",
"turma",
"situacaoMatricula",
"codigoMatrizCurricular",
"tipoMatricula",
"matrizCurricular",
"turno",
"dataMatricula",
"coeficienteRendimento",
"codigoInstituicaoDestino",
"motivoTransferencia",
"anoReferencia",
"codigoUsuario",
"e-mail",
"dataSolicitacaoAlteracao",
"resultado",
"telefone",
"polo",
"codigoTipoCurso",
"sexo",
"nascimento",
"gestorPolo"
]

In [None]:
# troca os nomes das colunas do dataframe pelos nomes que tem na lista listColumns
df = df.toDF(*listColumns)

In [25]:
# lista de colunas que serão dropadas
dropColumns = ["r.a.",
"periodoLetivo",
"codigoCurso",
"codigoHabilitacao",
"habilitacao",
"situacaoMatricula",
"codigoMatrizCurricular",
"tipoMatricula",
"matrizCurricular",
"turno",
"dataMatricula",
"coeficienteRendimento",
"codigoInstituicaoDestino",
"motivoTransferencia",
"anoReferencia",
"codigoUsuario",
"e-mail",
"dataSolicitacaoAlteracao",
"resultado",
"telefone",
"polo",
"codigoTipoCurso",
"sexo",
"gestorPolo"
]

In [26]:
# dropa as colunas da lista dropColumns
for col in dropColumns:
    df = df.drop(col)

In [27]:
from pyspark.sql.functions import col

# remoção das linhas com valor pensionato da coluna curso
df = df.filter(~col('curso').like('%Pensionato%'))

In [28]:
from pyspark.sql.functions import trim

# remoção de espaços nos inicios e finais das linhas da coluna aluno
df = df.withColumn('aluno', trim(df['aluno']))
df = df.dropDuplicates(['aluno'])

In [29]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import unicodedata

# Função criada para remoção de acentos, pois quando eu usei dropDuplicates ele estava apagando células que não deveria
def remover_acentos(texto):
    if texto is not None:
        return unicodedata.normalize('NFKD', texto).encode('ASCII', 'ignore').decode('ASCII')
    return None

remover_acentos_udf = udf(remover_acentos, StringType())

In [30]:
# remoção de acentos e criação de uma coluna para os dados sem acentos
df = df.withColumn('alunos', remover_acentos_udf(df['aluno']))

In [31]:
# remoção dos duplicados com base na coluna criada sem os acentos
df = df.dropDuplicates(['alunos'])

In [32]:
# remoção da coluna usada para aplicação dos dropDuplicates
df = df.drop('alunos')

In [33]:
from pyspark.sql.functions import when, substring

# adição das trocas de nomes da coluna turma dos nomes originais para os nomes dos anos de curso
df = df.withColumn(
    "turma",
    when(col("turma").like("%01%") | col("turma").like("%02%"), "1º ano")\
    .when(col("turma").like("%03%") | col("turma").like("%04%"), "2º ano")\
    .when(col("turma").like("%05%") | col("turma").like("%06%"), "3º ano")\
    .when(col("turma").like("%07%") | col("turma").like("%08%"), "4º ano")\
    .when(col("turma").like("%09%") | col("turma").like("%10%"), "5º ano")\
    .otherwise(col("turma"))
)

In [34]:
# pega a string de data no formato "dd/MM/yyyy", retira os 2 dígitos do mês que tem na string e transforma em inteiro
df = df.withColumn('nascimento', substring('nascimento', 4, 2).cast('int'))

In [35]:
# ordena a tabela pelas colunas de nascimento, curso, turma e aluno
df = df.orderBy('nascimento', 'curso', 'turma', 'aluno')

In [36]:
# lista para substituir os numeros da coluna nascimento pelos nomes dos meses
lista_meses = [
    "Janeiro",
    "Fevereiro",
    "Março",
    "Abril",
    "Maio",
    "Junho",
    "Julho",
    "Agosto",
    "Setembro",
    "Outubro",
    "Novembro",
    "Dezembro"
]

In [37]:
# troca o numero do mês de nascimento pelo indice do mês na lista lista_meses
df = df.withColumn("nascimento",
    when(col("nascimento") == 1, f"{lista_meses[0]}")\
    .when(col("nascimento") == 2, f"{lista_meses[1]}")\
    .when(col("nascimento") == 3, f"{lista_meses[2]}")\
    .when(col("nascimento") == 4, f"{lista_meses[3]}")\
    .when(col("nascimento") == 5, f"{lista_meses[4]}")\
    .when(col("nascimento") == 6, f"{lista_meses[5]}")\
    .when(col("nascimento") == 7, f"{lista_meses[6]}")\
    .when(col("nascimento") == 8, f"{lista_meses[7]}")\
    .when(col("nascimento") == 9, f"{lista_meses[8]}")\
    .when(col("nascimento") == 10, f"{lista_meses[9]}")\
    .when(col("nascimento") == 11, f"{lista_meses[10]}")\
    .when(col("nascimento") == 12, f"{lista_meses[11]}")\
    .otherwise(col("nascimento"))
)

In [38]:
# gravação do csv com os arquivos processados
df.write.csv(\
                path = './aniversariantes',
                sep = ';',
                header = True,
                mode = 'overwrite'
)