In [None]:
#globales a usar en el programa
RAW_PATH = "wasbs://raw@storagesergio9090.blob.core.windows.net"
MOUNT_POINT_RAW = "/mnt/raw"
RAW_CHECKPOINT_LOCATION = "/mnt/raw/check"
RAW_DATA_PATH = "/mnt/raw/test-data"
SILVER_PATH = "wasbs://silver@storagesergio9090.blob.core.windows.net"
MOUNT_POINT_SILVER = "/mnt/silver"
SILVER_CHECKPOINT_LOCATION = "/mnt/silver/check"
SILVER_DATA_PATH = "/mnt/silver/test-data"
DATABRICKS_SCOPE = "databricks-secret-scope1"
AZURE_SECRET_PATH = "fs.azure.account.key.storagesergio9090.blob.core.windows.net"
AZURE_SECRET_NAME = "data-lake-access-key"

In [None]:
#configurando y montando puntos de montaje raw DataBricks - Azure
dbutils.fs.mount(
    source = RAW_PATH,
    mount_point = MOUNT_POINT_RAW,
    extra_configs = {AZURE_SECRET_PATH:dbutils.secrets.get(scope=DATABRICKS_SCOPE, key=AZURE_SECRET_NAME)}
)

In [None]:
#configurando y montando puntos de montaje silver DataBricks - Azure
dbutils.fs.mount(
    source = SILVER_PATH,
    mount_point = MOUNT_POINT_SILVER,
    extra_configs = {AZURE_SECRET_PATH:dbutils.secrets.get(scope=DATABRICKS_SCOPE, key=AZURE_SECRET_NAME)}
)

In [4]:
from pyspark.sql.types import BooleanType, StructType, StringType, TimestampType
from pyspark.sql.functions import *

In [3]:
# Se crea el esquema para los datos que son publicado en el tópico de Kafka (se le da una estructura a los mensajes de entrada)
schema = StructType()\
         .add("timestamp", TimestampType())\
         .add("url", StringType())\
         .add("userURL", StringType())\
         .add("pageURL", StringType())\
         .add("isNewPage", BooleanType())\
         .add("geocoding", StructType()
            .add("countryCode2", StringType())
            .add("city", StringType())
            .add("latitude", StringType())
            .add("country", StringType())
            .add("longitude", StringType())
            .add("stateProvince", StringType())
            .add("countryCode3", StringType())
            .add("user", StringType())
            .add("namespace", StringType())
         )

In [None]:
# Comando para cargar datos de cualquier tipo de fuente. Se utiliza el objeto Spark que permite la conexión hacia el cluster de Spark. (transformación: leer o suscribirse a un canal de kafka)
kafkaDF = (spark   # kafkaDF es un DataFrame en un objeto de python.
           .readStream # Función para conexión a un cluster de Kafka
           .option("kafka.bootstrap.servers", "server1.databricks.training:9092") # Servidor público para pruebas
           .option("subscribe", "en") # Sucripción a un canal con el nombre de "en"
           .format("kafka")  # Se define el formato tipo kafka
           .load() 
           )

In [None]:
# Los DataFrame de Spark son inmutables, una vez creados no se pueden modificar (se debe sobreescribir o crear uno nuevo)
kafkaCleanDF = (kafkaDF
                .select(from_json(col("value").cast(StringType()),schema).alias("message")) # Ingresando dentro de value, transformar los datos (Type), se aplica el esquema desarrollado anteriormente y se le pone un alias con el nombre de "message"
                .select("message.*") # traer todas las columnas de la variable "mmessage"
               )

In [None]:
myStreamName = "prueba_streaming"
display(kafkaCleanDF, streamName = myStreamName) # Crea una tabla en donde se pueden observar los resultados

In [None]:
# Transformar el Stream e inicializandolo con la transformación realizada
# Se empezará a aplicar transformaciones a los datos en tiempo real. Se analizarán sólo los datos de los paises no nulos.

geocodingDF = (kafkaCleanDF
              .filter(col("geocoding.country").isNotNull()) # Se ingresa a la columna "geocoding_country" y se seleccionan las filas con este atributo no nulo
              .select("timestamp", "pageURL", "geocoding.countryCode2", "geocoding.city") # Me quedo con los atribudos que me importan para el análisis
              )
display(geocodingDF)

In [None]:
# Guardar en capa raw (bronze)
(spark.readStream
  .format("kafka")  
  .option("kafka.bootstrap.servers", "server1.databricks.training:9092")
  .option("subscribe", "en, ja")
  .load()
  .withColumn("json", from_json(col("value").cast("string"), schema))
  .select(col("timestamp").alias("KafkaTimestamp"), col("json.*"))
  .writeStream
  .format("delta")
  .option("checkpointLocation", RAW_CHECKPOINT_LOCATION)
  .outputMode("append")
  .queryName('bronze_stream')
  .start(RAW_DATA_PATH)
)

In [None]:
# Guardar en capa silver
(spark.readStream
  .format("delta")
  .load(RAW_DATA_PATH)
  .select(col('KafkaTimestamp'), 
          # expr('left(comment,100) as Comments'), 
          col("namespace"),
          col("user"),
          when(col("geocoding.countryCode2").isNotNull(), col("geocoding.countryCode2")).otherwise("Unknown").alias("CountryCode"),\
          col("flag"),
          col("pageURL"))
  .writeStream
  .format("delta")
  .option("checkpointLocation", SILVER_CHECKPOINT_LOCATION)
  .outputMode("append")
  .queryName('silver_stream')
  .start(SILVER_DATA_PATH)
)

In [None]:
filterDF = (kafkaCleanDF
              .filter(col("geocoding.countryCode2").contains("US")) # Se ingresa a la columna "geocoding_countryCode2" y se seleccionan las filas con el atributo "US"
              .filter(col("geocoding.city").isNotNull())
              .select("timestamp", "pageURL", "geocoding.countryCode2", "geocoding.city") # Me quedo con los atribudos que me importan para el análisis
              )
display(filterDF)

In [None]:
CountsDF = (
  kafkaCleanDF
    .filter(col("geocoding.country").isNotNull())
    .groupBy("geocoding.country")    
    .count()
)

display(CountsDF
        .sort("count", ascending=False)) # clasificaición

In [None]:
#para detener los procesos de streaming
for s in spark.streams.active:
      s.stop()