# Libs

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col

# Setting the Change Data Feed (CDF)

In [None]:
-- Enabling the CDF for all tables

%sql
set spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;

# Read the data and use the streaming

In [None]:
# Computer connected

source_table = 'spark_catalog.bronze.computer_connected'
df_readStream_computer_connected = (spark.readStream
                                            .format("delta")
                                            .option("ignoreDeletes", "true")
                                            .table(source_table)
                                            .withColumn("Classification",when(col("Velocity").between(0, 20), "Low")
                                                                        .when(col("Velocity").between(21, 100), "Medium")
                                                                        .otherwise("Hight"))
                                            )

In [None]:
# Computer Disconnected

source_table = 'spark_catalog.bronze.computer_disconnected'
df_readStream_computer_disconnected = (spark.readStream
                                            .format("delta")
                                            .option("ignoreDeletes", "true")
                                            .table(source_table)
                                            .withColumn("Classification",when(col("Velocity").between(0, 20), "Low")
                                                                        .when(col("Velocity").between(21, 100), "Medium")
                                                                        .otherwise("Hight"))
                                            )

In [None]:
# Mobile connected

source_table = 'spark_catalog.bronze.mobile_connected'
df_readStream_mobile_connected = (spark.readStream
                                            .format("delta")
                                            .option("ignoreDeletes", "true")
                                            .table(source_table)
                                            .withColumn("Classification",when(col("Velocity").between(0, 20), "Low")
                                                                        .when(col("Velocity").between(21, 100), "Medium")
                                                                        .otherwise("Hight"))
                                            )

In [None]:
# Mobile Disconnected

source_table = 'spark_catalog.bronze.mobile_disconnected'
df_readStream_mobile_disconnected = (spark.readStream
                                            .format("delta")
                                            .option("ignoreDeletes", "true")
                                            .table(source_table)
                                            .withColumn("Classification",when(col("Velocity").between(0, 20), "Low")
                                                                        .when(col("Velocity").between(21, 100), "Medium")
                                                                        .otherwise("Hight"))
                                            )

# Unifying new batch dataframes

In [None]:
#fazendo o union all de todos os dataframes pelo nome da coluna
dfs_individuais = [ df_readStream_computer_connected
                   ,df_readStream_mobile_connected
                   ,df_readStream_computer_disconnected
                   ,df_readStream_mobile_disconnected]

def union_all(dfs):
    if len(dfs) > 1:
        return dfs[0].unionByName(union_all(dfs[1:]), allowMissingColumns=True).distinct()
    else:
        return dfs[0].distinct()

df_all = union_all(dfs_individuais)

# Recording on the silver layer with streaming

In [None]:
target_table = 'spark_catalog.silver.consolidated_connection'
chekpoint = 'dbfs:/FileStore/silver/consolidated_connection'
target_location = 'dbfs:/FileStore/silver/conexao_unificada'

( df_all.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", chekpoint)
  .option("path", target_location)
  .trigger(availableNow=True)
  .table(target_table)
)