# Importações básicas

In [42]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F


from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

# Iniciar a sessão Spark

In [43]:
spark = SparkSession.builder\
                    .config("spark.master","local[*]")\
                    .appName("Fontes_de_dados")\
                    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Módulo(s) de Obtenção de Dados

## Carga de Arquivos

### Criação do Esquema schemaCompras

In [44]:
# importe dos tipos de dados para criação do esquema manual
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, LongType, DateType, DoubleType, DecimalType

schemaCompra = StructType().add(StructField("nr_cont", IntegerType()))\
                           .add(StructField("obj", StringType()))\
                           .add(StructField("fund_leg", StringType()))\
                           .add(StructField("mod_comp", StringType()))\
                           .add(StructField("sit_cont", StringType()))\
                           .add(StructField("cod_org_sup", IntegerType()))\
                           .add(StructField("nm_org_sup", StringType()))\
                           .add(StructField("cod_org", IntegerType()))\
                           .add(StructField("nm_org", StringType()))\
                           .add(StructField("cod_ug", IntegerType()))\
                           .add(StructField("nm_ug", StringType()))\
                           .add(StructField("dt_ass_cont", DateType()))\
                           .add(StructField("dt_pub_dou", DateType()))\
                           .add(StructField("dt_ini_vig", DateType()))\
                           .add(StructField("dt_fim_vig", DateType()))\
                           .add(StructField("cnpj_contrtd", LongType()))\
                           .add(StructField("nm_contrtd", StringType()))\
                           .add(StructField("vl_ini_comp", StringType()))\
                           .add(StructField("vl_fim_comp", StringType()))\
                           .add(StructField("nr_lic", IntegerType()))\
                           .add(StructField("cod_ug_lic", IntegerType()))\
                           .add(StructField("nm_ug_lic", StringType()))\
                           .add(StructField("cod_mod_comp_lic", IntegerType()))\
                           .add(StructField("desc_mod_comp_lic", StringType()))
# embora a natureza dos campos seja decimal, o spark não consegue converter o padrão separado por virgula
# no momento da carga do dados, por isto os campos >>vl_ini_comp e vl_fim_comp<< foram configurados como string(texto)



### Carga do DataFrame

In [45]:
compraEsquemaManualDF = spark.read\
                             .format("csv")\
                             .option("sep",";")\
                             .option("header","true")\
                             .option("charset","iso-8859-1")\
                             .schema(schemaCompra)\
                             .option("dateFormat", "dd/MM/yyyy")\
                             .load("/home/aluno/_spark/dados/originais/compras_publicas_federal/compras/")

- exploração básica do dataframe carregado

In [46]:
compraEsquemaManualDF.show(5,100)
compraEsquemaManualDF.printSchema()

+-------+----------------------------------------------------------------------------------------------------+----------------------------------------------------------------+----------------------------+---------+-----------+----------------------------+-------+---------------------------------------------+------+-----------------------------------------+-----------+----------+----------+----------+--------------+------------------------------------------------------+-----------+-----------+--------+----------+-----------------------------------------+----------------+----------------------------+
|nr_cont|                                                                                                 obj|                                                        fund_leg|                    mod_comp| sit_cont|cod_org_sup|                  nm_org_sup|cod_org|                                       nm_org|cod_ug|                                    nm_ug|dt_ass_cont|dt_pub_dou|dt_ini_vig|dt

# Módulo(s) de Transformação de Dados

In [47]:
compraDF = compraEsquemaManualDF.withColumn("vl_ini_comp", regexp_replace(col("vl_ini_comp"), ",", ".").cast(DecimalType(20, 2)))\
                                .withColumn("vl_fim_comp", regexp_replace(col("vl_fim_comp"), ",", ".").cast(DecimalType(20, 2)))\
                                #.select(
                                #          "nr_cont",
                                #          "obj",
                                #          "mod_comp",
                                #          "cod_ug",
                                #          "nm_ug",
                                #          "dt_ini_vig",
                                #          "dt_fim_vig",
                                #          "cnpj_contrtd",
                                #          "nm_contrtd",
                                #          "vl_ini_comp",
                                #          "vl_fim_comp"
                                #       )

In [48]:
compraDF.show(3, False)
compraDF.printSchema()

+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------+--------------------------+---------+-----------+----------------------+-------+---------------------------------------------+------+-----------------------------------+-----------+----------+----------+----------+--------------+------------------------------------------------------+-----------+-----------+--------+----------+-----------------------------------+----------------+--------------------------+
|nr_cont|obj                                                                                                                                                                   |fund_leg                                                        |mod_comp                  |sit_cont |cod_org_sup|nm_org_sup            |cod_org|nm_org                       

## Criação de Dimensões

### Dimensão: Orgão Superior

In [49]:
dimOrgSupDF = compraDF.select(F.col("cod_org_sup").alias("cod_dim_org_sup"), "nm_org_sup")\
                      .distinct()\
                      .sort("cod_dim_org_sup")

dimOrgSupDF.show(truncate = False)

+---------------+---------------------------------------------+
|cod_dim_org_sup|nm_org_sup                                   |
+---------------+---------------------------------------------+
|15000          |Justiça do Trabalho                          |
|20000          |Presidência da República                     |
|22000          |Ministério da Agricultura, Pecuária e Abastec|
|24000          |Ministério da Ciência, Tecnologia, Inovações |
|25000          |Ministério da Economia                       |
|26000          |Ministério da Educação                       |
|30000          |Ministério da Justiça e Segurança Pública    |
|32000          |Ministério de Minas e Energia                |
|35000          |Ministério das Relações Exteriores           |
|36000          |Ministério da Saúde                          |
|37000          |Controladoria-Geral da União                 |
|39000          |Ministério da Infraestrutura                 |
|41000          |Ministério das Comunica

### Dimensão Modalidade Compra

In [50]:
dimModalidadeCompraDF = compraDF.select("mod_comp")\
                                .distinct()\
                                .sort("mod_comp")\
                                .withColumn("cod_dim_mod_comp", F.monotonically_increasing_id()+1)\
                                .select("cod_dim_mod_comp", F.col("mod_comp").alias("nm_mod_comp"))

dimModalidadeCompraDF.show(truncate=False)

+----------------+--------------------------------+
|cod_dim_mod_comp|nm_mod_comp                     |
+----------------+--------------------------------+
|1               |Concorrência                    |
|2               |Concorrência - Registro de Preço|
|3               |Concorrência Internacional      |
|4               |Convite                         |
|5               |Dispensa de Licitação           |
|6               |Inexigibilidade de Licitação    |
|7               |Pregão                          |
|8               |Pregão - Registro de Preço      |
|9               |Sem Informação                  |
|10              |Tomada de Preços                |
+----------------+--------------------------------+



### Dimensão Tempo

In [51]:
dimTempoDF = spark.range(7000)\
                  .withColumn("dataIni", F.to_date(F.lit("01/01/2005"), "dd/MM/yyyy"))\
                  .withColumn("dia", F.col("id").cast(IntegerType()))\
                  .selectExpr("date_add(dataIni, dia) data", "dataIni","dia")\
                  .where(F.dayofmonth(F.col("data")) == 1)\
                  .select(
                              F.col("data").alias("cod_dim_data"),
                              F.to_csv(F.struct(F.col("data")), {'dateFormat':'E', 'locale':'PT-BR'}).alias("dia_semana_reduz"),
                              F.to_csv(F.struct(F.col("data")), {'dateFormat':'EEEE', 'locale':'PT-BR'}).alias("dia_semana_compl"),
                              F.to_csv(F.struct(F.col("data")), {'dateFormat':'MMM', 'locale':'PT-BR'}).alias("nm_mes_reduz"),
                              F.to_csv(F.struct(F.col("data")), {'dateFormat':'MMMM', 'locale':'PT-BR'}).alias("nm_mes_compl"),
                              F.to_csv(F.struct(F.col("data")), {'dateFormat':'QQQ', 'locale':'PT-BR'}).alias("nm_tri_reduz"),
                              F.to_csv(F.struct(F.col("data")), {'dateFormat':'QQQQ', 'locale':'PT-BR'}).alias("nm_tri_compl"),
                              F.year(F.col("data")).alias("ano"),
                              F.concat_ws("/",F.to_csv(F.struct(F.col("data")), {'dateFormat':'MMM', 'locale':'PT-BR'}),F.year(F.col("data"))).alias("sig_mes_ano"),
                              F.concat_ws("/",F.date_format(F.col("data"), "MM"),F.year(col("data"))).alias("num_mes_ano"),
                              F.concat_ws("",F.year(F.col("data")), F.date_format(col("data"), "MM")).alias("ano_num_mes")
                         )



dimTempoDF.show(5)

+------------+----------------+----------------+------------+------------+------------+------------+----+-----------+-----------+-----------+
|cod_dim_data|dia_semana_reduz|dia_semana_compl|nm_mes_reduz|nm_mes_compl|nm_tri_reduz|nm_tri_compl| ano|sig_mes_ano|num_mes_ano|ano_num_mes|
+------------+----------------+----------------+------------+------------+------------+------------+----+-----------+-----------+-----------+
|  2005-01-01|             Sáb|          Sábado|         jan|     Janeiro|          T1|1º trimestre|2005|   jan/2005|    01/2005|     200501|
|  2005-02-01|             Ter|     Terça-feira|         fev|   Fevereiro|          T1|1º trimestre|2005|   fev/2005|    02/2005|     200502|
|  2005-03-01|             Ter|     Terça-feira|         mar|       Março|          T1|1º trimestre|2005|   mar/2005|    03/2005|     200503|
|  2005-04-01|             Sex|     Sexta-feira|         abr|       Abril|          T2|2º trimestre|2005|   abr/2005|    04/2005|     200504|
|  200

### Criação da Tabela Fato

Granularidade por mês

Tratamento de Dados

In [52]:
cpDF = compraDF.select("cod_org_sup",
                       "mod_comp",
                       "dt_ass_cont",
                       "vl_ini_comp",
                       "vl_fim_comp"
                      )\
               .withColumn("cod_dim_data", F.trunc(F.col("dt_ass_cont"),"Month"))
                                        



exprJoinFT = (dimModalidadeCompraDF.nm_mod_comp == cpDF["mod_comp"])



completoDF = cpDF.join(dimModalidadeCompraDF, exprJoinFT, "left")\
                 .withColumn("in_dif_vlr", (F.col("vl_ini_comp") != F.col("vl_fim_comp")).cast(IntegerType()))\
                 .select(
                        F.col("cod_org_sup").alias("cod_dim_org_sup"),
                        "cod_dim_mod_comp",
                        "cod_dim_data",
                        "in_dif_vlr",
                        "vl_fim_comp"
                     )



completoDF.show(5)

+---------------+----------------+------------+----------+-----------+
|cod_dim_org_sup|cod_dim_mod_comp|cod_dim_data|in_dif_vlr|vl_fim_comp|
+---------------+----------------+------------+----------+-----------+
|          26000|               5|  2019-12-01|         0|  230559.60|
|          52000|               8|  2020-01-01|         0|    6000.00|
|          26000|               7|  2020-01-01|         0|   17000.00|
|          52000|               6|  2019-12-01|         1|  400000.00|
|          39000|               7|  2020-01-01|         0|  416143.91|
+---------------+----------------+------------+----------+-----------+
only showing top 5 rows



### Aplicação dos Métodos groupBy e agg Para Criação da Tabela Fato

In [53]:
fatoCompraDF = completoDF.groupBy("cod_dim_org_sup", "cod_dim_mod_comp", "cod_dim_data")\
                         .agg(
                                F.count("*").alias("qtd_comp"),
                                F.sum("in_dif_vlr").alias("qtd_dif_vlrs"),
                                F.sum("vl_fim_comp").alias("vlr_ttl_comp"),
                                F.avg("vl_fim_comp").alias("med_ttl_comp")
                             )


fatoCompraDF.show(5)

+---------------+----------------+------------+--------+------------+------------+--------------+
|cod_dim_org_sup|cod_dim_mod_comp|cod_dim_data|qtd_comp|qtd_dif_vlrs|vlr_ttl_comp|  med_ttl_comp|
+---------------+----------------+------------+--------+------------+------------+--------------+
|          25000|               8|  2019-07-01|       1|           1|    35280.00|  35280.000000|
|          26000|               9|  2020-02-01|       9|           0| 14217685.12|1579742.791111|
|          25000|               6|  2019-04-01|       2|           0|   135148.13|  67574.065000|
|          22000|               6|  2020-06-01|       5|           0|  4336976.13| 867395.226000|
|          26000|               2|  2020-03-01|       1|           0|   523662.35| 523662.350000|
+---------------+----------------+------------+--------+------------+------------+--------------+
only showing top 5 rows



Fazer esse trabalho em outro notebook - Pois é muito extenso.

completoExrcDF.join(dimUGCompletoDF,["cod_dim_ug"],"left")\
              .join(dimTempoDF,["cod_dim_data"],"left")\
              .where("ano = 2018 and nm_tri_reduz='T3' ").show()

## Criação do Cubo OLAP

Cubo OLAP é interessante para fazer consultas rápidas, mas é feita uma análise combinatótia com todas as possibilidades. o que aumenta muito a necessidade de armazenamento.

In [55]:
cuboCompraDF = completoDF.join(dimTempoDF, ["cod_dim_data"], "left")\
                         .cube(
                                "cod_dim_org_sup",
                                "cod_dim_mod_comp",
                                "nm_mes_reduz",
                                "nm_tri_reduz",
                                "num_mes_ano",
                                "ano"
                              )\
                         .agg(
                                F.count("*").alias("qtd_comp"),
                                F.sum("in_dif_vlr").alias("qtd_dif_vlrs"),
                                F.sum("vl_fim_comp").alias("vlr_ttl_comp"),
                                F.avg("vl_fim_comp").alias("med_ttl_comp")
                             )\
                         .sort(
                                "cod_dim_org_sup",
                                "cod_dim_mod_comp",
                                "nm_mes_reduz",
                                "nm_tri_reduz",
                                "num_mes_ano",
                                "ano"
                              )\
                         .join(dimOrgSupDF, ["cod_dim_org_sup"], "left")\
                         .join(dimModalidadeCompraDF, ["cod_dim_mod_comp"], "left")

cuboCompraDF.where(F.expr(
                   """
                            cod_dim_org_sup is null
                        and cod_dim_mod_comp is not null
                        and nm_mes_reduz is null
                        and nm_tri_reduz is null
                        and num_mes_ano = '01/2020'
                        and ano is null
                   """)
                  )\
        .show(100)

cuboCompraDF.count()

+----------------+---------------+------------+------------+-----------+----+--------+------------+------------+--------------+----------+--------------------+
|cod_dim_mod_comp|cod_dim_org_sup|nm_mes_reduz|nm_tri_reduz|num_mes_ano| ano|qtd_comp|qtd_dif_vlrs|vlr_ttl_comp|  med_ttl_comp|nm_org_sup|         nm_mod_comp|
+----------------+---------------+------------+------------+-----------+----+--------+------------+------------+--------------+----------+--------------------+
|               1|           null|        null|        null|    01/2020|null|      23|           4| 88096617.27|3830287.707391|      null|        Concorrência|
|               2|           null|        null|        null|    01/2020|null|       1|           0|   475400.00| 475400.000000|      null|Concorrência - Re...|
|               3|           null|        null|        null|    01/2020|null|       1|           0|   290232.94| 290232.940000|      null|Concorrência Inte...|
|               4|           null|      

16607

In [56]:
cuboCompraDF.where("ano = 2020 and nm_mod_comp='Concorrência'").show()

[Stage 286:>                                                        (0 + 1) / 1]

+----------------+---------------+------------+------------+-----------+----+--------+------------+------------+---------------+--------------------+------------+
|cod_dim_mod_comp|cod_dim_org_sup|nm_mes_reduz|nm_tri_reduz|num_mes_ano| ano|qtd_comp|qtd_dif_vlrs|vlr_ttl_comp|   med_ttl_comp|          nm_org_sup| nm_mod_comp|
+----------------+---------------+------------+------------+-----------+----+--------+------------+------------+---------------+--------------------+------------+
|               1|          36000|         mar|          T1|    03/2020|2020|       1|           1|  2948887.93| 2948887.930000| Ministério da Saúde|Concorrência|
|               1|          54000|        null|          T1|    01/2020|2020|       3|           0|  8934265.79| 2978088.596667|Ministério do Tur...|Concorrência|
|               1|          39000|        null|          T2|       null|2020|       1|           1|  5617410.14| 5617410.140000|Ministério da Inf...|Concorrência|
|               1|    

                                                                                

In [63]:
cuboCompraDF.where("ano = 2020 AND qtd_comp >= 3  AND nm_tri_reduz='T1' AND qtd_dif_vlrs > 50 AND nm_mod_comp = 'Pregão'").show()

[Stage 427:>                                                        (0 + 1) / 1]

+----------------+---------------+------------+------------+-----------+----+--------+------------+-------------+--------------+--------------------+-----------+
|cod_dim_mod_comp|cod_dim_org_sup|nm_mes_reduz|nm_tri_reduz|num_mes_ano| ano|qtd_comp|qtd_dif_vlrs| vlr_ttl_comp|  med_ttl_comp|          nm_org_sup|nm_mod_comp|
+----------------+---------------+------------+------------+-----------+----+--------+------------+-------------+--------------+--------------------+-----------+
|               7|          26000|        null|          T1|       null|2020|     381|          70| 275418171.97| 722882.341129|Ministério da Edu...|     Pregão|
|               7|           null|         jan|          T1|       null|2020|     674|          98| 671602142.25| 996442.347552|                null|     Pregão|
|               7|           null|        null|          T1|       null|2020|    1460|         195|2050997534.52|1404792.831863|                null|     Pregão|
|               7|          

                                                                                

### Biblioteca Window

In [65]:
from pyspark.sql.window import Window

jan01 = Window.partitionBy("nr_cont").orderBy("nr_cont")


compraComWindow01DF = compraDF.withColumn("num_linha_compra", F.row_number().over(jan01))\
                              .withColumn("ttl_compra", F.count("*").over(jan01))\
                              .withColumn("med_vlr_comp", F.avg("vl_fim_comp").over(jan01))\
                              .where("nr_cont <> 112020")\
                              .where("med_vlr_comp < vl_fim_comp")
                              

compraComWindow01DF.select("nr_cont",
                           "dt_ass_cont",
                           "vl_fim_comp",
                           "num_linha_compra",
                           "ttl_compra",
                           "med_vlr_comp","vl_fim_comp").show(50)

+-------+-----------+-----------+----------------+----------+--------------+-----------+
|nr_cont|dt_ass_cont|vl_fim_comp|num_linha_compra|ttl_compra|  med_vlr_comp|vl_fim_comp|
+-------+-----------+-----------+----------------+----------+--------------+-----------+
|  12017| 2017-08-23|  181630.29|               1|         2|  93617.565000|  181630.29|
|  12018| 2018-12-31|  240000.00|               1|         5| 127432.400000|  240000.00|
|  12018| 2020-01-06|  300000.00|               3|         5| 127432.400000|  300000.00|
|  12019| 2019-12-30| 3973000.00|               3|        38| 588064.347368| 3973000.00|
|  12019| 2020-01-07|  728058.36|               5|        38| 588064.347368|  728058.36|
|  12019| 2019-12-27| 6371351.36|               6|        38| 588064.347368| 6371351.36|
|  12019| 2019-11-26| 1023718.08|              10|        38| 588064.347368| 1023718.08|
|  12019| 2019-12-26| 2000000.00|              11|        38| 588064.347368| 2000000.00|
|  12019| 2018-09-11|

In [66]:
jan02 = Window.partitionBy("nr_cont").orderBy("dt_ass_cont")



aux02DF = compraDF.withColumn("soma_vlr_comp", sum("vl_fim_comp").over(jan02))\
                  .withColumn("med_vlr_comp", F.avg("vl_fim_comp").over(jan02).cast(DecimalType(16,2)))\
                  .withColumn("desv_vlr_comp", F.stddev_pop("vl_fim_comp").over(jan02).cast(DecimalType(16,2)))



aux02DF.select("nr_cont",
               "dt_ass_cont",
               "vl_fim_comp",
               "soma_vlr_comp",
               "med_vlr_comp",
               "desv_vlr_comp").show(10)

+-------+-----------+-----------+-------------+------------+-------------+
|nr_cont|dt_ass_cont|vl_fim_comp|soma_vlr_comp|med_vlr_comp|desv_vlr_comp|
+-------+-----------+-----------+-------------+------------+-------------+
|  12016| 2020-06-03|   32812.80|     32812.80|    32812.80|         0.00|
|  12017| 2017-02-15|    5604.84|      5604.84|     5604.84|         0.00|
|  12017| 2017-08-23|  181630.29|    187235.13|    93617.57|     88012.73|
|  12018| 2018-06-27|   36576.00|     36576.00|    36576.00|         0.00|
|  12018| 2018-11-14|   34000.00|     70576.00|    35288.00|      1288.00|
|  12018| 2018-12-31|  240000.00|    310576.00|   103525.33|     96507.89|
|  12018| 2019-08-14|   26586.00|    337162.00|    84290.50|     89973.70|
|  12018| 2020-01-06|  300000.00|    637162.00|   127432.40|    117987.74|
|  12019| 2018-09-11| 2322242.40|   2322242.40|  2322242.40|         0.00|
|  12019| 2019-01-01|  900000.00|   3222242.40|  1611121.20|    711121.20|
+-------+-----------+----

In [67]:
aux02DF = compraDF.withColumn("soma_vlr_acumul_comp", sum("vl_fim_comp").over(jan02).cast(DecimalType(16,2)))\
                  .withColumn("soma_vlr_ttl_comp", sum("vl_fim_comp").over(jan01).cast(DecimalType(16,2)))\
                  .sort("nr_cont", "dt_ass_cont")\
                  .withColumn("perc_evolucao", (F.col("soma_vlr_acumul_comp") / F.col("soma_vlr_ttl_comp")).cast(DecimalType(16,2)))



aux02DF.select("nr_cont",
               "dt_ass_cont",
               "vl_fim_comp",
               "soma_vlr_acumul_comp",
               "soma_vlr_ttl_comp",
               "perc_evolucao").show(10)

+-------+-----------+-----------+--------------------+-----------------+-------------+
|nr_cont|dt_ass_cont|vl_fim_comp|soma_vlr_acumul_comp|soma_vlr_ttl_comp|perc_evolucao|
+-------+-----------+-----------+--------------------+-----------------+-------------+
|  12016| 2020-06-03|   32812.80|            32812.80|         32812.80|         1.00|
|  12017| 2017-02-15|    5604.84|             5604.84|        187235.13|         0.03|
|  12017| 2017-08-23|  181630.29|           187235.13|        187235.13|         1.00|
|  12018| 2018-06-27|   36576.00|            36576.00|        637162.00|         0.06|
|  12018| 2018-11-14|   34000.00|            70576.00|        637162.00|         0.11|
|  12018| 2018-12-31|  240000.00|           310576.00|        637162.00|         0.49|
|  12018| 2019-08-14|   26586.00|           337162.00|        637162.00|         0.53|
|  12018| 2020-01-06|  300000.00|           637162.00|        637162.00|         1.00|
|  12019| 2018-09-11| 2322242.40|          

Exercício 02: utilizar biblioteca Window

In [78]:
from pyspark.sql.window import Window

wind = Window.partitionBy("nm_org_sup").orderBy("nm_org_sup")


compraComWindowExercDF = compraDF.withColumn("reg_compras", F.row_number().over(wind))\
                              .withColumn("med_vlr_comp", F.avg("vl_fim_comp").over(wind))\
                              .where("med_vlr_comp < vl_fim_comp")
                            

In [86]:
compraComWindowExercDF.agg( "nm_org_sup",
                               "vl_fim_comp",
                               "med_vlr_comp",
                               "vl_fim_comp",
                               "reg_compras")\
                        .show(50, False)

AssertionError: all exprs should be Column