In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = (SparkSession.builder
             .master("spark://spark-master:7077") # Points to the Spark Cluster
             .appName('lab') # Name the app
             .config("hive.metastore.uris", "thrift://hive-metastore:9083") # Set external Hive Metastore
             .config("hive.metastore.warehouse.dir", "s3a://minio:9000/datalake/") # Set default warehouse dir (legacy) users/hive/warehouse
             .config("spark.sql.warehouse.dir", "s3a://minio:9000/datalake/") # Set default warehouse dir
             .config("hive.metastore.schema.verification", "false") # Prevent some errors
             .config("fs.defaultFS", "s3a://minio:9000/datalake/") # Set default file system into the HDFS namenode
             .config("spark.jars", "/opt/bitnami/spark/jars_external/hadoop-aws-3.3.4.jar,/opt/bitnami/spark/jars_external/aws-java-sdk-bundle-1.12.588.jar")
             .config("spark.sql.repl.eagerEval.enabled", True)
             .enableHiveSupport()
             .getOrCreate())

sc = spark.sparkContext

hdp_configs = {
    "fs.s3a.endpoint": "http://minio:9000",
    "fs.s3a.access.key": "minio",
    "fs.s3a.secret.key": "minioadmin",
    "fs.s3a.connection.timeout": "600000",
    "spark.sql.debug.maxToStringFields": "100",
    "fs.s3a.path.style.access": "true",
    "fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
    "fs.s3a.connection.ssl.enabled": "true"
}

for k,v in hdp_configs.items():
    spark.sparkContext._jsc.hadoopConfiguration().set(k, v)


In [10]:
spark.stop()

In [31]:
spark.sql("drop database silver cascade")

AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: java.net.UnknownHostException: hdfs-namenode)

In [2]:
spark.sql("show databases")

namespace
bronze
default
gold
silver
source


In [34]:
spark.sql("CREATE DATABASE IF NOT EXISTS source LOCATION 's3a://datalake/source/'")

In [37]:
spark.sql("CREATE DATABASE IF NOT EXISTS gold LOCATION 's3a://datalake/gold/'")

In [38]:
spark.sql("CREATE DATABASE IF NOT EXISTS silver LOCATION 's3a://datalake/silver/'")

In [39]:
spark.sql("CREATE DATABASE IF NOT EXISTS bronze LOCATION 's3a://datalake/bronze/'")

In [3]:
spark.sql("show tables from source").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|   source|    final|      false|
+---------+---------+-----------+



In [5]:
spark.sql("show tables from silver").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



In [35]:
schema = StructType([
    StructField('ano_cmpt', StringType(), True), 
    StructField('mes_cmpt', StringType(), True), 
    StructField('cgc_hosp', StringType(), True), 
    StructField('munic_res', StringType(), True), 
    StructField('nasc', DateType(), True), 
    StructField('sexo', StringType(), True), 
    StructField('uti_mes_to', StringType(), True), 
    StructField('uti_int_to', StringType(), True), 
    StructField('proc_rea', StringType(), True), 
    StructField('qt_proc', StringType(), True), 
    StructField('dt_atend', DateType(), True), 
    StructField('dt_saida', DateType(), True), 
    StructField('diag_princ', StringType(), True), 
    StructField('diag_secun', StringType(), True), 
    StructField('cobranca', StringType(), True), 
    StructField('natureza', StringType(), True), 
    StructField('gestao', StringType(), True), 
    StructField('munic_mov', StringType(), True), 
    StructField('cod_idade', StringType(), True), 
    StructField('idade', StringType(), True), 
    StructField('dias_perm', StringType(), True), 
    StructField('morte', StringType(), True), 
    StructField('cnes', StringType(), True), 
    StructField('fonte', StringType(), True), 
    StructField('modalidade', StringType(), True), 
    StructField('nome_uf', StringType(), True), 
    StructField('nome_municipio', StringType(), True), 
    StructField('regiao', StringType(), True), 
    StructField('idhm', FloatType(), True), 
    StructField('populacao_residente', IntegerType(), True), 
    StructField('area_unidade_territorial', FloatType(), True), 
    StructField('diag_princ_desc', StringType(), True), 
    StructField('diag_secun_desc', StringType(), True), 
    StructField('diag_princ_detalhes', StructType([
        StructField('sub_cat', StringType(), True), 
        StructField('classificacao', StringType(), True), 
        StructField('restr_sexo', StringType(), True), 
        StructField('causa_obito', StringType(), True), 
        StructField('descricao', StringType(), True), 
        StructField('desc_abrev', StringType(), True), 
        StructField('refer', StringType(), True), 
        StructField('excluidos', StringType(), True)
    ]), True), 
    StructField('diag_secun_detalhes', StructType([
        StructField('sub_cat', StringType(), True), 
        StructField('classificacao', StringType(), True), 
        StructField('restr_sexo', StringType(), True), 
        StructField('causa_obito', StringType(), True), 
        StructField('descricao', StringType(), True), 
        StructField('desc_abrev', StringType(), True), 
        StructField('refer', StringType(), True), 
        StructField('excluidos', StringType(), True)
    ]), True), 
    StructField('feriado', StringType(), True), 
    StructField('distancia_feriado', IntegerType(), True), 
    StructField('feriado_info', ArrayType(StructType([
        StructField('data', StringType(), True), 
        StructField('nome', StringType(), True), 
        StructField('tipo', StringType(), True), 
        StructField('descricao', StringType(), True), 
        StructField('uf', StringType(), True), 
        StructField('municipio', StringType(), True), 
        StructField('cod_municipio', StringType(), True)
    ]), True), True), 
    StructField('sigla', StringType(), True)
])

df_final = spark.createDataFrame([], schema = schema)

schema_str = ", ".join([f"{x[0]} {x[1]}" for x in df_final.drop("sigla").dtypes ])
spark.sql(f"CREATE EXTERNAL TABLE IF NOT EXISTS source.final ({schema_str}) USING PARQUET PARTITIONED BY (sigla string) LOCATION 's3a://datalake/source/final/'").show()

++
||
++
++



In [36]:
spark.sql("msck repair table source.final").show()

++
||
++
++



In [10]:
spark.table("source.final").groupBy("sigla").count()

sigla,count
SP,67515831
RS,9961350
MG,8143352
BA,5017618
SC,2861516
CE,906485
MS,659648
AL,522891
PA,567822
GO,503891


In [6]:
import pyspark.sql.types as T
import pyspark.sql.functions as F
import builtins as b

def StructNormalizer(df):
  def SchemaIterator(schema, root=True, prefix=[]):
    f = []
    for c in schema:
      if c.dataType.typeName() == "struct":
        f += SchemaIterator(schema=c.dataType, root=False, prefix=[*prefix, c.name])
      elif not root:
        f.append([*prefix, c.name])
    return f
  nested_cols = SchemaIterator(df.schema)
  s = []
  for c in df.columns:
    if c not in [x[0] for x in nested_cols]:
      s.append(F.col(c))
    else:
      s += [F.col(".".join(x)).alias("_".join(x)) for x in b.filter(None, [x if x[0] == c else None for x in nested_cols])]
  return df.select(*s)

In [13]:
df = StructNormalizer(spark.table("source.final").filter("sigla = 'GO'"))

df.write.partitionBy("sigla").mode("overwrite").format("parquet").saveAsTable("gold.final")

In [8]:
df = spark.table("gold.final")

In [9]:
df

ano_cmpt,mes_cmpt,cgc_hosp,munic_res,nasc,sexo,uti_mes_to,uti_int_to,proc_rea,qt_proc,dt_atend,dt_saida,diag_princ,diag_secun,cobranca,natureza,gestao,munic_mov,cod_idade,idade,dias_perm,morte,cnes,fonte,modalidade,nome_uf,nome_municipio,regiao,idhm,populacao_residente,area_unidade_territorial,diag_princ_desc,diag_secun_desc,diag_princ_detalhes_sub_cat,diag_princ_detalhes_classificacao,diag_princ_detalhes_restr_sexo,diag_princ_detalhes_causa_obito,diag_princ_detalhes_descricao,diag_princ_detalhes_desc_abrev,diag_princ_detalhes_refer,diag_princ_detalhes_excluidos,diag_secun_detalhes_sub_cat,diag_secun_detalhes_classificacao,diag_secun_detalhes_restr_sexo,diag_secun_detalhes_causa_obito,diag_secun_detalhes_descricao,diag_secun_detalhes_desc_abrev,diag_secun_detalhes_refer,diag_secun_detalhes_excluidos,feriado,distancia_feriado,feriado_info,sigla


In [14]:
spark.table("source.final").count()

99162294

In [2]:
df = spark.read.parquet("s3a://datalake/source/ciha/")

In [3]:
df

ANO_CMPT,MES_CMPT,CGC_HOSP,MUNIC_RES,NASC,SEXO,UTI_MES_TO,UTI_INT_TO,PROC_REA,QT_PROC,DT_ATEND,DT_SAIDA,DIAG_PRINC,DIAG_SECUN,COBRANCA,NATUREZA,GESTAO,MUNIC_MOV,COD_IDADE,IDADE,DIAS_PERM,MORTE,CNES,FONTE,MODALIDADE
2019,2,60194990000763,354990,19820217,3,0,0,411010034,1,20190226,20190228,O821,,61,,M,354990,4,37,2,0,9539,6,2
2019,2,60194990000763,354990,19831215,3,0,0,411010034,1,20190225,20190227,O829,,62,,M,354990,4,35,2,0,9539,2,2
2019,2,60194990000763,354990,19880226,3,0,0,301040001,1,20190226,20190227,A09,,12,,M,354990,4,31,1,0,9539,2,2
2019,2,60194990000763,354990,19350825,1,0,2,301040001,1,20190226,20190228,I219,,31,,M,354990,4,83,2,0,9539,2,2
2019,2,60194990000763,354990,19950205,3,0,0,301040001,1,20190226,20190227,Z411,,12,,M,354990,4,24,1,0,9539,2,2
2019,2,60194990000763,354990,19611021,3,0,0,301040001,1,20190227,20190227,N649,,12,,M,354990,4,57,0,0,9539,2,2
2019,2,45184066000117,354990,19680603,3,0,0,301060070,1,20190222,20190222,G560,,11,,M,354990,4,50,0,0,3042529,1,2
2019,2,45184066000117,354990,19680814,1,0,0,301060070,1,20190222,20190223,N209,,11,,M,354990,4,50,1,0,3042529,1,2
2019,2,45184066000389,354990,19820502,3,0,0,411010034,1,20190222,20190224,O829,,61,,M,354990,4,36,2,0,5259789,1,2
2019,2,45184066000389,354990,19870423,3,0,0,411010034,1,20190222,20190224,O829,,61,,M,354990,4,31,2,0,5259789,1,2


In [4]:
df.count()

127272146