In [None]:
from pathlib import Path
import json
import re
import os
import ssl
import unittest
from unittest import mock
from platform import python_version
import pandas as pd
from pandas.testing import assert_frame_equal
import numpy as np
import scipy
import sklearn
from scipy import special
from scipy import integrate
from sklearn import datasets
from sklearn.model_selection import train_test_split
import matplotlib
import matplotlib.pyplot as plt
import kafka
from kafka import KafkaConsumer, KafkaProducer, TopicPartition
from kafka.producer.buffer import SimpleBufferPool
from kafka.errors import KafkaConfigurationError
import boto3

def get_major_minor(s):
    return '.'.join(s.split('.')[:2])

def load_expected_versions() -> dict:
    lock_file = Path('./expected_versions.json')
    data = {}

    with open(lock_file, 'r') as file:
        data = json.load(file)

    return data

def get_expected_version(dependency_name: str) -> str:
    raw_value = expected_versions.get(dependency_name)
    raw_version = re.sub(r'^\D+', '', raw_value)
    return get_major_minor(raw_version)

class TestPythonVersion(unittest.TestCase):
    def test_version(self):
        expected_major_minor = expected_major_minor = get_expected_version('Python')
        actual_major_minor = get_major_minor(python_version())
        self.assertEqual(actual_major_minor, expected_major_minor, "incorrect version")

class TestPandas(unittest.TestCase):
    def test_version(self):
        expected_major_minor = get_expected_version('Pandas')
        actual_major_minor = get_major_minor(pd.__version__)
        self.assertEqual(actual_major_minor, expected_major_minor, "incorrect version")

    def test_dataframe_creation(self):
        sample_df = pd.DataFrame({'a': [1, 2], 'b': [3, 4]})
        self.assertIsInstance(sample_df, pd.core.frame.DataFrame)

    def test_equal_dataframes(self):
        df1 = pd.DataFrame({'a': [1, 2], 'b': [3, 4]})
        df2 = pd.DataFrame({'a': [1, 2], 'b': [3.0, 4.0]})
        self.assertIsNone(assert_frame_equal(df1, df2, check_dtype=False), "Dataframes provided are unequal")

    def test_unequal_dataframes(self):
        df1 = pd.DataFrame({'a': [1, 2], 'b': [3, 4]})
        df2 = pd.DataFrame({'a': [1, 2], 'b': [3.0, 5.0]})
        with self.assertRaises(AssertionError):
            assert_frame_equal(df1, df2, check_dtype=False)

    def test_dataframe_shape(self):
        random_data = {
            'apples': [3, 2, 0, 1],
            'oranges': [0, 3, 7, 2]
        }
        sample_df = pd.DataFrame(random_data)
        self.assertEqual(sample_df.shape, (4,2))

    def test_index_out_of_bounds(self):
        random_data = {
            'apples': [3, 2, 0, 1],
            'oranges': [0, 3, 7, 2]
        }
        sample_df = pd.DataFrame(random_data)
        with self.assertRaises(IndexError):
            print(sample_df.iat[0,3])

    def test_sampling(self):
        random_data = {
            'apples': [3, 2, 0, 1],
            'oranges': [0, 3, 7, 2]
        }
        sample_df = pd.DataFrame(random_data)
        half_sampled_df = sample_df.sample(frac = 0.5)
        self.assertEqual(len(half_sampled_df), 2)

    def test_drop(self):
        random_data = {
            'apples': [3, 2, 0, 1],
            'oranges': [0, 3, 7, 2]
        }
        sample_df = pd.DataFrame(random_data)
        self.assertEqual(sample_df.drop(columns=['apples']).shape, (4, 1))

class TestNumpy(unittest.TestCase):
    def test_version(self):
        expected_major_minor = get_expected_version('Numpy')
        actual_major_minor = get_major_minor(np.__version__)
        self.assertEqual(actual_major_minor, expected_major_minor, "incorrect version")

    def test_array_creation(self):
        arr = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9])
        self.assertIsInstance(arr, np.ndarray)

    def test_array_opeartions(self):
        arr = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9])

        self.assertEqual(arr.sum(), 45)
        self.assertEqual(len(arr), 9)
        self.assertEqual(arr.max(), 9)
        self.assertEqual(arr.min(), 1)

    def test_array_statistical_functions(self):
        arr = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9])

        self.assertEqual(np.median(arr), 5)
        self.assertEqual(np.mean(arr), 5)
        self.assertEqual(np.std(arr), 2.581988897471611)

class TestScipy(unittest.TestCase):
    def test_version(self):
        expected_major_minor = get_expected_version('Scipy')
        actual_major_minor = get_major_minor(scipy.__version__)
        self.assertEqual(actual_major_minor, expected_major_minor, "incorrect version")

    def test_scipy_special(self):
        self.assertEqual(special.exp10(3), 1000.0)
        self.assertEqual(special.exp2(10), 1024.0)
        self.assertEqual(special.sindg(90), 1)
        self.assertEqual(special.cosdg(0), 1)

    def test_scipy_integrate(self):
        a= lambda x:special.exp10(x)
        b = integrate.quad(a, 0, 1)
        self.assertEqual(b, (3.9086503371292665, 4.3394735994897923e-14))

class TestSKLearn(unittest.TestCase):
    def test_version(self):
        expected_major_minor = get_expected_version('Scikit-learn')
        actual_major_minor = get_major_minor(sklearn.__version__)
        self.assertEqual(actual_major_minor, expected_major_minor, "incorrect version")

    def test_sklearn_dataset(self):
        data_set = datasets.load_iris()
        self.assertIsInstance(data_set, sklearn.utils._bunch.Bunch)

    def test_sklearn_train_test_split(self):
        my_iris = datasets.load_iris()
        X = my_iris.data
        Y = my_iris.target

        X_traindata, X_testdata, Y_traindata, Y_testdata = train_test_split(
            X, Y, test_size = 0.3, random_state = 1)

        self.assertEqual(X_traindata.shape, (105, 4))
        self.assertEqual(X_testdata.shape, (45, 4))
        self.assertEqual(Y_traindata.shape, (105,))
        self.assertEqual(Y_testdata.shape, (45,))

class TestMatplotlib(unittest.TestCase):

    def test_version(self):
        expected_major_minor = get_expected_version('Matplotlib')
        actual_major_minor = get_major_minor(matplotlib.__version__)
        self.assertEqual(actual_major_minor, expected_major_minor, "incorrect version")

    def test_matplotlib_figure_creation(self):
        self.assertIsInstance(plt.figure(figsize=(8,5)), matplotlib.figure.Figure)


class TestKafkaPython(unittest.TestCase):

    def test_version(self):
        expected_major_minor = get_expected_version('Kafka-Python-ng')
        actual_major_minor = get_major_minor(kafka.__version__)
        self.assertEqual(actual_major_minor, expected_major_minor, "incorrect version")

    def test_buffer_pool(self):
        pool = SimpleBufferPool(1000, 1000)

        buf1 = pool.allocate(1000, 1000)
        message = ''.join(map(str, range(100)))
        buf1.write(message.encode('utf-8'))
        pool.deallocate(buf1)

        buf2 = pool.allocate(1000, 1000)
        self.assertEqual(buf2.read(), b'')

    def test_session_timeout_larger_than_request_timeout_raises(self):
        with self.assertRaises(KafkaConfigurationError):
            KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 9), group_id='foo', session_timeout_ms=50000, request_timeout_ms=40000)

class TestBoto3(unittest.TestCase):

    def test_version(self):
        expected_major_minor = get_expected_version('Boto3')
        actual_major_minor = get_major_minor(boto3.__version__)
        self.assertEqual(actual_major_minor, expected_major_minor, "incorrect version")

    def setUp(self):
        self.session_patch = mock.patch('boto3.Session', autospec=True)
        self.Session = self.session_patch.start()

    def tearDown(self):
        boto3.DEFAULT_SESSION = None
        self.session_patch.stop()

    def test_create_default_session(self):
        session = self.Session.return_value

        boto3.setup_default_session()

        self.assertEqual(boto3.DEFAULT_SESSION, session)

@unittest.skip("RHAIENG-509: TestSecurity tests all fail")
class TestSecurity(unittest.TestCase):
    def test_jupyter_password_env(self):
        self.assertIn("JUPYTER_PASSWORD", os.environ, "Missing JUPYTER_PASSWORD env variable for login protection")

    def test_ssl_files_exist(self):
        cert_file = os.environ.get("JUPYTER_SSL_CERT", "/etc/jupyter/ssl/cert.pem")
        key_file = os.environ.get("JUPYTER_SSL_KEY", "/etc/jupyter/ssl/key.pem")
        self.assertTrue(os.path.exists(cert_file), f"SSL cert not found: {cert_file}")
        self.assertTrue(os.path.exists(key_file), f"SSL key not found: {key_file}")

    def test_ssl_certificate_validity(self):
        cert_file = os.environ.get("JUPYTER_SSL_CERT", "/etc/jupyter/ssl/cert.pem")
        try:
            context = ssl.create_default_context()
            context.load_cert_chain(certfile=cert_file)
        except Exception as e:
            self.fail(f"Invalid SSL cert: {e}")

    def test_host_not_public(self):
        host = os.environ.get("JUPYTER_HOST", "localhost")
        self.assertIn(host, ["localhost", "127.0.0.1"], f"Jupyter host is publicly exposed: {host}")

expected_versions = load_expected_versions()
unittest.main(argv=[''], verbosity=2, exit=False)