In [552]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

In [470]:
# esta sessao vai servir para todos os exercicios resolvidos abaixo
spark=SparkSession.builder.appName("experian").getOrCreate()

### 1. Ler CSV Viagens e gerar output Parquet

In [598]:
df = spark.read.format('csv').options(header='true', inferSchema='true', delimiter = ";").load('/viagens_csv/2020_Viagem.csv')

In [599]:
df.printSchema()

root
 |-- Identificador do processo de viagem: integer (nullable = true)
 |-- Situa��o: string (nullable = true)
 |-- C�digo do �rg�o superior: integer (nullable = true)
 |-- Nome do �rg�o superior: string (nullable = true)
 |-- C�digo �rg�o solicitante: integer (nullable = true)
 |-- Nome �rg�o solicitante: string (nullable = true)
 |-- CPF viajante: string (nullable = true)
 |-- Nome: string (nullable = true)
 |-- Cargo: string (nullable = true)
 |-- Per�odo - Data de in�cio: string (nullable = true)
 |-- Per�odo - Data de fim: string (nullable = true)
 |-- Destinos: string (nullable = true)
 |-- Motivo: string (nullable = true)
 |-- Valor di�rias: string (nullable = true)
 |-- Valor passagens: string (nullable = true)
 |-- Valor outros gastos: string (nullable = true)



In [600]:
# Substituindo cada espaço em branco por underscore do nome de cada coluna 
# para salvar os dados no formato parquet

for col in df.columns:
    df = df.withColumnRenamed(col,col.replace(" ", "_"))

In [601]:
df.columns

['Identificador_do_processo_de_viagem',
 'Situa��o',
 'C�digo_do_�rg�o_superior',
 'Nome_do_�rg�o_superior',
 'C�digo_�rg�o_solicitante',
 'Nome_�rg�o_solicitante',
 'CPF_viajante',
 'Nome',
 'Cargo',
 'Per�odo_-_Data_de_in�cio',
 'Per�odo_-_Data_de_fim',
 'Destinos',
 'Motivo',
 'Valor_di�rias',
 'Valor_passagens',
 'Valor_outros_gastos']

In [None]:

df.write.parquet("/resultados/res_1/output.parquet")

### 2. Ler CSV Viagens e gerar output XML

In [568]:
# Este exercicio foi feito no pyspark shell pois o jupyter-notebbok 
# aprensntou erros na importação de pacotes do spark-xml
# Todo o processo se encontra na pastas /csv_to_xml deste repositorio

### 3. Ler CSV Viagens, ordenar por nome do órgão superior e nome do órgão solicitante em ordem decrescente. Gerar JSON do resultado.

In [602]:
# lendo arquivo .csv
df = spark.read.format('csv').options(header='true', inferSchema='true', delimiter = ";").\
load('/viagens_csv/2020_Viagem.csv')
df.printSchema()

root
 |-- Identificador do processo de viagem: integer (nullable = true)
 |-- Situa��o: string (nullable = true)
 |-- C�digo do �rg�o superior: integer (nullable = true)
 |-- Nome do �rg�o superior: string (nullable = true)
 |-- C�digo �rg�o solicitante: integer (nullable = true)
 |-- Nome �rg�o solicitante: string (nullable = true)
 |-- CPF viajante: string (nullable = true)
 |-- Nome: string (nullable = true)
 |-- Cargo: string (nullable = true)
 |-- Per�odo - Data de in�cio: string (nullable = true)
 |-- Per�odo - Data de fim: string (nullable = true)
 |-- Destinos: string (nullable = true)
 |-- Motivo: string (nullable = true)
 |-- Valor di�rias: string (nullable = true)
 |-- Valor passagens: string (nullable = true)
 |-- Valor outros gastos: string (nullable = true)



In [606]:
# ordenando decrescente as colunas em [cols]
cols = ['Nome do �rg�o superior', 'Nome �rg�o solicitante']
df3 = df.orderBy(cols, ascending=False)


In [131]:
# salvando em 3 arquivos coalesce(3)
df3.coalesce(3).write.json("/resultados/res_3/2020_Viagem.json")

### 4. Ler CSV Viagens e criar coluna "Valor Total" (Valor diária + Valor Passagens + Valor Outros)

In [610]:
# lendo arquivo .csv
df = spark.read.format('csv').options(header='true', inferSchema='true', delimiter = ";") \
.load('/viagens_csv/2020_Viagem.csv')

In [611]:
df.printSchema()

root
 |-- Identificador do processo de viagem: integer (nullable = true)
 |-- Situa��o: string (nullable = true)
 |-- C�digo do �rg�o superior: integer (nullable = true)
 |-- Nome do �rg�o superior: string (nullable = true)
 |-- C�digo �rg�o solicitante: integer (nullable = true)
 |-- Nome �rg�o solicitante: string (nullable = true)
 |-- CPF viajante: string (nullable = true)
 |-- Nome: string (nullable = true)
 |-- Cargo: string (nullable = true)
 |-- Per�odo - Data de in�cio: string (nullable = true)
 |-- Per�odo - Data de fim: string (nullable = true)
 |-- Destinos: string (nullable = true)
 |-- Motivo: string (nullable = true)
 |-- Valor di�rias: string (nullable = true)
 |-- Valor passagens: string (nullable = true)
 |-- Valor outros gastos: string (nullable = true)



In [612]:
# substituindo virgula por ponto nas colunas [cols] da tabela
cols = ['Valor di�rias','Valor passagens', 'Valor outros gastos' ]
for col_name in df.columns:    
    if col_name in cols:
        df = df.withColumn(col_name, regexp_replace(col_name, ',', '.').cast('float'))


In [613]:
# visulizando o esquema da tabela e as 3 primeiras linhas das 3 ultimas colunas da tabela
df.printSchema()
df.select(df[-3],df[-2],df[-1]).show(3)

root
 |-- Identificador do processo de viagem: integer (nullable = true)
 |-- Situa��o: string (nullable = true)
 |-- C�digo do �rg�o superior: integer (nullable = true)
 |-- Nome do �rg�o superior: string (nullable = true)
 |-- C�digo �rg�o solicitante: integer (nullable = true)
 |-- Nome �rg�o solicitante: string (nullable = true)
 |-- CPF viajante: string (nullable = true)
 |-- Nome: string (nullable = true)
 |-- Cargo: string (nullable = true)
 |-- Per�odo - Data de in�cio: string (nullable = true)
 |-- Per�odo - Data de fim: string (nullable = true)
 |-- Destinos: string (nullable = true)
 |-- Motivo: string (nullable = true)
 |-- Valor di�rias: float (nullable = true)
 |-- Valor passagens: float (nullable = true)
 |-- Valor outros gastos: float (nullable = true)

+-------------+---------------+-------------------+
|Valor di�rias|Valor passagens|Valor outros gastos|
+-------------+---------------+-------------------+
|     18549.02|        6903.68|             167.65|
|        630

In [614]:
#Coluna 'Valor Total' criada somamdo as tres ultimas colunas 
#da tabela (Valor diária + Valor Passagens + Valor Outros)
df4 = df.withColumn('Valor Total', round((df[-3]+df[-2]+df[-1]),2))

In [617]:
# visulizando as 10 primeiras linhas das 4 ultimas colunas da tabela

df4.select(df4[-4],df4[-3],df4[-2],df4[-1]).show(10)

+-------------+---------------+-------------------+-----------+
|Valor di�rias|Valor passagens|Valor outros gastos|Valor Total|
+-------------+---------------+-------------------+-----------+
|     18549.02|        6903.68|             167.65|   25620.35|
|        630.8|            0.0|                0.0|      630.8|
|          0.0|            0.0|                0.0|        0.0|
|          0.0|         221.85|                0.0|     221.85|
|          0.0|         964.91|                0.0|     964.91|
|          0.0|        5224.87|                0.0|    5224.87|
|          0.0|         442.23|                0.0|     442.23|
|          0.0|        3474.44|            1243.13|    4717.57|
|          0.0|         826.56|                0.0|     826.56|
|          0.0|         1230.4|                0.0|     1230.4|
+-------------+---------------+-------------------+-----------+
only showing top 10 rows



In [618]:
# padronizar eh legal ! :) substitui espaços em branco por underscore
for col in df4.columns:
    df4 = df4.withColumnRenamed(col,col.replace(" ", "_"))
    
# salvando com estilo particionado
df4.coalesce(1).write.csv('/resultados/res_4/inserindo_valor_total.csv',header=True)

# salvando um arquivo estilo pandas
df4.toPandas().to_csv('/resultados/res_4/inserindo_valor_total_pandas.csv',index=False)

### 5. Ler CSV e apresentar o total gasto por orgao solicitante onde a Situação for igual a Realizada (utilizar a coluna valor total que foi criada no item 4)

In [620]:
df = spark.read.format('csv').options(header='true', inferSchema='true', delimiter = ";") \
.load('/viagens_csv/2020_Viagem.csv')

df.printSchema()

root
 |-- Identificador do processo de viagem: integer (nullable = true)
 |-- Situa��o: string (nullable = true)
 |-- C�digo do �rg�o superior: integer (nullable = true)
 |-- Nome do �rg�o superior: string (nullable = true)
 |-- C�digo �rg�o solicitante: integer (nullable = true)
 |-- Nome �rg�o solicitante: string (nullable = true)
 |-- CPF viajante: string (nullable = true)
 |-- Nome: string (nullable = true)
 |-- Cargo: string (nullable = true)
 |-- Per�odo - Data de in�cio: string (nullable = true)
 |-- Per�odo - Data de fim: string (nullable = true)
 |-- Destinos: string (nullable = true)
 |-- Motivo: string (nullable = true)
 |-- Valor di�rias: string (nullable = true)
 |-- Valor passagens: string (nullable = true)
 |-- Valor outros gastos: string (nullable = true)



In [621]:
df = df.withColumnRenamed('Situa��o','Situacao')\
        .withColumnRenamed('C�digo do �rg�o superior','Codigo_do_orgao_superior')\
        .withColumnRenamed('Nome do �rg�o superior','Nome_do_orgao_superior')\
        .withColumnRenamed('C�digo �rg�o solicitante','Codigo_orgao_solicitante')\
        .withColumnRenamed('Nome �rg�o solicitante','Nome_orgao_solicitante')\
        .withColumnRenamed('Per�odo - Data de in�cio','Periodo_-_Data_de_inicio')\
        .withColumnRenamed('Per�odo - Data de fim','Periodo_-_Data_de_fim')\
        .withColumnRenamed('Valor di�rias','Valor_diarias')

In [622]:
cols = ['Valor_diarias','Valor passagens', 'Valor outros gastos' ]
for col_name in df.columns:    
    if col_name in cols:
        df = df.withColumn(col_name, regexp_replace(col_name, ',', '.').cast('float'))
df = df.withColumn('Valor Total', round((df[-3]+df[-2]+df[-1]),2))
# df.select(df[-4],df[-3],df[-2],df[-1]).show(3)


In [623]:
for col in df.columns:
    df = df.withColumnRenamed(col,col.replace(" ", "_"))
df.printSchema()

root
 |-- Identificador_do_processo_de_viagem: integer (nullable = true)
 |-- Situacao: string (nullable = true)
 |-- Codigo_do_orgao_superior: integer (nullable = true)
 |-- Nome_do_orgao_superior: string (nullable = true)
 |-- Codigo_orgao_solicitante: integer (nullable = true)
 |-- Nome_orgao_solicitante: string (nullable = true)
 |-- CPF_viajante: string (nullable = true)
 |-- Nome: string (nullable = true)
 |-- Cargo: string (nullable = true)
 |-- Periodo_-_Data_de_inicio: string (nullable = true)
 |-- Periodo_-_Data_de_fim: string (nullable = true)
 |-- Destinos: string (nullable = true)
 |-- Motivo: string (nullable = true)
 |-- Valor_diarias: float (nullable = true)
 |-- Valor_passagens: float (nullable = true)
 |-- Valor_outros_gastos: float (nullable = true)
 |-- Valor_Total: float (nullable = true)



In [625]:
# substituindo valores Nulls por 'fill value' quando a coluna for de string ou por zero qdo for de float
resultDF = df.na.fill('fill value').na.fill(0).filter(df.Situacao == "Realizada")\
  .select('Codigo_orgao_solicitante', 'Valor_Total')\
  .groupBy('Codigo_orgao_solicitante')\
  .agg({'Valor_Total' : 'sum'}).withColumnRenamed('sum(Valor_Total)','Valor_Total')\
  .orderBy('Codigo_orgao_solicitante', ascending=True)
# apresentando o total gasto por orgao solicitante onde a Situação for igual a Realizada
df5 = resultDF.withColumn('Valor_Total', round(resultDF['Valor_Total'],2))
df5.show(10)

+------------------------+-------------+
|Codigo_orgao_solicitante|  Valor_Total|
+------------------------+-------------+
|                      -1|1.093839526E7|
|                   20000|   2213453.27|
|                   20202|     69657.78|
|                   20203|     66003.76|
|                   20301|    134805.26|
|                   20402|    179919.72|
|                   20403|      6350.92|
|                   20404|      4185.87|
|                   20408|      7014.75|
|                   20411|     92416.98|
+------------------------+-------------+
only showing top 10 rows



In [596]:
# este bloco mostra a visualização dos dados como no bloco anterior 
# somente acrescentando o campo 'Nome do Orgao superior'
# resultDF = df.na.fill('fill value').na.fill(0).filter(df.Situacao == "Realizada")\
#   .select('Codigo_orgao_solicitante', 'Nome_orgao_solicitante', 'Valor_Total')\
#   .groupBy('Codigo_orgao_solicitante', 'Nome_orgao_solicitante')\
#   .agg({'Valor_Total' : 'sum'}).withColumnRenamed('sum(Valor_Total)','Valor_Total')\
#   .orderBy('Codigo_orgao_solicitante', ascending=True)
# # apresentando o total gasto por orgao solicitante onde a Situação for igual a Realizada
# result5DF = resultDF.withColumn('Valor_Total', round(resultDF['Valor_Total'],2))
# result5DF.show(200)

In [626]:
# salvando um arquivo estilo pandas
df5.toPandas().to_csv('/resultados/res_5/valor_total_org_solicitante.csv',index=False)

### 6. Ler CSV de viagens e de trechos. Selecionar os campos Id do processo de viagem, situação, nome do órgão superior, sequência trecho, destino - cidade. Salvar um JSON de outpout

In [538]:
df1 = spark.read.format('csv').options(header='true', inferSchema='true', delimiter = ";") \
.load('/viagens_csv/2020_Viagem.csv')
for col in df1.columns:
    df1 = df1.withColumnRenamed(col,col.replace(" ", "_"))


df2 = spark.read.format('csv').options(header='true', inferSchema='true', delimiter = ";") \
.load('/viagens_csv/2020_Trecho.csv')
for col in df2.columns:
    df2 = df2.withColumnRenamed(col,col.replace(" ", "_"))

In [539]:
df1.printSchema()

df2.printSchema()

root
 |-- Identificador_do_processo_de_viagem: integer (nullable = true)
 |-- Situa��o: string (nullable = true)
 |-- C�digo_do_�rg�o_superior: integer (nullable = true)
 |-- Nome_do_�rg�o_superior: string (nullable = true)
 |-- C�digo_�rg�o_solicitante: integer (nullable = true)
 |-- Nome_�rg�o_solicitante: string (nullable = true)
 |-- CPF_viajante: string (nullable = true)
 |-- Nome: string (nullable = true)
 |-- Cargo: string (nullable = true)
 |-- Per�odo_-_Data_de_in�cio: string (nullable = true)
 |-- Per�odo_-_Data_de_fim: string (nullable = true)
 |-- Destinos: string (nullable = true)
 |-- Motivo: string (nullable = true)
 |-- Valor_di�rias: string (nullable = true)
 |-- Valor_passagens: string (nullable = true)
 |-- Valor_outros_gastos: string (nullable = true)

root
 |-- Identificador_do_processo_de_viagem_: integer (nullable = true)
 |-- Sequ�ncia_Trecho: integer (nullable = true)
 |-- Origem_-_Data: string (nullable = true)
 |-- Origem_-_Pa�s: string (nullable = true)
 |--

In [627]:
df6 = df1.join(df2, df1.Identificador_do_processo_de_viagem  == df2.Identificador_do_processo_de_viagem_ )\
   .select(df1['Identificador_do_processo_de_viagem'].alias('ID_Do_Processo_De_Viagem'), \
           df1['Situa��o'].alias('Situacao'),\
           df1['Nome_do_�rg�o_superior'].alias('Nome_do_Orgao_superior'),\
           df2['Sequ�ncia_Trecho'].alias('Sequencia_Trecho'),\
           df2['Destino_-_Cidade'].alias('Destino_Cidade'))
df6.show(10)

+------------------------+-------------+----------------------+----------------+--------------+
|ID_Do_Processo_De_Viagem|     Situacao|Nome_do_Orgao_superior|Sequencia_Trecho|Destino_Cidade|
+------------------------+-------------+----------------------+----------------+--------------+
|                16415470|    Realizada|  Minist�rio da Edu...|               4|       Niter�i|
|                16415470|    Realizada|  Minist�rio da Edu...|               2|        Lisboa|
|                16415470|    Realizada|  Minist�rio da Edu...|               3|Rio de Janeiro|
|                16415470|    Realizada|  Minist�rio da Edu...|               1|Rio de Janeiro|
|                16441929|    Realizada|  Minist�rio da Defesa|               1|Rio de Janeiro|
|                16460934|    Realizada|  Minist�rio da Edu...|               2| Florian�polis|
|                16460934|    Realizada|  Minist�rio da Edu...|               1|        Faenza|
|                16465411|N�o realizada|

In [567]:
# salvando em 3 arquivos coalesce(3)
df6.coalesce(3).write.json("/resultados/res_6/Viagens_Trechos.json")