In [25]:
!pip install pyspark



In [26]:
# Importações principais do PySpark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import regexp_replace, col

# Funções específicas do pyspark.sql.functions
from pyspark.sql.functions import (
    col, lit, to_date, concat_ws, concat, split, lpad,
    mean, stddev, when, sum as spark_sum, round
)

from pyspark.sql.functions import col, split, lpad, to_date, concat, lit, format_string

In [27]:
spark = SparkSession.builder.getOrCreate()

In [28]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [29]:
#dataframe
df = spark.read.csv('/content/drive/MyDrive/Projeto de Parceria  Semantix/ipeadata_selic.csv', header=True, inferSchema=True)

In [30]:
#Limpeza e pré-processamento dos dado

In [31]:
#visualizar dados
df.show()

+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+
|   Data|Taxa de juros - Selic - fixada pelo Copom - (% a.a.) - Banco Central do Brasil- Boletim- Seção mercado financeiro e de capitais (Bacen/Boletim/M. Finan.) - BM366_TJOVER366| _c2|
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+
|1996.07|                                                                                                                                                                       NULL|NULL|
|1996.08|                                                                                                                                                                    25.3401|NULL|
|1996.09|                                                        

In [32]:
#tipo de dados
df.printSchema()

root
 |-- Data: double (nullable = true)
 |-- Taxa de juros - Selic - fixada pelo Copom - (% a.a.) - Banco Central do Brasil- Boletim- Seção mercado financeiro e de capitais (Bacen/Boletim/M. Finan.) - BM366_TJOVER366: double (nullable = true)
 |-- _c2: string (nullable = true)



In [33]:
#trocando nome valor para selic
df = df.withColumnRenamed("Taxa de juros - Selic - fixada pelo Copom - (% a.a.) - Banco Central do Brasil- Boletim- Seção mercado financeiro e de capitais (Bacen/Boletim/M. Finan.) - BM366_TJOVER366", "selic_mes")
df.select( "selic_mes").show(5)


+---------+
|selic_mes|
+---------+
|     NULL|
|  25.3401|
|  25.0453|
|  24.1644|
|  23.5803|
+---------+
only showing top 5 rows



In [34]:
#apagar coluna _c2
df = df.drop("_c2")

In [35]:
df.show(20)

+-------+---------+
|   Data|selic_mes|
+-------+---------+
|1996.07|     NULL|
|1996.08|  25.3401|
|1996.09|  25.0453|
| 1996.1|  24.1644|
|1996.11|  23.5803|
|1996.12|  22.9988|
|1997.01|  22.4197|
|1997.02|  21.8432|
|1997.03|  21.2691|
|1997.04|  20.6976|
|1997.05|  20.6976|
|1997.06|  20.6976|
|1997.07|  20.6976|
|1997.08|  20.6976|
|1997.09|  20.6976|
| 1997.1|  43.4089|
|1997.11|  20.6976|
|1997.12|  40.9238|
|1998.01|     34.5|
|1998.02|     34.5|
+-------+---------+
only showing top 20 rows



In [36]:

# Garantir que a coluna Data é formatada como string com duas casas decimais SEM vírgulas
df = df.withColumn("data_str", format_string("%.2f", col("Data")))

# Extrair ano e mês
df = df.withColumn("ano", split(col("data_str"), "\.").getItem(0))
df = df.withColumn("mes", lpad(split(col("data_str"), "\.").getItem(1), 2, "0"))

# Criar coluna no formato yyyy-MM-dd
df = df.withColumn(
    "data_formatada",
    to_date(concat(col("ano"), lit("-"), col("mes"), lit("-01")), "yyyy-MM-dd")
)

# Visualizar resultado
df.show(20)



+-------+---------+--------+----+---+--------------+
|   Data|selic_mes|data_str| ano|mes|data_formatada|
+-------+---------+--------+----+---+--------------+
|1996.07|     NULL| 1996.07|1996| 07|    1996-07-01|
|1996.08|  25.3401| 1996.08|1996| 08|    1996-08-01|
|1996.09|  25.0453| 1996.09|1996| 09|    1996-09-01|
| 1996.1|  24.1644| 1996.10|1996| 10|    1996-10-01|
|1996.11|  23.5803| 1996.11|1996| 11|    1996-11-01|
|1996.12|  22.9988| 1996.12|1996| 12|    1996-12-01|
|1997.01|  22.4197| 1997.01|1997| 01|    1997-01-01|
|1997.02|  21.8432| 1997.02|1997| 02|    1997-02-01|
|1997.03|  21.2691| 1997.03|1997| 03|    1997-03-01|
|1997.04|  20.6976| 1997.04|1997| 04|    1997-04-01|
|1997.05|  20.6976| 1997.05|1997| 05|    1997-05-01|
|1997.06|  20.6976| 1997.06|1997| 06|    1997-06-01|
|1997.07|  20.6976| 1997.07|1997| 07|    1997-07-01|
|1997.08|  20.6976| 1997.08|1997| 08|    1997-08-01|
|1997.09|  20.6976| 1997.09|1997| 09|    1997-09-01|
| 1997.1|  43.4089| 1997.10|1997| 10|    1997-

In [37]:
df = df.drop("data_str",)
# Reordenar as colunas: Data, ano, mes, selic_mes
df = df.select("data_formatada","Data", "ano", "mes", "selic_mes")


df.show(5)

+--------------+-------+----+---+---------+
|data_formatada|   Data| ano|mes|selic_mes|
+--------------+-------+----+---+---------+
|    1996-07-01|1996.07|1996| 07|     NULL|
|    1996-08-01|1996.08|1996| 08|  25.3401|
|    1996-09-01|1996.09|1996| 09|  25.0453|
|    1996-10-01| 1996.1|1996| 10|  24.1644|
|    1996-11-01|1996.11|1996| 11|  23.5803|
+--------------+-------+----+---+---------+
only showing top 5 rows



In [38]:
#volume de dados
df.count()

346

In [39]:
# Converter mes para inteiro (se estiver como string)
df = df.withColumn("mes", col("mes").cast("int"))

In [40]:
#Filtrar os dados de dezembro (mes == 12)
selic_dezembro = df.filter(col("mes") == 12).select(
    col("ano"),
    col("selic_mes").alias("selic_ano")
)

#Fazer join do DataFrame original com o DataFrame de dezembro, pelo ano
df = df.join(selic_dezembro, on="ano", how="left")


In [41]:
#contar dados nulos
from pyspark.sql.functions import col, sum, when

df.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c)
    for c in df.columns
]).show()


+---+--------------+----+---+---------+---------+
|ano|data_formatada|Data|mes|selic_mes|selic_ano|
+---+--------------+----+---+---------+---------+
|  0|             0|   0|  0|        2|        5|
+---+--------------+----+---+---------+---------+



In [42]:
#visualizar dados nulos
df.filter(col("selic_ano").isNull()).show()

+----+--------------+-------+---+---------+---------+
| ano|data_formatada|   Data|mes|selic_mes|selic_ano|
+----+--------------+-------+---+---------+---------+
|2025|    2025-01-01|2025.01|  1|    13.25|     NULL|
|2025|    2025-02-01|2025.02|  2|    13.25|     NULL|
|2025|    2025-03-01|2025.03|  3|    14.25|     NULL|
|2025|    2025-04-01|2025.04|  4|    14.25|     NULL|
|2025|    2025-05-01|2025.05|  5|     NULL|     NULL|
+----+--------------+-------+---+---------+---------+



In [43]:
# Verificar linhas duplicadas
linhas_duplicadas = df.groupBy(df.columns).count().filter(col("count") > 1)

# Mostrar as duplicadas, se existirem
linhas_duplicadas.show()

+---+--------------+----+---+---------+---------+-----+
|ano|data_formatada|Data|mes|selic_mes|selic_ano|count|
+---+--------------+----+---+---------+---------+-----+
+---+--------------+----+---+---------+---------+-----+



In [44]:
#tipo de dados
df.printSchema()

root
 |-- ano: string (nullable = true)
 |-- data_formatada: date (nullable = true)
 |-- Data: double (nullable = true)
 |-- mes: integer (nullable = true)
 |-- selic_mes: double (nullable = true)
 |-- selic_ano: double (nullable = true)



In [45]:
#periodo_economico: Categorização dos períodos econômicos brasileiros

df = df.withColumn(
    "periodo_economico",
    when(col("ano") <= 1986, "Pré-Plano Cruzado")
    .when((col("ano") >= 1987) & (col("ano") <= 1993), "Entre-Planos")
    .when((col("ano") >= 1994) & (col("ano") <= 1999), "Plano Real")
    .when((col("ano") >= 2000) & (col("ano") <= 2010), "Consolidação do Real")
    .when((col("ano") >= 2011) & (col("ano") <= 2014), "Nova Matriz Econômica")
    .when((col("ano") >= 2015) & (col("ano") <= 2016), "Crise Econômica")
    .when((col("ano") >= 2017) & (col("ano") <= 2019), "Recuperação e Reformas")
    .when((col("ano") >= 2020) & (col("ano") <= 2021), "Pandemia e Choques Globais")
    .when((col("ano") >= 2022) & (col("ano") <= 2024), "Inflação e Juros Altos")
    .otherwise("Fora da classificação")
)
df.select("ano", "periodo_economico").show()

+----+-----------------+
| ano|periodo_economico|
+----+-----------------+
|1996|       Plano Real|
|1996|       Plano Real|
|1996|       Plano Real|
|1996|       Plano Real|
|1996|       Plano Real|
|1996|       Plano Real|
|1997|       Plano Real|
|1997|       Plano Real|
|1997|       Plano Real|
|1997|       Plano Real|
|1997|       Plano Real|
|1997|       Plano Real|
|1997|       Plano Real|
|1997|       Plano Real|
|1997|       Plano Real|
|1997|       Plano Real|
|1997|       Plano Real|
|1997|       Plano Real|
|1998|       Plano Real|
|1998|       Plano Real|
+----+-----------------+
only showing top 20 rows



In [46]:
#ciclo_economico: Classificação do momento econômico (expansão, contração, etc.)


df = df.withColumn(
    "ciclo_economico",
    when(col("ano") <= 1986, "Contração")
    .when((col("ano") >= 1987) & (col("ano") <= 1993), "Contração")
    .when((col("ano") >= 1994) & (col("ano") <= 1999), "Expansão")
    .when((col("ano") >= 2000) & (col("ano") <= 2010), "Expansão")
    .when((col("ano") >= 2011) & (col("ano") <= 2014), "Contração")
    .when((col("ano") >= 2015) & (col("ano") <= 2016), "Contração")
    .when((col("ano") >= 2017) & (col("ano") <= 2019), "Expansão")
    .when((col("ano") >= 2020) & (col("ano") <= 2021), "Contração")
    .when((col("ano") >= 2022) & (col("ano") <= 2024), "Estagnação/Contração")
    .otherwise("Fora da classificação")
)

df.select("ano", "ciclo_economico").show()

+----+---------------+
| ano|ciclo_economico|
+----+---------------+
|1996|       Expansão|
|1996|       Expansão|
|1996|       Expansão|
|1996|       Expansão|
|1996|       Expansão|
|1996|       Expansão|
|1997|       Expansão|
|1997|       Expansão|
|1997|       Expansão|
|1997|       Expansão|
|1997|       Expansão|
|1997|       Expansão|
|1997|       Expansão|
|1997|       Expansão|
|1997|       Expansão|
|1997|       Expansão|
|1997|       Expansão|
|1997|       Expansão|
|1998|       Expansão|
|1998|       Expansão|
+----+---------------+
only showing top 20 rows



In [47]:
df.show(5)

+----+--------------+-------+---+---------+---------+-----------------+---------------+
| ano|data_formatada|   Data|mes|selic_mes|selic_ano|periodo_economico|ciclo_economico|
+----+--------------+-------+---+---------+---------+-----------------+---------------+
|1996|    1996-07-01|1996.07|  7|     NULL|  22.9988|       Plano Real|       Expansão|
|1996|    1996-08-01|1996.08|  8|  25.3401|  22.9988|       Plano Real|       Expansão|
|1996|    1996-09-01|1996.09|  9|  25.0453|  22.9988|       Plano Real|       Expansão|
|1996|    1996-10-01| 1996.1| 10|  24.1644|  22.9988|       Plano Real|       Expansão|
|1996|    1996-11-01|1996.11| 11|  23.5803|  22.9988|       Plano Real|       Expansão|
+----+--------------+-------+---+---------+---------+-----------------+---------------+
only showing top 5 rows



In [48]:
#salvar arquivo

In [49]:
#DataFrame em um único CSV
df.coalesce(1) \
  .write \
  .option("header", True) \
  .mode("overwrite") \
  .csv("/content/drive/MyDrive/Projeto de Parceria  Semantix/selic_csv_temp")


In [50]:
#Mover e renomear o arquivo .csv
import os
import shutil

temp_path = "/content/drive/MyDrive/Projeto de Parceria  Semantix/selic_csv_temp"
final_csv_path = "/content/drive/MyDrive/Projeto de Parceria  Semantix/selic_final.csv"

# Encontrar o part-xxxxx.csv e mover
for file in os.listdir(temp_path):
    if file.startswith("part-") and file.endswith(".csv"):
        shutil.move(f"{temp_path}/{file}", final_csv_path)
        break


In [51]:
#Copiar o arquivo para o Colab para gerar o link de download:
shutil.copy(final_csv_path, "/content/selic_final.csv")


'/content/selic_final.csv'

In [52]:
from google.colab import files
files.download("/content/selic_final.csv")


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [53]:
spark.stop()