# Kick Stream İzleyici Simülasyonu

Bu notebook, Kick platformundaki yayınların izleyici sayısını simüle eder.

## TPU Bağlantısı
Bu kod ağ istekleri ve thread'lerle çalışır - derin öğrenme içermediği için TPU kullanmaz. Ancak aynı Colab ortamında TPU kullanılmak istenirse ilk kurulum burada yapılabilir.

In [1]:
# TPU bağlantısı - Bu kod TPU kullanmıyor ancak aynı notebook'ta TPU kullanmak isterseniz
import os
try:
    import tensorflow as tf
    resolver = tf.distribute.cluster_resolver.TPUClusterResolver()
    print('TPU bulunan cihazlar: ', resolver.cluster_spec().as_dict()['worker'])
    tf.config.experimental_connect_to_cluster(resolver)
    tf.tpu.experimental.initialize_tpu_system(resolver)
    print("TPU kullanıma hazır")
except:
    print("TPU bağlantısı bulunamadı")

TPU bağlantısı bulunamadı


## Gereksinimleri Kurma

Aşağıdaki hücre gerekli kütüphaneleri yükler:

In [2]:
!pip install requests m3u8 curl_cffi
print("Gerekli kütüphaneler yüklendi")

Gerekli kütüphaneler yüklendi


## Kod Tanımlamaları

Aşağıdaki hücre tüm sınıfları ve fonksiyonları tanımlar:

In [3]:
import re
import requests
import time
import m3u8
from concurrent.futures import ThreadPoolExecutor
import json
import sys
from typing import Optional, Dict, Any
import os
from urllib.parse import urlparse, urljoin
import threading
import logging
import curl_cffi
import atexit
from google.colab import output

# Google Colab'da hücre çıktısında otomatik kaydırma için
output.enable_custom_widget_manager()

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class ViewerCounter:
    def __init__(self):
        self._count = 0
        self._lock = threading.Lock()

    def increment(self):
        with self._lock:
            self._count += 1
            logger.info(f"Active viewers: {self._count}")

    def decrement(self):
        with self._lock:
            self._count = max(0, self._count - 1)
            logger.info(f"Active viewers: {self._count}")

    @property
    def count(self):
        with self._lock:
            return self._count

viewer_counter = ViewerCounter()

class Kick:
    def __init__(self, proxy: Optional[str] = None):
        self.session = self._create_session(proxy)

    def _create_session(self, proxy: Optional[str]) -> curl_cffi.Session:
        session = curl_cffi.Session(impersonate="firefox135")
        headers = {
            "sec-ch-ua": "\"Google Chrome\";v=\"131\", \"Not)A;Brand\";v=\"99\", \"Microsoft Edge\";v=\"131\"",
            "sec-ch-ua-mobile": "?0",
            "sec-ch-ua-platform": "\"Windows\"",
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9",
            "Accept-Encoding": "gzip, deflate, br",
            "Accept-Language": "en-US,en;q=0.9",
            "Cache-Control": "max-age=0",
            "Connection": "keep-alive",
            "Upgrade-Insecure-Requests": "1"
        }
        session.headers.update(headers)

        if proxy:
            session.proxies = {"http": proxy, "https": proxy}

        return session

    def get_stream_url(self, username: str) -> Optional[str]:
        try:
            response = self.session.get(f"https://kick.com/{username}")
            pattern = r'playback_url\\":\\"(https://[^\\"]+)'
            match = re.search(pattern, response.text)

            if match:
                playback_url = match.group(1)
                return playback_url.replace('\\\\', '\\').replace('\\/', '/')
            return None
        except Exception as e:
            logger.error(f"Error getting stream URL: {e}")
            return None

class M3U8Handler:
    def __init__(self, master_url: str, session: Optional[Any] = None):
        self.master_url = master_url
        self.session = session or requests.Session()
        self.base_url = self._get_base_url(master_url)
        self.stop_event = threading.Event()
        self.playback_thread = None

    def _get_base_url(self, url: str) -> str:
        parsed = urlparse(url)
        return f"{parsed.scheme}://{parsed.netloc}{os.path.dirname(parsed.path)}/"

    def _resolve_url(self, segment_url: str) -> str:
        if segment_url.startswith('http'):
            return segment_url
        return urljoin(self.base_url, segment_url)

    def fetch_playlist(self) -> Optional[m3u8.M3U8]:
        try:
            response = self.session.get(self.master_url)
            return m3u8.loads(response.text)
        except Exception as e:
            logger.error(f"Error fetching master playlist: {e}")
            return None

    def get_lowest_bandwidth_stream(self) -> Optional[str]:
        playlist = self.fetch_playlist()
        if not playlist:
            return None

        if not playlist.playlists:
            return self.master_url

        min_bandwidth = float('inf')
        lowest_variant = None

        for variant in playlist.playlists:
            bandwidth = variant.stream_info.bandwidth
            if bandwidth < min_bandwidth:
                min_bandwidth = bandwidth
                lowest_variant = variant

        if lowest_variant:
            return self._resolve_url(lowest_variant.uri)
        return None

    def fetch_media_playlist(self, playlist_url: str) -> Optional[m3u8.M3U8]:
        try:
            response = self.session.get(playlist_url)
            return m3u8.loads(response.text)
        except Exception as e:
            logger.error(f"Error fetching media playlist: {e}")
            return None

    def fetch_segment(self, segment_url: str) -> bool:
        full_url = self._resolve_url(segment_url)
        try:
            start_time = time.time()
            response = self.session.get(full_url, stream=True)
            for chunk in response.iter_content(chunk_size=1):
                if chunk:
                    break
            duration = time.time() - start_time
            logger.info(f"Segment fetched in {duration:.2f}s: {full_url}")
            return True
        except Exception as e:
            logger.error(f"Error fetching segment: {e}")
            return False

    def simulate_playback(self, media_playlist_url: str) -> None:
        viewer_counter.increment()
        try:
            while not self.stop_event.is_set():
                media_playlist = self.fetch_media_playlist(media_playlist_url)
                if not media_playlist or not media_playlist.segments:
                    break

                for segment in media_playlist.segments[:min(2, len(media_playlist.segments))]:
                    if self.stop_event.is_set():
                        break
                    self.fetch_segment(segment.uri)
                    time.sleep(40)
                if media_playlist.is_endlist:
                    break
                time.sleep(5)
        finally:
            viewer_counter.decrement()

    def start(self) -> bool:
        lowest_stream_url = self.get_lowest_bandwidth_stream()
        if not lowest_stream_url:
            logger.error("Could not find a stream to play")
            return False

        self.playback_thread = threading.Thread(target=self.simulate_playback, args=(lowest_stream_url,))
        self.playback_thread.daemon = True
        self.playback_thread.start()
        return True

    def stop(self) -> None:
        self.stop_event.set()
        if self.playback_thread and self.playback_thread.is_alive():
            self.playback_thread.join(timeout=5)

def cleanup_handlers(handlers):
    logger.info("Cleaning up handlers...")
    for handler in handlers:
        handler.stop()


def main(viewers: int = 1, username: str = "username", proxy: Optional[str] = None):
    active_handlers = []
    m3u8_url = None
    session = None
    try:
        # Sadece bir kez Kick ile m3u8 url'si al
        kick = Kick(proxy)
        m3u8_url = kick.get_stream_url(username)
        session = kick.session
        if not m3u8_url:
            logger.error("Could not find playback URL")
            return

        # Colab'da çok yüksek sayıda viewer optimize edelim
        max_batch = 100  # Aynı anda başlatılacak maksimum handler sayısı
        batches = [viewers // max_batch + (1 if viewers % max_batch > 0 else 0)]
        batch_sizes = [min(max_batch, viewers - i * max_batch) for i in range(batches[0])]

        logger.info(f"{len(batch_sizes)} batch halinde çalıştırılacak: {batch_sizes}")

        for batch_idx, batch_size in enumerate(batch_sizes):
            logger.info(f"Batch {batch_idx+1}/{len(batch_sizes)} başlatılıyor: {batch_size} handler")

            # Tüm handlerlar aynı m3u8 url'sini kullanacak ve segment isteği atacak
            for _ in range(batch_size):
                handler = M3U8Handler(m3u8_url, session=session)
                if handler.start():
                    active_handlers.append(handler)
                else:
                    logger.error("Failed to start a handler.")
                time.sleep(0.05)  # Handler başlatma arası 50ms gecikme, DNS ve bağlantı yükünü azaltır

            logger.info(f"Batch {batch_idx+1} tamamlandı: {len(active_handlers)} aktif handler")

            # Batch'ler arası 5 saniye bekleyelim
            if batch_idx < len(batch_sizes) - 1:
                logger.info(f"Sonraki batch için 5 saniye bekleniyor...")
                time.sleep(5)

        logger.info(f"Successfully started {len(active_handlers)} viewers (hepsi segment isteği atıyor, kick.com'a tekrar istek atmıyor)")
        atexit.register(cleanup_handlers, active_handlers)

        try:
            while True:
                logger.info(f"Current active viewers: {viewer_counter.count}")
                time.sleep(60)
        except KeyboardInterrupt:
            logger.info("Received keyboard interrupt, shutting down...")

    finally:
        cleanup_handlers(active_handlers)
        logger.info("Program sonlandırıldı.")

## Programı Çalıştırma

Aşağıdaki hücreyi çalıştırarak yayın izleyicileri simüle edilir. Parametreleri isteğinize göre ayarlayabilirsiniz.

In [None]:
# İzleyici sayısı ve yayıncı adını ayarlayın
viewers = 5000  # 5000 yerine daha makul bir sayı ile başlayalım
username = "irkaleva"  # İzlenmek istenen yayıncı adı

# Programı çalıştırma
print(f"{viewers} izleyici simülasyonu başlatılıyor...")
main(viewers=viewers, username=username)

5000 izleyici simülasyonu başlatılıyor...


ERROR:__main__:Error fetching segment: Failed to perform, curl: (35) TLS connect error: error:100003f2:SSL routines:OPENSSL_internal:SSLV3_ALERT_UNEXPECTED_MESSAGE. See https://curl.se/libcurl/c/libcurl-errors.html first for more details.
ERROR:__main__:Error fetching media playlist: Failed to perform, curl: (28) Operation timed out after 30002 milliseconds with 0 bytes received. See https://curl.se/libcurl/c/libcurl-errors.html first for more details.
ERROR:__main__:Error fetching media playlist: Failed to perform, curl: (35) TLS connect error: error:10000129:SSL routines:OPENSSL_internal:TLS13_DOWNGRADE. See https://curl.se/libcurl/c/libcurl-errors.html first for more details.
ERROR:__main__:Error fetching media playlist: Failed to perform, curl: (35) TLS connect error: error:10000129:SSL routines:OPENSSL_internal:TLS13_DOWNGRADE. See https://curl.se/libcurl/c/libcurl-errors.html first for more details.
ERROR:__main__:Error fetching media playlist: Failed to perform, curl: (35) TLS c

## Programı Durdurma

Tüm handler'ları manuel olarak durdurmak için aşağıdaki hücreyi çalıştırın:

In [None]:
# Kernel'i yeniden başlatmadan önce bu hücreyi çalıştırın
import os
os._exit(0)  # Tüm thread'leri sonlandırır