In [None]:
from google.cloud import pubsub_v1
import json
import requests 
import time
from google.oauth2 import service_account

class BTC_API:
    def __init__(self):
        self.key = "https://api.binance.com/api/v3/ticker/price?symbol=BTCUSDT"
        self.PROJECT_ID = "big-d-project-404815"
        self.TOPIC_NAME = "bitcoin-topic"
        self.topic_path = f"projects/{self.PROJECT_ID}/topics/{self.TOPIC_NAME}"
    def get_data(self):
        # requesting data from url 
        data = requests.get(self.key) 
        data = data.json() 
        return data

    def send_to_pubsub(self, data):
        # Konfiguracja klienta Pub/Sub
        publisher = pubsub_v1.PublisherClient(
            #login using json file
            credentials=service_account.Credentials.from_service_account_file(
                filename='../big-d-project-404815-e51da3ccf3cc.json'),
        )
        # Konwersja danych do formatu JSON
        data_json = json.dumps(data)
        # Publikowanie wiadomo≈õci
        try:
            publisher.publish(self.topic_path, data=data_json.encode('utf-8'))
            print(f"Message published : {data_json}")
        except Exception as e:
            print(f"Eror during publishing message: {e}")

    def listen_for_messages(self, wait_time=15):
        while True:
            time.sleep(wait_time)
            data = self.get_data()
            self.send_to_pubsub(data)
            del data
            
if __name__ == "__main__":
    btc_api = BTC_API()
    btc_api.listen_for_messages()

In [7]:
import unittest
from unittest.mock import patch, MagicMock
from api import BTC_API  # Replace 'your_module' with the actual name of your module

class TestBTC_API(unittest.TestCase):

    @patch('requests.get')
    def test_get_data(self, mock_get):
        # Setup mock response
        mock_response = MagicMock()
        mock_response.json.return_value = {'symbol': 'BTCUSDT', 'price': '12345.67'}
        mock_get.return_value = mock_response

        # Test get_data method
        btc_api = BTC_API()
        data = btc_api.get_data()
        self.assertEqual(data, {'symbol': 'BTCUSDT', 'price': '12345.67'})

    @patch('google.cloud.pubsub_v1.PublisherClient')
    def test_send_to_pubsub(self, mock_publisher_client):
        # Setup mock publisher
        mock_publisher = MagicMock()
        mock_publisher_client.return_value = mock_publisher

        # Test send_to_pubsub method
        btc_api = BTC_API()
        btc_api.send_to_pubsub({'test': 'data'})

        # Verify publish was called
        mock_publisher.publish.assert_called_once()

if __name__ == '__main__':
    unittest.main(argv=['first-arg-is-ignored'], exit=False)


..
----------------------------------------------------------------------
Ran 2 tests in 0.006s

OK


Message published : {"test": "data"}
