In [14]:
# Data format
# import datetime
from data_tmp import UsageEvent


from base_bloom import BloomFilter
from base_hll import HyperLogLog
# from base_reservoir import
# from base_map_reduce import
 

from typing import List, Dict, Set, Optional, Tuple
from datetime import datetime, timedelta
import mmh3
from collections import defaultdict
import itertools
import math
import random
from bitarray import bitarray


In [15]:
UsageEvent

data_tmp.UsageEvent

In [29]:
class StreamingAnalyser:
    def __init__(self,
                 hll_precision: int = 14,
                 bloom_size: int = 100000,
                 bloom_hash_count: int = 5,
                 time_window: timedelta = timedelta(hours=1)):
        
        # Ініціалізація компонентів
        # Унікальні глядачі
        self.track_listeners = defaultdict(lambda: HyperLogLog(hll_precision))
        # Повторні перегляди
        self.repeat_listens  = BloomFilter(bloom_size, bloom_hash_count)
        
        # Статистика прослуховувань
        self.listen_counts = defaultdict(int)
        self.completion_stats = defaultdict(lambda: defaultdict(int))
        self.device_stats = defaultdict(lambda: defaultdict(int))
        
        # Аналіз плейлистів та рекомендацій
        self.user_sequences = defaultdict(list)
        self.track_correlations = defaultdict(lambda: defaultdict(int))
        
        # Часові метрики
        self.time_window = time_window
        self.window_stats = []
        self.hourly_stats = defaultdict(lambda: defaultdict(int))
        
        # Додаткова статистика
        self.duration_points = defaultdict(list)

    def proccesor(self, event):
        """Обробляє подію прослуховування."""
        # Конвертуємо timestamp в datetime
        event_time = datetime.strptime(event.timestamp, '%Y-%m-%d %H:%M:%S')
        
        # Оновлюємо HyperLogLog
        self.track_listeners[event.music_id].add(event.user_id)
        
        # Перевіряємо повторне прослуховування
        listen_key = f"{event.user_id}:{event.music_id}:{event_time.date()}"
        is_repeat = self.repeat_listens.check(listen_key)
        self.repeat_listens.add(listen_key)
        
        # Оновлюємо всі метрики
        self._update_listen_stats(event)
        self._update_device_stats(event)
        self._update_user_sequences(event)
        self._update_time_window(event, event_time)
        self._update_duration_stats(event)


  
    def _update_listen_stats(self, event: UsageEvent) -> None:
        """Оновлює статистику прослуховувань."""
        self.listen_counts[event.music_id] += 1
        
        if event.completed.lower() == 'true':
            self.completion_stats[event.music_id]['completed'] += 1
        self.completion_stats[event.music_id]['total'] += 1
    
    def _update_device_stats(self, event: UsageEvent) -> None:
        """Оновлює статистику пристроїв."""
        self.device_stats[event.music_id][event.device] += 1
    
    def _update_user_sequences(self, event: UsageEvent) -> None:
        """Оновлює послідовності прослуховувань для рекомендацій."""
        user_seq = self.user_sequences[event.user_id]
        if user_seq:
            last_track = user_seq[-1]
            self.track_correlations[last_track][event.music_id] += 1
        
        user_seq.append(event.music_id)
        if len(user_seq) > 10:  # Зберігаємо останні 10 треків
            user_seq.pop(0)
    
    def _update_time_window(self, event: UsageEvent, event_time: datetime) -> None:
        """Підтримує ковзне вікно для аналізу трендів."""
        self.window_stats = [
            (ts, track) for ts, track in self.window_stats 
            if event_time - ts <= self.time_window
        ]
        
        self.window_stats.append((event_time, event.music_id))
        self.hourly_stats[event.music_id][event_time.hour] += 1
    
    def _update_duration_stats(self, event: UsageEvent) -> None:
        """Оновлює статистику тривалості прослуховування."""
        try:
            duration = float(event.duration_point)
            self.duration_points[event.music_id].append(duration)
        except (ValueError, TypeError):
            pass  # Пропускаємо некоректні значення
    
    def get_trending_tracks(self, top_n: int = 10) -> List[Tuple[str, int]]:
        """Повертає трендові треки на основі поточного вікна."""
        window_counts = defaultdict(int)
        for _, track_id in self.window_stats:
            window_counts[track_id] += 1
        
        return sorted(
            window_counts.items(),
            key=lambda x: x[1],
            reverse=True
        )[:top_n]
    
    def get_track_recommendations(self, track_id: str, limit: int = 5) -> List[Tuple[str, float]]:
        """Повертає рекомендації на основі кореляцій прослуховувань."""
        correlations = self.track_correlations[track_id]
        if not correlations:
            return []
        
        total_plays = sum(correlations.values())
        normalized = [
            (track, count / total_plays)
            for track, count in correlations.items()
        ]
        
        return sorted(normalized, key=lambda x: x[1], reverse=True)[:limit]
    
    def get_track_stats(self, track_id: str) -> Dict[str, any]:
        """Повертає детальну статистику для треку."""
        completion_stats = self.completion_stats[track_id]
        duration_points = self.duration_points[track_id]
        
        return {
            'total_plays': self.listen_counts[track_id],
            'unique_listeners': int(self.track_listeners[track_id].estimate()),
            'avg_duration': sum(duration_points) / len(duration_points) if duration_points else 0,
            'completion_rate': (
                completion_stats['completed'] / completion_stats['total']
                if completion_stats['total'] > 0 else 0
            ),
            'device_distribution': {
                device: count / self.listen_counts[track_id]
                for device, count in self.device_stats[track_id].items()
            },
            'hourly_distribution': dict(self.hourly_stats[track_id])
        }

    

In [30]:
import random
import string
import time
from dataclasses import dataclass
from typing import Generator

def random_string(length: int) -> str:
    """Generate a random string of uppercase letters and digits."""
    return ''.join(random.choices(string.ascii_uppercase + string.digits, k=length))

def generate_usage_events():
    """A generator that yields random UsageEvent objects."""
    devices = ["mobile", "desktop", "tablet", "smart_tv"]
    track_ids = [f"track_{i}" for i in range(100)]
    user_ids = [f"user_{i}" for i in range(1000)]

    while True:
        user_id = random.choice(user_ids)
        music_id = random.choice(track_ids)
        timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        duration_point = str(random.randint(30, 300))  # Duration in seconds (30 to 300 seconds)
        completed = random.choice(["true", "false"])
        device = random.choice(devices)

        yield UsageEvent(
            user_id=user_id,
            music_id=music_id,
            timestamp=timestamp,
            duration_point=duration_point,
            completed=completed,
            device=device
        )

In [43]:
analizer = StreamingAnalyser()
event_generator = generate_usage_events()


for _ in range(10000):  # Generate 5 events for demonstration
    event = next(event_generator)
    print(event)
    analizer.proccesor(event)

UsageEvent(user_id='user_457', music_id='track_34', timestamp='2025-01-19 17:22:44', duration_point='250', completed='false', device='smart_tv')
UsageEvent(user_id='user_436', music_id='track_37', timestamp='2025-01-19 17:22:44', duration_point='290', completed='false', device='smart_tv')
UsageEvent(user_id='user_979', music_id='track_52', timestamp='2025-01-19 17:22:44', duration_point='219', completed='false', device='smart_tv')
UsageEvent(user_id='user_109', music_id='track_82', timestamp='2025-01-19 17:22:44', duration_point='240', completed='false', device='tablet')
UsageEvent(user_id='user_497', music_id='track_84', timestamp='2025-01-19 17:22:44', duration_point='52', completed='true', device='smart_tv')
UsageEvent(user_id='user_634', music_id='track_30', timestamp='2025-01-19 17:22:44', duration_point='68', completed='false', device='smart_tv')
UsageEvent(user_id='user_434', music_id='track_74', timestamp='2025-01-19 17:22:44', duration_point='207', completed='false', device='d

UsageEvent(user_id='user_542', music_id='track_80', timestamp='2025-01-19 17:22:44', duration_point='99', completed='false', device='mobile')
UsageEvent(user_id='user_51', music_id='track_50', timestamp='2025-01-19 17:22:44', duration_point='64', completed='false', device='desktop')
UsageEvent(user_id='user_843', music_id='track_35', timestamp='2025-01-19 17:22:44', duration_point='112', completed='true', device='smart_tv')
UsageEvent(user_id='user_888', music_id='track_64', timestamp='2025-01-19 17:22:44', duration_point='204', completed='false', device='mobile')
UsageEvent(user_id='user_977', music_id='track_29', timestamp='2025-01-19 17:22:44', duration_point='207', completed='true', device='tablet')
UsageEvent(user_id='user_634', music_id='track_85', timestamp='2025-01-19 17:22:44', duration_point='165', completed='false', device='tablet')
UsageEvent(user_id='user_26', music_id='track_71', timestamp='2025-01-19 17:22:44', duration_point='153', completed='false', device='mobile')
Us

In [44]:
for track_id, plays in analizer.get_trending_tracks():
    print(f"\nТрек {track_id}")

        # Детальна статистика
    stats = analizer.get_track_stats(track_id)
    print(f"Загальні прослуховування: {stats['total_plays']}")
    print(f"Унікальні слухачі: {stats['unique_listeners']}")
    print(f"Середня тривалість прослуховування: {stats['avg_duration']:.2f}")
    print(f"Відсоток завершених прослуховувань: {stats['completion_rate']:.1%}")
    
    # Розподіл за пристроями
    print("\nРозподіл за пристроями:")
    for device, percentage in stats['device_distribution'].items():
        print(f"  - {device}: {percentage:.1%}")
    
    # Рекомендації
    print("\nРекомендовані треки:")
    for rec_id, score in analizer.get_track_recommendations(track_id):
        print(f"  - {rec_id} (score: {score:.2f})")


Трек track_84
Загальні прослуховування: 132
Унікальні слухачі: 53
Середня тривалість прослуховування: 174.12
Відсоток завершених прослуховувань: 53.0%

Розподіл за пристроями:
  - smart_tv: 26.5%
  - tablet: 24.2%
  - desktop: 24.2%
  - mobile: 25.0%

Рекомендовані треки:
  - track_44 (score: 0.03)
  - track_32 (score: 0.03)
  - track_99 (score: 0.03)
  - track_53 (score: 0.03)
  - track_30 (score: 0.03)

Трек track_94
Загальні прослуховування: 124
Унікальні слухачі: 62
Середня тривалість прослуховування: 161.62
Відсоток завершених прослуховувань: 46.0%

Розподіл за пристроями:
  - tablet: 30.6%
  - smart_tv: 22.6%
  - desktop: 25.0%
  - mobile: 21.8%

Рекомендовані треки:
  - track_84 (score: 0.05)
  - track_60 (score: 0.04)
  - track_46 (score: 0.04)
  - track_61 (score: 0.03)
  - track_14 (score: 0.03)

Трек track_70
Загальні прослуховування: 120
Унікальні слухачі: 56
Середня тривалість прослуховування: 160.16
Відсоток завершених прослуховувань: 49.2%

Розподіл за пристроями:
  - t