In [12]:
import pyspark as ps
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, sum, explode, collect_list
import time


In [20]:
#Iniciar a sessão
spark = SparkSession.builder.appName("Exemplo Processamento Distribuido").getOrCreate()

#carregar dados
df = spark.read.csv("./br_rf_arrecadacao_uf.csv",header=True, inferSchema=True)

#Mostrar esquema dos dados
print("Apresentando o Schema")
df.printSchema()

#Mostrar dados 
print("Apresentando os dados")
df.show()

#operações básicas
filtro_df = df.filter(col("ipi_automoveis") > 10000)

print("Apresentando o filtro")
filtro_df.show()

#Agrupando dados
print("Dados Agrupados")
dadosAgrupados_df = df.groupBy("sigla_uf").agg(sum("irpf").alias("total_irpf"))
dadosAgrupados_df.show()

#operações complexar para comparação
def operacao_complexa(df):
    
    start_time = time.time()
    result = df.groupBy("ano","sigla_uf").agg(sum("irpf").alias("total_irpf"), avg("ipi_fumo").alias("media_ipi_fumo")).orderBy("ano","sigla_uf")
    result.count()
    end_time = time.time() 
    return end_time - start_time

spark_time = operacao_complexa(df)
print(f"Tempo de execução do Spark: {spark_time} segundos")


#comparação com pandas
pandas_df = df.toPandas()
def pandas_operacao_complexa(df):
    start_time = time.time()
    result = df.groupBy("ano","sigla_uf").agg(sum("irpf").alias("total_irpf"), avg("ipi_fumo").alias("media_ipi_fumo")).orderBy("ano","sigla_uf")
    result.count()
    end_time = time.time() 
    return end_time - start_time

pandas_time = pandas_operacao_complexa(df)
print(f"Tempo de execução do Pandas: {pandas_time} segundos")

#visualizar plano de execução
df.groupBy("ano").agg(sum("irpf").alias("totalr")).explain()

#encerrar sessão spark
spark.stop()






Apresentando o Schema
root
 |-- ano: integer (nullable = true)
 |-- mes: integer (nullable = true)
 |-- sigla_uf: string (nullable = true)
 |-- imposto_importacao: double (nullable = true)
 |-- imposto_exportacao: double (nullable = true)
 |-- ipi_fumo: double (nullable = true)
 |-- ipi_bebidas: double (nullable = true)
 |-- ipi_automoveis: double (nullable = true)
 |-- ipi_importacoes: double (nullable = true)
 |-- ipi_outros: double (nullable = true)
 |-- irpf: double (nullable = true)
 |-- irpj_entidades_financeiras: double (nullable = true)
 |-- irpj_demais_empresas: double (nullable = true)
 |-- irrf_rendimentos_trabalho: double (nullable = true)
 |-- irrf_rendimentos_capital: double (nullable = true)
 |-- irrf_remessas_exterior: double (nullable = true)
 |-- irrf_outros_rendimentos: double (nullable = true)
 |-- iof: double (nullable = true)
 |-- itr: double (nullable = true)
 |-- ipmf: integer (nullable = true)
 |-- cpmf: double (nullable = true)
 |-- cofins: double (nullable = 