In [None]:
BUCKET_NAME = "path"

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
import random


# Define the schema for the generated events
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("words", StringType(), True),
    StructField("timestamp", TimestampType(), True),
])

# Define a generator function to create random words
def generate_random_word():
    words = ["apple", "BaNana", "banana","cherry", "mango", "inova" ,"INOVA", "iFood"]
    return random.choice(words)

# Register the UDF to be used with the streaming DataFrame
generate_random_word_udf = udf(generate_random_word, StringType())
spark.udf.register("generate_random_word", generate_random_word, StringType())

# Create a streaming DataFrame using the generator and schema
streaming_df = spark \
    .readStream \
    .format("rate") \
    .option("rowsPerSecond", 1) \
    .load() \
    .selectExpr("value as id")

# Apply the UDF to generate the random word
streaming_df = streaming_df \
    .withColumn("random_word", generate_random_word_udf()) \
    .withColumn("timestamp", current_timestamp())  # Add current timestamp

# Write the streaming DataFrame to a Delta table
query = streaming_df \
    .writeStream \
    .outputMode("append") \
    .format("delta") \
    .option("checkpointLocation", f"{BUCKET_NAME}/workshop-exemple-word-count/data-generator/checkpoint") \
    .option("path", f"{BUCKET_NAME}/workshop-exemple-word-count/data-generator/data") \
    .start()

# Wait for the stream to end
# query.awaitTermination()


In [None]:
df_events = spark.read.format("delta").load(f"{BUCKET_NAME}/workshop-exemple-word-count/data-generator/data")

In [None]:
display(df_events)

In [None]:
# Read the Delta table as a streaming DataFrame
streaming_df = spark \
    .readStream \
    .format("delta") \
    .load(f"{BUCKET_NAME}/workshop-exemple-word-count/data-generator/data")  # Replace with your actual path

# Group by the "random_word" column and count the occurrences of each word
word_counts = streaming_df.groupBy("random_word").count()

# Write the streaming word counts to the console
# query = word_counts \
#     .writeStream \
#     .outputMode("complete") \
#     .format("console") \
#     .option("checkpointLocation", f"{BUCKET_NAME}/workshop-exemple-word-count/word-count-simples/checkpoint") \
#     .start()

display(word_counts)

In [None]:
import pyspark.sql.functions as F


# Read the Delta table as a streaming DataFrame
streaming_df = spark \
    .readStream \
    .format("delta") \
    .load(f"{BUCKET_NAME}/workshop-exemple-word-count/data-generator/data")  # Replace with your actual path

# Group by the "random_word" column and count the occurrences of each word
word_counts = (
  streaming_df
  .withColumn("random_word_lowercase", F.lower("random_word"))
  .groupBy("random_word_lowercase").count()
)

# Write the streaming word counts to the console
# query = word_counts \
#     .writeStream \
#     .outputMode("complete") \
#     .format("console") \
#     .option("checkpointLocation", f"{BUCKET_NAME}/workshop-exemple-word-count/word-count-lower-case/checkpoint") \
#     .start()

display(word_counts)

In [None]:
import pyspark.sql.functions as F


# Read the Delta table as a streaming DataFrame
streaming_df = spark \
    .readStream \
    .format("delta") \
    .load(f"{BUCKET_NAME}/workshop-exemple-word-count/data-generator/data")  # Replace with your actual path

# Group by the "random_word" column and count the occurrences of each word
word_counts = (
  streaming_df
  .withColumn("random_word_lowercase", F.lower("random_word"))
  .groupBy("random_word_lowercase").count()
)

# Write the streaming word counts to the console
# query = word_counts \
#     .writeStream \
#     .outputMode("complete") \
#     .format("console") \
#     .option("checkpointLocation", f"{BUCKET_NAME}/workshop-exemple-word-count/word-count-lower-case/checkpoint") \
#     .start()

display(word_counts)

In [None]:
%scala

val BUCKET_NAME = "path"

In [None]:
%scala


import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode}
import org.apache.spark.sql.{Dataset, Encoders, Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.GroupState
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}

import org.apache.spark.sql.types._

// Define the WordCount case class
  val OUTPUT_SCHEMA: StructType = new StructType()
    .add("word", StringType)
    .add("count", LongType)

    
// Define the mapping function
def flatMapGroupsWithStateFunction
(
  id: String,
  iterator: Iterator[Row],
  state: GroupState[Map[String, Long]]
): Iterator[Row] = {
  // FIXME: We are only considering 1x for each microbatch. We should iterate over all rows and process each of them.
  var wordCount = state.getOption.getOrElse(Map())

  val updatedCount:Long = wordCount.get(id).map(_ + 1).getOrElse(1)
  val wordCountUpdated: Map[String, Long] = wordCount + (id -> updatedCount)

  state.update(wordCountUpdated)
  val rowIterator: Iterator[Row] = wordCountUpdated.map { case (key, value) => Row(key, value.toLong) }.iterator

  rowIterator
}

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

// Read the Delta table as a streaming DataFrame
val df = spark.readStream.format("delta").load(s"${BUCKET_NAME}/workshop-exemple-word-count/data-generator/data")

val inputColumnSchema: StructType = df.schema

val groupedDataFrame = {
  implicit val inputRowEncoder= RowEncoder(inputColumnSchema)

  df
    .as[Row]
    .groupByKey { row: Row =>
      row.getAs[String]("random_word")
    }
}

// Split the words into separate rows
val words = df.selectExpr("explode(split(random_word, ' ')) as word")

// Convert the DataFrame of words to Dataset[WordCount]


val wordCounts = {
  implicit val outputRowEncoder = RowEncoder(OUTPUT_SCHEMA)
  groupedDataFrame
          .flatMapGroupsWithState(
            outputMode = OutputMode.Update(),
            timeoutConf = GroupStateTimeout.ProcessingTimeTimeout()
  )(
    flatMapGroupsWithStateFunction
  )
  .toDF()
}

def saving(dataFrame: DataFrame, batchId: Long): Unit = {
  dataFrame
    .write
    .format("delta")
    .mode("append")
    .save(s"${BUCKET_NAME}/workshop-exemple-word-count-arbirtary-agg/data")
}
wordCounts
          .writeStream
          .queryName("query_nrt")
          .option("checkpointLocation",f"${BUCKET_NAME}/workshop-exemple-word-count-arbirtary-agg/checkpoint")
          .format("delta")
          .outputMode("update")
          /*
           * This is a required trick. Because it uses flatMapGroupsWithState in update mode, we also need to use update mode here.
           * However, we will not be able to write using update mode ("... DeltaDataSource does not support Update output mode").
           * So, to overcome it, we use a foreachBath and, inside it, we perform some appends.
           *
           * Due to some changes in Scala 2.12, the method DataStreamWriter.foreachBatch requires some updates on the code,
           * otherwise this ambiguity happens. Because of that we create a new method to pass to foreachBatch instead
           * the writer the code inside it.
           * Source: https://issues.apache.org/jira/browse/SPARK-26132?focusedCommentId=17178019&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17178019
           */
          .foreachBatch(saving _)
          .trigger(Trigger.ProcessingTime(5))
          .start()





In [None]:

df = spark.read.format("delta").load(f"{BUCKET_NAME}/workshop-exemple-word-count-arbirtary-agg/data")
display(df)