In [None]:
display(dbutils.fs.ls("/FileStore/tables/"))

In [None]:
dbutils.fs.mkdirs("/FileStore/tables/arquivos_curso")

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso"))

# JSON

## Lendo um arquivo JSON

In [None]:
df = spark.read.json("/FileStore/tables/arquivos_curso/PNSB.json")
display(df)

In [None]:
df = df.withColumnRenamed("D1C","cod_regiao") \
       .withColumnRenamed("D1N","regiao") \
       .withColumnRenamed("D2C","cod_variavel") \
       .withColumnRenamed("D2N","variavel") \
       .withColumnRenamed("D3C","cod_ano") \
       .withColumnRenamed("D3N","ano") \
       .withColumnRenamed("D4C","cod_doenca") \
       .withColumnRenamed("D4N","doenca") \
       .withColumnRenamed("MC","cod_medida") \
       .withColumnRenamed("MN","medida") \
       .withColumnRenamed("NC","cod_nivel_territorial") \
       .withColumnRenamed("NN","nivel_territorial") \
       .withColumnRenamed("V","valor") 


display(df)

In [None]:
df = df.filter(df.valor!='Valor')
display(df)

In [None]:
df.printSchema()

In [None]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

df_new = df.withColumn("cod_regiao", col("cod_regiao").cast(IntegerType())) \
           .withColumn("cod_variavel", col("cod_variavel").cast(IntegerType())) \
           .withColumn("cod_ano", col("cod_ano").cast(IntegerType())) \
           .withColumn("ano", col("ano").cast(IntegerType())) \
           .withColumn("cod_doenca", col("cod_doenca").cast(IntegerType())) \
           .withColumn("cod_medida", col("cod_medida").cast(IntegerType())) \
           .withColumn("cod_nivel_territorial", col("cod_nivel_territorial").cast(IntegerType())) \
           .withColumn("valor", col("valor").cast(IntegerType()))

In [None]:
df_new.printSchema()

## Escrevendo um arquivo JSON

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso"))

In [None]:
df_new.write\
    .option("compression", "gzip")\
    .mode("overwrite") \
    .format("json") \
    .save("/FileStore/tables/arquivos_curso/json_gzip/")

In [None]:
df = spark.read \
  .option("compression", "gzip") \
  .json("/FileStore/tables/arquivos_curso/json_gzip/")

display(df)

In [None]:
df.write \
  .option("sep", ",") \
  .format("csv") \
  .save("/FileStore/tables/arquivos_curso/pnsb_csv/")

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/pnsb_csv"))

# CSV

## Lendo um arquivo CSV

In [None]:
df_csv = spark.read.csv('/FileStore/tables/arquivos_curso/pnsb_csv')
display(df_csv)

In [None]:
df.write \
  .option("sep", ",") \
  .option("header", True) \
  .mode("overwrite") \
  .format("csv") \
  .save("/FileStore/tables/arquivos_curso/pnsb_csv/")

In [None]:
df_csv = spark.read.csv('/FileStore/tables/arquivos_curso/pnsb_csv', header=True, inferSchema=True)
display(df_csv)

In [None]:
df_csv.printSchema()

## Escrevendo arquivo CSV com compressão


In [None]:
df_csv.write \
  .option("compression", "gzip") \
  .option("header","true") \
  .option("sep", ",") \
  .format("csv") \
  .save("/FileStore/tables/arquivos_curso/csv_gzip/")

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/csv_gzip"))

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/pnsb_csv"))

In [None]:
df = spark.read \
  .option("compression", "gzip") \
  .option("header","true") \
  .option("inferSchema", "true") \
  .option("sep", ",") \
  .csv("/FileStore/tables/arquivos_curso/csv_gzip")

display(df)

# TXT

## Salvando o DataFrame em txt

In [None]:
df = df.na.fill(value=0, subset=["valor"])
display(df)

In [None]:
df.write.text("/FileStore/tables/arquivos_curso/txt/")

In [None]:
from pyspark.sql.functions import concat_ws

df_uma_coluna = df.select(concat_ws("|", *df.columns).alias('dados'))
display(df_uma_coluna)

In [None]:
df_uma_coluna.write \
    .format("text") \
    .mode("overwrite") \
    .save("/FileStore/tables/arquivos_curso/txt/")

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/txt/"))

In [None]:
df_text = spark.read.format("text") \
  .load("/FileStore/tables/arquivos_curso/txt/")

display(df_text)

In [None]:
df_text = spark.read \
  .option("header", "false") \
  .option("delimiter", "|") \
  .option("inferSchema", "true") \
  .format("csv") \
  .load("/FileStore/tables/arquivos_curso/txt/")

display(df_text)

In [None]:
df_text_2 = df_text.withColumnRenamed("_c0","ano") \
       .withColumnRenamed("_c1","cod_ano") \
       .withColumnRenamed("_c2","cod_doenca") \
       .withColumnRenamed("_c3","cod_medida") \
       .withColumnRenamed("_c4","cod_nivel_territorial") \
       .withColumnRenamed("_c5","cod_regiao") \
       .withColumnRenamed("_c6","cod_variavel") \
       .withColumnRenamed("_c7","doenca") \
       .withColumnRenamed("_c8","medida") \
       .withColumnRenamed("_c9","nivel_territorial") \
       .withColumnRenamed("_c10","regiao") \
       .withColumnRenamed("_c11","valor") \
       .withColumnRenamed("_c12","variavel")

display(df_text_2)

## Salvando o arquivo txt comprimido

In [None]:
df_text_2.write \
  .mode("overwrite") \
  .option("compression", "gzip") \
  .format("text") \
  .save("dbfs:/FileStore/tables/arquivos_curso/txt_gzip")

In [None]:
df_uma_coluna.write \
  .mode("overwrite") \
  .option("compression", "gzip") \
  .format("text") \
  .save("dbfs:/FileStore/tables/arquivos_curso/txt_gzip")

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/txt_gzip"))

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/txt/"))

In [None]:
df = spark.read \
  .option("compression", "gzip") \
  .option("inferSchema", "true") \
  .option("sep", "|") \
  .csv("/FileStore/tables/arquivos_curso/txt_gzip")

display(df)

In [None]:
df_renomeado = df.withColumnRenamed("_c0","ano") \
       .withColumnRenamed("_c1","cod_ano") \
       .withColumnRenamed("_c2","cod_doenca") \
       .withColumnRenamed("_c3","cod_medida") \
       .withColumnRenamed("_c4","cod_nivel_territorial") \
       .withColumnRenamed("_c5","cod_regiao") \
       .withColumnRenamed("_c6","cod_variavel") \
       .withColumnRenamed("_c7","doenca") \
       .withColumnRenamed("_c8","medida") \
       .withColumnRenamed("_c9","nivel_territorial") \
       .withColumnRenamed("_c10","regiao") \
       .withColumnRenamed("_c11","valor") \
       .withColumnRenamed("_c12","variavel")

display(df_renomeado)

In [None]:
df_renomeado.write \
  .mode("overwrite")\
  .format('avro') \
  .save("/FileStore/tables/arquivos_curso/avro/")

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/avro/"))

# AVRO

## Lendo um arquivo no formato AVRO

In [None]:
df_avro = spark.read \
        .format("avro") \
        .load("/FileStore/tables/arquivos_curso/avro")

display(df_avro)

In [None]:
df_avro = spark.read \
        .format("avro") \
        .load("/FileStore/tables/arquivos_curso/avro/", pathGlobFilter="*.avro")

display(df_avro)

## Escrevendo o arquivo AVRO com compressão

In [None]:
df_avro.write \
  .mode("overwrite") \
  .option("compression", "deflate") \
  .format('avro') \
  .save("/FileStore/tables/arquivos_curso/avro_deflate")

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/avro_deflate"))

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/avro"))

In [None]:
spark.conf.set("spark.sql.avro.compression.codec", "deflate")

In [None]:
df_avro.write \
  .mode("overwrite") \
  .format('avro') \
  .save("/FileStore/tables/arquivos_curso/avro_deflate2")

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/avro_deflate2"))

In [None]:
spark.conf.set("spark.sql.avro.deflate.level", "8")

In [None]:
df_avro.write \
  .mode("overwrite") \
  .format('avro') \
  .save("/FileStore/tables/arquivos_curso/avro_deflate2")

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/avro_deflate2"))

# PARQUET

## Lendo e escrevendo arquivos PARQUET

In [None]:
df_avro.write \
  .mode("overwrite") \
  .format('parquet') \
  .save("/FileStore/tables/arquivos_curso/parquet")

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/parquet"))

In [None]:
df_avro.write \
  .mode("overwrite") \
  .option("compression", "gzip") \
  .format('parquet') \
  .save("/FileStore/tables/arquivos_curso/parquet_gzip")

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/parquet_gzip"))

In [None]:
df_parquet = spark.read.format("parquet") \
  .load("/FileStore/tables/arquivos_curso/parquet_gzip", compression='gzip')

display(df_parquet)

# Particionamento

In [None]:
df_parquet.select("cod_doenca").distinct().show()

In [None]:
df_parquet.select("nivel_territorial").distinct().show()

In [None]:
df_parquet.write\
    .partitionBy("nivel_territorial")\
    .mode("overwrite")\
    .parquet("/FileStore/tables/arquivos_curso/parquet_particionado")

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/parquet_particionado"))

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/parquet_particionado/nivel_territorial=Brasil"))

In [None]:
df_parquet.write\
    .partitionBy("nivel_territorial", "cod_doenca")\
    .mode("overwrite")\
    .parquet("/FileStore/tables/arquivos_curso/parquet_multi_particionado")

In [None]:
display(dbutils.fs.ls("FileStore/tables/arquivos_curso/parquet_multi_particionado"))

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/parquet_multi_particionado/nivel_territorial=Grande Região/"))

In [None]:
df_120943 = spark.read\
     .parquet('/FileStore/tables/arquivos_curso/parquet_multi_particionado/nivel_territorial=Grande Região/cod_doenca=120943/')


display(df_120943)

In [None]:
df = spark.read\
    .parquet('/FileStore/tables/arquivos_curso/parquet_particionado')

display(df)

# ORC

## Escrevendo e lendo arquivos ORC

In [None]:
df.write \
  .format('orc') \
  .save("/FileStore/tables/arquivos_curso/orc")

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/orc"))

In [None]:
df.write.format("orc") \
    .mode("overwrite") \
    .option("compression", "zlib") \
    .save("/FileStore/tables/arquivos_curso/orc_zlib")

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/orc_zlib"))

In [None]:
df_orc = spark.read\
  .option("compression", "zlib") \
  .format("orc") \
  .load("/FileStore/tables/arquivos_curso/orc_zlib")

display(df_orc)

## Agrupando as partições criadas

In [None]:
df_orc.coalesce(1)\
 .write \
 .format("orc") \
 .mode("overwrite") \
 .save("/FileStore/tables/arquivos_curso/orc_junto_snappy")

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/orc_junto_snappy"))

In [None]:
df_orc.coalesce(1)\
 .write \
 .format("orc") \
 .mode("overwrite") \
 .option("compression", "zlib") \
 .save("/FileStore/tables/arquivos_curso/orc_junto_zlib")

In [None]:
display(dbutils.fs.ls("/FileStore/tables/arquivos_curso/orc_junto_zlib"))

In [None]:
df_orc_zlib = spark.read \
  .option("compression", "zlib") \
  .orc("/FileStore/tables/arquivos_curso/orc_junto_zlib")

display(df_orc_zlib)