In [69]:
import findspark
import os

findspark.init(os.environ.get('SPARK_HOME'))

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as functions
from decimal import Decimal


In [70]:
spark = SparkSession.builder.appName("GastoMinisterio").getOrCreate()

In [71]:
df = spark.read.format("csv").option("header", True).option("delimiter", ";").option('encoding', 'windows-1252').csv('hdfs://localhost:9000/user/maycon/2019_Viagem.csv')   

In [72]:
to_value = lambda v : Decimal(v.replace(",", "."))

In [73]:
udf_to_value = functions.udf(to_value)

In [74]:
df2 = df.withColumn("value", udf_to_value(df["Valor passagens"]))

In [75]:
orgaos = df2.select("Nome do órgão superior").distinct()

In [76]:
df2.groupBy("Nome do órgão superior").agg(functions.max("value"), functions.sum("value"), functions.count("value"), functions.avg("value")).show(orgaos.count(), truncate = False)

+-----------------------------------------------------------+----------+--------------------+------------+------------------+
|Nome do órgão superior                                     |max(value)|sum(value)          |count(value)|avg(value)        |
+-----------------------------------------------------------+----------+--------------------+------------+------------------+
|Ministério do Meio Ambiente                                |989.92    |4754103.779999999   |11766       |404.0543753187149 |
|Ministério da Agricultura, Pecuária e Abastecimento        |998.83    |6569691.700000001   |18862       |348.30302725055674|
|Advocacia-Geral da União                                   |994.57    |978350.1899999995   |3696        |264.70513798701285|
|Ministério das Cidades                                     |0.00      |0.0                 |1           |0.0               |
|Ministério da Cultura                                      |3494.56   |5256.2              |2           |2628.1      

In [77]:
df2.write.mode('overwrite').csv("hdfs://localhost:9000/user/maycon/agrupamentoMinisterio2.csv")

In [78]:
df2.select('Nome do órgão superior', 'value').show(truncate = False)

+-----------------------------------------+-------+
|Nome do órgão superior                   |value  |
+-----------------------------------------+-------+
|Ministério da Educação                   |2875.92|
|Ministério da Educação                   |2420.48|
|Ministério da Educação                   |2694.58|
|Ministério da Defesa                     |1236.38|
|Ministério da Defesa                     |746.35 |
|Ministério da Defesa                     |1293.41|
|Ministério da Educação                   |986.15 |
|Ministério da Justiça e Segurança Pública|1492.03|
|Ministério da Justiça e Segurança Pública|1492.03|
|Ministério da Educação                   |672.60 |
|Ministério da Educação                   |2671.65|
|Ministério da Educação                   |0.00   |
|Ministério das Relações Exteriores       |3463.12|
|Ministério das Relações Exteriores       |5895.37|
|Ministério da Educação                   |5889.17|
|Ministério da Justiça e Segurança Pública|4304.31|
|Ministério 

In [79]:
tuplaOrgaosValor = df2.rdd.map(lambda x : (x["Nome do órgão superior"], x["value"]))

In [80]:
tuplaOrgaosValor.take(10)

[('Ministério da Educação', '2875.92'),
 ('Ministério da Educação', '2420.48'),
 ('Ministério da Educação', '2694.58'),
 ('Ministério da Defesa', '1236.38'),
 ('Ministério da Defesa', '746.35'),
 ('Ministério da Defesa', '1293.41'),
 ('Ministério da Educação', '986.15'),
 ('Ministério da Justiça e Segurança Pública', '1492.03'),
 ('Ministério da Justiça e Segurança Pública', '1492.03'),
 ('Ministério da Educação', '672.60')]