In [81]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [2]:
# criar spark session
spark = SparkSession \
    .builder \
    .config('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.0,org.postgresql:postgresql:42.2.20') \
    .config('spark.hadoop.fs.s3a.access.key', '') \
    .config('spark.hadoop.fs.s3a.secret.key', '') \
    .appName('spark etl - transferencia') \
    .getOrCreate()

# ler arquivos no s3
df = spark.read.csv('s3a://custos-stn-bucket/transferencia/transferencia_0.csv', header=True, inferSchema=True)

In [4]:
# schema
df.printSchema()

root
 |-- co_natureza_juridica: integer (nullable = true)
 |-- ds_natureza_juridica: string (nullable = true)
 |-- co_organizacao_n1: integer (nullable = true)
 |-- ds_organizacao_n1: string (nullable = true)
 |-- co_organizacao_n2: integer (nullable = true)
 |-- ds_organizacao_n2: string (nullable = true)
 |-- co_organizacao_n3: integer (nullable = true)
 |-- ds_organizacao_n3: string (nullable = true)
 |-- an_lanc: integer (nullable = true)
 |-- me_lanc: integer (nullable = true)
 |-- co_esfera_orcamentaria: integer (nullable = true)
 |-- ds_esfera_orcamentaria: string (nullable = true)
 |-- co_modalidade_aplicacao: integer (nullable = true)
 |-- ds_modalidade_aplicacao: string (nullable = true)
 |-- co_resultado_eof: integer (nullable = true)
 |-- ds_resultado_eof: string (nullable = true)
 |-- va_custo_transferencias: double (nullable = true)



In [5]:
# contagem de linhas
df.count()

250

In [6]:
# printar primeiras 5 linhas
df.show(5)

+--------------------+--------------------+-----------------+--------------------+-----------------+--------------------+-----------------+-----------------+-------+-------+----------------------+----------------------+-----------------------+-----------------------+----------------+--------------------+-----------------------+
|co_natureza_juridica|ds_natureza_juridica|co_organizacao_n1|   ds_organizacao_n1|co_organizacao_n2|   ds_organizacao_n2|co_organizacao_n3|ds_organizacao_n3|an_lanc|me_lanc|co_esfera_orcamentaria|ds_esfera_orcamentaria|co_modalidade_aplicacao|ds_modalidade_aplicacao|co_resultado_eof|    ds_resultado_eof|va_custo_transferencias|
+--------------------+--------------------+-----------------+--------------------+-----------------+--------------------+-----------------+-----------------+-------+-------+----------------------+----------------------+-----------------------+-----------------------+----------------+--------------------+-----------------------+
|         

In [7]:
# criar view para spark sql
df.createOrReplaceTempView("transferencias")

In [61]:

df_tf = spark.sql("""
                  select *
                  from transferencias
                  """)

df_tf.printSchema()
df_tf.show()

root
 |-- co_natureza_juridica: integer (nullable = true)
 |-- ds_natureza_juridica: string (nullable = true)
 |-- co_organizacao_n1: integer (nullable = true)
 |-- ds_organizacao_n1: string (nullable = true)
 |-- co_organizacao_n2: integer (nullable = true)
 |-- ds_organizacao_n2: string (nullable = true)
 |-- co_organizacao_n3: integer (nullable = true)
 |-- ds_organizacao_n3: string (nullable = true)
 |-- an_lanc: integer (nullable = true)
 |-- me_lanc: integer (nullable = true)
 |-- co_esfera_orcamentaria: integer (nullable = true)
 |-- ds_esfera_orcamentaria: string (nullable = true)
 |-- co_modalidade_aplicacao: integer (nullable = true)
 |-- ds_modalidade_aplicacao: string (nullable = true)
 |-- co_resultado_eof: integer (nullable = true)
 |-- ds_resultado_eof: string (nullable = true)
 |-- va_custo_transferencias: double (nullable = true)

+--------------------+--------------------+-----------------+--------------------+-----------------+--------------------+-----------------+-

In [62]:
df_tf.head(1) # Mostra o cabeçalho da primeira linha

[Row(co_natureza_juridica=4, ds_natureza_juridica='AUTARQUIA', co_organizacao_n1=244, ds_organizacao_n1='MINISTERIO DA EDUCACAO', co_organizacao_n2=100905, ds_organizacao_n2='INSTITUTO FEDERAL DE EDUCACAO, CIENCIA E TECNOLOGIA DA PARAIBA', co_organizacao_n3=-9, ds_organizacao_n3='NAO SE APLICA', an_lanc=2019, me_lanc=10, co_esfera_orcamentaria=1, ds_esfera_orcamentaria='ORCAMENTO FISCAL', co_modalidade_aplicacao=80, ds_modalidade_aplicacao='TRANSFERENCIAS AO EXTERIOR', co_resultado_eof=2, ds_resultado_eof='PRIMARIO DISCRICIONARIO', va_custo_transferencias=0.0)]

In [69]:
df_tf.describe(["co_natureza_juridica","co_organizacao_n1","co_organizacao_n2","co_organizacao_n3","an_lanc","me_lanc"]).show()

+-------+--------------------+-----------------+-----------------+-----------------+-------+------------------+
|summary|co_natureza_juridica|co_organizacao_n1|co_organizacao_n2|co_organizacao_n3|an_lanc|           me_lanc|
+-------+--------------------+-----------------+-----------------+-----------------+-------+------------------+
|  count|                 250|              250|              250|              250|    250|               250|
|   mean|                3.08|          544.416|        32686.664|         9768.148| 2019.0|             8.088|
| stddev|  1.0611806715497052|641.1477307098398|45837.42193743651|25053.05149355365|    0.0|3.1559620380388247|
|    min|                   1|              244|                1|               -9|   2019|                 2|
|    max|                   4|             1988|           100908|           115227|   2019|                12|
+-------+--------------------+-----------------+-----------------+-----------------+-------+------------

In [70]:
df_tf.describe(["co_esfera_orcamentaria","co_modalidade_aplicacao","co_resultado_eof","va_custo_transferencias"]).show()

+-------+----------------------+-----------------------+------------------+-----------------------+
|summary|co_esfera_orcamentaria|co_modalidade_aplicacao|  co_resultado_eof|va_custo_transferencias|
+-------+----------------------+-----------------------+------------------+-----------------------+
|  count|                   250|                    250|               250|                    250|
|   mean|                 1.256|                 51.776|             2.312|      5077482.831393072|
| stddev|   0.43729728522213634|     12.827157435359014|0.9687154579928082|    3.615109248360504E7|
|    min|                     1|                     30|                 2|        -1.6114851376E8|
|    max|                     2|                     80|                 6|         4.0122152418E8|
+-------+----------------------+-----------------------+------------------+-----------------------+



In [18]:
df_tf.select('ds_organizacao_n1').distinct().rdd.map(lambda r: r[0]).collect()
# Util para conhecer classes discretas de uma coluna categórica.
# Neste caso mostra as organizacoes presentes nestes dados

['MINISTERIO DA SAUDE',
 'MINISTERIO DA EDUCACAO',
 'MINISTERIO DA CIENCIA, TECNOLOGIA, INOVACOES E COMUNICACOES']

In [19]:
df_tf.select('ds_modalidade_aplicacao').distinct().rdd.map(lambda r: r[0]).collect()
# Util para conhecer classes discretas de uma coluna categórica.
# Neste caso mostra as modalidades de aplicacao presentes nestes dados

['TRANSFERENCIAS AO EXTERIOR',
 'TRANSF.CONSORC.PUB.MEDIANTE CONTRATO RATEIO',
 'TRANSFER.A INST. PRIVADAS COM FINS LUCRATIVOS',
 'TRANSF. A INSTITUICOES MULTIGOVERNAMENTAIS',
 'TRANSFERENCIAS A MUNICIPIOS - FUNDO A FUNDO',
 'TRANSFERENCIAS A MUNICIPIOS',
 'TRANSFER. A ESTADOS E AO DISTRITO FEDERAL',
 'TRANSF. A INST. PRIVADAS S/ FINS LUCRATIVOS',
 'EXECUCAO ORCAMENTARIA DELEGADA AOS ESTADOS/DF',
 'TRANSFER. A ESTADOS E DF - FUNDO A FUNDO']

In [79]:
df_tf = spark.sql("""
                  select distinct *
                  from transferencias
                  """)
df_tf.count() # Verificar se tem linhas duplicadas

# Retorna 250, isso quer dizer que as linhas sao todas diferentes

250

In [74]:
df_tf.head(1)

[Row(co_natureza_juridica=2, ds_natureza_juridica='FUNDACAO PUBLICA', co_organizacao_n1=244, ds_organizacao_n1='MINISTERIO DA EDUCACAO', co_organizacao_n2=94739, ds_organizacao_n2='FUNDACAO UNIVERSIDADE FEDERAL DO PAMPA', co_organizacao_n3=-9, ds_organizacao_n3='NAO SE APLICA', an_lanc=2019, me_lanc=3, co_esfera_orcamentaria=1, ds_esfera_orcamentaria='ORCAMENTO FISCAL', co_modalidade_aplicacao=50, ds_modalidade_aplicacao='TRANSF. A INST. PRIVADAS S/ FINS LUCRATIVOS', co_resultado_eof=2, ds_resultado_eof='PRIMARIO DISCRICIONARIO', va_custo_transferencias=0.0)]

In [75]:
df_tf_unique = spark.sql("""
                  select co_natureza_juridica || '-' || co_organizacao_n1 || '-' || co_organizacao_n2 || '-' || co_organizacao_n3 || '-' ||va_custo_transferencias || '-' || an_lanc ||'-' || me_lanc || '-' || co_esfera_orcamentaria || '-' || co_modalidade_aplicacao || '-'|| co_resultado_eof, 
                  count(co_natureza_juridica || '-' || co_organizacao_n1 || '-' || co_organizacao_n2 || '-' || co_organizacao_n3 || '-' ||va_custo_transferencias || '-' || an_lanc ||'-' || me_lanc || '-' || co_esfera_orcamentaria || '-' || co_modalidade_aplicacao || '-'|| co_resultado_eof)
                  from transferencias
                  group by 1
                  having count(1) > 1
                  """)
# Encontrando a chave primária através de:
# 1- co_natureza_juridica
# 2- co_organizacao_n1
# 3- co_organizacao_n2
# 4- co_organizacao_n3
# 5- va_custo_transferencias
# 6- an_lanc
# 7- me_lanc
# 8- co_esfera_orcamentaria
# 9- co_modalidade_aplicacao
# 10 - co_resultado_eof

df_tf_unique.count()

0

In [85]:
df_tf.groupBy(["co_natureza_juridica","ds_natureza_juridica"]).count().sort("count", ascending=True).show()
# Mostra a quantidade de descricao por natureza juridica

+--------------------+--------------------+-----+
|co_natureza_juridica|ds_natureza_juridica|count|
+--------------------+--------------------+-----+
|                   1|     EMPRESA PUBLICA|   20|
|                   3|ADMINISTRACAO DIRETA|   28|
|                   2|    FUNDACAO PUBLICA|   71|
|                   4|           AUTARQUIA|  131|
+--------------------+--------------------+-----+



In [None]:
df.write \
  .format("jdbc") \
  .option("url", "jdbc:postgresql://<database-ip>:<port>/custos_stn") \
  .option("dbtable", "custos_stn.transferencias") \
  .option("user", "postgres") \
  .option("driver", "org.postgresql.Driver") \
  .option("password", "postgres")\
  .save()