# Streaming Data Engineering
## Exam preparation

### Breadth questions

1) Which are the four approaches to tame velocity? List them all before comparing and constrasting two of them

**Answer:** 

We have 4 main approaches to tame the velocity of the input data and stream processing:
* Event-based systems (EBS): These systems focus on individual events as the unit of processing. Events are processed and responded to in near real-time, often by triggering specific actions based on predefined rules.

* Data Stream Management Systems (DSMS): Similar to traditional database management systems, DSMS focuses on continuous queries over unbounded streams of data. It provides capabilities for querying, filtering and aggregating data streams in real-time.

* Complex Event Processing (CEP): CEP goes beyond individual events by identifying patterns and correlations across multiple events. It allows for the detection of more sophisticated scenarios, such as trends or anomalies.
  
* Event-driven architectures (EDA): EDA s a design paradigm where the flow of data and control is determined by events. Components of the system sommunicate asynchronously through event messages, enabling scalability and flexibility in handling high-velocity data.

For example, let us compare EBS and CEP:
* Similarities:
  * Event-centric processing: Both approaches revolve around processing events as the fundamental unit of data. Events are the primary triggers for actions in both systems.

  * Real-time response: Both EPS ans CEP aim to process and respond to data in real-time or near real-time, making the suitable for dynamic and time-sensitive applications.

  * Scalability: Both must handle high-velocity data streams efficiently, often requiring distributed architectures to scale horizontally.

  * Application in EDAs: Both can be integrated into Event-Driven Architectures, where components communicate asynchronously via events.

* Differences:
  * Focus of processing: EBS deals with individual events in isolation, focusing on simple and direct reactions to each event. On the other hand, CEP processes multiple events collectively, identifying patterns, sequences or correlations.

  * Complexity: EBS is relatively simple to design and implement, as it doesn't require advanced pattern recognition, while CEP is more complex, as it involves defining rules for detecting sequences, windows and relationships across events.

  * Use cases: EBS is more suitable for reactive tasks like sending notifications, logging or device monitoring. CEP is better for scenarios requiring advanced analytics, such as fraud detection, supply chain optimization or trend analysis.

  * State management: EBS is typically stateless, reacting to each event independently, while CEP is often stateful, requiring the system to maintain a memory of past events for pattern detection.



2) Which are the three time-models we introduced? List them all before comparing and constrasting two of them

**Answer:** 

We introduced the 3 following time models:
* Stream-only time model: in this model, time is defined by the order of events in the stream. We only care about the order of events and not the time in which they happend. This model has no notion of time, limiting the amount of possible queries, but also reducing the latency.

* Absolute time model: in this model, time is defined by the real time in which events happened. In this model, knowing the exact moment of the ocurrance of events is as important as their order. This increases the expresiveness of the model, but also the latency.

* Interval-based time model: in this model, time is defined in intervals. It is useful when we need to know the time in which events happened, but also group them. This increases the expresiveness, but also the latency of the model.

Let us compare stream-only and absolute time models:

* Similarities:
  * Both models focus on processing events as they arrive.

  * Both are simpler than the interval-based, as they don't require reasoning about overlapping or continuous time ranges.

* Differences:
  * Time representation: stream-only model does not use explicit timestamps and relies only on the system clock to order the arrival of events. The absolute time model includes explicit timestamps for each event, enabling analysis based on actual occurrence times.

  * Use cases: the stream-only model is suitable for scenarios where ordering is sufficient, such as reactive systems or message queues. On the other hand, absolute time model is ideal for scenarios requiring precise temporal analytics, such as financial transaction monitoring or sensor data analysis.
  
  * Expresiveness and latency: stream-only model allows a lower latency, but with a low expresiveness. In absolute time, we are able to perform more queries, but with the drawback of a higher latency.
 

3) What are the basic types of windows? List them all and describe the behavior of at least two of them

**Answer:**

The basic types of windows are:
* Tumbling window: this window has a fixed size and does not overlap. In other words, it makes a partition of time, so that each event always belongs to only one window.

* Hopping window: this window moves forward in fixed intervals, usually overlapping with the previous window. This produces that an event may belong to two (or more, depending on the value of the stride) hopping windows.

* Session window: this window is determined dynamically based on periods of activity separated by periods of inactivity. In other words, an event belongs to a certain window if either is the first one detected after a period of inactivity, or falls within a limited time context after the ocurrance of another event.

4) What is a context and how does it relate to interval-based time model? Give an example that supports your explanation.

**Answer:**

A context refers to the temporal or logical scope within which events are considered relevant or processed. It defines the boundaries for evaluating events, aggregating data or detecting patterns.

The interval-based model associates events with specific time intervals, during which they are valid (i.e., are taken into account for aggregation, processing, etc.). Contexts in the model are defined by these intervals, enabling the system to process events only within their relevant timeframes. 

For example, imagine we are monitoring the transactions of a user during the following 10 minutes after a transaction, for fraud detection. Then, the context is generated after the first transaction received (after a period of inactivity), and then it is mantained over an interval of 10 minutes (in which the new transactions that arrive are processed together).

5) Why do we say that Complex Event Processing can tame the torrent effect? Support you claim with an example

**Answer:**

The torrent effect occurs when a high volume of events floods a system, making it difficult to process or extract meaningful information in real-time. Complex Event Processing (CEP) can tame this effect by aggregating, filtering and correlating low-level events into high-level meaningful insights. This is done by:

* Pattern detection: CEP can detect specific sequences of events.
* Temporal constraints: CEP can generate temporal contexts in which only the events within are valid.
* Event filtering and aggregation, focusing only on specific data according to our needs.
* Output control: CEP can regulate the frequency and form in which the results are generated, to avoid overwhelming outputs.

For example, in fire alarm systems, sensors are continuously generating data like temperature, smoke and CO levels. This creates a flood of raw events that can overwhelm any system if not properly handled. CEP manages this by :
* Filtering irrelevant data (e.g., temperatures lower than 50°).

* Aggregating values over short time intervals (e.g., average temperature every 10 seconds).

* Detecting specific patterns on a given context (e.g., after avg temperature higher than 50°, high smoke and CO within a minute).

* Controlling the output (e.g., trigger once the alarm after the fire event is detected, not every single time after that).

6) What are latency / throughput? Given an example of both in the same domain

**Answer:**

We refer to latency as the time taken for a message to travel from the producer to the consumer. Throughput, on the other hand, is the number of messages that can be processed in a given time period.

We can illustrate it with a visual example. Imagine we have a road connecting city A to B. We refer to latency as the time it takes a car to move from A to B, and throughput the amount of cars that arrive to B from A on, for example, an hour.

In the same way, latency on an distributed streaming system refers to the time it takes to process a message, while throughput means the amount of messages (or info) that can be processed within a fixed amount of time, e.g., a minute. 

7) Describe the Kafka components at a conceptual, system, and physical level. Illustrate them using an example.

**Answer:**

At a conceptual level:

Kafka is a distributed system for real-time data streaming, composed of:

* Producer: a system that sends data to Kafka topics. 

* Consumer: a system that reads data from certain Kafka topics.

* Topic: is a category of messages in Kafka, a way to organize incoming messages based on their content.

* Message: it is the unit of data in Kafka. It consists of a key-value pair that is sent from the produced and is stored on a topic

* Cluster: a Kafka cluster manages a set of topics.

At a system level, we add the following:

* Partitions: each topic is subdivided into partitions, to allow for parallel processing and scalability. Each partition of the same topic is stored on different brokers.

* Broker: it is a Kafka server that stores and serves data. It also handles replication of partitions to allow for fault-tolerance. Each broker can store multiple partitions

At a physical level:

* Each broker is hosted on different machines of a network.

* When a message is received, it is assigned to a certain partition based on the key (round-robin if no key)

* The message is replicated to other brokers, for fault tolerance. Usually, a partition is replicated many times. 

* Each partition is stored on the disk of the broker, in a log-structured format, to allow for fast reads and writes (commit log). Also we can configure retention policies to keep messages for an amount of time.

* Consumers generally form consumer groups, that subscribe to the same topic. This allows parallel reading from many partitions.

Example: Log Processing System

Conceptual Level:

* A logging service (Producer) sends application logs to a Kafka topic named logs.
* Monitoring tools (Consumers) read logs from the topic to detect anomalies.

System Level:

* The logs topic is divided into 3 partitions for scalability.
* Producers write logs into partitions in a round-robin fashion.
* Consumers subscribe to the logs topic and are assigned specific partitions to process logs in parallel.
* The Kafka cluster consists of 5 brokers, each storing one or more partitions of the logs topic.

Physical Level:

* The cluster is deployed on 5 physical or virtual machines (brokers).
* Each broker has disk storage where partitions are stored (e.g., Partition 0 on Broker 1, Partition 1 on Broker 2, etc.).
* Messages are replicated across brokers (e.g., Partition 0 has replicas on Broker 1 and Broker 3).

8) Illustrate the programming model of spark structured streaming at the logical and physical level. Illustrate them using an example.

**Answer:**

Logical Level:

At the logical level, Spark Structured Streaming treats incoming live data streams as unbounded tables that are continuously updated as new data arrives. Computations are expressed as a Directed Acyclic Graph (DAG), resembling batch queries applied to static tables. Key characteristics:

* Incremental Updates: The DAG processes only new data in each micro-batch.

* State Management: Maintains and updates state across batches to ensure continuity.

* Declarative API: Uses SQL-like syntax for operations, making streaming computation similar to batch processing.

Physical level:

At the physical level, Spark executes the streaming DAG incrementally on the latest available data:

* Micro-Batches: The incoming data is processed as small, fixed-size batches.

* Execution Engine: Utilizes the Catalyst optimizer and Tungsten execution engine for efficient query optimization and execution.


* Fault Tolerance: Ensures exactly-once guarantees using checkpoints and write-ahead logs.

Each micro-batch reads new data from the source, applies transformations, updates intermediate states (if needed), and writes results to the sink (e.g., database, Kafka, file system).


### Illustrated Example: Real-Time Traffic Monitoring

Logical Model:

* Use Case: Monitoring vehicle speeds on a highway.
* Query: Compute the average speed of vehicles passing through different sections of the highway every minute.

Logical representation:

$$\texttt{speed\_sdf.groupBy(window("timestamp", "1 minute"), "section").agg(avg("speed"))}$$

The window function groups data into one-minute intervals.

Physical Model:

* Data is read from traffic sensors as micro-batches.
* The engine updates state (e.g., total speed and count for each section).
* Results are written incrementally to a dashboard showing average speeds in real time.

This model allows seamless integration of batch and stream processing while ensuring fault tolerance and scalability.

9) How does Spark Structured Stream treat late arrivals in windowed aggregation? Illustrate it using an example.

**Answer:**

Late arrivals in Spark Structured Streaming are events that arrive after the expected time window for processing. To manage these effectively, Spark uses watermarking, which:

* Tracks the maximum event time seen in the stream.
* Specifies a watermark delay as a threshold, beyond which late events are considered too late and discarded.

In general, the system will track the processing time $P$ as the maximum event seen, and if the difference between $P$ and the event time of the incoming event is smaller, it will be considered on the aggregation and the table will update accordingly, as it had arrived on time.

#### Example Scenario: Real-Time Traffic Monitoring

Consider a traffic management system monitoring vehicle speeds:

* Events include vehicle ID, speed, and timestamp.
* Speed averages are calculated over 10-minute windows.
* Some sensor data may be delayed due to network latency.

Using watermarking:

* Late arrivals are allowed up to 5 minutes after the window's end.
* Delayed data exceeding this threshold is discarded.



10) Is there an exact incremental algorithm for computing average, median and distinct? Why?

**Answer:**

* Average: Yes, an exact incremental algorithm exists. It tracks a running sum and count to compute the average efficiently.

* Median: No exact incremental algorithm exists without storing all data, as the median requires sorted values. Approximations like quantile summaries can estimate it.

* Distinct Count: No exact incremental algorithm exists without tracking all unique elements. Approximations like HyperLogLog are used for scalability.

Exact algorithms for median and distinct counts aren't feasible in streaming due to memory and real-time constraints.


### Depth questions

1) What is a hopping logical window? Give an example at the conceptual level and show that you know both EPL’s and Spark Structured Streaming’s syntax

**Answer:**

A hopping logical window is a fixed-sized window that slides by a fixed-sized increment. With logical, it means that this size is measured by time. On a window, multiple events can be grouped and processed.

EPL:

In [None]:
select sensor
from TemperatureSensorEvent.win:time(1 minutes)
group by sensor;
output snapshot every 30 seconds;

Spark:

In [None]:
temperature_df.groupBy(window("TS", "1 minutes", "30 seconds"), "SENSOR")

2) What is the role of the output clause in EPL? Give an example that supports your explanation.

**Answer:**

It defines how the output stream is going to be generated. It is what we use to, for example, avoid the overwhelming of event generation after certain trigger (event pattern) happens.

For example:

In [None]:
select sensor
from FireEvent.win:time(10 minutes);
output first every 5 minutes;

3) What’s the role of the followed by operator (->) in EPL? Is it possible to implement it in Spark Structured Streaming? Give an example that supports your explanation.

**Answer:**

The specific role of the followed by (->) operator in EPL is to detect specific sequences of events that happen (for example, during a window or context). Combined with the every clause, it allows us to detect and report the ocurrence of one event followed by another.

This operator is one of the tools of CEP that allows us to tame the torrent effect by filtering and processing high-level patterns instead of low-level events.

In Spark, this operator can only be implemented partially (on the form every A -> every B) by using a stream-to-stream join with temporal constraints on the events timestamps

For example:

In [None]:
join_sdf = smoke_events . join (
    high_temp_events , expr ( """
        ( sensorTemp == sensorSmoke ) AND
        ( tsTemp > tsSmoke ) AND                   ## this is the EPL ’s -> operator
        ( tsTemp < tsSmoke + interval 2 minute )   ## this is the timer : within clause
    """ )
)

4) Explain stream to stream joins. Give an example in EPL that illustrates the different results you obtain using an inner and a left outer join

**Answer:**

Stream-to-stream joins allow combining two event streams based on a common key or condition. A window (logical or physical) is used to limit the join to events within a specified time frame.

Inner:

In [None]:
SELECT * 
FROM View#time(9 sec) AS v 
INNER JOIN Click#time(9 sec) AS c 
ON v.id = c.id;

Left Outer:

In [None]:
SELECT * 
FROM View#time(9 sec) AS v 
LEFT OUTER JOIN Click#time(9 sec) AS c 
ON v.id = c.id;

Note that every time that an event arrives, it will trigger the query. On the inner join, an output will be produced if we find a match in both streams within the window. On the left outer, every event on View will be on the output, with a null event in case that no event matched on the Clicks stream.

5) Explain stream to table joins. Given an example in EPL that illustrates the different results you obtain using an inner and a left outer join

**Answer:**

Stream-to-table joins allows us to combine the events from a stream with the events grouped on a table, based on a common key. This table is updated dynamically from a stream, by using a grouping clause (for example $\texttt{Click#unique(id)}$).

Note that these types of joins incorporate a keyword $\texttt{UNIDIRECTIONAL}$ to execute the query only on the arrivals of the stream events.

In [None]:
select *
from View as v
unidirectional inner join
Click # unique ( id ) as c
on v . id = c . id ;

With an left outer join, we incorporate all the results from the stream View.

6) What makes Kafka scale horizontally? Explain it using an example.

**Answer:**

On a Kafka cluster, adding a new machine improves the system's efficiency by increasing its capacity and fault tolerance. Kafka divides a cluster into brokers, each hosted on a different machine. Each broker manages a subset of partitions from various topics. Adding brokers allows:

* Parallel Reads and Writes: Enables concurrent operations across multiple partitions.
* Balanced Distribution: Kafka redistributes partitions across brokers, balancing the load.
* Enhanced Fault Tolerance: Replication of partitions ensures data availability, even if one broker fails.
* Consumer Scaling: Consumers in a group process partitions in parallel, benefiting from the distributed architecture.

This ensures that the Kafka cluster scales effectively with minimal bottlenecks as data volume grows.

7) What’s the role of a topic in Kafka? How does it relate to partitions and brokers? Explain it using an example.

**Answer:**

A topic is a category used to organize and store messages. Producers send messages to topics, and consumers read from topics, allowing a decoupled communication between these entities. 

Each topic is divided in subsets called partitions, so many partitions can refer to the same topic. These partitions provide an unit of parallelism and fault-tolerance. A broker can contain one or more partitions from different topics, and they handle the read, write and replication of the data in the partitions.

For example, let us suppose we have multiple weather stations, sending the temperature ans humidity of sectors in a city. We can group these in the topic `WeatherReport`. This topic is divided into 2 partitions (e.g., from the zones A to D, and from the zones E to H), and each partition is sent into a broker. We also have 2 other brokers for fault tolerance, where the partitions are replicated. Then, the consumer just needs to subscribe to the topic, and read from the partitions (allowing parallel search, for example).


8) What’s the role of a broker in Kafka? How does it relate to topics and partitions? Explain it using an example.

**Answer:**

A broker in Kafka is responsible of storing one or more partitions from different topics on a Kafka cluster. It handles the reads and writes, as well as distributing the data to other brokers for replication. A broker can have many partitions, from many different topics.

9) What’s the role of a consumer group in Kafka? How does it relate to topics and partitions? Explain it using an example.

**Answer:**

A consumer group allows distributed consumption of the topics. In other words, it allows the parallel read of messages from a topic, by grouping consumers subscribed to de same topic. 

A consumer group may have one consumer that receives messages from all partitions of the topic it is subscribed to (bottleneck), or it can have many parallel consumers, each receiving messages from one (or more, but not all) of the partitions of the subscribed topic, with the goal of parallelizing the read.

A consumer group can only consume from one topic.

### Exercises

Suppose you receive the following stream of events:

$$\texttt{A1@0,C1@1,B1@2,B2@3,A2@4,B3@5,A3@6,B4@10}$$

Note that $\texttt{A3@6}$ denotes an event of type A identified by the number 3 that
is received at time 6.

Given the patter:

$$\texttt{every A -> (B and not C where timer:within(3 sec))}$$

1) Translate the pattern into an English sentence
2) Which are the events that trigger the matching? Why?
3) Which are the events that may trigger the matching but are excluded by the semantics of the every and the where timer:within clauses? Why?

**Answer:**

1) For every occurence of an event A, look for an ocurrance of an event B, provided that there is no occurrance of an event C within 3 seconds of A. 

2) (A2, B3), (A3, B4)

3) As we are not using an `every` after the `->` the event (A2, B4) is not captured. Also, the `where timer:within` clause excludes the (A1, B1). The others (A1, B2), (A1, B3) and (A1, B4) are excluded by both semantics.

In the git repo of the course, you find a complete example about a
Robotic Arm solved in:

* EPL:
  * https://github.com/Streaming-Data-Analytics/Courseware/tree/main/Streaming%20Data%20Engineering/EPL/epl_robotic-arm

* Spark Structured Streaming:
  * https://github.com/Streaming-Data-Analytics/Courseware/tree/main/Streaming%20Data%20Engineering/Spark/sss_robotic-arm

There are other two examples completely solved in EPL:
* https://github.com/Streaming-Data-Analytics/Courseware/tree/main/Streaming%20Data%20Engineering/EPL/epl_tomatopick

* https://github.com/Streaming-Data-Analytics/Courseware/tree/main/Streaming%20Data%20Engineering/EPL/epl_bocce (VERY HARD)