# Transformação de Dados em Big Data ETL com Apache Spark

Este tutorial apresenta uma visão geral do conceito de Transformação de dados em Big Data ETL utilizando o Apache Spark como ferramenta para apresentar alguns exemplos de transformação de dados. Nos exemplos deste tutorial, iremos utilizar as bases de dados de projetos aprovados para a Lei Rouanet entre 2003 e 2013 disponíveis no [site do governo federal](http://dados.gov.br/dataset/incentivo-fiscal-para-projetos-culturais).

## Transformações e Ações no Spark

Spark disponibiliza dois tipos distintos de operações para o usuário: **Transformações** e **Ações**.

![transformations and actions](http://curriculum-release.s3-website-us-west-2.amazonaws.com/wiki-book/general/spark_ta.png)


Transformações, como select() ou filter(), criam um novo DataFrame de um já existente, resultando em outro DataFrame imutável. Todas as Transformações são 'lazy', ou seja, elas não são executadas enquanto uma Ação não for invocada. Por isso, as Transformações não são completadas no momento em que você escreve e executa um código na célula do Notebook, mas sim quando a Ação é executada.

Ações, como collect() e count(), são comandos que são processados pelo Spark no exato momento da sua execução. As Ações consistem em executar todas as Transformações anteriores para obter o resultado efetivo. Uma Ação é composta de um ou mais jobs que consistem de tarefas que irão ser executadas pelos workers em paralelo onde for possível. 

Abaixo, podem ser vistos os exemplos mais utilizados de Transformações e Ações. O restante das funções disponíveis podem ser acessadas na documentação das [Transformações](https://spark.apache.org/docs/latest/rdd-programming-guide.html#transformations) e das [Ações](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions) do Apache Spark.

![transformations and actions](http://training.databricks.com/databricks_guide/gentle_introduction/trans_and_actions.png)

In [2]:
%sh 
# Baixa o arquivo que contém a quantidade de projetos aprovados pela Lei Rouanet para cada Estado
if [ ! -f /databricks/driver/rouanet_quantidade_projetos.json ]; then 
  wget https://raw.githubusercontent.com/savioteles/big_data/master/etl/datasets/rouanet_quantidade_projetos.json
fi

# Baixa o arquivo que contém o valor total em reais dos projetos aprovados pela Lei Rouanet para cada Estado
if [ ! -f /databricks/driver/rouanet_valor_projetos.json ]; then 
  wget https://raw.githubusercontent.com/savioteles/big_data/master/etl/datasets/rouanet_valor_projetos.json
fi

#lista os arquivos para ver se os dois arquivos da Lei Rouanet foram baixados com sucesso
ls /databricks/driver

In [3]:
# lê o arquivo com a quantidade de projetos aprovados pela Lei Rouanet
rouanet_qtd_df = spark.read.option("multiline", "true").json("file:///databricks/driver/rouanet_quantidade_projetos.json")
display(rouanet_qtd_df)

aditividade,base_territorial,data_atualizacao,descricao,disponibilizacao,estado,final,fonte_gestora,fonte_provedora,formatacao,grupo_informacao,id,inicio,multiplicador,nome,nome_estendido,orgao_primeiro_escalao,periodicidade,portal_dados_abertos,produto,publicacao,tempo_aditividade,unidade_medida,url,url_origem,valores
1,List(Estadual),2014-08-04 00:21:29.386939,Número de projetos aprovados pela Lei Rouanet - projetos culturais aprovados para receber incentivos fiscais.,"List(365, 365 dias)",List(Em uso),2013,"List(Gabinete do Ministro, 138, MinC/GABMIN, List(Ministério da Cultura, MinC), 1, http://api.pgi.gov.br/api/1/fonte/138.json)","List(Gabinete do Ministro, 138, MinC/GABMIN, List(Ministério da Cultura, MinC), 1, http://api.pgi.gov.br/api/1/fonte/138.json)",0,"List(387, Incentivo fiscal para projetos culturais, List(), http://api.pgi.gov.br/api/1/grupo/387.json)",1477,2003,"List(1, )",Projetos aprovados Lei Rouanet,Número de projetos aprovados pela Lei Rouanet,"List(Ministério da Cultura, MinC)",List(Anual),False,"List(, Quantidade)",List(Dados Publicados),1,"List(4, Unidade, Un, http://api.pgi.gov.br/api/1/unidade/4.json)",http://api.pgi.gov.br/api/1/serie/1477.json,http://www.cultura.gov.br/,"List(List(2003, 11, 4), List(2003, 12, 2), List(2003, 13, 6), List(2003, 14, 1), List(2003, 15, 23), List(2003, 16, 3), List(2003, 17, 5), List(2003, 21, 31), List(2003, 22, 13), List(2003, 23, 52), List(2003, 24, 16), List(2003, 25, 5), List(2003, 26, 109), List(2003, 27, 6), List(2003, 28, 13), List(2003, 29, 127), List(2003, 31, 333), List(2003, 32, 30), List(2003, 33, 1082), List(2003, 35, 1231), List(2003, 41, 347), List(2003, 42, 157), List(2003, 43, 179), List(2003, 50, 37), List(2003, 51, 13), List(2003, 52, 72), List(2003, 53, 172), List(2004, 11, 11), List(2004, 12, 3), List(2004, 13, 11), List(2004, 14, 1), List(2004, 15, 33), List(2004, 16, 0), List(2004, 17, 17), List(2004, 21, 33), List(2004, 22, 9), List(2004, 23, 76), List(2004, 24, 14), List(2004, 25, 16), List(2004, 26, 128), List(2004, 27, 10), List(2004, 28, 15), List(2004, 29, 163), List(2004, 31, 497), List(2004, 32, 24), List(2004, 33, 1181), List(2004, 35, 1450), List(2004, 41, 409), List(2004, 42, 197), List(2004, 43, 266), List(2004, 50, 39), List(2004, 51, 37), List(2004, 52, 85), List(2004, 53, 233), List(2005, 11, 12), List(2005, 12, 2), List(2005, 13, 19), List(2005, 14, 1), List(2005, 15, 42), List(2005, 16, 4), List(2005, 17, 16), List(2005, 21, 59), List(2005, 22, 14), List(2005, 23, 94), List(2005, 24, 10), List(2005, 25, 25), List(2005, 26, 134), List(2005, 27, 25), List(2005, 28, 19), List(2005, 29, 218), List(2005, 31, 743), List(2005, 32, 43), List(2005, 33, 1319), List(2005, 35, 1782), List(2005, 41, 353), List(2005, 42, 252), List(2005, 43, 384), List(2005, 50, 39), List(2005, 51, 30), List(2005, 52, 122), List(2005, 53, 229), List(2006, 11, 7), List(2006, 12, 8), List(2006, 13, 19), List(2006, 14, 1), List(2006, 15, 40), List(2006, 16, 2), List(2006, 17, 15), List(2006, 21, 40), List(2006, 22, 15), List(2006, 23, 101), List(2006, 24, 14), List(2006, 25, 28), List(2006, 26, 123), List(2006, 27, 21), List(2006, 28, 21), List(2006, 29, 241), List(2006, 31, 835), List(2006, 32, 34), List(2006, 33, 1485), List(2006, 35, 2013), List(2006, 41, 394), List(2006, 42, 262), List(2006, 43, 418), List(2006, 50, 30), List(2006, 51, 24), List(2006, 52, 106), List(2006, 53, 236), List(2007, 11, 19), List(2007, 12, 4), List(2007, 13, 15), List(2007, 14, 1), List(2007, 15, 38), List(2007, 16, 4), List(2007, 17, 16), List(2007, 21, 37), List(2007, 22, 13), List(2007, 23, 109), List(2007, 24, 16), List(2007, 25, 31), List(2007, 26, 159), List(2007, 27, 15), List(2007, 28, 18), List(2007, 29, 287), List(2007, 31, 861), List(2007, 32, 36), List(2007, 33, 1451), List(2007, 35, 1847), List(2007, 41, 298), List(2007, 42, 215), List(2007, 43, 471), List(2007, 50, 30), List(2007, 51, 37), List(2007, 52, 101), List(2007, 53, 229), List(2008, 11, 10), List(2008, 12, 7), List(2008, 13, 24), List(2008, 14, 3), List(2008, 15, 42), List(2008, 16, 5), List(2008, 17, 8), List(2008, 21, 42), List(2008, 22, 14), List(2008, 23, 101), List(2008, 24, 15), List(2008, 25, 36), List(2008, 26, 139), List(2008, 27, 16), List(2008, 28, 19), List(2008, 29, 242), List(2008, 31, 765), List(2008, 32, 32), List(2008, 33, 1546), List(2008, 35, 2055), List(2008, 41, 286), List(2008, 42, 250), List(2008, 43, 467), List(2008, 50, 22), List(2008, 51, 34), List(2008, 52, 123), List(2008, 53, 174), List(2009, 11, 5), List(2009, 12, 4), List(2009, 13, 14), List(2009, 14, 2), List(2009, 15, 33), List(2009, 16, 1), List(2009, 17, 2), List(2009, 21, 11), List(2009, 22, 7), List(2009, 23, 119), List(2009, 24, 10), List(2009, 25, 16), List(2009, 26, 105), List(2009, 27, 11), List(2009, 28, 8), List(2009, 29, 171), List(2009, 31, 690), List(2009, 32, 26), List(2009, 33, 1371), List(2009, 35, 1866), List(2009, 41, 260), List(2009, 42, 233), List(2009, 43, 417), List(2009, 50, 18), List(2009, 51, 21), List(2009, 52, 79), List(2009, 53, 151), List(2010, 12, 3), List(2010, 13, 20), List(2010, 16, 0), List(2010, 21, 22), List(2010, 23, 119), List(2010, 27, 8), List(2010, 29, 220), List(2010, 31, 729), List(2010, 32, 57), List(2010, 50, 19), List(2010, 52, 104), List(2010, 53, 179), List(2010, 51, 42), List(2010, 15, 50), List(2010, 25, 24), List(2010, 26, 135), List(2010, 22, 12), List(2010, 41, 295), List(2010, 33, 1885), List(2010, 24, 11), List(2010, 11, 7), List(2010, 14, 1), List(2010, 43, 537), List(2010, 42, 314), List(2010, 28, 9), List(2010, 35, 2385), List(2010, 17, 5), List(2011, 12, 2), List(2011, 27, 9), List(2011, 13, 18), List(2011, 16, 4), List(2011, 29, 333), List(2011, 23, 168), List(2011, 53, 210), List(2011, 32, 77), List(2011, 52, 129), List(2011, 21, 21), List(2011, 31, 825), List(2011, 50, 20), List(2011, 51, 57), List(2011, 15, 33), List(2011, 25, 46), List(2011, 26, 129), List(2011, 22, 15), List(2011, 41, 425), List(2011, 33, 1908), List(2011, 24, 19), List(2011, 11, 5), List(2011, 14, 2), List(2011, 43, 525), List(2011, 42, 322), List(2011, 28, 24), List(2011, 35, 2586), List(2011, 17, 8), List(2012, 12, 4), List(2012, 27, 7), List(2012, 13, 17), List(2012, 16, 5), List(2012, 29, 204), List(2012, 23, 125), List(2012, 53, 134), List(2012, 32, 46), List(2012, 52, 100), List(2012, 21, 18), List(2012, 31, 775), List(2012, 50, 25), List(2012, 51, 24), List(2012, 15, 39), List(2012, 25, 18), List(2012, 26, 122), List(2012, 22, 10), List(2012, 41, 374), List(2012, 33, 1396), List(2012, 24, 19), List(2012, 11, 8), List(2012, 14, 1), List(2012, 43, 462), List(2012, 42, 307), List(2012, 28, 13), List(2012, 35, 2089), List(2012, 17, 5), List(2013, 12, 4), List(2013, 27, 5), List(2013, 13, 15), List(2013, 16, 4), List(2013, 29, 190), List(2013, 23, 131), List(2013, 53, 146), List(2013, 32, 35), List(2013, 52, 109), List(2013, 21, 13), List(2013, 31, 824), List(2013, 50, 22), List(2013, 51, 32), List(2013, 15, 37), List(2013, 25, 21), List(2013, 26, 111), List(2013, 22, 14), List(2013, 41, 416), List(2013, 33, 1326), List(2013, 24, 33), List(2013, 11, 9), List(2013, 43, 567), List(2013, 42, 304), List(2013, 28, 7), List(2013, 35, 2062), List(2013, 17, 4))"


In [4]:
# lê o arquivo com o valor em Reais dos projetos aprovados pela Lei Rouanet para cada Estado
rouanet_valor_df = spark.read.option("multiline", "true").json("file:///databricks/driver/rouanet_valor_projetos.json")
display(rouanet_valor_df)

aditividade,base_territorial,data_atualizacao,descricao,disponibilizacao,estado,final,fonte_gestora,fonte_provedora,formatacao,grupo_informacao,id,inicio,multiplicador,nome,nome_estendido,orgao_primeiro_escalao,periodicidade,portal_dados_abertos,produto,publicacao,tempo_aditividade,unidade_medida,url,url_origem,valores
1,List(Estadual),2014-08-04 00:22:00.393468,Valores captados por incentivos fiscais à projetos culturais pela Lei Rouanet.,"List(365, 365 dias)",List(Em uso),2013,"List(Gabinete do Ministro, 138, MinC/GABMIN, List(Ministério da Cultura, MinC), 1, http://api.pgi.gov.br/api/1/fonte/138.json)","List(Gabinete do Ministro, 138, MinC/GABMIN, List(Ministério da Cultura, MinC), 1, http://api.pgi.gov.br/api/1/fonte/138.json)",2,"List(387, Incentivo fiscal para projetos culturais, List(), http://api.pgi.gov.br/api/1/grupo/387.json)",1479,2003,"List(1, )",Valores captados Lei Rouanet,Valores captados por incentivos fiscais à projetos culturais pela Lei Rouanet,"List(Ministério da Cultura, MinC)",List(Anual),False,"List(Valor, Valor)",List(Dados Publicados),1,"List(3, Real (R$), R$, http://api.pgi.gov.br/api/1/unidade/3.json)",http://api.pgi.gov.br/api/1/serie/1479.json,http://www.cultura.gov.br/,"List(List(2003, 11, 254300.0), List(2003, 12, 0.0), List(2003, 13, 300000.0), List(2003, 14, 30000.0), List(2003, 15, 5842618.0), List(2003, 16, 0.0), List(2003, 17, 81220.0), List(2003, 21, 893896.0), List(2003, 22, 1173000.0), List(2003, 24, 193091.87), List(2003, 25, 20000.0), List(2003, 26, 9616399.59), List(2003, 27, 162224.0), List(2003, 28, 1967113.31), List(2003, 29, 1.066232793E7), List(2003, 31, 4.413992777E7), List(2003, 32, 704472.83), List(2003, 41, 1.09807987E7), List(2003, 42, 7492236.35), List(2003, 50, 1418074.25), List(2003, 51, 255064.16), List(2003, 52, 4377431.97), List(2003, 53, 1.661297706E7), List(2004, 11, 394799.48), List(2004, 12, 112041.19), List(2004, 13, 3400000.0), List(2004, 14, 0.0), List(2004, 15, 5224465.71), List(2004, 16, 0.0), List(2004, 17, 696593.88), List(2004, 21, 825104.5), List(2004, 22, 1805000.0), List(2004, 23, 6446545.15), List(2004, 24, 897143.88), List(2004, 25, 184400.0), List(2004, 27, 849715.14), List(2004, 28, 4230387.47), List(2004, 29, 9241517.68), List(2004, 32, 2545503.23), List(2004, 50, 1148773.84), List(2004, 51, 1191249.9), List(2004, 52, 4573416.55), List(2004, 53, 9728327.11), List(2005, 11, 249039.11), List(2005, 12, 1113391.72), List(2005, 13, 738033.73), List(2005, 14, 0.0), List(2005, 15, 2137889.54), List(2005, 16, 0.0), List(2005, 17, 302000.0), List(2005, 21, 4775917.45), List(2005, 22, 1268950.0), List(2005, 23, 7526351.2), List(2005, 24, 1451363.86), List(2005, 25, 488625.0), List(2005, 26, 1.206553536E7), List(2005, 27, 682648.75), List(2005, 28, 4451548.69), List(2005, 29, 1.903982022E7), List(2005, 31, 7.9991234359E7), List(2005, 32, 5347721.52), List(2005, 33, 1.985987182E8), List(2005, 41, 1.707925092E7), List(2005, 43, 4.930523289E7), List(2005, 50, 2837122.96), List(2005, 51, 1335000.0), List(2005, 52, 3255424.82), List(2005, 53, 1.100772627E7), List(2006, 11, 177412.02), List(2006, 12, 1089745.25), List(2006, 13, 1592331.11), List(2006, 14, 25000.0), List(2006, 15, 3000033.33), List(2006, 16, 20000.0), List(2006, 17, 534425.0), List(2006, 22, 2437213.2), List(2006, 23, 1.088360718E7), List(2006, 24, 289226.4), List(2006, 25, 456131.62), List(2006, 27, 1859770.66), List(2006, 28, 1576900.0), List(2006, 31, 1.0387311701E8), List(2006, 32, 9796238.82), List(2006, 42, 1.128176408E7), List(2006, 50, 2689122.13), List(2006, 51, 2147255.43), List(2006, 52, 4780833.52), List(2007, 11, 446508.0), List(2007, 12, 593448.29), List(2007, 13, 1826968.88), List(2007, 14, 95000.0), List(2007, 15, 3713560.37), List(2007, 16, 160000.0), List(2007, 17, 1266001.74), List(2007, 21, 3754528.93), List(2007, 22, 480375.0), List(2007, 23, 8850204.55), List(2007, 24, 1051344.27), List(2007, 25, 671050.0), List(2007, 27, 2038906.36), List(2007, 28, 1065560.0), List(2007, 29, 2.620661326E7), List(2007, 32, 8886921.86), List(2007, 50, 2641577.24), List(2007, 51, 1949789.59), List(2007, 52, 4116784.0), List(2007, 53, 1.834957693E7), List(2008, 11, 1370330.0), List(2008, 12, 655904.5), List(2008, 13, 750193.86), List(2008, 15, 1410040.9), List(2008, 16, 611927.2), List(2008, 17, 355858.89), List(2008, 21, 2578426.51), List(2008, 23, 1.134142819E7), List(2008, 24, 1368408.2), List(2008, 25, 1382416.16), List(2008, 26, 1.689856609E7), List(2008, 27, 1477331.0), List(2008, 28, 2142526.59), List(2008, 29, 1.9483683924E7), List(2008, 31, 1.0865092505E8), List(2008, 32, 7650131.71), List(2008, 50, 741775.65), List(2008, 51, 1340022.46), List(2008, 52, 4007510.08), List(2008, 53, 2.038232937E7), List(2009, 11, 562722.0), List(2009, 12, 25000.0), List(2009, 13, 714888.46), List(2009, 14, 0.0), List(2009, 15, 4149645.33), List(2009, 16, 142410.8), List(2009, 17, 0.0), List(2009, 21, 2017713.8), List(2009, 22, 2754726.19), List(2009, 23, 1.376061876E7), List(2009, 24, 989317.8), List(2009, 25, 1258091.75), List(2009, 26, 2.465478995E7), List(2009, 27, 159000.0), List(2009, 28, 1401105.0), List(2009, 29, 1.82120905E7), List(2009, 31, 8.948327697E7), List(2009, 32, 3499900.89), List(2009, 33, 2.7857348963E8), List(2009, 35, 3.9941597179E8), List(2009, 41, 2.606736488E7), List(2009, 42, 1.987789714E7), List(2009, 43, 5.617259258E7), List(2009, 50, 898470.5), List(2009, 51, 1772717.06), List(2009, 52, 3530510.64), List(2009, 53, 2.99464933E7), List(2010, 11, 867498.0), List(2010, 12, 60000.0), List(2010, 13, 3411898.19), List(2010, 14, 0.0), List(2010, 15, 2.252687189E7), List(2010, 16, 61000.0), List(2010, 17, 0.0), List(2010, 21, 1.020601378E7), List(2010, 22, 1957574.28), List(2010, 23, 1.634077546E7), List(2010, 24, 1645106.0), List(2010, 25, 1484181.01), List(2010, 26, 1.799128642E7), List(2010, 27, 698555.08), List(2010, 28, 1029839.0), List(2010, 29, 1.973186657E7), List(2010, 31, 1.2593292255E8), List(2010, 32, 1.390117001E7), List(2010, 33, 2.70918973531E8), List(2010, 41, 3.763031542E7), List(2010, 42, 2.623765877E7), List(2010, 43, 6.914750512E7), List(2010, 50, 1524173.96), List(2010, 51, 1921992.84), List(2010, 52, 5350966.1), List(2010, 53, 2.378866502E7), List(2003, 23, 5510544.74), List(2003, 33, 1.145276951E8), List(2003, 43, 2.452144414E7), List(2003, 35, 1.6915708933E8), List(2004, 31, 5.1122452183E7), List(2004, 26, 8109262.763), List(2004, 41, 1.5132656073E7), List(2004, 33, 1.20210957909E8), List(2004, 43, 3.983456698E7), List(2004, 42, 8576782.32), List(2004, 35, 2.1526673906E8), List(2005, 42, 1.1376809481E7), List(2005, 35, 2.9028772434E8), List(2006, 29, 1.832054636E7), List(2006, 53, 1.8790543277E7), List(2006, 21, 1710100.2), List(2006, 26, 1.829841942E7), List(2006, 41, 2.117617291E7), List(2006, 33, 2.0781316638E8), List(2006, 43, 4.877592001E7), List(2006, 35, 3.6075387416E8), List(2007, 31, 1.1315127832E8), List(2007, 26, 1.856514954E7), List(2007, 41, 2.202202217E7), List(2007, 33, 2.26644756413E8), List(2007, 43, 5.972268704E7), List(2007, 42, 1.96087171E7), List(2007, 35, 4.42010567973E8), List(2008, 22, 3126776.609), List(2008, 41, 3.360060052E7), List(2008, 33, 2.3106003575E8), List(2008, 14, 92500.0), List(2008, 43, 5.978394357E7), List(2008, 42, 1.626302586E7), List(2008, 35, 4.1510453789E8), List(2010, 35, 4.9210613272E8), List(2011, 12, 90000.0), List(2011, 27, 1547016.99), List(2011, 13, 2133152.55), List(2011, 16, 20000.0), List(2011, 29, 1.649493176E7), List(2011, 23, 1.652377447E7), List(2011, 53, 2.339749838E7), List(2011, 32, 5899519.5), List(2011, 52, 8526974.29), List(2011, 21, 4257025.87), List(2011, 31, 1.2760143544E8), List(2011, 50, 2447713.6), List(2011, 51, 4034516.87), List(2011, 15, 5689654.67), List(2011, 25, 1406920.72), List(2011, 26, 2.358037902E7), List(2011, 22, 3008677.03), List(2011, 41, 4.701388788E7), List(2011, 33, 3.5146254621E8), List(2011, 24, 1734862.5), List(2011, 11, 500000.0), List(2011, 43, 7.251063059E7), List(2011, 42, 3.173332236E7), List(2011, 28, 663986.0), List(2011, 35, 5.7178698908E8), List(2011, 17, 74378.0), List(2012, 12, 304374.82), List(2012, 27, 1178000.0), List(2012, 13, 1451747.24), List(2012, 16, 90000.0), List(2012, 29, 1.277322585E7), List(2012, 23, 1.388027254E7), List(2012, 53, 1.587116488E7), List(2012, 32, 7066045.53), List(2012, 52, 7611642.71), List(2012, 21, 1647128.75), List(2012, 31, 1.2698392811E8), List(2012, 50, 2737417.78), List(2012, 51, 2119145.372), List(2012, 15, 5872158.68), List(2012, 25, 735570.9), List(2012, 26, 1.810459815E7), List(2012, 22, 2345162.5), List(2012, 41, 4.89646375E7), List(2012, 33, 3.3348517572E8), List(2012, 24, 3834778.07), List(2012, 11, 1000000.0), List(2012, 14, 110000.0), List(2012, 43, 6.429543356E7), List(2012, 42, 3.700857645E7), List(2012, 28, 1081276.0), List(2012, 35, 5.6326507369E8), List(2012, 17, 366972.5), List(2013, 12, 43931.46), List(2013, 13, 2720974.28), List(2013, 29, 1.129878225E7), List(2013, 23, 1.090205509E7), List(2013, 53, 1.164626829E7), List(2013, 32, 6853656.5), List(2013, 52, 3825082.75), List(2013, 21, 802488.3), List(2013, 31, 1.2052164255E8), List(2013, 50, 337819.86), List(2013, 51, 1986709.95), List(2013, 15, 2073342.82), List(2013, 25, 266641.33), List(2013, 26, 1.561013643E7), List(2013, 22, 1483823.03), List(2013, 41, 4.873617077E7), List(2013, 33, 2.8904066134E8), List(2013, 24, 8583190.31), List(2013, 11, 1280807.0), List(2013, 14, 4500.0), List(2013, 43, 7.105478468E7), List(2013, 42, 3.360528539E7), List(2013, 28, 526715.0), List(2013, 35, 5.0657076696E8), List(2013, 17, 271753.93))"


O exemplo abaixo transforma a estrutura complexa do JSON que contém os dados da Lei Rouanet em um formato tabular para facilitar a manipulação do DataFrame. O Spark possui várias funções internas de transformação de dados no módulo [pyspark.sql.functions](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions), por isso, é necessário importar esta função.

A coluna **valores** dos dois DataFrames contém os dados da Lei Rouanet e, por isso, será utilizada nos exemplos abaixo. Esta coluna é um array de JSONs, onde cada JSON contém o valor ou quantidade de projetos destinados àquele Estado em um ano específico. Por isso, primeiramente, uma linha é criada para cada item do array com a função [explode](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.explode). Depois, selecionamos todos os subcampos (*ano*, *estado_ibge* e *valor*) do JSON utilizando a função select("valores.\*"). Mais detalhes de manipulação de JSON utilizando o Spark pode ser visto no seguinte [tutorial da Databricks](https://docs.databricks.com/delta/data-transformation/complex-types.html).

In [6]:
from pyspark.sql.functions import *
#utiliza a função explode para transformar cada item do array do campo 'valores' em uma linha da tabela
rouanet_qtd_df_table_temp = rouanet_qtd_df.select(explode("valores").alias('valores'))
display(rouanet_qtd_df_table_temp)

valores
"List(2003, 11, 4)"
"List(2003, 12, 2)"
"List(2003, 13, 6)"
"List(2003, 14, 1)"
"List(2003, 15, 23)"
"List(2003, 16, 3)"
"List(2003, 17, 5)"
"List(2003, 21, 31)"
"List(2003, 22, 13)"
"List(2003, 23, 52)"


In [7]:
# transforma cada linha da tabela em três colunas a partir dos dados do objeto JSON de cada linha. Com isto forma-se uma grande tabela com três colunas: ano, estado_ibge e valor
rouanet_qtd_df_table = rouanet_qtd_df_table_temp.select("valores.*")
display(rouanet_qtd_df_table)

ano,estado_ibge,valor
2003,11,4
2003,12,2
2003,13,6
2003,14,1
2003,15,23
2003,16,3
2003,17,5
2003,21,31
2003,22,13
2003,23,52


In [8]:
#A transformação do campo 'valores' na tabela final pode ser feita em uma linha. O exemplo abaixo mostra esta transformação para o DataFrame rouanet_valor_df
rouanet_valor_df_table = rouanet_valor_df.select(explode("valores").alias('valores')).select("valores.*")
display(rouanet_valor_df_table)

ano,estado_ibge,valor
2003,11,254300.0
2003,12,0.0
2003,13,300000.0
2003,14,30000.0
2003,15,5842618.0
2003,16,0.0
2003,17,81220.0
2003,21,893896.0
2003,22,1173000.0
2003,24,193091.87


Depois de ler os dois Dataframes (um com a quantidade de projetos por ano por Estado e outro com o valor em Reais), é feito um merge dos dois DataFrames para um DataFrame com quatro colunas: ano, estado_ibge, valor em reais, quantidade. Desta forma, consegue-se realizar diversas análises sobre este DataFrame.

In [10]:
# cria um DataFrame temporário com a junção dos dois DataFraames com dados da Lei Rouanet.
rouanet_df_tmp = rouanet_qtd_df_table.join(rouanet_valor_df_table, ['ano','estado_ibge'],  'inner')
display(rouanet_df_tmp)


ano,estado_ibge,valor,valor.1
2003,11,4,254300.0
2003,12,2,0.0
2003,13,6,300000.0
2003,14,1,30000.0
2003,15,23,5842618.0
2003,16,3,0.0
2003,17,5,81220.0
2003,21,31,893896.0
2003,22,13,1173000.0
2003,24,16,193091.87


In [11]:
# Como os dois DataFrames contém o campo denominado 'valor', as colunas são selecionadas com um alias para as colunas quantidade e valor_em_reais. Esta linha poderia ter sido adicionada na célula acima, mas foi colocada separada para facilitar a compreensão.
rouanet_df = rouanet_df_tmp.select(col('ano'), col('estado_ibge'), rouanet_qtd_df_table.valor.alias("quantidade"), rouanet_valor_df_table.valor.alias("valor_em_reais"))
display(rouanet_df)

ano,estado_ibge,quantidade,valor_em_reais
2003,11,4,254300.0
2003,12,2,0.0
2003,13,6,300000.0
2003,14,1,30000.0
2003,15,23,5842618.0
2003,16,3,0.0
2003,17,5,81220.0
2003,21,31,893896.0
2003,22,13,1173000.0
2003,24,16,193091.87


Pelo código do IBGE não é possível identificar, de forma fácil, para qual Estado foi liberada a verba da Lei Rouanet. Por isso, será adicionado um arquivo do Censo do IBGE de cada Estado que irá facilitar esta identificação e, desta forma, permitir explorar algumas características dos Estados que digam o porquê de alguns receberem mais verbas que outros.

In [13]:
%sh 
# Baixa o arquivo com os dados do Censo do IBGE de cada Estado do Brasil
if [ ! -f /databricks/driver/censo_estado.csv ]; then 
  wget https://raw.githubusercontent.com/savioteles/big_data/master/etl/datasets/censo_estado.csv
fi
ls /databricks/driver

In [14]:
# lê os dados do Censo de cada estado
censo_df = spark.read.csv("file:///databricks/driver/censo_estado.csv", header='true', inferSchema='true')
display(censo_df)

estado,pib,codigo,area,população,idh,receita,despesa
Rondônia,39450587,11,237765233,1757589,690,9122310.72305,7085530.0168
Acre,13751126,12,164123738,869265,663,6632883.10836,6084416.8063
Amazonas,89017165,13,1559168117,4080611,674,17328459.43402,15324896.55705
Roraima,11011454,14,224273831,576568,707,4419450.41557,3384683.73914
Pará,138068008,15,1245759305,8513497,646,25849446.10454,22533470.04547
Amapá,14338838,16,142470762,829494,708,5396417.14471,4224464.08829
Tocantins,31575831,17,277720404,1555229,699,10305099.01288,8929456.43836
Maranhão,85286226,21,329642170,7035055,639,18503261.35491,17627170.75574
Piauí,41405815,22,251616823,3264531,646,12124215.61511,9676736.31835
Ceará,138378785,23,148894757,9075649,682,28420222.47191,24608352.18276


In [15]:
#mostra a lista de Estados do Brasil
display(censo_df.select('estado').distinct())

estado
Santa Catarina
Mato Grosso do Sul
Goiás
Mato Grosso
Ceará
Piauí
Espírito Santo
Paraná
Alagoas
Bahia


Será necessário padronizar os campos de receita e despesa para ficarem com duas casas decimais depois da vírgula. O código abaixo sozinho faz esta transformação, utilizando a função [format_number](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read%20csv#pyspark.sql.functions.format_number) para converter para duas casas depois da vírgula e depois removendo as vírgulas. Mas, para fins didáticos, esta transformação será feita em duas etapas nas próximas duas células.

~~~
censo_df = censo_df.withColumn("receita", regexp_replace(format_number("receita", 2), ",", "").cast('float'))
censo_df = censo_df.withColumn("despesa", regexp_replace(format_number("despesa", 2), ",", "").cast('float'))
~~~

In [17]:
# a função format_number transforma as colunas receita e despesa em String com vírgulas separando o número.
censo_df = censo_df.withColumn("receita", format_number("receita", 2))
censo_df = censo_df.withColumn("despesa", format_number("despesa", 2))
display(censo_df)

estado,pib,codigo,area,população,idh,receita,despesa
Rondônia,39450587,11,237765233,1757589,690,9122310.72,7085530.02
Acre,13751126,12,164123738,869265,663,6632883.11,6084416.81
Amazonas,89017165,13,1559168117,4080611,674,17328459.43,15324896.56
Roraima,11011454,14,224273831,576568,707,4419450.42,3384683.74
Pará,138068008,15,1245759305,8513497,646,25849446.1,22533470.05
Amapá,14338838,16,142470762,829494,708,5396417.14,4224464.09
Tocantins,31575831,17,277720404,1555229,699,10305099.01,8929456.44
Maranhão,85286226,21,329642170,7035055,639,18503261.35,17627170.76
Piauí,41405815,22,251616823,3264531,646,12124215.62,9676736.32
Ceará,138378785,23,148894757,9075649,682,28420222.47,24608352.18


In [18]:
# Como se quer a receita e despesa como Double, remove-se as vírgulas e transforma o campo para 'double'
censo_df = censo_df.withColumn("receita", regexp_replace("receita", ",", "").cast('double'))
censo_df = censo_df.withColumn("despesa", regexp_replace("despesa", ",", "").cast('double'))
display(censo_df)

estado,pib,codigo,area,população,idh,receita,despesa
Rondônia,39450587,11,237765233,1757589,690,9122310.72,7085530.02
Acre,13751126,12,164123738,869265,663,6632883.11,6084416.81
Amazonas,89017165,13,1559168117,4080611,674,17328459.43,15324896.56
Roraima,11011454,14,224273831,576568,707,4419450.42,3384683.74
Pará,138068008,15,1245759305,8513497,646,25849446.1,22533470.05
Amapá,14338838,16,142470762,829494,708,5396417.14,4224464.09
Tocantins,31575831,17,277720404,1555229,699,10305099.01,8929456.44
Maranhão,85286226,21,329642170,7035055,639,18503261.35,17627170.76
Piauí,41405815,22,251616823,3264531,646,12124215.62,9676736.32
Ceará,138378785,23,148894757,9075649,682,28420222.47,24608352.18


In [19]:
# faz a junção da base de dados rouanet com o Censo do IBGE. Remove a coluna 'codigo' do Censo porque ela já está presente como estado_ibge na rouanet
rouanet_censo_df = censo_df.join(rouanet_df, censo_df.codigo == rouanet_df.estado_ibge).drop('codigo')
display(rouanet_censo_df)

estado,pib,area,população,idh,receita,despesa,ano,estado_ibge,quantidade,valor_em_reais
Rondônia,39450587,237765233,1757589,690,9122310.72,7085530.02,2003,11,4,254300.0
Acre,13751126,164123738,869265,663,6632883.11,6084416.81,2003,12,2,0.0
Amazonas,89017165,1559168117,4080611,674,17328459.43,15324896.56,2003,13,6,300000.0
Roraima,11011454,224273831,576568,707,4419450.42,3384683.74,2003,14,1,30000.0
Pará,138068008,1245759305,8513497,646,25849446.1,22533470.05,2003,15,23,5842618.0
Amapá,14338838,142470762,829494,708,5396417.14,4224464.09,2003,16,3,0.0
Tocantins,31575831,277720404,1555229,699,10305099.01,8929456.44,2003,17,5,81220.0
Maranhão,85286226,329642170,7035055,639,18503261.35,17627170.76,2003,21,31,893896.0
Piauí,41405815,251616823,3264531,646,12124215.62,9676736.32,2003,22,13,1173000.0
Rio Grande do Norte,59660847,52809602,3479010,684,13527552.73,11330957.55,2003,24,16,193091.87


In [20]:
# Pra qual estado foi o maior aporte em reais na história da Lei Rouanet?
display(rouanet_censo_df.sort(desc('valor_em_reais')).limit(10))

estado,pib,area,população,idh,receita,despesa,ano,estado_ibge,quantidade,valor_em_reais
São Paulo,2038004931,248219481,45538936,783,232822496.57,231982243.69,2011,35,2586,571786989.08
São Paulo,2038004931,248219481,45538936,783,232822496.57,231982243.69,2012,35,2089,563265073.69
São Paulo,2038004931,248219481,45538936,783,232822496.57,231982243.69,2013,35,2062,506570766.96
São Paulo,2038004931,248219481,45538936,783,232822496.57,231982243.69,2010,35,2385,492106132.72
São Paulo,2038004931,248219481,45538936,783,232822496.57,231982243.69,2007,35,1847,442010567.973
São Paulo,2038004931,248219481,45538936,783,232822496.57,231982243.69,2008,35,2055,415104537.89
São Paulo,2038004931,248219481,45538936,783,232822496.57,231982243.69,2009,35,1866,399415971.79
São Paulo,2038004931,248219481,45538936,783,232822496.57,231982243.69,2006,35,2013,360753874.16
Rio de Janeiro,640185780,43750423,17159960,761,78488140.79,67965548.7,2011,33,1908,351462546.21
Rio de Janeiro,640185780,43750423,17159960,761,78488140.79,67965548.7,2012,33,1396,333485175.72


In [21]:
# Pra qual estado foi a maior quantidade de projetos aprovados na história da Lei Rouanet?
display(rouanet_censo_df.sort(desc('quantidade')).take(1))

estado,pib,area,população,idh,receita,despesa,ano,estado_ibge,quantidade,valor_em_reais
São Paulo,2038004931,248219481,45538936,783,232822496.57,231982243.69,2011,35,2586,571786989.08


In [22]:
# Qual o Top 10 dos estados na história da Lei Rouanet do valor arrecadado?
display(rouanet_censo_df.groupBy('estado').agg(sum('valor_em_reais').alias('valor_em_reais')).sort(desc('valor_em_reais')).limit(10))

estado,valor_em_reais
São Paulo,4425725466.993
Rio de Janeiro,2622336176.1830006
Minas Gerais,1091452140.312
Rio Grande do Sul,615124741.1600001
Paraná,328403877.74300003
Santa Catarina,223062075.301
Distrito Federal,199521569.887
Pernambuco,183494522.733
Bahia,181465406.304
Ceará,121966177.32999998


In [23]:
# Qual o Top 10 dos estados na história da Lei Rouanet pelo número de projetos aprovados?
display(rouanet_censo_df.groupBy('estado').agg(sum('quantidade').alias('quantidade')).orderBy(desc('quantidade')).limit(10))

estado,quantidade
São Paulo,21366
Rio de Janeiro,15950
Minas Gerais,7877
Rio Grande do Sul,4693
Paraná,3857
Santa Catarina,2813
Bahia,2396
Distrito Federal,2093
Pernambuco,1394
Ceará,1195


In [24]:
# Como foi a evolução dos gastos com a Lei Rouanet ao longo do tempo?
display(rouanet_censo_df.groupBy('ano').agg(sum('valor_em_reais').alias('valor_em_reais')).sort('ano'))

ano,valor_em_reais
2003,430893947.1
2004,511748401.998
2005,726713080.3899999
2006,854148869.477
2007,989889897.826
2008,963631156.533
2009,980040805.72
2010,1166472941.7210002
2011,1324139793.7800002
2012,1274183507.3020003


In [25]:
# Qual o Top 10 dos PIB dos Estados? Existe algum relação com o valor disponibilizado pela Lei Rouanet? Qual estado não está no Top 10 da Lei Rouanet?
display(rouanet_censo_df.groupBy('estado').agg(min('area').alias('area')).sort(desc('area')).limit(5))

estado,area
Amazonas,1559168117
Pará,1245759305
Mato Grosso,903206997
Minas Gerais,586521121
Bahia,564722611


In [26]:
# Qual o Top 10 da população dos Estados? Existe algum relação com o valor disponibilizado pela Lei Rouanet? Qual estado não estão no Top 10 da Lei Rouanet?
display(rouanet_censo_df.groupBy('estado').agg(min('area').alias('area')).sort(desc('area')).limit(10))

estado,area
Amazonas,1559168117
Pará,1245759305
Mato Grosso,903206997
Minas Gerais,586521121
Bahia,564722611
Mato Grosso do Sul,357145535
Goiás,340125715
Maranhão,329642170
Rio Grande do Sul,281707151
Tocantins,277720404


In [27]:
# Qual o Top 10 da área geográfica dos Estados? Existe algum relação com o valor disponibilizado pela Lei Rouanet? Quais estados não estão no Top 10 da Lei Rouanet?
display(rouanet_censo_df.groupBy('estado').agg(min('area').alias('area')).sort(desc('area')).limit(10))

estado,area
Amazonas,1559168117
Pará,1245759305
Mato Grosso,903206997
Minas Gerais,586521121
Bahia,564722611
Mato Grosso do Sul,357145535
Goiás,340125715
Maranhão,329642170
Rio Grande do Sul,281707151
Tocantins,277720404


In [28]:
# Qual o Top 10 dos estados da Lei Rouanet do valor arrecadado durante o governo do ex-presidente Lula?
display(rouanet_censo_df.filter(col('ano') < 2011).groupBy('estado').agg(sum('valor_em_reais').alias('valor_em_reais')).sort(desc('valor_em_reais')).limit(10))

estado,valor_em_reais
São Paulo,2784102637.2630005
Rio de Janeiro,1648347792.913
Minas Gerais,716345134.2119999
Rio Grande do Sul,407263892.33
Paraná,183689181.593
Distrito Federal,148606638.33699998
Bahia,140898466.444
Pernambuco,126199409.133
Santa Catarina,120714891.101
Ceará,80660075.22999999


## Considerações Finais

Neste tutorial foram alguns exemplos de Transformação de Dados no fluxo de ETL utilizando o Apache Spark para facilitar a compreensão de funções de transformação de dados. Algumas análises foram exploradas para facilitar o entendimento das funções disponíveis para Transformação de Dados, mas você pode utilizar estas funções no seu processo de ETL. Outras funções de transformação de dados com o Spark podem ser encontradas na [documentação](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions).

Você poderá encontrar vários outros exemplos de ETL em Big Data com outras ferramentas, como Apache Sqoop, Apache Flume, Apache Kafka no meu [repositório do github](https://github.com/savioteles/big_data). Neste repositório você vai encontrar os códigos e um tutorial de como executar cada estudo de caso.