# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session #objeto spark dentro do contexto do glue

job = Job(glueContext)
job.init(args['JOB_NAME'], args)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.3 
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::021090348946:role/fia-datasus-covid_dataset-role
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: 57a5091f-554a-445f-9cd8-1fa4ce07b07a
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.3
--enable-glue-datacatalog true
Waiting for session 

In [None]:
# read files from raw-data
df_covid = spark.read.parquet("s3://data-lake-fia/raw-data/datasus-imunizacao")

In [2]:
df_covid.printSchema()

root
 |-- _index: string (nullable = true)
 |-- _type: string (nullable = true)
 |-- _id: string (nullable = true)
 |-- _score: double (nullable = true)
 |-- _source: struct (nullable = true)
 |    |-- @timestamp: string (nullable = true)
 |    |-- @version: string (nullable = true)
 |    |-- co_condicao_maternal: long (nullable = true)
 |    |-- data_importacao_datalake: string (nullable = true)
 |    |-- data_importacao_rnds: string (nullable = true)
 |    |-- document_id: string (nullable = true)
 |    |-- ds_condicao_maternal: string (nullable = true)
 |    |-- dt_deleted: string (nullable = true)
 |    |-- estabelecimento_municipio_codigo: string (nullable = true)
 |    |-- estabelecimento_municipio_nome: string (nullable = true)
 |    |-- estabelecimento_razaoSocial: string (nullable = true)
 |    |-- estabelecimento_uf: string (nullable = true)
 |    |-- estabelecimento_valor: string (nullable = true)
 |    |-- estalecimento_noFantasia: string (nullable = true)
 |    |-- id_sist

In [73]:
# change struct field to separeted columns
df_covid_final = (
    df_covid
    .select('_index',
            '_type',
            '_id',
            '_score',
            "_source.@timestamp",
            "_source.@version",
            "_source.co_condicao_maternal",
            "_source.data_importacao_datalake",
            "_source.data_importacao_rnds",
            "_source.document_id",
            "_source.ds_condicao_maternal",
            "_source.dt_deleted",
            "_source.estabelecimento_municipio_codigo",
            "_source.estabelecimento_municipio_nome",
            "_source.estabelecimento_razaoSocial",
            "_source.estabelecimento_uf",
            "_source.estabelecimento_valor",
            "_source.estalecimento_noFantasia",
            "_source.id_sistema_origem",
            "_source.paciente_dataNascimento",
            "_source.paciente_endereco_cep",
            "_source.paciente_endereco_coIbgeMunicipio",
            "_source.paciente_endereco_coPais",
            "_source.paciente_endereco_nmMunicipio",
            "_source.paciente_endereco_nmPais",
            "_source.paciente_endereco_uf",
            "_source.paciente_enumSexoBiologico",
            "_source.paciente_id",
            "_source.paciente_idade",
            "_source.paciente_nacionalidade_enumNacionalidade",
            "_source.paciente_racaCor_codigo",
            "_source.paciente_racaCor_valor",
            "_source.sistema_origem",
            "_source.status",
            "_source.vacina_categoria_codigo",
            "_source.vacina_categoria_nome",
            "_source.vacina_codigo",
            "_source.vacina_dataAplicacao",
            "_source.vacina_descricao_dose",
            "_source.vacina_fabricante_nome",
            "_source.vacina_fabricante_referencia",
            "_source.vacina_grupoAtendimento_codigo",
            "_source.vacina_grupoAtendimento_nome",
            "_source.vacina_lote",
            "_source.vacina_nome",
            "_source.vacina_numDose",
           )
)




In [74]:
# write parquet final into context tier
(
    df_covid_final
    .write
    .mode("overwrite")
    .format("parquet")
    .save("s3://data-lake-fia/context/datasus_db/covid_dataset/")
)




In [75]:
job.commit()

NameError: name 'job' is not defined
