# Wersja z dostępem do środowiska

Tą wersję można przejść również posiadając nowy obraz dockerowy i uruchomiony docker desktop na własnym komputerze.

1. Przejdź do przeglądarki i uruchom stronę ze środowiskiem (w przypadku Docker uruchom `localhost:8888`).
2. Uruchom (w jupyter lab za pomocą ikony terminala) nowy terminal

3. Przejdź do katalogu głównego i wypisz listę wszystkich elementów. Sprawdź czy na liście znajduje się katalog `kafka`.

   ```bash
   cd ~
   ls -all
   ```
4. Uruchom polecenie sprawdzające listę topiców serwera Kafki
    ```bash
    kafka/bin/kafka-topics.sh --list --bootstrap-server broker:9092
    ```
5. Dodaj topic o nazwie streaming
   ```bash
   kafka/bin/kafka-topics.sh --bootstrap-server broker:9092 --create --topic streaming
   ```
6. Sprawdź listę tematów ponownie upewniając się, że posiadasz temat `streaming`
7. Uruchom nowy terminal na notatniku i utwórz producenta w konsoli generującego dane do nowego topicu
```bash
kafka/bin/kafka-console-producer.sh --bootstrap-server broker:9092 --topic streaming
```

Aby sprawdzić czy wysyłanie wiadomości działa uruchom kolejne okno terminala i wpisz następującą komendę realizującą konsumenta w konsoli: 

```bash
kafka/bin/kafka-console-consumer.sh --bootstrap-server broker:9092 --topic streaming --from-beginning
```
> Pamiętaj aby uruchamiać komendy z odpowiedniego katalogu. 

In [23]:
%%file stream_stock.py

import json
import requests
import sys
from datetime import datetime, timedelta
from time import sleep
import random
import yfinance as yf

from kafka import KafkaProducer

def fetch_stock_data():
    with open('/home/jovyan/notebooks/stock_indexes.txt', 'r') as file:
        for line in file:
            symbol = line.strip()
            stock = yf.Ticker(symbol)
            try:
                stock_info = stock.info
                if stock_info:
                    message = {'type': 'Stock',
                               "date": datetime.now().isoformat(),
                               'symbol': stock_info.get('symbol', 'No Data'),
                               'currentPrice': stock_info.get('currentPrice', 'No Data'),
                               'open': stock_info.get('open', 'No Data'),
                               'previousClose': stock_info.get('previousClose', 'No Data'),
                               'dayLow': stock_info.get('dayLow', 'No Data'),
                               'dayHigh': stock_info.get('dayHigh', 'No Data'),
                               'recommendation': stock_info.get('recommendationKey', 'No Data'),
                               
                               'targetHighPrice': stock_info.get('targetHighPrice', 'No Data'),
                               'targetLowPrice': stock_info.get('targetLowPrice', 'No Data'),
                               'targetMeanPrice': stock_info.get('targetMeanPrice', 'No Data'),
                               'targetMedianPrice': stock_info.get('targetMedianPrice', 'No Data'),
                               'recommendationMean': stock_info.get('recommendationMean', 'No Data'),
                               
                               'totalRevenue': stock_info.get('totalRevenue', 'No Data'),
                               'revenuePerShare': stock_info.get('revenuePerShare', 'No Data'),
                               'totalDebt': stock_info.get('totalDebt', 'No Data'),
                               'debtToEquity': stock_info.get('debtToEquity', 'No Data'),
                               'totalCash': stock_info.get('totalCash', 'No Data'),
                               'totalCashPerShare': stock_info.get('totalCashPerShare', 'No Data'),
                               'ebitda': stock_info.get('ebitda', 'No Data'),
                               'earningsGrowth': stock_info.get('earningsGrowth', 'No Data'),
                               'revenueGrowth': stock_info.get('revenueGrowth', 'No Data')
                              }
                    yield message
                else:
                    print("No data for symbol:", symbol)
            except:
                print("Errorfor symbol", symbol)

        sleep(10)

if __name__ == "__main__":
    SERVER = "broker:9092"

    stock_producer = KafkaProducer(
        bootstrap_servers=[SERVER],
        value_serializer=lambda x: json.dumps(x).encode("utf-8"),
        api_version=(3, 7, 0),)
    
    try:
        while True:
            for stock_data in fetch_stock_data():
                stock_producer.send("stock", value=stock_data)

    except KeyboardInterrupt:
        stock_producer.close()

Overwriting stream_stock.py


In [20]:
%%file stream_crypto.py

import json
import requests
import sys
from datetime import datetime, timedelta
from time import sleep
import random
import yfinance as yf

from kafka import KafkaProducer

def fetch_crypto_data():
    with open('/home/jovyan/notebooks/crypto_index.txt', 'r') as file:
        ids = file.readlines()
        ids = [id.strip() for id in ids]
    
    for id in ids:
        url = f"https://api.coingecko.com/api/v3/coins/markets?ids={id}&vs_currency=usd"
    
        headers = {
            "accept": "application/json",
            "x-cg-demo-api-key": "CG-4zHc6u3aAS8gNQbRbR5ruUPb"
        }
    
        response = requests.get(url, headers=headers)
    
        if response.status_code == 200:
            data = response.json()
            if data:
                entry = {
                    "type": "Crypto",
                    "date": datetime.now().isoformat(), 
                    "name": data[0]["name"],
                    "current_price": data[0]["current_price"],
                    "high_24h": data[0]["high_24h"],
                    "low_24h": data[0]["low_24h"],
                    "price_change_24h": data[0]["price_change_24h"],
                    "price_change_percentage_24h": data[0]["price_change_percentage_24h"],
                    "market_cap": data[0]["market_cap"],
                    "total_volume": data[0]["total_volume"],
                    "ath": data[0]["ath"],
                    "atl": data[0]["atl"]
                }
                yield entry
            else:
                print(f"No data for ID: {id}")
        else:
            print(f"Error for ID: {id} code: {response.status_code}")

    sleep(10)

if __name__ == "__main__":
    SERVER = "broker:9092"
    
    crypto_producer = KafkaProducer(
        bootstrap_servers=[SERVER],
        value_serializer=lambda x: json.dumps(x).encode("utf-8"),
        api_version=(3, 7, 0),)

    
    try:
        while True:
            
            for crypto_data in fetch_crypto_data():
                crypto_producer.send("crypto", value=crypto_data)
                
    except KeyboardInterrupt:
        crypto_producer.close()

Overwriting stream_crypto.py
