In [7]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from datetime import datetime
import boto3
import json

In [2]:
conf = SparkConf()
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.2,com.databricks:spark-xml_2.12:0.14.0')
conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'com.amazonaws.auth.InstanceProfileCredentialsProvider')
conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')

<pyspark.conf.SparkConf at 0x7f6934fe1450>

In [8]:
secret_name = "spark/trusted"
region_name = "us-east-1"

session = boto3.session.Session()
client = session.client(
    service_name='secretsmanager',
    region_name=region_name
)
get_secret_value_response = client.get_secret_value(SecretId=secret_name)
secret = get_secret_value_response['SecretString']
secret = json.loads(secret)['TRUSTED_DATA_SECRET']

In [4]:
spark = SparkSession.builder\
    .appName('autism-files')\
    .config(conf=conf)\
    .getOrCreate()

:: loading settings :: url = jar:file:/usr/local/lib/python3.7/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.databricks#spark-xml_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-27327850-f273-488b-984b-071440eeff32;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.2.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.563 in central
	found com.databricks#spark-xml_2.12;0.14.0 in central
	found commons-io#commons-io;2.8.0 in central
	found org.glassfish.jaxb#txw2;2.3.4 in central
	found org.apache.ws.xmlschema#xmlschema-core;2.2.5 in central
:: resolution report :: resolve 587ms :: artifacts dl 36ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.563 from central in [default]
	com.databricks#spark-xml_2.12;0.14.0 from central in [default]
	commons-io#commons-io;2.8.0 from central in [default]
	org.apache.hadoop#hadoop-aws;3.2.2 from central in [default]
	org.apache.ws.xml

22/10/15 23:32:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
execution_date = '27-05-2022'

In [6]:
df_xml = spark.read.format("com.databricks.spark.xml")\
    .option("rootTag", "data") \
    .option("rowTag", "row") \
    .load(f"s3a://projeto-pi-datalake-raw/{execution_date}-autismo_xml.xml")

22/10/15 23:32:13 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


                                                                                

In [7]:
df_json = (
    spark.read.json(f"s3a://projeto-pi-datalake-raw/{execution_date}-autismo_json.json", multiLine=True)
)

                                                                                

In [8]:
df_csv = (
    spark.read.csv(f"s3a://projeto-pi-datalake-raw/{execution_date}-autismo_csv.csv", header=True)
)

In [9]:
df_xml.show(truncate=False, vertical=True)

-RECORD 0--------------------------------------
 PIB_nacional                 | 8700000000000  
 PIB_percent_BR               | 13.9           
 PIB_regional                 | 15779110       
 area_regiao                  | 576736819      
 densidade_demografica_regiao | 36.06          
 gentilico_regiao             | nordestino     
 identificador                | 1              
 pais                         | Brasil         
 populacao_regiao             | 56560081       
 raca_predominante_regiao     | parda          
 regiao                       | Nordeste       
-RECORD 1--------------------------------------
 PIB_nacional                 | 8700000000000  
 PIB_percent_BR               | 5.3            
 PIB_regional                 | 1195000000000  
 area_regiao                  | 3853676948     
 densidade_demografica_regiao | 4.12           
 gentilico_regiao             | nortista       
 identificador                | 2              
 pais                         | Brasil  

In [10]:
df_json.head()

                                                                                

Row(classe='E', classe-descricao='Baixa classe media', empregado=False, id_paciente=1, per_capita=311.0, quantidade_familia=6, total_salario=1866)

In [11]:
df_csv.head()

Row(id_paciente='1', idade_atual='28', idade_descoberta='14', genero='M', grau='moderado', sensibilidade_sentidos='False', agressivo='False', hiperativo='False', movimentos_repetitivos='False', baixa_concentracao='False', hiperfoco='False', necessidade_rotina='True', dificuldade_imaginacao='False', introvertido='False', tipo_autismo='Sindrome de Asperger', tipo_autismo_descricao='TEA é agora o termo genérico para o grupo de transtornos de neurodesenvolvimento complexos que constituem o autismo. Os sintomas do autismo costumam estar presentes desde a primeira infância e podem afetar o funcionamento diário. Os sintomas do TEA geralmente aparecem nos primeiros 2 anos de vida.')

In [12]:
df_joined = (
    df_csv
        .join(df_json, 'id_paciente')
        .join(df_xml, df_csv.id_paciente == df_xml.identificador)
        .withColumn('year', f.lit(int(execution_date[6:11]))) # 2022
        .withColumn('month', f.lit(int(execution_date[3:5]))) # 05
        .withColumn('day', f.lit(int(execution_date[0:2])))   # 27
        .withColumn('id_paciente', f.expr(f"hex(aes_encrypt(id_paciente, '{secret}'))"))
        .withColumn('genero', f.expr(f"hex(aes_encrypt(genero, '{secret}'))"))
        .withColumn('regiao',  f.expr(f"hex(aes_encrypt(genero, '{secret}'))"))
        .drop('identificador')
)

In [13]:
df_joined.select('id_paciente').head()

                                                                                

Row(id_paciente='D2641FE662F883B97AD3E65CC0575897A5F3B48843E2D67BB4623A58F0C9')

In [14]:
df_joined.printSchema()

root
 |-- id_paciente: string (nullable = true)
 |-- idade_atual: string (nullable = true)
 |-- idade_descoberta: string (nullable = true)
 |-- genero: string (nullable = true)
 |-- grau: string (nullable = true)
 |-- sensibilidade_sentidos: string (nullable = true)
 |-- agressivo: string (nullable = true)
 |-- hiperativo: string (nullable = true)
 |-- movimentos_repetitivos: string (nullable = true)
 |-- baixa_concentracao: string (nullable = true)
 |-- hiperfoco: string (nullable = true)
 |-- necessidade_rotina: string (nullable = true)
 |-- dificuldade_imaginacao: string (nullable = true)
 |-- introvertido: string (nullable = true)
 |-- tipo_autismo: string (nullable = true)
 |-- tipo_autismo_descricao: string (nullable = true)
 |-- classe: string (nullable = true)
 |-- classe-descricao: string (nullable = true)
 |-- empregado: boolean (nullable = true)
 |-- per_capita: double (nullable = true)
 |-- quantidade_familia: long (nullable = true)
 |-- total_salario: long (nullable = true

In [15]:
df_joined.write.mode('overwrite')\
    .partitionBy('year', 'month', 'day').parquet('s3a://projeto-pi-datalake-trusted/autism-region')

22/10/15 23:33:08 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


                                                                                

In [16]:
spark.sparkContext.stop()

In [17]:
spark.stop()