In [None]:
import unittest
from unittest.mock import Mock, patch
from io import BytesIO
import pandas as pd
from jobs.silver_layer import SilverLayerProcessor
from plugins.storage_client import AzureStorageClient
from plugins.spark_manager import SparkManager
from pyspark.sql import SparkSession
from pyspark.sql import Row


class TestSilverLayerProcessor(unittest.TestCase):

    def setUp(self):
        self.mock_storage_client = Mock(spec=AzureStorageClient)
        self.mock_spark_manager = Mock(spec=SparkManager)

        self.mock_spark = Mock(SparkSession)
        self.mock_spark_manager.create_spark_session.return_value = self.mock_spark

        self.mock_schema = Mock()
        self.mock_spark_manager.get_breweries_schema.return_value = self.mock_schema

        self.processor = SilverLayerProcessor(
            storage_client=self.mock_storage_client,
            spark_manager=self.mock_spark_manager
        )

    @patch('pyspark.sql.DataFrame')
    def test_process(self, mock_df):

        test_json_data = '[{"id": "1", "name": "Test Brewery", "state": "NY"}]'
        self.mock_storage_client.download_blob.return_value = test_json_data

        mock_df.dropDuplicates.return_value = mock_df
        mock_df.repartition.return_value = mock_df
        
        self.mock_spark.createDataFrame.return_value = mock_df
        
        mock_df.select.return_value.distinct.return_value.collect.return_value = [
            Row(state='NY')]

        test_pdf = pd.DataFrame(
            {'id': ['1'], 'name': ['Test Brewery'], 'state': ['NY']})
        mock_df.filter.return_value.toPandas.return_value = test_pdf

        self.processor.process()

        self.mock_storage_client.download_blob.assert_called_once_with(
            container='bronze-layer', blob_path='breweries_data.json', file_type='json'
        )
        mock_df.dropDuplicates.assert_called_once()
        mock_df.repartition.assert_called_once_with("state")
        mock_df.filter.assert_called_with(mock_df.state == 'NY')
        
        self.mock_storage_client.upload_blob.assert_called_once()
        self.mock_spark.createDataFrame.assert_called_once()


if __name__ == '__main__':
    unittest.main()