In [1]:
import json
import uuid
import os
import json
from dotenv import load_dotenv
from pathlib import Path
from kafka import KafkaProducer
from faker import Faker
from time import sleep
import spotify_pb2

In [2]:
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

True

In [3]:
kafka_host = os.getenv('KAFKA_HOST')
kafka_topic = os.getenv('KAFKA_TOPIC_NAME')
kafka_topic_partition = os.getenv('KAFKA_TOPIC_NAME')

In [4]:
import json
from kafka import KafkaConsumer
from collections import defaultdict

artist_counts = defaultdict(int)

def start_consumer(group_id, client_id, topic_name):
    consumer = KafkaConsumer(
        topic_name,
        group_id=group_id,
        client_id=client_id,
        bootstrap_servers=[f'{kafka_host}:9092'],
        auto_offset_reset='earliest',
        enable_auto_commit=False
    )

    print(f"{client_id} started and subscribed to {topic_name}...")

    try:
        for message in consumer:
            try:

                message_value = message.value.decode('utf-8')
                artist_data = json.loads(message_value)

                artist_name = artist_data.get('artist')
                interaction = artist_data.get('interaction')

                if interaction in ['Play', 'Like']:
                    artist_counts[artist_name] += 1

                sorted_artists = sorted(artist_counts.items(), key=lambda x: x[1], reverse=True)

                print(f"{client_id} Topic: {message.topic}, "
                      f"Partition: {message.partition}, Offset: {message.offset}, "
                      f"Key: {message.key}, Value: {message_value}")
                
                print("-" * 50)
                print("Current Top Artists:")
                for index, (artist, count) in enumerate(sorted_artists, start=1):
                    print(f" {index}. {artist}: {count}")
                print("-" * 50)
            
            except UnicodeDecodeError as e:
                print(f"Error decoding message: {e}. Skipping message.")
            except json.JSONDecodeError as e:
                print(f"Error parsing JSON: {e}. Skipping message.")
    except KeyboardInterrupt:
        print("Shutting down consumer...")
    finally:
        consumer.close()


In [5]:
import threading

topic_name = kafka_topic

thread1 = threading.Thread(target=start_consumer, args=("consumer_group", "consumer_1", topic_name))
thread1.start()

thread2 = threading.Thread(target=start_consumer, args=("consumer_group", "consumer_2", topic_name))
thread2.start()

thread1.join()
thread2.join()

consumer_1 started and subscribed to spotify...
consumer_2 started and subscribed to spotify...
consumer_2 Topic: spotify, Partition: 1, Offset: 697, Key: None, Value: {"track_id": "05daa2bd-2cc8-494e-bf68-c3cc4a05807c", "track_name": "near", "artist": "SZA", "genre": "Country", "popularity": 25, "Duration": 2, "tempo": 82, "explicit": false, "interaction": "Like", "listened_from": "Indonesia", "ts": 1569786104}
--------------------------------------------------
Current Top Artists:
 1. SZA: 1
--------------------------------------------------
consumer_2 Topic: spotify, Partition: 1, Offset: 698, Key: None, Value: {"track_id": "7cb40791-91e7-4236-8d86-327b99cd4f6f", "track_name": "water", "artist": "Justin Bieber", "genre": "R&B", "popularity": 36, "Duration": 4, "tempo": 100, "explicit": true, "interaction": "Play", "listened_from": "Indonesia", "ts": 1310970624}
--------------------------------------------------
Current Top Artists:
 1. SZA: 1
 2. Justin Bieber: 1
-------------------

KeyboardInterrupt: 

In [9]:
# protobuf error parsing type 'Spotify'

# from kafka import KafkaConsumer
# from collections import defaultdict
# import spotify_pb2 

# artist_counts = defaultdict(int)
# spotify = spotify_pb2.Spotify()

# def start_consumer(group_id, client_id, topic_name):
#     consumer = KafkaConsumer(
#         topic_name,
#         group_id=group_id,
#         client_id=client_id,
#         bootstrap_servers=[f'{kafka_host}:9092'],
#         value_deserializer=lambda v: spotify_pb2.Spotify.FromString(v),  # Deserialize Protobuf
#         auto_offset_reset='earliest',
#         enable_auto_commit=False
#     )

#     print(f"{client_id} started and subscribed to {topic_name}...")
    
#     try:
#         for message in consumer:
#             print(f"Raw message value: {message.value}")
#              
#             message_value = message.value.decode('utf-8')
#
#             artist_name = message_value.artist
#             interaction = message_value.interaction
            
#             if interaction in ['Play', 'Like']:
#                 artist_counts[artist_name] += 1
            
#             sorted_artists = sorted(artist_counts.items(), key=lambda x: x[1], reverse=True)
            
#             print(f"{client_id} Topic: {message.topic}, "
#                   f"Partition: {message.partition}, Offset: {message.offset}, "
#                   f"Key: {message.key}, Value: {message_value}")
#             print("-" * 50)
#             print("Current Top Artists:")
#             for index, (artist, count) in enumerate(sorted_artists, start=1):
#                 print(f" {index}. {artist}: {count}")
#             print("-" * 50)
            
#     except KeyboardInterrupt:
#         print("Shutting down consumer...")
#     finally:
#         consumer.close()
