# Filtering JetStream Events

In this notebook, we'll filter events from the Redis Stream that we created in the previous notebook. We'll use a combination of techniques to filter the events:

1. Deduplication using Redis Bloom Filter to avoid processing the same event multiple times
2. Content-based filtering using a machine learning model to identify software-related posts
3. Storing filtered events in Redis for further processing

Redis Bloom Filter is a probabilistic data structure that allows us to check if an element is in a set. It's very memory efficient and has a constant time complexity for both insertion and lookup operations. The trade-off is that it can have false positives, but the probability of false positives can be controlled by the size of the filter.

Machine learning models can be used to classify text into different categories. In this notebook, we'll use a pre-trained zero-shot classification model to classify posts as software-related or not.

## Consuming from Redis Streams

### Model Redis Streams Event
In this section, we'll define a data class to represent the events stored in the Redis Stream. This model will be used to deserialize the events from the stream.

In [75]:
@file:DependsOn("redis.clients:jedis:6.0.0")

In [76]:
import redis.clients.jedis.resps.StreamEntry

data class Event(
    val did: String,
    val rkey: String,
    val text: String,
    val timeUs: String,
    val operation: String,
    val uri: String,
    val parentUri: String,
    val rootUri: String,
    val langs: List<String>,
) {
    companion object {
        fun fromMap(entry: StreamEntry): Event {
            val fields = entry.fields
            return Event(
                did = fields["did"] ?: "",
                rkey = fields["rkey"] ?: "",
                text = fields["text"] ?: "",
                timeUs = fields["timeUs"] ?: "",
                operation = fields["operation"] ?: "",
                uri = fields["uri"] ?: "",
                parentUri = fields["parentUri"] ?: "",
                rootUri = fields["rootUri"] ?: "",
                langs = fields["langs"]?.replace("[", "")?.replace("]", "")?.split(", ") ?: emptyList()
            )
        }
    }
}

### Creating a Redis Client
Create a Jedis client to connect to Redis. This is a reusable client that can be used to interact with Redis Streams.

In [77]:
import redis.clients.jedis.JedisPooled

val jedisPooled = JedisPooled()

### Creating a Consumer Group
Create a consumer group to read from the Redis Stream. A consumer group allows multiple consumers to read from the same stream without duplicating the work. Each consumer in the group will receive a different subset of the messages.

A consumer group can be created in Redis with the XGROUP CREATE command:

`XGROUP CREATE streamName groupName id [MKSTREAM]`

To create a consumer group in this notebook, we will encapsulate the command in a function. The function will take the stream name and the group name as parameters.

In [78]:
import redis.clients.jedis.StreamEntryID

fun createConsumerGroup(streamName: String, consumerGroupName: String) {
    try {
        jedisPooled.xgroupCreate(streamName, consumerGroupName, StreamEntryID("0-0"), true)
    } catch (_: Exception) {
        println("Group already exists")
    }
}

In [79]:
createConsumerGroup("jetstream", "printer-example")

### Reading from the Stream
Create a reusable function to read from the stream. This function will read from the stream and return a list of entries. It uses the XREADGROUP command to read from the stream as part of a consumer group:

`XREADGROUP GROUP groupName consumerName COUNT count BLOCK blockTime streamName id`

The command will be encapsulated in a function that takes the stream name, consumer group name, consumer name, and count as parameters. The function will return a list of entries.

In [80]:
import redis.clients.jedis.params.XReadGroupParams

fun readFromStream(
    streamName: String,
    consumerGroup: String,
    consumer: String, count: Int
): List<Map.Entry<String, List<StreamEntry>>> {
    return jedisPooled.xreadGroup(
        consumerGroup,
        consumer,
        XReadGroupParams().count(count),
        mapOf(
            streamName to StreamEntryID.XREADGROUP_UNDELIVERED_ENTRY
        )
    ) ?: emptyList()
}

### Acknowledging Messages
Create a function to acknowledge the message. This is important to let Redis know that the message has been processed successfully, so it won't be delivered to other consumers in the group.

This is done by using the XACK command:

`XACK streamName groupName id`

The command will be encapsulated in a lambda function that takes the stream name, consumer group name, and entry as parameters. The function will acknowledge the message by calling the XACK command.

In [81]:
fun ackFunction(): (String, String, StreamEntry) -> Unit {
    return { streamName, consumerGroup, entry ->
        jedisPooled.xack(
            streamName,
            consumerGroup,
            entry.id
        )
    }
}

### Consuming the Stream
Create a reusable function to consume the stream.

This function implements a pipeline pattern where each event is processed sequentially by a series of handlers. If any handler returns false, the processing stops for that event.

After processing the event, the function acknowledges the message using the ack function.

In [82]:
%use coroutines

In [83]:
import kotlinx.coroutines.*

fun consumeStream(
    streamName: String,
    consumerGroup: String,
    consumer: String,
    handlers: List<(Event) -> Pair<Boolean, String>>,
    ackFunction: ((String, String, StreamEntry) -> Unit),
    count: Int = 5,
    limit: Int = 5
) {
    var lastMessageTime = System.currentTimeMillis()
    var consumed = 0

    while (consumed < limit) {
        val entries = readFromStream(streamName, consumerGroup, consumer, count)
        val allEntries = entries.flatMap { it.value }
        allEntries.map { entry ->
            consumed++
            val event = Event.fromMap(entry)

            for (handler in handlers) {
                val (shouldContinue, message) = handler(event)
                ackFunction(streamName, consumerGroup, entry)

                if (!shouldContinue) {
                    println("$consumer: Handler stopped processing: $message")
                    break
                }
            }
        }

        if (allEntries.isEmpty()) {
            val now = System.currentTimeMillis()
            if (now - lastMessageTime >= 2_000) {
                println("$consumer: No new messages for 2 seconds. Stopping.")
                break
            }
        }
    }

}

To test the consumeStream function, we'll create a simple handler that prints the event's URI.

In [84]:
val printUri: (Event) -> Pair<Boolean, String> = {
    println("Got event from ${it.uri}")
    Pair(true, "OK")
}

In [85]:
runBlocking {
    consumeStream(
        streamName = "jetstream",
        consumerGroup = "printer-example",
        consumer ="printer-1",
        handlers = listOf(printUri),
        ackFunction = ackFunction(),
        count = 100,
        limit = 200
    )
}

Got event from at://did:plc:yuer2kud23jb3c7eq7khnqqb/app.bsky.feed.post/3lpygox6xw22o
Got event from at://did:plc:tfkomiidtoc22higi34kxkdr/app.bsky.feed.post/3lpygpaq4ks23
Got event from at://did:plc:dhblt3knvhtxrxqijrjhlnoj/app.bsky.feed.post/3lpygpbwqvc2q
Got event from at://did:plc:ojrjl55nrxb7op5rdm3rjvue/app.bsky.feed.post/3lpygpaxjuk26
Got event from at://did:plc:vlba54vik27uhaoyysup6yqo/app.bsky.feed.post/3lpygpbm7y22c
Got event from at://did:plc:dd32huibi3s3xr3nbhmfrep2/app.bsky.feed.post/3lpygp3ue6s27
Got event from at://did:plc:jhlioavkbe3r2fc4a4vgd6du/app.bsky.feed.post/3lpygpbjabk2j
Got event from at://did:plc:7ia5gyyqv2will5wxhrdey5h/app.bsky.feed.post/3lpygpbpqdc2t
Got event from at://did:plc:27qixaya6k6pmv5ffvhc3f5c/app.bsky.feed.post/3lpygpb37zs2r
Got event from at://did:plc:ydt5hnwsqnqosnxc3abnh3tu/app.bsky.feed.post/3lpygnst4qs2h
Got event from at://did:plc:72ywygk2uj2uckylc7p42a3o/app.bsky.feed.post/3lpygpaf5y22c
Got event from at://did:plc:cpvrj4xi5gehroptmcihflog/a

## Deduplication with Bloom Filter
Redis Bloom Filter is a probabilistic data structure that allows us to check if an element is in a set. It's very memory efficient and has a constant time complexity for both insertion and lookup operations.


### Creating a Bloom Filter
This function creates a Bloom Filter with the given name. The filter is configured with an error rate of 0.01 and an initial capacity of 1,000,000 elements.

In [86]:
import redis.clients.jedis.bloom.BFReserveParams
import redis.clients.jedis.exceptions.JedisDataException
fun createBloomFilter(name: String) {
    try {
        val errorRate = 0.01
        val capacity = 1_000_000L
        val reserveParams = BFReserveParams().expansion(2)
        jedisPooled.bfReserve(name, errorRate, capacity, reserveParams)
    } catch (_: JedisDataException) {
        println("Bloom filter already exists")
    }
}

### Deduplication Handler
This function creates a handler that checks if an event has already been processed by checking if its URI is in the Bloom Filter. If the URI is in the filter, the handler returns false, which stops the processing of the event.


In [87]:
fun deduplicate(bloomFilter: String): (Event) -> Pair<Boolean, String> {
    return { event ->
        if (jedisPooled.bfExists(bloomFilter, event.uri)) {
            Pair(false, "${event.uri} already processed")
        } else {
            Pair(true, "OK")
        }
    }
}

### Atomic Acknowledgment and Bloom Filter Update
This function creates a handler that acknowledges the message and adds the URI to the Bloom Filter in a single atomic transaction. This ensures that if the acknowledgment succeeds, the URI is also added to the filter, and vice versa.


In [88]:
import redis.clients.jedis.Connection
import redis.clients.jedis.JedisPool
import redis.clients.jedis.Transaction

val jedisPool = JedisPool()

fun ackAndBfFn(bloomFilter: String):  (String, String, StreamEntry) -> Unit {
    return { streamName, consumerGroup, entry ->
        jedisPool.resource.use { jedis ->
            // Create a transaction
            val multi = jedis.multi()

            // Acknowledge the message
            multi.xack(
                streamName,
                consumerGroup,
                entry.id
            )

            // Add the URI to the bloom filter
            multi.bfAdd(bloomFilter, Event.fromMap(entry).uri)

            // Execute the transaction
            multi.exec()
        }
    }
}

In [89]:
createConsumerGroup("jetstream", "deduplicate-example")

In [90]:
val bloomFilterName = "processed-uris"
createBloomFilter("processed-uris")

In [91]:
runBlocking {
    consumeStream(
        streamName = "jetstream",
        consumerGroup = "deduplicate-example",
        consumer = "deduplicate-1",
        handlers = listOf(deduplicate(bloomFilterName), printUri),
        ackFunction = ackAndBfFn(bloomFilterName),
        count = 100,
        limit = 200
    )
}

Got event from at://did:plc:yuer2kud23jb3c7eq7khnqqb/app.bsky.feed.post/3lpygox6xw22o
Got event from at://did:plc:tfkomiidtoc22higi34kxkdr/app.bsky.feed.post/3lpygpaq4ks23
Got event from at://did:plc:dhblt3knvhtxrxqijrjhlnoj/app.bsky.feed.post/3lpygpbwqvc2q
Got event from at://did:plc:ojrjl55nrxb7op5rdm3rjvue/app.bsky.feed.post/3lpygpaxjuk26
Got event from at://did:plc:vlba54vik27uhaoyysup6yqo/app.bsky.feed.post/3lpygpbm7y22c
Got event from at://did:plc:dd32huibi3s3xr3nbhmfrep2/app.bsky.feed.post/3lpygp3ue6s27
Got event from at://did:plc:jhlioavkbe3r2fc4a4vgd6du/app.bsky.feed.post/3lpygpbjabk2j
Got event from at://did:plc:7ia5gyyqv2will5wxhrdey5h/app.bsky.feed.post/3lpygpbpqdc2t
Got event from at://did:plc:27qixaya6k6pmv5ffvhc3f5c/app.bsky.feed.post/3lpygpb37zs2r
Got event from at://did:plc:ydt5hnwsqnqosnxc3abnh3tu/app.bsky.feed.post/3lpygnst4qs2h
Got event from at://did:plc:72ywygk2uj2uckylc7p42a3o/app.bsky.feed.post/3lpygpaf5y22c
Got event from at://did:plc:cpvrj4xi5gehroptmcihflog/a

## Content-Based Filtering with Machine Learning
In this section, we'll use a machine learning model to filter posts based on their content. We'll use a pre-trained zero-shot classification model to classify posts as software-related or not.

### Setting Up the Machine Learning Model
To load the model, we'll use the DJL (Deep Java Library) library. DJL is a high-level framework for deep learning in Java that provides a simple and consistent API for loading and using models.

In [92]:
@file:DependsOn("org.springframework.ai:spring-ai-redis-store:1.0.0")
@file:DependsOn("org.springframework.ai:spring-ai-transformers:1.0.0")

### Creating the Model Criteria
The criteria is used to load the model and create a predictor. The criteria specifies the model path, the engine to use (in this case, PyTorch), and the translator to use.


In [93]:
import org.springframework.ai.transformers.TransformersEmbeddingModel

val embeddingModel = TransformersEmbeddingModel()
embeddingModel.afterPropertiesSet()

### Loading the Model
Now we'll load the model and create a predictor. The predictor is used to make predictions with the model.


In [94]:
import org.springframework.ai.vectorstore.redis.RedisVectorStore
import org.springframework.ai.vectorstore.redis.RedisVectorStore.MetadataField
import redis.clients.jedis.search.Schema.FieldType

val redisVectorStore = RedisVectorStore.builder(jedisPooled, embeddingModel)
    .indexName("classifierIdx")
    .contentFieldName("text")
    .embeddingFieldName("textEmbedding")
    .prefix("classifier:")
    .initializeSchema(true)
    .vectorAlgorithm(RedisVectorStore.Algorithm.FLAT)
    .build()
redisVectorStore.afterPropertiesSet()

In [95]:
import kotlinx.serialization.json.Json
import org.springframework.ai.document.Document
import java.io.File
import java.util.UUID

val references = Json.decodeFromString<List<String>>(File("../resources/filtering-examples.json").readText())

fun createRouteDocument(text: String): Document {
    return Document(
        UUID.randomUUID().toString(),
        text,
        mapOf(
            "text" to text,
        )
    )
}

fun storeRouteDocumentsInRedis(references: List<String>) {
    val documents = references.map { text ->
        createRouteDocument(text)
    }

    redisVectorStore.add(documents)
}

storeRouteDocumentsInRedis(references)

### Creating a Classification Function
Now we'll create a function to classify text using the model.

The function takes a text as input and returns a classification output. The classification output contains the probabilities for each candidate label.


In [96]:
import ai.djl.modality.nlp.translator.ZeroShotClassificationOutput
import org.springframework.ai.vectorstore.SearchRequest

fun breakSentenceIntoClauses(sentence: String): List<String> {
    return sentence.split(Regex("""[!?,.:;()"\[\]{}]+"""))
        .filter { it.isNotBlank() }.map { it.trim() }
}

fun classify(post: String): List<Double> {
    return breakSentenceIntoClauses(post).map { clause ->
        (redisVectorStore.similaritySearch(
            SearchRequest.builder()
                .topK(1)
                .query(clause)
                .build()
        )?.map { it.score ?: 0.0 } ?: emptyList())
    }.flatten()
}

In [97]:
fun classify(post: String): Double {
    return redisVectorStore.similaritySearch(
        SearchRequest.builder()
            .topK(1)
            .query(post)
            .build()
    )?.map { it.score }?.first() ?: 0.0
}

In [98]:
fun removeUrls(text: String): String {
    return text.replace(Regex("""(?:https?:\/\/)?(?:www\.)?[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}(\/\S*)?"""), "")
        .replace(Regex("""@\w+"""), "")
        .replace(Regex("""\s+"""), " ")
        .trim()
}

In [99]:
fun classify(post: String): List<Double> {
    val cleanedPost = removeUrls(post)
    return breakSentenceIntoClauses(cleanedPost).map { clause ->
        println(clause)
        (redisVectorStore.similaritySearch(
            SearchRequest.builder()
                .topK(1)
                .query(clause)
                .build()
        )?.map {
            println("Matched sentence: ${it.text}")
            it.score ?: 0.0
        } ?: emptyList())
    }.flatten()
}

In [100]:
classify("Redis is a great tool for building distributed systems")

Redis is a great tool for building distributed systems
Matched sentence: AI tools for devs


[0.7052901387214661]

In [101]:
classify("Redis is a great tool for building applied AI systems")

Redis is a great tool for building applied AI systems
Matched sentence: how to build with AI


[0.8531131744384766]

### Creating a Filter Handler

Now we 'll create a handler that filters events based on their content.

The handler uses the classification function to determine if a post is software -related.If the post is not software -related, the handler returns false, which stops the processing of the event.

In [102]:
val filter: (Event) -> Pair<Boolean, String> = { event ->
    if (event.text.isNotBlank() && event.operation != "delete") {
        val scores = classify(event.text)
        if (scores.any { it > 0.75 }) {
            Pair(true, "OK")
        } else {
            Pair(false, "Not a post related to artificial intelligence")
        }
    } else {
        Pair(false, "Text is null or empty")
    }
}

## Storing Filtered Events
In this section, we'll store the filtered events in Redis for further processing.

### Converting Data Class to Hash

In [103]:
fun Event.toMap(): Map<String, String> {
    return mapOf(
        "did" to this.did,
        "timeUs" to this.timeUs,
        "text" to this.text,
        "langs" to this.langs.joinToString(","),
        "operation" to this.operation,
        "rkey" to this.rkey,
        "parentUri" to this.parentUri,
        "rootUri" to this.rootUri,
        "uri" to this.uri
    )
}

### Storing Events in Redis
Now we 'll create a handler that stores events in Redis. The handler stores the event as a hash in Redis, with the key being the event' s URI.

In [104]:
val storeEvent: (Event) -> Pair<Boolean, String> = { event ->
    jedisPooled.hset("post:" + event.uri.replace("at://did:plc:", ""), event.toMap())
    Pair(true, "OK")
}

### Adding Filtered Events to a New Stream

Finally, we'll create a handler that adds filtered events to a new stream. This allows other consumers to process only the filtered events, rather than having to filter the events themselves.

In [105]:
import redis.clients.jedis.params.XAddParams

val addFilteredEventToStream: (Event) -> Pair<Boolean, String> = { event ->
    jedisPooled.xadd(
        "filtered-events",
        XAddParams.xAddParams().id(StreamEntryID.NEW_ENTRY),
        event.toMap()
    )
    Pair(true, "OK")
}
createConsumerGroup("jetstream", "store-example")

## Putting It All Together
Now we 'll put all the pieces together to create a complete pipeline for filtering events from the Redis Stream.

In this example we create two consumers that will process the same stream.

- By doing that, we can scale the processing of the events by adding more consumers to the group.
- Redis will make sure that each consumer will receive different messages.

In [107]:
runBlocking {
    listOf(
        async(Dispatchers.IO) {
            consumeStream(
                streamName = "jetstream",
                consumerGroup = "store-example",
                consumer = "store-1",
                handlers = listOf(
                    filter,
                    printUri,
                    storeEvent,
                    addFilteredEventToStream
                ),
                ackFunction = ackFunction(),
                count = 1,
                limit = 1000
            )
        },
        async(Dispatchers.IO) {
            consumeStream(
                streamName = "jetstream",
                consumerGroup = "store-example",
                consumer = "store-2", // Different consumer
                handlers = listOf(
                    filter,
                    printUri,
                    storeEvent,
                    addFilteredEventToStream
                ),
                ackFunction = ackFunction(),
                count = 1,
                limit = 1000
            )
        }
    ).awaitAll()
}

anatomy of a beast or just organ harvest #oc #skyoc #art
A m√≠ me pasa que no puedo ver las noticias porque precisamente me ahogo en todo eso como t√∫ dices pero por suerte aqu√≠ no veo mucho porque si lo hubiera me tendr√≠a que ir jskdjd Anyway haces bien en tomarte un descanso as√≠ que dale
Matched sentence: Is there a beginner course for understanding how these models work?
store-2: Handler stopped processing: Not a post related to artificial intelligence
They know what's up ‚ÄºÔ∏è
Matched sentence: LLMs in production
store-1: Handler stopped processing: Not a post related to artificial intelligence
Matched sentence: Still not sure how these AI models know so much. Feels like magic.
Jorge G√≥nzalez Delivers Latin Fire On New Pop Hit ‚ÄòSambale‚Äô
store-2: Handler stopped processing: Not a post related to artificial intelligence
DEI = Diversit√©
Matched sentence: Midjourney v6
p=32893
Matched sentence: Claude AI
√âquit√©
Matched sentence: Claude AI
Inclusion
Matched sentence: GPT-4
s

[kotlin.Unit, kotlin.Unit]

## Next Steps
In the next notebook, we'll enrich the filtered events with additional information, such as topic modeling and embeddings for semantic search.
