#### Desmontando ambiente no DBFS com relação ao conteiner do Azure Data Lake Storage Gen2

In [0]:
dbutils.fs.unmount("/mnt")

#### Montando ambiente no DBFS com relação ao conteiner do Azure Data Lake Storage Gen2

In [0]:
%scala

val appID = "aab30e19-ec0f-4bcd-b9d8-aa0bfe28b126"
val password = dbutils.secrets.get(scope="active-directory-app-registration", key="active-directory-app-registration-secret-key")
val tenantID = "26f85b54-d82e-4d5d-8646-d206fc309c62"
val containerName = "input";
var storageAccountName = "datalakestorageversion01";

val configs =  Map("fs.azure.account.auth.type" -> "OAuth",
       "fs.azure.account.oauth.provider.type" -> "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id" -> appID,
       "fs.azure.account.oauth2.client.secret" -> password,
       "fs.azure.account.oauth2.client.endpoint" -> ("https://login.microsoftonline.com/" + tenantID + "/oauth2/token"),
       "fs.azure.createRemoteFileSystemDuringInitialization"-> "true")

dbutils.fs.mount(
source = "abfss://" + containerName + "@" + storageAccountName + ".dfs.core.windows.net/",
mountPoint = "/mnt",
extraConfigs = configs)

#### Removendo pastas no DBFS

In [0]:
%fs rm -r dbfs:/delta

#### Listando pastas no DBFS

In [0]:
%fs ls 'dbfs:/mnt/torra/adesao'

path,name,size
dbfs:/mnt/torra/adesao/tb_adesao_20201202.csv,tb_adesao_20201202.csv,72732
dbfs:/mnt/torra/adesao/tb_adesao_20201203.csv,tb_adesao_20201203.csv,84564
dbfs:/mnt/torra/adesao/tb_adesao_20201204.csv,tb_adesao_20201204.csv,92573
dbfs:/mnt/torra/adesao/tb_adesao_20201205.csv,tb_adesao_20201205.csv,122496


In [0]:
from datetime import *
from pyspark.sql.types import *

today = date.today()
yesterday = today - timedelta(2)
date = yesterday.strftime("%Y%m%d")

csvSchema = StructType(
[
  StructField('dataemissao',DateType(),True),
  StructField('codigoproduto',StringType(),True),
  StructField('datainiciovigencia',DateType(),True),
  StructField('datafimvigencia',DateType(),True),
  StructField('id_adesao',StringType(),True),
  StructField('premio_liquido',FloatType(),True),
  StructField('iof',StringType(),True),
  StructField('premio',StringType(),True),
  StructField('formapagamento',StringType(),True),
  StructField('quantidadeparcelas',StringType(),True),
  StructField('valorparcela',FloatType(),True),
  StructField('numerocontacartao',StringType(),True),
  StructField('tipocartao',StringType(),True),
  StructField('nomeproduto',StringType(),True),
  StructField('valorvenda',FloatType(),True),
  StructField('datavenda',DateType(),True),
  StructField('operacao',StringType(),True),
  StructField('dia_dado',DateType(),True),
  StructField('id_loja',StringType(),True),
  StructField('datavencimento',DateType(),True),
  StructField('id_segurado',StringType(),True),
  StructField('tipo',StringType(),True)
])

df = spark.read.csv("dbfs:/mnt/torra/adesao/tb_adesao_%s.csv" % (date), header=False, schema = csvSchema)

display(df)

dataemissao,codigoproduto,datainiciovigencia,datafimvigencia,id_adesao,premio_liquido,iof,premio,formapagamento,quantidadeparcelas,valorparcela,numerocontacartao,tipocartao,nomeproduto,valorvenda,datavenda,operacao,dia_dado,id_loja,datavencimento,id_segurado,tipo
2020-12-05,1,2020-12-05,2029-04-04,13720453,12.99,0,10000000,2,99,12.99,0,0,CARTAO SAUDE TORRA,12.99,2020-12-05,A,2020-12-05,104,2020-12-21,13720453,LOJA
2020-12-05,1,2020-12-05,2029-04-04,13720602,12.99,0,10000000,2,99,12.99,0,0,CARTAO SAUDE TORRA,12.99,2020-12-05,A,2020-12-05,61,2021-01-01,13720602,LOJA
2020-12-05,1,2020-12-05,2029-04-05,13720684,12.99,0,10000000,2,99,12.99,0,0,CARTAO SAUDE TORRA,12.99,2020-12-05,A,2020-12-06,103,2021-01-11,13720684,LOJA
2020-12-05,1,2020-12-05,2029-04-05,13720740,12.99,0,10000000,2,99,12.99,0,0,CARTAO SAUDE TORRA,12.99,2020-12-05,A,2020-12-06,103,2021-01-06,13720740,LOJA
2020-12-05,1,2020-12-05,2029-04-05,13720750,12.99,0,10000000,2,99,12.99,0,0,CARTAO SAUDE TORRA,12.99,2020-12-05,A,2020-12-06,66,2021-01-11,13720750,LOJA
2020-12-05,1,2020-12-05,2029-04-05,13720772,12.99,0,10000000,2,99,12.99,0,0,CARTAO SAUDE TORRA,12.99,2020-12-05,A,2020-12-06,19,2021-01-01,13720772,LOJA
2020-12-05,1,2020-12-05,2029-04-05,13720780,12.99,0,10000000,2,99,12.99,0,0,CARTAO SAUDE TORRA,12.99,2020-12-05,A,2020-12-06,35,2021-01-06,13720780,LOJA
2020-12-05,1,2020-12-05,2029-04-05,13720793,12.99,0,10000000,2,99,12.99,0,0,CARTAO SAUDE TORRA,12.99,2020-12-05,A,2020-12-06,17,2021-01-11,13720793,LOJA
2020-12-05,1,2020-12-05,2029-04-05,13720800,12.99,0,10000000,2,99,12.99,0,0,CARTAO SAUDE TORRA,12.99,2020-12-05,A,2020-12-06,66,2021-01-11,13720800,LOJA
2020-12-05,1,2020-12-05,2029-04-05,13720805,12.99,0,10000000,2,99,12.99,0,0,CARTAO SAUDE TORRA,12.99,2020-12-05,A,2020-12-06,106,2021-01-11,13720805,LOJA


In [0]:
df.printSchema()

# Inserting into Databricks Delta Table

In [0]:
df.write.format("delta").mode("append").save("/delta/torra/adesao/")

# Reading Data from Databricks Delta Table

In [0]:
df02 = spark.read.format("delta").load("/delta/torra/adesao/")

df02.printSchema()

# Creating Delta Table using Spark-SQL

In [0]:
%sql

DROP TABLE IF EXISTS tb_adesao;

CREATE TABLE tb_adesao
USING delta
OPTIONS
  ( 
  path = '/delta/torra/adesao/'
  )



In [0]:
%sql

SELECT * FROM tb_adesao LIMIT 5;

In [0]:
%sql

SELECT count(*) FROM tb_adesao;

# Show Table History

In [0]:
display(spark.sql("DESCRIBE HISTORY tb_adesao"))

# Show Table Details

In [0]:
display(spark.sql("DESCRIBE FORMATTED tb_adesao"))

#### Structured Streams - Schema Definition for Streaming Processing

In [0]:
# Ingest Data from Azure Data Lake Storage

inputPath = "dbfs:/mnt/torra/adesao/"

csvSchema = StructType(
[
  StructField('dataemissao',DateType(),True),
  StructField('codigoproduto',StringType(),True),
  StructField('datainiciovigencia',DateType(),True),
  StructField('datafimvigencia',DateType(),True),
  StructField('id_adesao',StringType(),True),
  StructField('premio_liquido',FloatType(),True),
  StructField('iof',StringType(),True),
  StructField('premio',StringType(),True),
  StructField('formapagamento',StringType(),True),
  StructField('quantidadeparcelas',StringType(),True),
  StructField('valorparcela',FloatType(),True),
  StructField('numerocontacartao',StringType(),True),
  StructField('tipocartao',StringType(),True),
  StructField('nomeproduto',StringType(),True),
  StructField('valorvenda',FloatType(),True),
  StructField('datavenda',DateType(),True),
  StructField('operacao',StringType(),True),
  StructField('dia_dado',DateType(),True),
  StructField('id_loja',StringType(),True),
  StructField('datavencimento',DateType(),True),
  StructField('id_segurado',StringType(),True),
  StructField('tipo',StringType(),True)
])

streamingDfReviews = (
  spark
  .readStream
  .schema(csvSchema)
  .option('maxFilePerTrigger',1)
  .csv(inputPath))

In [0]:
# Checking if it's streaming

streamingDfReviews.isStreaming

#### Manipulating streaming dataset with functions

In [0]:
display(streamingDfReviews.groupBy('id_loja').mean('premio_liquido').alias('premio_liquido02'))

id_loja,avg(premio_liquido)
100,12.989999771118164
19,12.989999771118164
50,12.989999771118164
57,12.989999771118164
59,12.989999771118164
40,12.989999771118164
106,12.989999771118164
94,12.989999771118164
102,12.989999771118164
101,12.989999771118164


In [0]:
aggDF = streamingDfReviews.groupBy('dataemissao','id_loja').count().withColumnRenamed('count','contagem')

display(aggDF)

dataemissao,id_loja,contagem
2020-12-05,28,2
2020-12-03,65,11
2020-12-03,97,6
2020-12-02,24,5
2020-12-05,42,10
2020-12-03,41,1
2020-12-05,98,12
2020-12-03,22,4
2020-12-02,90,4
2020-12-05,33,7


#### Configuring & Ingesting Streaming Processing into Databricks Delta Table

In [0]:
# Consuming in real time and writing in Delta Table

spark.conf.set('spark.sql.shuffle.partitions','2')

query = (
    streamingDfReviews.groupBy('dataemissao','id_loja').count().withColumnRenamed('count','contagem')
    .writeStream
    .format('delta')
    .outputMode('complete')
    .option('checkpointLocation','/delta/streaming/torra/adesao/_checkpoints/etl-from-json-mvconf2019')
    .start('/delta/streaming/torra/adesao/'))

In [0]:
%fs rm -r 'dbfs:/delta/'

In [0]:
%sql

CREATE TABLE streaming_adesao
USING delta
OPTIONS
  ( 
  path = '/delta/streaming/torra/adesao/'
  )

In [0]:
%sql

SELECT * FROM streaming_adesao;

dataemissao,id_loja,contagem
2020-12-05,43,3
2020-12-05,41,2
2020-12-04,99,5
2020-12-03,45,4
2020-12-02,50,3
2020-12-03,98,3
2020-12-03,58,10
2020-12-03,42,7
2020-12-02,40,6
2020-12-03,91,6
