<a href="https://colab.research.google.com/github/mariyaperchyk/codepubEventDrivenDesign/blob/main/Event_Driven_Design_Music_Streaming_App_Task.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Event Driven Design - Music Streaming App

This notebook demonstrates how to setup a kafka cluster (kafka and zookeeper) and implements a producer and a consumer to simulate a music streaming application. 
The producer generates a stream of plays (based on a csv file) and writes these plays to a kafka topic. 
The consumer listens to the topic and aggregates the plays for analytics, e.g., which genre is played most often. 

A few lines of code have been left out for you to fill!

# Setup

## Download and setup Kafka and Zookeeper instances
For demo purposes, the following instances are setup locally:

- Kafka (Brokers: 127.0.0.1:9092) Kafka Broker is a server that runs Kafka. It stored the data and handles requests from clients/applications. 

- Zookeeper (Node: 127.0.0.1:2181) Zookeeper keeps the state of the kafka cluster, e.g. data such as the location of partitions and the configuration of topics.


In [None]:
!curl -sSOL https://downloads.apache.org/kafka/2.7.0/kafka_2.13-2.7.0.tgz
!tar -xzf kafka_2.13-2.7.0.tgz

Using the default configurations (provided by Apache Kafka), we spin up one kafka broker and one zookeeper. 

In [None]:
!./kafka_2.13-2.7.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.13-2.7.0/config/zookeeper.properties
!./kafka_2.13-2.7.0/bin/kafka-server-start.sh -daemon ./kafka_2.13-2.7.0/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
!sleep 10


Once the instances are started as daemon processes, grep for `kafka` in the processes list. The two java processes correspond to our zookeeper and the kafka instances. Make suyre you have both processes running. 

In [None]:
!ps -ef | grep kafka

## Create Kafka topic

We can now create one kafka topic with the following specs:

- music-streams: partitions=2, replication-factor=1 

In [None]:
!./kafka_2.13-2.7.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic music-streams

Let's take a look at the topic that we created. We can describe it to get configuration details.

In [None]:
!./kafka_2.13-2.7.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic music-streams


## Install required dependencies

In [None]:
!pip install confluent-kafka

# Implement Producer

Download our dataset of music streams

In [None]:
!curl -o music-streams.csv -s https://owncloud.hpi.de/s/bDHMeaX3029rT2E/download
!ls -la

Let's take a look into the data

In [None]:
!head music-streams.csv

`music-streams.csv` simulates a music streaming application. Each row in the file represents a play of a song. Here, a play consists of a title, an artist, a genre and year in which the song was produced. Same songs might appear in the file multiple times, as a user might listen to the same song again and again. 

Now its time to implement the producer. The producer should read the plays from the `music-streams.csv` file and produce play events, which are written to the `music-streams` topic. 

In [None]:
import csv
from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer

def get_producer():
    producer_conf = {'bootstrap.servers': '127.0.0.1:9092',
                     'key.serializer': StringSerializer('utf_8'),
                     'value.serializer': StringSerializer('utf_8')}
    return SerializingProducer(producer_conf)

def write_to_kafka(topic_name, file):
  count=0
  producer = get_producer()
  with open(file) as csvfile:
    reader = csv.DictReader(csvfile, delimiter=';')

    for row in reader:
      #Retrieve delivery reports, this ensures that the write queue does not grow indefinitely
      producer.poll(0.0)
      producer.produce(
          topic=topic_name, 
          key=row['Top Genre'], 
          # Note: we are writing an OrderedDict as the value here
          value=str(row))
      count+=1
  producer.flush()
  print("Wrote {0} messages into topic: {1}".format(count, topic_name))

write_to_kafka('music-streams', 'music-streams.csv')

# Implement Consumer

In [None]:
from confluent_kafka import DeserializingConsumer
from confluent_kafka.serialization import StringDeserializer
from collections import defaultdict, OrderedDict


def get_consumer(topic_name, consumer_group):
  consumer_conf = {'bootstrap.servers': '127.0.0.1:9092',
                     'key.deserializer': StringDeserializer('utf_8'),
                     'value.deserializer': StringDeserializer('utf_8'),
                     'group.id': consumer_group,
                     'auto.offset.reset': "earliest"}

  consumer = DeserializingConsumer(consumer_conf)
  consumer.subscribe([topic_name])
  return consumer

def get_artist_statistics(topic_name, consumer_group):
  consumer = get_consumer(topic_name, consumer_group)
  artist_statistics = defaultdict(int)

  while True:
    try:
       # Let's consume events from the topic. 
       # poll() function of the consumer allows us to consume a single event
       # See docs here https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Consumer.poll
       
       # Note: If there are no more events to consume, poll()-function will return an empty record.
       # In this case, we can stop the execution and break out of the while-loop.
      

      # The returned event will have a key(), in our case genre, and a value(), which is the play event itself. 
      # The value() is the row from the csv file, which we have written and is a string representation of an OrderedDict
      # Tp transform the string into an OrderedDict, we can use python's eval() function,
      # see https://realpython.com/python-eval-function/


      # Once you have successfully transformed the value into an OrderedDict, we can start counting e.g. 
      # how often an artist was played. 
      # For that, we can use `artist_statistics`, which is a defaultdict, and just increment the dict for the current artist
      # artist_statistics[currentArtist?] += 1
      
      
    except KeyboardInterrupt:
      break
  return artist_statistics

# Note: Once you have consumed all messages from a topic, you cannot reconsume them again
# Try changing the consumer group string, to reconsume all events again
statistics = get_artist_statistics('music-streams', 'group1')

In [None]:
from collections import Counter

# Let's see which artists were played the most
c = Counter(statistics)
print(c.most_common(5))


[('The Beatles', 2451), ('Queen', 2085), ('Coldplay', 1888), ('Michael Jackson', 1511), ('The Rolling Stones', 1496)]


# For the curious minds:


*   Write and consume messages with different serialization formats, e.g. JSON or AVRO 
*   Manage Kafka partitioning and parallelize your consumers, by starting multiple consumers from the same consumer group

