## Notes for Data Streaming 

### Introduction to Stream Processing

In computing, a stream is typically thought of as a potentially unbounded sequence.

Stream Processing is the act of performing continual calculations on a potentially endless and constantly evolving source of data.

Stream Processing applications perform calculations on Data Streams. Data Streams consist of a potentially endless stream of immutable data.

Immutable data does not change -- once the data has been placed in the data stream it can never be updated. Another data entry can be placed in the stream that supersedes the previous data entry if necessary.

Data sent to data streams is typically small, less than 1MB in size.

The data throughput to data streams is highly variable. Some streams will receive thousands or tens of thousands of records per second, and some will receive one or two records per hour.

**Recap - Stream Processing**
* Stream Processing acts on potentially endless and constantly evolving immutable data contained in data streams.
* Once data have been placed in a data stream, they cannot be modified. We must place a new record in the stream to override the existing data.
* Finally, data in data streams is typically less than 1MB in size and the data volume may vary from a few records an hour to thousands of requests per second.

An event is an immutable fact regarding something that occured within our system.

Stream processing allows companies to process data as it's generated and not hours after the fact as is common with batch processing.

**Stream Processing Examples Recap**
Stream Processing is a critical component in a number of familiar technology applications:

* Finding patterns and meaningful data in disparate log messages in a microservices architecture
* Tracking user-engagement in real time with streaming website analytics
* Real-time pricing in ride-sharing applications based on demand and environmental conditions
* Stock buying/selling based on price, news, and social media sentiment


**Batch Processing**
* Runs on a scheduled basis
* May run for a longer period of time and write results to a SQL-like store
* May analyze all historical data at once
* Typically works with mutable data and data stores

**Stream Processing**
* Runs at whatever frequency events are generated
* Typically runs quickly, updating in-memory aggregates
* Stream Processing applications may simply emit events themselves, rather than write to an event store
* Typically analyzes trends over a limited period of time due to data volume
* Typically analyzes immutable data and data stores

Batch and Stream processing are not mutually exclusive. Batch systems can create events to feed into stream processing applications, and similarly, stream processing applications can be part of batch processing analyses.

**Streaming Data Store**
* May look like a message queue, as is the case with Apache Kafka
* May look like a SQL store, as is the case with Apache Cassandra
* Responsible for holding all of the immutable event data in the system
* Provides guarantee that data is stored ordered according to the time it was produced
* Provides guarantee that data is produced to consumers in the order it was received
* Provides guarantee that the events it stores are immutable and unchangeable

**Stream Processing Application and Framework**
* Stream Processing applications sit downstream of the data store
* Stream Processing applications ingest real-time event data from one or more data streams
* Stream Processing applications aggregate, join, and find differences in data from these streams
* Common Stream Processing Application Frameworks in use today include:
    * Confluent KSQL
    * Kafka Streams
    * Apache Flink
    * Apache Samza
    * Apache Spark Structure Streaming
    * Faust Python Library
    
**Benefits of Stream Processing**
* Faster for scenarios where a limited set of recent data is needed
* More scalable due to distributed nature of storage
* Provides a useful abstraction that decouples applications from each other
* Allows one set of data to satisfy many use-cases which may not have been predictable when the dataset was originally created
* Built-in ability to replay events and observe exactly what occurred, and in what order, provides more opportunities to recover from error states or dig into how a particular result was arrived at

**Examples of Log-Structured Storage**

Cassandra & HBase:
* Both provide SQL-like interfaces
* Use append-only, log-structured streams
* Look and act like traditional SQL database to end user
* Clusters may consist of thousands of distributed nodes
* Popular for batch workloads

Apache Kafka:
* A message queue based log-structured, append-only storage
* Scales to thousands of distributed nodes
* Popular for stream processing

**Append-only logs**
* Append-only logs are text files in which incoming events are written to the end of the log as they are received.
* This simple concept -- of only ever appending, or adding, data to the end of a log file -- is what allows stream processing applications to ensure that events are ordered correctly even at high throughput and scale.
* We can take this idea a step farther, and say that in fact, streams are append-only logs.

**Log-structured streaming**
* Log-structured streams build upon the concept of append-only logs. One of the hallmarks of log-structured storage systems is that at their core they utilize append-only logs.
* Common characteristics of all log-structured storage systems are that they simply append data to log files on disk.
* These log files may store data indefinitely, for a specific time period, or until a specific size is reached.
* There are typically many log files on disk, and these log files are merged and compacted occasionally.
* When a log file is merged it means that two or more log files are joined together into one log file.
* When a log file is compacted it means that data from one or more files is deleted. Deletion is typically determined by the age of a record. The oldest records are removed, while the newest stay.
* Examples of real world log-structured data stores: Apache HBase, Apache Cassandra, Apache Kafka

**Apache Kafka as a Stream Processing Tool**
* Kafka is one of the most popular streaming data platforms in the industry today.
* Provides an easy-to-use message queue interface on top of its append-only log-structured storage medium
* Kafka is a log of events
* In Kafka, an event describes something that has occurred, as opposed to a request for an action to be performed
* Kafka is distributed by default
* Fault tolerant by design, meaning it is hard to lose data if a node is suddenly lost
* Kafka scales from 1 to thousands of nodes
* Kafka provides ordering guarantees for data stored within it, meaning that the order in which data is received is the order in which data will be produced to consumers
* Commonly used data store for popular streaming tools like Apache Spark, Flink, and Samza

**Kafka History**
* Created at Linkedin to service internal stream processing needs
* Kafka is one of the Apache Foundation’s most popular projects
* Used widely in production. Some famous users include Uber, Apple, and Airbnb
* Creators of Kafka left LinkedIn to found Confluent, which now acts as the owner and leader of the Kafka project
* Jay Kreps, one of the core authors of Apache Kafka, named the system after Czech author Franz Kafka. Kreps, who enjoys Kafka’s work, thought the name was a good fit because Kafka was built to be a “system optimized for writing.”

**Kafka in Use in Industry**
* The term source is sometimes used to refer to Kafka clients which are producing data into Kafka, typically in reference to another data store
* The term sink is sometimes used to refer to Kafka clients which are extracting data from Kafka, typically in reference to another data store

**Kafka Topics**
* Used to organize and segment datasets, similar to SQL database tables
* Unlike SQL database tables, Kafka Topics are not queryable.
* May be created programmatically, from a CLI (Command Line Interface), or automatically
* Consist of key-value data in binary format

**Kafka Producers**
* Send event data into Kafka topics
* Integrate with client libraries in languages like Java, Python, Go, as well as many other languages

**Kafka Consumers**
* Pull event data from one or more Kafka Topics
* Integrate with Kafka via a Client Library written in languages like Python, Java, Go, and more
* By default only consume data that was produced after the consumer first connected to the topic. Historical data will not be consumed by default.

**Kafka CLI Tools**

List topics that already exist within Kafka:
```bash
kafka-topics --list --zookeeper localhost:2181
```
The --zookeeper localhost:2181 switch tells the kafka-topics CLI where the Zookeeper ensemble Kafka is using is located. Note that in the newest versions of Kafka the --zookeeper switch is deprecated in favor of a --bootstrap-server switch that points to Kafka. The --zookeeper switch still works, but will likely be dropped in the next major revision of Kafka.

Create topics:
```bash
kafka-topics --create --topic "my-first-topic" --partitions 1 --replication-factor 1 --zookeeper localhost:2181
```
Check the newly created topics:
```bash
kafka-topics --list --zookeeper localhost:2181 --topic "my-first-topic"
```
Now that we have a topic, let's add some data.
```bash
kafka-console-producer --topic "my-first-topic" --broker-list PLAINTEXT://localhost:9092
```
Delete topics:
```bash
kafka-topics --delete --topic "my-first-topic" --zookeeper localhost:2181
```

In [None]:
# PLEASE COMPLETE THE TODO ITEMS IN THIS PYTHON CODE

import asyncio

from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient, NewTopic


BROKER_URL = "PLAINTEXT://localhost:9092"
TOPIC_NAME = "my-first-python-topic"


async def produce(topic_name):
    """Produces data into the Kafka Topic"""
    # TODO: Configure the producer with `bootstrap.servers`
    #       See: https://docs.confluent.io/current/clients/confluent-kafka-python/#producer
    p = Producer({"bootstrap.servers": BROKER_URL, "group.id": "first-python_consumer-group"})

    curr_iteration = 0
    while True:
        # TODO: Produce a message to the topic
        #       See: https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.Producer.produce
        p.produce(TOPIC_NAME, f"Message: {curr_iteration}")

        curr_iteration += 1
        await asyncio.sleep(1)


async def consume(topic_name):
    """Consumes data from the Kafka Topic"""
    # TODO: Configure the consumer with `bootstrap.servers` and `group.id`
    #       See: https://docs.confluent.io/current/clients/confluent-kafka-python/#consumer
    c = Consumer({"bootstrap.servers": BROKER_URL})

    # TODO: Subscribe to the topic
    #       See: https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.Consumer.subscribe
    c.subscribe([TOPIC_NAME])

    while True:
        # TODO: Poll for a message
        #       See: https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.Consumer.poll
        message = c.poll(1.0)

        # TODO: Handle the message. Remember that you should:
        #   1. Check if the message is `None`
        #   2. Check if the message has an error: https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.Message.error
        #   3. If 1 and 2 were false, print the message key and value
        #       Key: https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.Message.key
        #       Value: https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.Message.value
        #
        if message is None:
            print("No message recieved!")
        elif message.error() is not None:
            print(f"Message has an error {message.error()}")
        else: 
            print(f"Key: {message.key()}, Value: {message.value()}")
        await asyncio.sleep(1)


async def produce_consume():
    """Runs the Producer and Consumer tasks"""
    t1 = asyncio.create_task(produce(TOPIC_NAME))
    t2 = asyncio.create_task(consume(TOPIC_NAME))
    await t1
    await t2


def main():
    """Runs the exercise"""
    # TODO: Configure the AdminClient with `bootstrap.servers`
    #       See: https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.admin.AdminClient
    client = AdminClient({"bootstrap.servers": BROKER_URL})
    # TODO: Create a NewTopic object. Don't forget to set partitions and replication factor to 1!
    #       See: https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.admin.NewTopic
    topic = NewTopic(TOPIC_NAME, num_partitions=1, replication_factor=1)

    # TODO: Using `client`, create the topic
    #       See: https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.admin.AdminClient.create_topics
    client.create_topics([topic])

    try:
        asyncio.run(produce_consume())
    except KeyboardInterrupt as e:
        print("shutting down")
    finally:
        # TODO: Using `client`, delete the topic
        #       See: https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.admin.AdminClient.delete_topics
        client.delete_topics([topic])
        pass


if __name__ == "__main__":
    main()

**Glossary of Key Terms in this Lesson**
* Stream - An unbounded sequence of ordered, immutable data
* Stream Processing - Continual calculations performed on one or more Streams
* Immutable Data - Data that cannot be changed once it has been created
* Event - An immutable fact regarding something that has occurred in our system.
* Batch Processing - Scheduled, periodic analysis of one or more groups of related data.
* Data Store - A generic place that holds data of some kind, like a message queue or data store
* Stream Processing Application - An application which is downstream of one or more data streams and performs some kind of calculation on incoming data, typically producing one or more output data streams
* Stream Processing Framework - A set of tools, typically bundled as a library, used to construct a Stream Processing Application
* Real-time - In relation to processing, this implies that a piece of data, or an event, is processed almost as soon as it is produced. Strict time-based definitions of real-time are controversial in the industry and vary widely between applications. For example, a Computer Vision application may consider real-time to be 1 millisecond or less, whereas a data engineering team may consider it to be 30 seconds or less. In this class when the term "real-time" is used, the time-frame we have in mind is seconds.
* Append-only Log - files in which incoming events are written to the end of the file as they are received
* Change Data Capture (CDC) - The process of capturing change events, typically in SQL database systems, in order to accurately communicate and synchronize changes from primary to replica nodes in a clustered system.
* Log-Structured Storage - Systems built on Append-Only Logs, in which system data is stored in log format.
* Merge (Log Files) - When two or more log files are joined together into a single output log file
* Compact (Log Files) - When data from one or more files is deleted, typically based on the age of data
* Source (Kafka) - A term sometimes used to refer to Kafka clients which are producing data into Kafka, typically in reference to another data store
* Sink (Kafka) - A term sometimes used to refer to Kafka clients which are extracting data from Kafka, typically in reference to another data store
* Topic (Kafka) - A logical construct used to organize and segment datasets within Kafka, similar to how SQL databases use tables
* Producer (Kafka) - An application which is sending data to one or more Kafka Topics.
* Consumer (Kafka) - An application which is receiving data from one or more Kafka Topics.

### Apache Kafka

**Glossary of Key Terms:**
* Broker (Kafka) - A single member server of the Kafka cluster
* Cluster (Kafka) - A group of one or more Kafka Brokers working together to satisfy Kafka production and consumption
* Node - A single computing instance. May be physical, as in a server in a datacenter, or virtual, as an instance might be in AWS, GCP, or Azure.
* Zookeeper - Used by Kafka Brokers to determine which broker is the leader of a given partition and topic, as well as track cluster membership and configuration for Kafka
* Access Control List (ACL) - Permissions associated with an object. In Kafka, this typically refers to a user’s permissions with respect to production and consumption, and/or the topics themselves.
* JVM - The Java Virtual Machine - Responsible for allowing host computers to execute the byte-code compiled against the JVM.
* Data Partition (Kafka) - Kafka topics consist of one or more partitions. A partition is a log which provides ordering guarantees for all of the data contained within it. Partitions are chosen by hashing key values.
* Data Replication (Kafka) - A mechanism by which data is written to more than one broker to ensure that if a single broker is lost, a replicated copy of the data is available.
* In-Sync Replica (ISR) - A broker which is up to date with the leader for a particular broker for all of the messages in the current topic. This number may be less than the replication factor for a topic.
* Rebalance - A process in which the current set of consumers changes (addition or removal of consumer). When this occurs, assignment of partitions to the various consumers in a consumer group must be changed.
* Data Expiration - A process in which data is removed from a Topic log, determined by data retention policies.
* Data Retention - Policies that determine how long data should be kept. Configured by time or size.
* Batch Size - The number of messages that are sent or received from Kafka
acks - The number of broker acknowledgements that must be received from Kafka before a producer continues processing
* Synchronous Production - Producers which send a message and wait for a response before performing additional processing
* Asynchronous Production - Producers which send a message and do not wait for a response before performing additional processing
* Avro - A binary message serialization format
* Message Serialization - The process of transforming an applications internal data representation to a format suitable for interprocess communication over a protocol like TCP or HTTP.
* Message Deserialization - The process of transforming an incoming set of data from a form suitable for interprocess communication, into a data representation more suitable for the application receiving the data.
* Retries (Kafka Producer) - The number of times the underlying library will attempt to deliver data before moving on
* Consumer Offset - A value indicating the last seen and processed message of a given consumer, by ID.
* Consumer Group - A collection of one or more consumers, identified by group.id which collaborate to consume data from Kafka and share a consumer offset.
* Consumer Group Coordinator - The broker in charge of working with the Consumer Group Leader to initiate a rebalance
* Consumer Group Leader - The consumer in charge of working with the Group Coordinator to manage the consumer group
* Topic Subscription - Kafka consumers indicate to the Kafka Cluster that they would like to consume from one or more topics by specifying one or more topics that they wish to subscribe to.
* Consumer Lag - The difference between the offset of a consumer group and the latest message offset in Kafka itself
* CCPA - California Consumer Privacy Act
* GDPR - General Data Protection Regulation

**Kafka Architecture**
* Kafka servers are referred to as brokers
* All of the brokers that work together are referred to as a cluster
* Clusters may consist of just one broker, or thousands of brokers
* Apache Zookeeper is used by Kafka brokers to determine which broker is the leader of a given partition and topic
* Zookeeper keeps track of which brokers are part of the Kafka cluster
* Zookeeper stores configuration for topics and permissions (Access Control Lists - ACLs)
* ACLs are Permissions associated with an object. In Kafka, this typically refers to a user’s permissions with respect to production and consumption, and/or the topics themselves.
* Kafka nodes may gracefully join and leave the cluster
* Kafka runs on the Java Virtual Machine (JVM)

**Kafka Clustering - Key Points**
* Kafka servers are referred to as brokers and organized into clusters.
* Kafka uses Apache Zookeeper to keep track of topic and ACL configuration, as well as determine leadership and cluster management.
* Usage of ZooKeeper means that Kafka brokers can typically seamlessly join and leave clusters, allowing Kafka to grow easily as its usage increases or decreases.
* Kafka stores all of its data in a directory on the broker disk.

**Data Partitions**
* All Kafka topics consist of one or more partitions
* A partition contains a strictly-ordered subset of all data in the topic.
* Partitions enable Kafka to achieve high throughout

**Data Replication**
* With replication, data is written to multiple brokers, not just one.
* Replicas store a copy of the leader's data
* When a leader broker is lost, a new leader is elected by Zookeeper.
* Default number of replicas can be configured globally
* Cannot have more replicas than the number of brokers
* Data replciation incurs some overhead
* Always enable replication in a Production cluster.

create a topic
```bash
kafka-topics --create --topic kafka-arch --partitions 1 --replication-factor 1 --zookeeper localhost:2181
```
see how Kafka stored it on disk
```bash
ls -alh /var/lib/kafka/data | grep kafka-arch
```
modify the number of partitions
```bash
kafka-topics --alter --topic kafka-arch --partitions 3 --zookeeper localhost:2181
```
**How Kafka Works**
* A Kafka Broker is an individual Kafka server
* A Kafka Cluster is a group of Kafka Brokers
* Kafka uses Zookeeper to elect topic leaders and store its own configuration
* Kafka writes log files to disk on the Kafka brokers themselves
* How Kafka achieves scale and parallelism with topic partitions
* How Kafka provides resiliency and helps prevent data loss with data replication

**Topic Configuration**
* Data replication can be set on a per-topic basis
* A broker must be an "In Sync Replica"(ISR) to become a leader
* Desired number of ISRs can be set on topics
* Number of ISRs must succeed when data is sent
* Topic ordering guarantees only apply within partitions

**Partitioning Topics Tips and Equation**
* The “right” number of partitions is highly dependent on the scenario.
* The most important number to understand is desired throughput. How many MB/s do you need to achieve to hit your goal?
* You can easily add partitions at a later date by modifying a topic.
* Partitions have performance consequences. They require additional networking latency and potential rebalances, leading to unavailability.
* Determine the number of partitions you need by dividing the overall throughput you want by the throughput per single consumer partition or the throughput per single producer partition. Pick the larger of these two numbers to determine the needed number of partitions.
    * `Partitions = Max(Overall Throughput/Producer Throughput, Overall Throughput/Consumer Throughput)`
    * Example from video, with 3 Producers and 5 Consumers, each operating at 10MB/s per single producer/consumer partition: `Max(100MBs/(3 * 10MB/s), 100MBs/(5 * 10MB/s)) = Max(2) ~= 2 partitions needed`
    
**Naming Conventions**
* No official or idiomatic pattern defined
* Kafka requires names < 256 chars, `[a-zA-Z0-9.-_]`
* Name topics according to some consistent strategy
* Consistent naming leads to simpler consumption
* Recommend approch: `<domain>.<model>.<evnt type>`

**Data Management - Key Points**
* Data retention determines how long Kafka stores data in a topic.
    * The `retention.bytes`, `retention.ms` settings control retention policy
* When data expires it is deleted from the topic.
    * This is true if  `cleanup.policy` is set to `delete`
* Retention policies may be time based. Once data reaches a certain age it is deleted.
    * The `retention.ms` setting controls retention policy on time
* Retention policies may be size based. Once a topic reaches a certain age the oldest data is deleted.
    * The `retention.bytes` setting controls retention policy on time
* Retention policies may be both time- and size-based. Once either condition is reached, the oldest data is deleted.
* Alternatively, topics can be compacted in which there is no size or time limit for data in the topic.
    * This is true if `cleanup.policy` is set to `compact`
* Compacted topics use the message key to identify messages uniquely. If a duplicate key is found, the latest value for that key is kept, and the old message is deleted.
* Kafka topics can use compression algorithms to store data. This can reduce network overhead and save space on brokers. Supported compression algorithms include: lz4, ztsd, snappy, and gzip.
    * `compression.type` controls the type of message compression for a topic
* Kafka topics should store data for one type of event, not multiple types of events. Keeping multiple event types in one topic will cause your topic to be hard to use for downstream consumers.

In [None]:
# Please complete the TODO items in this code

import asyncio

from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient, NewTopic


BROKER_URL = "PLAINTEXT://localhost:9092"


def topic_exists(client, topic_name):
    """Checks if the given topic exists"""
    # TODO: Check to see if the given topic exists
    #       See: https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.Consumer.list_topics
    topic_metadata = client.list_topics(timeout=5)
    return topic_metadata.topics.get(topic_name) is not None


def create_topic(client, topic_name):
    """Creates the topic with the given topic name"""
    # TODO: Create the topic. Make sure to set the topic name, the number of partitions, the
    # replication factor. Additionally, set the config to have a cleanup policy of delete, a
    # compression type of lz4, delete retention milliseconds of 2 seconds, and a file delete delay
    # milliseconds of 2 second.
    #
    # See: https://docs.confluent.io/current/clients/confluent-kafka-python/#confluent_kafka.admin.NewTopic
    # See: https://docs.confluent.io/current/installation/configuration/topic-configs.html
    futures = client.create_topics(
        [
         NewTopic(
             topic = topic_name,
             num_partitions = 5,
             replcation_factor = 1,
             config = {
                 "cleanup.policy": "compact",
                 "compression.type": "lz4",
                 "delete.retention.ms": 100,
                 "file.delete.delay.ms": 100,
             }
         )
        ]
    )

    for topic, future in futures.items():
        try:
            future.result()
            print("topic created")
        except Exception as e:
            print(f"failed to create topic {topic_name}: {e}")
            raise


def main():
    """Checks for topic and creates the topic if it does not exist"""
    client = AdminClient({"bootstrap.servers": BROKER_URL})

    #
    # TODO: Decide on a topic name
    #
    topic_name = "sample2"
    exists = topic_exists(client, topic_name)
    print(f"Topic {topic_name} exists: {exists}")

    if exists is False:
        create_topic(client, topic_name)

    try:
        asyncio.run(produce_consume(topic_name))
    except KeyboardInterrupt as e:
        print("shutting down")


async def produce_consume(topic_name):
    """Runs the Producer and Consumer tasks"""
    t1 = asyncio.create_task(produce(topic_name))
    t2 = asyncio.create_task(consume(topic_name))
    await t1
    await t2


async def produce(topic_name):
    """Produces data into the Kafka Topic"""
    p = Producer({"bootstrap.servers": BROKER_URL})

    curr_iteration = 0
    while True:
        p.produce(topic_name, f"iteration {curr_iteration}".encode("utf-8"))
        curr_iteration += 1
        await asyncio.sleep(0.5)


async def consume(topic_name):
    """Consumes data from the Kafka Topic"""
    c = Consumer({"bootstrap.servers": BROKER_URL, "group.id": "0"})
    c.subscribe([topic_name])
    while True:
        message = c.poll(1.0)
        if message is None:
            print("no message received by consumer")
        elif message.error() is not None:
            print(f"error from consumer {message.error()}")
        else:
            print(f"consumed message {message.key()}: {message.value()}")
        await asyncio.sleep(2.5)


if __name__ == "__main__":
    main()

* Synchronous producers block producer program execution unitl the broker has confirmed receipt.
* Asynchronous producers send the data and immediately continue
* Data sent to Kafka should be serialized into a standard format
* Formats include binary, string, csv, JSON, Avro
* Never change serialization type without a new topic

**Producer Configuration Options - Summary**
* All available settings for the confluent_kafka_python library can be found in the librdkafka configuration options. confluent_kafka_python uses librdkafka under the hood and shares the exact configuration options in this document.
* It is a good idea to always set the client.id for improved logging, debugging, and resource limiting
* The retries setting determines how many times the producer will attempt to send a message before marking it as failed
* If ordering guarantees are important to your application and you’ve also enabled retries, make sure that you set enable.idempotence to true
* Producers may choose to compress messages with the compression.type setting
    * Options are none, gzip, lz4, snappy, and zstd
    * lz4 and snappy are the fastest algorithms, zstd and gzip provide a higher compression ratio
    * Compression is performed by the producer client if enabled
    * If the topic has its own compression setting, it must match the producer setting, otherwise the broker will decompress and recompress the message into its configured format.
    * The acks setting determines how many In-Sync Replica (ISR) Brokers need to have successfully received the message from the client before moving on
    * A setting of -1 or all means that all ISRs will have successfully received the message before the producer proceeds
    * Clients may opt to set this to 0 for performance reasons

**Kafka Producers - Summary**
Kafka Producers are rich in options and configuration. In this section you’ve seen how to adapt your producer code to a wide-variety of real world situations through configuration.

Remember, no one set of settings works in all scenarios. If your producer application isn’t performing the way you expect, it’s worth revisiting your producer configuration to ensure that the settings make sense for the throughput level you are hoping to achieve.

The offset is the last message a consumer successfully processed.

**Consumer Offsets**
* Kafka keeps track of what data a consumer has seen with offsets
* Kafka stores offsets in a private internal topic
* Most client libraries automatically send offsets to Kafka for you on a periodic basis
* You may opt to commit offsets yourself, but it is not recommended unless there is a specific use-case.
Offsets may be sent synchronously or asynchronously
* Committed offsets determine where the consumer will start up
    * If you want the consumer to start from the first known message, [set auto.offset.reset to earliest]
    * This will only work the first time you start your consumer. On subsequent restarts it will pick up wherever it left off
    * If you always want your consumer to start from the earliest known message, you must manually assign your consumer to the start of the topic on boot.
    
**Consumer Groups**
* All Kafka Consumers belong to a Consumer group
    * The group.id parameter is required and identifies the globally unique consumer group
    * Consumer groups consist of one or more consumers
* Consumer groups increase throughput and processing speed by allowing many consumers of topic data. However, only one consumer in the consumer group receives any given message.
* If your application needs to inspect every message in a topic, create a consumer group with a single member
* Adding or removing consumers causes Kafka to rebalance
    * During a rebalance, a broker group coordinator identifies a consumer group leader
    * The consumer group leader reassigns partitions to the current consumer group members
    * During a rebalance, messages may not be processed or consumed
    
**Topic Subscriptions**
* You subscribe to a topic by specifying its name
    * If you wanted to subscribe to com.udacity.lesson.views, you would simply specify the full name as ”com.udacity.lesson.views”
    * Make sure to set allow.auto.create.topics to false so that the topic isn’t created by the consumer if it does not yet exist
* One consumer can subscribe to multiple topics by using a regular expression
    * The format for the regular expression is slightly different. If you wanted to subscribe to com.udacity.lesson.views.lesson1 and com.udacity.lesson.views.lesson2 you would specify the topic name as ”^com.udacity.lesson.views.*”
    * The topic name must be prefixed with ”^” for the client to recognize that it is a regular expression, and not a specific topic name
    * Use regexp to specify your regular expressions.
    * See the confluent_kafka_python subscribe() documentation for more information
    
* Remember to deserialize the data you are receiving from Kafka in an appropriate format
    * If the producer used JSON, you will need to deserialize the data using a JSON library
    * If the producer used bytes or string data, you may not have to do anything
* Consumer groups increase fault tolerance and resiliency by automatically redistributing partition assignments if one or more members of the consumer group fail.

**Retrieving Data from Kafka**
* Most Kafka Consumers will have a “poll” loop which loops infinitely and ingests data from Kafka
* Here is a sample poll loop:
```python
while True:
message = c.poll(1.0)
if message is None:
  print("no message received by consumer")
elif message.error() is not None:
  print(f"error from consumer {message.error()}")
else:
  print(f"consumed message {message.key()}: {message.value()}")
```
* It is possible to use either poll or consume, but poll is slightly more feature rich
* Make sure to call close() on your consumer before exiting and to consume any remaining messages
    * Failure to call close means the Kafka Broker has to recognize that the consumer has left the consumer group, which takes time and failed messages. Try to avoid this if you can.
    
* Consumer Lag measures how far behind consumer is
* Lag = Latest Topic Offset - Consumer Topic Offset
* Messages per Second indicates throughput
* Kafka Java Metrics Explorter provides real-time metrics

* latency = time broker recived - time produced
* High latency may indicate acks setting is too high
* High latency may indicate too many partitions
* High latency may indicate too many replicas
* Producer response rate tracks overall delivery rate

**Performance Considerations**
Monitoring Kafka Consumers, Producers, and Brokers for performance is an important part of using Kafka. There are many metrics by which to measure your Kafka cluster. Focus on these key metrics to get started:

* Consumer Lag: The difference between the latest offset in the topic and the most recently committed consumer offset
* Producer Response Rate: The rate at which the broker is responding to the producer indicating message status
* Producer Request Latency: The length of time a producer has to wait for a response from the broker after sending a message
* Broker Disk Space
* Broker Elections

### Data Schemas and Apache Avro

* Data Schema - Define the shape of a particular kind of data. Specifically, data schemas define the expected fields, their names, and value types for those fields. Data schemas may also indicate whether fields are required or optional.
* Apache Avro - A data serialization framework which includes facilities for defining and communicating data schemas. Avro is widely used in the Kafka ecosystem and data engineering generally.
* Record (Avro) - A single encoded record in the defined Avro format
* Primitive Type (Avro) - In Avro, a primitive type is a type which requires no additional specification - null, boolean, int, long, float, double, bytes, string.
* Complex Type (Avro) - In Avro, a complex type models data structures which may involve nesting or other advanced functionality: records, enums, maps, arrays, unions, fixed.
* Schema Evolution - The process of modifying an existing schema with new, deleted, or modified fields.
* Schema Compatibility - Determines whether or not two given versions of a schema are usable by a given client
* Backward Compatibility - means that consumer code developed against the most recent version of an Avro Schema can use data using the prior version of a schema without modification.
* Forward Compatibility - means that consumer code developed against the previous version of an Avro Schema can consume data using the newest version of a schema without modification.
* Full Compatibility - means that consumers developed against the latest schema can consume data using the previous schema, and that consumers developed against the previous schema can consume data from the latest schema as well. In other words, full compatibility means that a schema change is both forward and backward compatible.
* None Compatibility - disables compatibility checking by Schema Registry.

**What are Data Schemas?**

* Data schemas help us define:
    * The shape of the data
    * The names of fields
    * The expected types of values
    * Whether certain data fields are optional or required.
* Data schemas provide expectations for applications so that they can properly ingest or produce data that match that specification
* Data schemas are used for communication between software
* Data schemas can help us create more efficient representations with compression
* Data schemas help systems develop independently of each other
* Data schemas are critical in data systems and applications today
    * gRPC in Kubernetes
    * Apache Avro in the Hadoop Ecosystem
    
```python
# Please complete the TODO items in this code

import asyncio
from dataclasses import dataclass, field
import json
import random

from confluent_kafka import Consumer, Producer
from confluent_kafka.admin import AdminClient, NewTopic
from faker import Faker


faker = Faker()

BROKER_URL = "PLAINTEXT://localhost:9092"


async def produce(topic_name):
    """Produces data into the Kafka Topic"""
    p = Producer({"bootstrap.servers": BROKER_URL})
    while True:
        p.produce(topic_name, ClickEvent().serialize())
        await asyncio.sleep(1.0)


async def consume(topic_name):
    """Consumes data from the Kafka Topic"""
    c = Consumer({"bootstrap.servers": BROKER_URL, "group.id": "0"})
    c.subscribe([topic_name])
    while True:
        message = c.poll(1.0)
        if message is None:
            print("no message received by consumer")
        elif message.error() is not None:
            print(f"error from consumer {message.error()}")
        else:
            #
            # TODO: Load the value as JSON and then create a ClickEvent object. The web team has
            #       told us to expect the keys "email", "uri", and "timestamp".
            #
            purchase_json = json.loads(message.value())
            try:
                pass
            except KeyError as e:
                print(f"Failed to unpack message {e}")
        await asyncio.sleep(1.0)


def main():
    """Checks for topic and creates the topic if it does not exist"""
    client = AdminClient({"bootstrap.servers": BROKER_URL})

    try:
        asyncio.run(produce_consume("com.udacity.lesson3.exercise1.clicks"))
    except KeyboardInterrupt as e:
        print("shutting down")


async def produce_consume(topic_name):
    """Runs the Producer and Consumer tasks"""
    t1 = asyncio.create_task(produce(topic_name))
    t2 = asyncio.create_task(consume(topic_name))
    await t1
    await t2


@dataclass
class ClickEvent:
    email: str = field(default_factory=faker.email)
    timestamp: str = field(default_factory=faker.iso8601)
    uri: str = field(default_factory=faker.uri)

    num_calls = 0

    def serialize(self):
        email_key = "email" if ClickEvent.num_calls < 10 else "user_email"
        ClickEvent.num_calls += 1
        return json.dumps(
            {"uri": self.uri, "timestamp": self.timestamp, email_key: self.email}
        )

    @classmethod
    def deserialize(self, json_data):
        purchase_json = json.loads(json_data)
        return Purchase(
            username=purchase_json["username"],
            currency=purchase_json["currency"],
            amount=purchase_json["amount"],
        )


if __name__ == "__main__":
    main()
```

* Data schemas help systems evolve independently from each other. This is beneficial at an application and an organizational level within our companies.
* Data schemas describe the expected keys, value types, and whether certain keys are optional or required.
* Data schemas can be used to create more efficient representations of our data model

Apache Avro is a widely used data schema system in the data engineering space, and especially in the Apache Kafka ecosystem.

**Avro Schema**

* Apache Avro records are defined in JSON.
* Avro records include a required name, such as "user"
* Avro records must include a type defined as record
* Avro records may optionally include a namespace, such as "com.udacity"
* Avro records are required to include an array of fields that define the names of the expected fields and their associated type. Such as "fields": [{"name": "age", "type": "int"}]
* Avro can support optional fields by specifying the field type as either null or some other type. Such as "fields": [{"name": "age", "type": [“null”, "int"]}]
* Avro records are made up of complex and primitive types
Complex types are other records, arrays, maps, and others

`
{
  “type”: “record”,
  “name”: “stock.price_change”,
  “namespace”: “com.udacity”,
  “fields”: [
      {“name”: “ticker”, “type”: “string”},
      {“name”: “prev_price”, “type”: “int”},
      {“name”: “price”, “type”: “int”},
      {“name”: “cause”, “type”: [“null”, “string”]}
  ]
}
`

**Schema Registry**

* Provides an HTTP REST API for managing Avro schemas
* Many Kafka clients natively support Schema Registry interactions for you
* Reduces network overhead, allowing producers and consumers to register schemas one time
* Simplifies using Avro, reducing the barrier to entry for developers
* Uses a Kafka topic to store state
* Deployed as one or more web servers, with one leader
* Uses ZooKeeper to manage elections

**Schema Compatibility**

* The process of schema change is known as Schema Evolution
* Schema Evolution is caused by a modification to an existing data schema
    * Adding or removing a field
    * Making a field optional
    * Changing a field type
* Schema Registry can track schema compatibility between schemas
    * Compatibility is used to determine whether or not a particular schema version is usable by a data consumer
    * Consumers may opt to use this compatibility information to preemptively refuse to process data that is incompatible with its current configuration
    * Schema Registry supports four categories of compatibility
    * Backward / Backward Transitive
    * Forward / Forward Transitive
    * Full / Full Transitive
    * None
* Managing compatibility requires both producer and consumer code to determine the compatibility of schema changes and send those updates to Schema Registry

* Backward compatibility means that consumer code developed against the most recent version of an Avro Schema can use data using the prior version of a schema without modification.
    * The deletion of a field or the addition of a new optional field are backward compatible changes.
    * Update consumers before updating producers to ensure that consumers can handle the new data type
* The BACKWARD compatibility type indicates compatibility with the current version (N) and the immediately prior version (N-1)
    * Unless you specify otherwise, Schema Registry always assumes that changes are BACKWARD compatible
* The BACKWARD_TRANSITIVE compatibility type indicates compatibility with all prior versions (1 → N)

* Forward compatibility means that consumer code developed against the previous version of an Avro Schema can consume data using the newest version of a schema without modification
    * The deletion of an optional field or the addition of a new field are forward compatible changes
    * Producers need to be updated before consumers
* The FORWARD compatibility type indicates that data produced with the latest schema (N) is usable by consumers using the previous schema version (N-1)
* The BACKWARD_TRANSITIVE compatibility type indicates that data produced with the latest schema (N) is usable by all consumers using any previous schema version (1 → N-1)

* Full compatibility means that consumers developed against the latest schema can consume data using the previous schema, and that consumers developed against the previous schema can consume data from the latest schema as well. In other words, full compatibility means that a schema change is both forward and backward compatible.
    * Changing the default value for a field is an example of a full compatible change.
    * The order in which producers or consumers are updated does not matter.
* The FULL compatibility type indicates that data produced is both forward and backward compatible with the current (N) and previous (N-1) schema.
* The FULL_TRANSITIVE compatibility type indicates that data produced is both forward and backward compatible with the current (N) and all previous (1 → N-1) schemas.

### Kafka Connect and REST Proxy

* Kafka Connect - A web server and framework for integrating Kafka with external data sources such as SQL databases, log files, and HTTP endpoints.
* JAR - Java ARchive. Used to distribute Java code reusably in a library format under a single file.
* Connector - A JAR built on the Kafka Connect framework which integrates to an external system to either source or sink data from Kafka
* Source - A Kafka client putting data into Kafka from an external location, such as a data store
* Sink - A Kafka client removing data from Kafka into an external location, such as a data store
* JDBC - Java Database Connectivity. A Java programming abstraction over SQL database interactions.
* Task - Responsible for actually interacting with and moving data within a Kafka connector. One or more tasks make up a connector.
* Kafka REST Proxy - A web server providing APIs for producing and consuming from Kafka, as well as fetching cluster metadata.

* Kafka Connect supports a number of Connectors for common data sources
    * Files on disk
    * Amazon S3 and Google Cloud Storage
    * SQL databases such as MySQL and Postgres
    * HDFS
* Kafka Connect has an extensive REST API for managing and creating Connectors

* REST Proxy is a powerful tool for integration applications into Kafka that could not otherwise use it.

* REST Proxy offers a comprehensive API for producing and consuming data from Kafka topics, and even provides fine-grained control over consumer groups, offsets, and partitions.

### Stream processing fundamentals

* Join (Streams) - The process of combining one or more streams into an output stream, typically on some related key attribute.
* Filtering (Streams) - The process of removing certain events in a data stream based on a condition
* Aggregating (Streams) - The process of summing, reducing, or otherwise grouping data based on a key attribute
* Remapping (Streams) - The process of modifying the input stream data structure into a different output structure. This may include the addition or removal of fields on a given event.
* Windowing (Streams) - Defining a period of time from which data is analyzed. Once data falls outside of that period of time, it is no longer valid for streaming analysis.
* Tumbling Window (Streams) - The tumbling window defines a block of time which rolls over once the duration has elapsed. A tumbling window of one hour, started now, would collect all data for the next 60 minutes. Then, at the 60 minute mark, it would reset all of the data in the topic, and begin collecting a fresh set of data for the next 60 minutes.
* Hopping Window (Streams) - Hopping windows advance in defined increments of time. A hopping window consists of a window length, e.g. 30 minutes, and an increment time, e.g. 5 minutes. Every time the increment time expires, the window is advanced forward by the increment.
* Sliding Window (Streams) - Sliding Windows work identically to Hopping Windows, except the increment period is much smaller -- typically measured in seconds. Sliding windows are constantly updated and always represent the most up-to-date state of a given stream aggregation.
* Stream - Streams contain all events in a topic, immutable, and in order. As new events occur, they are simply appended to the end of the stream.
* Table - Tables are the result of aggregation operations in stream processing applications. They are a roll-up, point-in-time view of data.
* Stateful - Stateful operations must store the intermediate results of combining multiple events to represent the latest point-in-time value for a given key

### Stream Processing with Faust

* DSL - Domain Specific Language. A metaprogramming language for specific tasks, such as building database queries or stream processing applications.
* Dataclass (Python) - A special type of Class in which instances are meant to represent data, but not contain mutating functions
* Changelog - An append-only log of changes made to a particular component. In the case of Faust and other stream processors, this tracks all changes to a given processor.
* Processor (Faust) - Functions that take a value and return a value. Can be added in a pre-defined list of callbacks to stream declarations.
* Operations (Faust) - Actions that can be applied to an incoming stream to create an intermediate stream containing some modification, such as a group-by or filter

Faust is built using modern Python features such as asyncio. Faust is embeddable as a library in existing applications. It can also be configured to be deployed as a stand-alone application in your infrastructure. Faust implements storage, time windowing, streams, tables, and many of the aggregate functions. It is important to note that Faust requires Python 3.6+ and does not support Avro or Schema Registry natively at this time.

```python
import faust

#
# TODO: Create the faust app with a name and broker
#       See: https://faust.readthedocs.io/en/latest/userguide/application.html#application-parameters
#
app = faust.App("hellow-world-faust", broker="loalhost:9092")

#
# TODO: Connect Faust to com.udacity.streams.clickevents
#       See: https://faust.readthedocs.io/en/latest/userguide/application.html#app-topic-create-a-topic-description
#
topic = app.topic("com.udacity.streams.purchases")

#
# TODO: Provide an app agent to execute this function on topic event retrieval
#       See: https://faust.readthedocs.io/en/latest/userguide/application.html#app-agent-define-a-new-stream-processor
#
@app.agent(topic)
async def clickevent(clickevents):
    # TODO: Define the async for loop that iterates over clickevents
    #       See: https://faust.readthedocs.io/en/latest/userguide/agents.html#the-stream
    # TODO: Print each event inside the for loop
    async for purchase in purchases:
        print(purchase)


if __name__ == "__main__":
    app.main()
```

**Python Dataclasses**

* A dataclass is a special type of Class in which instances are meant to represent data, but not contain mutating functions.
* Python dataclass objects can be marked as frozen, which makes them immutable
    * Nothing in Python is truly immutable, but this attribute gets you about as close as you can get
* dataclass objects require type annotations on fields and will enforce those constraints on creation. This helps ensure you’re always working with data in the expected format, reducing and preventing errors.
* Can be paired with the asdict function to quickly transform dataclasses into dictionaries
* New in Python 3.7
* Default to using dataclass to work with data coming into and out of your Faust applications unless you have a good reason notto

**Faust Deserialization**
* All data model classes must inherit from the faust.Record class if you wish to use them with a Faust topic.
* It is a good idea to specify the serializer type on your so that Faust will deserialize data in this format by default.
* It is a good practice to set validation=True on your data models. When set to true, Faust will enforce that the data being deserialized from Kafka matches the expected type.
    * E.g., if we expect a str for a field, but receive an int, Faust will raise an error.
* Use Faust codecs to build custom serialization and deserialization

**Faust Serialization**
* Serialization in Faust leverages the same faust.Record that we saw in the deserialization section. Faust runs the serializer in reverse to serialize the data for the output stream.
* Multiple serialization codecs may be specified for a given model
    * e.g., serialization=”binary|json”. This means, when serializing, encode to json, then base64 encode the data.
    
### KSQL

KSQL is a SQL-like abstraction developed by Confluent, which provides users the ability to create streams and tables.

* Kafka Streams - A Java library for constructing stream processing applications. KSQL translates SQL statements to Kafka Streams applications.
* User Defined Function (UDF) - An extension to the SQL capabilities of KSQL written by the user. For KSQL, these are written in Java.
* Key (KSQL) - Data which uniquely identifies the value contained in this data message relative to other pieces of data in the stream. For example, a user_id may uniquely identify a user object.
* Session Windowing (KSQL) - A system that keeps track of when the last time a particular key was seen. When a new record with the same key arrives, the difference between the timestamps is calculated. If the difference is larger than a defined session window, then a new window is started for that session. If the difference is less than the defined session window, the new record is added to the existing window.

KSQL is a Java application built on top of the Kafka Streams Java stream processing library. KSQL is a web-server with a REST API that accepts incoming or preconfigured requests containing SQL-like commands. These commands are translated by the KSQL server into the equivalent Kafka Streams application and then executed.

Users can interact with KSQL via a REST API, its dedicated CLI, or predefined SQL files.

## Spark

* RDD stands for Resilient Distributed Dataset:
* Resilient because its fault-tolerance comes from maintaining RDD lineage, so even with loss during the operations, you can always go back to where the operation was lost.
* Distributed because the data is distributed across many partitions and workers.
* Dataset is a collection of partitioned data. RDD has characteristics like in-memory, immutability, lazily evaluated, cacheable, and typed (we don't see this much in Python, but you'd see this in Scala or Java).


```python
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[2]").setAppName("RDD Example")
sc = SparkContext(conf=conf)

# different way of setting configurations 
#conf.setMaster('some url')
#conf.set('spark.executor.memory', '2g')
#conf.set('spark.executor.cores', '4')
#conf.set('spark.cores.max', '40')
#conf.set('spark.logConf', True)

# sparkContext.parallelize materializes data into RDD 
# documentation: https://spark.apache.org/docs/2.1.1/programming-guide.html#parallelized-collections
rdd = sc.parallelize([('Richard', 22), ('Alfred', 23), ('Loki',4), ('Albert', 12), ('Alfred', 9)])

rdd.collect() # [('Richard', 22), ('Alfred', 23), ('Loki', 4), ('Albert', 12), ('Alfred', 9)]

# create two different RDDs
left = sc.parallelize([("Richard", 1), ("Alfred", 4)])
right = sc.parallelize([("Richard", 2), ("Alfred", 5)])

joined_rdd = left.join(right)
collected = joined_rdd.collect()

collected #[('Alfred', (4, 5)), ('Richard', (1, 2))]


# Notice we’re using pyspark.sql library here
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local") \
        .appName("CSV file loader") \
        .getOrCreate()

# couple ways of setting configurations
#spark.conf.set("spark.executor.memory", '8g')
#spark.conf.set('spark.executor.cores', '3')
#spark.conf.set('spark.cores.max', '3')
#spark.conf.set("spark.driver.memory", '8g')

file_path = "./AB_NYC_2019.csv"
# Always load csv files with header=True
df = spark.read.csv(file_path, header=True)

df.printSchema()

df.select('neighbourhood').distinct().show(10, False)
```

**Partitioning in Spark**

By default in Spark, a partition is created for each block of the file in HDFS (128MB is the default setting for Hadoop) if you are using HDFS as your file system. If you read a file into an RDD from AWS S3 or some other source, Spark uses 1 partition per 32MB of data. There are a few ways to bypass this default upon creation of an RDD, or reshuffling the RDD to resize the number of partitions, by using `rdd.repartition(<the partition number you want to repartition to>)`. For example, `rdd.repartition(10)` should change the number of partitions to 10.

In local mode, Spark uses as many partitions as there are cores, so this will depend on your machine. You can override this by adding a configuration parameter `spark-submit --conf spark.default.parallelism=<some number>`.

So hypothetically, if you have a file of 200 MB and if you were to load this into an RDD, how many partitions will this RDD have? If this file is on HDFS, this will produce 2 partitions (each of them being 128MB). If the file is on AWS S3 or some other file system, it will produce 7 partitions.

**Hash Partitioning**

Hash partitioning in Spark is not different than the normal way of using a hash key in the data world to distribute data across partitions uniformly.

Usually this is defined by

`partition = key.hashCode() % numPartitions`

This mode of partitioning is used when you want to evenly distribute your data across partitions.

**Range Partitioning**

Range partitioning is another well-known partitioning method in the data world. Range partitioning divides each partition in a continuous but non-overlapping way.

Let's pretend there is a table called employees, and it has the following schema:

`
CREATE TABLE employees (
    employee_id INT NOT NULL,
    first_name VARCHAR(30),
    last_name VARCHAR(30),
    ...
)
`

Range partitioning would come into play where you partition the employees table by employee_id, like this:

`
PARTITION BY RANGE (employee_id) (
    PARTITION p0 VALUES LESS THAN (11),
    PARTITION p0 VALUES LESS THAN (21),
    PARTITION p0 VALUES LESS THAN (31),
    ...
)
`

In reality, you'd want to use range partition over a timestamp, but this example gives you a rough idea of what range partitioning means.

You can use the partitionByRange() function to partition your data into some kind of group. Range partitioning in Spark ensures that every range is contained in a single partition. This becomes useful when you want to reduce shuffles across machines, for example when you know for sure all your parent RDDs need to stay in the same partition.

**DataFrames and Datasets - Key Points**

You can think of DataFrames as tables in a relational database, or dataframes in Python’s pandas library. DataFrames provide memory management and optimized execution plans.

**DataFrames**

DataFrames appeared in Spark Release 1.3.0. We already know that both Datasets and DataFrames are an organized collection of data in columns, but the biggest difference is that DataFrames do not provide type safety. DataFrames are similar to the tables in a relational database. Unlike RDDs, DataFrames and Datasets are part of the spark.sql library, which means you can create a temporary view of these tables and apply SQL queries.

DataFrames allow users to process a large amount of structured data by providing Schema. The Schema is another feature that is very similar to a relational database, indicating types of data that should be stored in the column (String, Timestamp, Double, Long, etc... these are available in spark.sql.types library), and also whether the column can be nullable or not. The aspect that is different from relational databases is that DataFrames and Datasets have no notion of primary/foreign keys - you as a developer define these as you create your DataFrame or Dataset.

**Datasets**

A Dataset is a core building block in SparkSQL that is strongly typed, unlike DataFrames, You can think of Datasets as an extension of the DataFrame API with type-safety. The Dataset API has been available since the release of Spark 1.6. Although Datasets and DataFrames are part of the Spark SQL Component, RDDs, Datasets, and DataFrames still share common features which are: immutability, resilience, and the capability of distributed computing in-memory.

A Dataset provides the features of an RDD and a DataFrame:

The convenience of an RDD, as it is an extended library of a Spark DataFrame
* Performance optimization of a DataFrame using Catalyst
* Enforced type-safety
* Datasets are not available in Python, only in Java and Scala. So we won’t be spending much time with Datasets in this course, since we focus on Python.

**Spark Streaming**

Spark DStream, Discretized Stream, is the basic abstraction and building block of Spark Streaming. DStream is a continuous stream of RDDs. It receives input from various sources like Kafka, Flume, Kinesis, or TCP sockets (we'll mostly be using sockets or Kafka). Another way of generating a Dstream is by operating transformation functions on top of existing DStream.

Structured Streaming is a programming model, introduced in Spark 2.0, to provide support for building scalable and fault-tolerant applications using Spark SQL.

Internally, Structured Streaming is processed using a micro-batch. It processes data streams as a series of small batch jobs.

With Structured Streaming, users/developers don't have to worry about specific issues related to streaming applications, like fault-tolerance, data loss, state loss, or real-time processing of data. The application can now guarantee fault-tolerance using checkpointing.

The advantages of using Structured Streaming are:

* Continuous update of the final result
* Can be used in either Scala, Python, or Java
* Computations are optimized due to using the same Spark SQL component (Catalyst)


Structured Streaming is a new streaming strategy developed from Discretized Stream. It added a few updates from Dstream, such as decoupling saving state to store to decouple the state management, and also checkpointing metadata. Because these two limitations are decoupled from the application, the developer is now able to exercise fault-tolerant end-to-end execution with ease.

Spark UI is a web interface that gets created when you submit a Spark job. It's a convenient resource for the developer to monitor the status of the job execution. The developer can inspect jobs, stages, storages, environment, and executors in this page, as well as the visualized version of the DAGs (Directed Acyclic Graph) of the Spark job.

At any level, when an action is called on the RDD, Spark generates a DAG. One different thing to note about DAGs is that, unlike Hadoop MapReduce, which creates a Map stage and a Reduce stage, DAGs in Spark can contain many stages.

The DAG scheduler divides operators into stages of tasks, and also puts operators together in the most optimized way.

**What is a Schema?**

Generally, a schema is the description of the structure of your data. It tells you how your data is organized - you can say it’s the blueprint of your data. DataFrames and Datasets use this concept when you create DataFrame and Dataset during run time (implicit) or compile time (explicit).

StructField objects are in tuple (name, type of your data, and nullified represented in True/False), and you need to wrap these objects in StructType to build a schema.

StructType and StructField belong to the org.apache.spark.sql.types package so these need to be imported separately.


**Configurations/Tuning Key Points**

There are a few ways to tune your Spark Structured Streaming application. But before that, go through your application and try to answer these questions.

* Study the memory available for your application. Do you have enough memory to process your Spark job? If not, consider vertical scaling. If you do have enough memory but limited resources, consider horizontal scaling.
* Study your query plans - do they make sense? Are you doing unnecessary shuffles/aggregations? Can you reduce your shuffles?
* What’s the throughput/latency of your data?
* How are you saving your data? Are you persisting your data to memory or to disk only, or to both memory and disk?
* Are you breaking your lineage anywhere?


* Write Ahead Logs (WAL): This is where the operation is logged in a file format. When unfortunate events like driver or node failure happen and the application is restarted, the operation logged in the WAL can be applied to the data.
* Broadcasting: Spark allows users to keep a read-only variable cached on each machine rather than sending the data over the network with tasks. This is useful when you have a rather large dataset and don't want to send the dataset over the network.
* Salting: Another popular method of redistributing a dataset. This is done by adding a "fake-primary-key" to redistribute the data.

KafkaSourceProvider requires these options

* subscribe, subscribepattern, or assign
* kafka.bootstrap.server

```python
kafka_df = spark.readStream.\
  format("kafka").\ # set data ingestion format as Kafka
  option("subscribe", "<topic_name>").\ #This is required although it says option.
  option("kafka.bootstrap.servers", "localhost:9092").\ #You will also need the url and port of the bootstrap server
  load()
```

* Write Ahead Logs enforce fault-tolerance by saving logs to a certain checkpoint directory. Enabling WAL can be done by using the Spark property, spark.streaming.receiver.writeAheadLog.enable.

* Spark Triggers determine how often a streaming query needs to be executed. The trigger can be set using a few options in the query builder.

```python
import logging
from pyspark.sql import SparkSession


def run_spark_job(spark):
    # TODO set up entry point
    df = spark \
        .readStream \
        .format("rate")
        .option("rowPerSecond", 90000)
        .load()
    # Show schema for the incoming resources for checks
    df.printSchema()

    agg_df = df.count()

    # TODO play around with processingTime and once parameter in trigger to see how the progress report changes
    query = agg_df \
        .writeStream \
        .format("console") \
        .queryName("Once") \
        .option("checkpointLocation", "/tmp/checkpoint") \
        .start()

if __name__ == "__main__":
    logger = logging.getLogger(__name__)

    spark = SparkSession \
        .builder \
        .master("local[*]") \
        .appName("StructuredStreamingSetup") \
        .getOrCreate()

    logger.info("Spark started")

    run_spark_job(spark)

    spark.stop()
```

* KafkaSourceProvider provides a consumer for Kafka within Spark, therefore we will not need to create separate consumer modules for consumption.
* Managing offsets becomes crucial to making your Kafka and Spark microservices be best optimized. You’re in full control of your offset management, and you’ll have to make decisions best fitting your business context.

**Kafka Broker with Spark Structured Streaming**

To recap from previous lessons, a Kafka broker receives messages from producers. A Kafka broker is a server, and we can think of a Kafka cluster as comprised of one or more Kafka brokers. Producers publish the data (or push messages) into Kafka topics. And then the Kafka consumer pulls messages from a Kafka topic.

Spark Streaming and Structured Streaming already provide a quick way to integrate Kafka to read data and write data back to Kafka.

Once received from Kafka, the source will have the following schema:

* key[binary]
* value[binary]
* topic[string]
* partition[int]
* offset[long]
* timestamp[long]
* timestampType[int]


```python
import logging
import json
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as psf

def run_spark_job(spark):
    
    df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \ 
        .option("subscribe", "JSON_TOPICS") \ 
        .option("startingOffsets", "earliest") \
        .option("maxRatePerPartition", 100) \ 
        .option("maxOffsetPerTrigger", 10) \ 
        .load()
    
    df.printSchema()
    kafka_df = df.selectExpr("Cast(value as STRING)")
    jsonSchema = StructTYpe([StructField("status", StringType(), True),
                             StructField("timestamp", TimestampType(), True)])
    json_df = kafka_df\
        .select(psf.from_json(psf.col('value'), jsonSchema).alias("JSON_TOPICS"))\
        .select("JSON_TOPICS.*")
    
    query = json_df \
            .writeStream \ 
            .outputMode("append") \ 
            .format("console") \ 
            .start()
    
    query.awaitTermination()
```

Given problem: We're given a hypothetical Spark streaming application. This application receives data from Kafka. Every 2 minutes, you can see that Kafka is producing 60000 records. But at its maximum load, the application takes between 2 minutes for each micro-batch of 500 records. How do we improve the speed of this application?

* We can tweak the application's algorithm to speed up the application.

    * Let's say the application's algorithm is tweaked - how can you check if most or all of the CPU cores are working?
    * In a Spark Streaming job, Kafka partitions map 1:1 with Spark partitions. So we can increase Parallelism by increasing the number of partitions in Kafka, which then will increase Spark partitions.
* We can check if the input data was balanced/unbalanced, skewed or not. We can check the throughput of each partition using Spark UI, and how many cores are consistently working. You can also use the htop command to see if your cores are all working (if you have a small cluster).

    * Increase driver and executor memory: Out-of-memory issues can be frequently solved by increasing the memory of executor and driver. Always try to give some overhead (usually 10%) to fit your excessive applications.
* You could also set spark.streaming.kafka.maxRatePerPartition to a higher number and see if there is any increase in data ingestion.