In [73]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [74]:
spark = SparkSession.builder.appName('TransformationMonthSpark').getOrCreate()

In [75]:
df = spark.read.csv('storage/PRECOS SEMESTRAIS - AUTOMOTIVOS.csv', header=True, inferSchema=True, sep=';')

#Formata a data para recuperar e criar uma novas colunas no dataframe de ano e mês 
df = df.withColumn('Data Formatada', to_date(col('Data da Coleta'), 'MM/dd/yyyy'))
df = df.withColumn('Mes', month(col('Data Formatada')))
df = df.withColumn('Ano', year(col('Data Formatada')))

#Separa todos os meses e ano que existem no dataframe
distinct_month = df.select('Mes').distinct().collect()
distinct_year = df.select('Ano').distinct().collect()

#Realiza um laço para recuperar cada mês e segmentar os dados pelo mês
for row in distinct_month:
    month = row['Mes']
    if month is not None:
        df_month_filter = df.filter(col('Mes') == month)
        #Cria um nova coluna com a data de carga
        df_month_filter_full = df_month_filter.withColumn('Data da Carga ETL', current_date()) 
        #Tira a formatação PT-BR(,) e coloca (.) para realizar o cálculo de média
        df_month_filter_full = df_month_filter_full.withColumn('Valor de Venda', regexp_replace(col('Valor de Venda'), ',', '.'))

        #Realiza o cálculo da média
        df_month_filter_agg = df_month_filter_full.groupBy(['Data da Carga ETL', 'Mes','Ano']).agg(round(avg('Valor de Venda'), 2).alias('Média da Venda'))
    
        #Apresenta os dados que serão gravados nos arquivos
        df_month_filter_full.show()
        df_month_filter_agg.show()
        #Cria os arquivos, agregados e abertos
        df_month_filter_full.write.csv(f'storage/year-month/{year_path}/{str(month)}/full', header=True, mode="overwrite")
        df_month_filter_agg.write.csv(f'storage/year-month/{year_path}/{str(month)}/agg', header=True, mode="overwrite")

spark.stop()

Filtrando mês: 12
+--------------+--------------+-------------------+--------------------+-------------------+--------------------+----------+--------------------+---------------+---------+------------------+--------------+--------------+---------------+-----------------+-------------+--------------+---+----+-----------------+
|Regiao - Sigla|Estado - Sigla|          Municipio|             Revenda|    CNPJ da Revenda|         Nome da Rua|Numero Rua|         Complemento|         Bairro|      Cep|           Produto|Data da Coleta|Valor de Venda|Valor de Compra|Unidade de Medida|     Bandeira|Data Formatada|Mes| Ano|Data da Carga ETL|
+--------------+--------------+-------------------+--------------------+-------------------+--------------------+----------+--------------------+---------------+---------+------------------+--------------+--------------+---------------+-----------------+-------------+--------------+---+----+-----------------+
|             N|            AC|         RIO BRANC