In [190]:
import inspect
import json
import numpy as np
import pandas as pd
import requests
import uuid
import time
import redis
import math
from tqdm.auto import tqdm
from sklearn.base import BaseEstimator
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, ParameterGrid, ParameterSampler

class MLTaskManager:
    def __init__(self, session_id, api_url=None, api_key=None, status_endpoint="/job-status"):
        """
        Initialize the task manager with a session ID and API details.
        
        Args:
            session_id (str): Unique identifier for the current session
            api_url (str): Base URL of the API endpoint
            api_key (str): API key for authentication (if required)
            status_endpoint (str): Endpoint for checking job status
        """
        self.session_id = session_id
        self.api_url = api_url
        # self.api_key = api_key

    
    def extract_model_details(self, estimator):
        """
        Extract model type and hyperparameters from a scikit-learn estimator.
        
        Args:
            estimator: A scikit-learn estimator, GridSearchCV, or RandomizedSearchCV object
            
        Returns:
            dict: Dictionary containing model type and hyperparameters
        """
        if isinstance(estimator, (GridSearchCV, RandomizedSearchCV)):
            base_estimator = estimator.estimator
            model_type = type(base_estimator).__name__
            search_type = "GridSearchCV" if isinstance(estimator, GridSearchCV) else "RandomizedSearchCV"
            
            # Get param distributions/grid
            if isinstance(estimator, GridSearchCV):
                param_search = {
                    'param_grid': estimator.param_grid
                }
            else:  # RandomizedSearchCV
                param_search = {
                    'param_distributions': estimator.param_distributions,
                    'n_iter': estimator.n_iter
                }
            
            # Get CV parameters
            cv_params = {
                'cv': estimator.cv,
                'scoring': estimator.scoring,
                'refit': estimator.refit,
                'verbose': estimator.verbose,
                'error_score': estimator.error_score,
                'return_train_score': estimator.return_train_score
            }
            
            hyperparams = {
                'base_estimator_params': {k.split('__')[-1]: v for k, v in base_estimator.get_params().items()},
                'search_params': param_search,
                'cv_params': cv_params
            }
            
            return {
                'model_type': model_type,
                'search_type': search_type,
                'hyperparameters': hyperparams
            }
        else:
            model_type = type(estimator).__name__
            hyperparams = {k: v for k, v in estimator.get_params().items()}
            
            return {
                'model_type': model_type,
                'hyperparameters': hyperparams
            }
    
    def api_request(self, endpoint, method="post", data=None, params=None):
        """
        Make an API request with proper error handling.
        
        Args:
            endpoint (str): API endpoint (appended to base api_url)
            method (str): HTTP method (post, get, etc.)
            data (dict): Data to send in the request body
            params (dict): URL parameters
            
        Returns:
            dict: API response
        """
        if not self.api_url:
            print("Warning: API URL not provided.")
            return {"status": "error", "message": "API URL not provided"}
        
        url = f"{self.api_url.rstrip('/')}/{endpoint.lstrip('/')}"

        
        headers = {}
        # if self.api_key:
        #     headers["Authorization"] = f"Bearer {self.api_key}"
        
        if data:
            headers["Content-Type"] = "application/json"
        
        try:
            # Convert numpy/pandas types to native Python types
            def json_serializer(obj):
                if isinstance(obj, (np.float32, np.float64)):
                    if math.isnan(obj) or math.isinf(obj):
                        return None  # Convert NaN and Inf to null
                    return float(obj)
                elif isinstance(obj, (np.int32, np.int64)):
                    return int(obj)
                elif isinstance(obj, np.ndarray):
                    return obj.tolist()
                elif isinstance(obj, (pd.DataFrame, pd.Series)):
                    return obj.to_dict()
                return str(obj)

            def clean_dict(data):
                """Recursively replace NaN and Inf values in a dictionary with None."""
                if isinstance(data, dict):
                    return {k: clean_dict(v) for k, v in data.items()}
                elif isinstance(data, list):
                    return [clean_dict(v) for v in data]
                elif isinstance(data, float) and (math.isnan(data) or math.isinf(data)):
                    return None  # Convert NaN/Inf to None
                return data

            
            if data:
                data = json.loads(json.dumps(data, default=json_serializer))
                data = clean_dict(data) 
            
            if method.lower() == "get":
                response = requests.get(url, params=params, headers=headers)
            elif method.lower() == "post":
                response = requests.post(url, json=data, params=params, headers=headers)
            else:
                return {"status": "error", "message": f"Unsupported HTTP method: {method}"}
            
            response.raise_for_status()
            return response.json()
        
        except requests.exceptions.RequestException as e:
            print(f"API request failed: {str(e)}")
            print(f"ERROR HERE")
            return {"status": "error", "message": str(e)}
        
        except Exception as e:
            print(f"Error processing request: {str(e)}")
            print(f"ERROR HERE")
            return {"status": "error", "message": str(e)}
    
    def send_to_training_api(self, payload):
        """
        Send job configuration to the distributed training API endpoint.
        
        Args:
            payload (dict): The data to send to the API
            
        Returns:
            dict: API response
        """
        return self.api_request(f"train/{self.session_id}", method="post", data=payload)
    
    def check_job_status(self, job_id):
        """
        Check the status of a training job.
        
        Args:
            job_id (str): ID of the job to check
            
        Returns:
            dict: Job status information
        """
        
        return self.api_request(
            f"check_status/{self.session_id}/{job_id}",
            method="get", 
        )
    
    # def wait_for_completion(self, job_id, polling_interval=5, timeout=3600):
    #     """
    #     Wait for a job to complete, with a progress bar.
        
    #     Args:
    #         job_id (str): ID of the job to monitor
    #         polling_interval (int): Seconds between status checks
    #         timeout (int): Maximum seconds to wait
            
    #     Returns:
    #         dict: Final job status
    #     """
    #     start_time = time.time()
    #     progress_bar = None
    #     last_progress = 0
    #     total_tasks = 0
        
    #     print(f"Monitoring job: {job_id}")
        
    #     while True:
    #         # Check if we've exceeded the timeout
    #         if time.time() - start_time > timeout:
    #             if progress_bar:
    #                 progress_bar.close()
    #             return {"status": "error", "message": f"Timeout after {timeout} seconds"}
            
    #         # Get job status
    #         status = self.check_job_status(job_id)
            
    #         # Handle error in status check
    #         if status.get("job_status") == "error":
    #             if progress_bar:
    #                 progress_bar.close()
    #             return status
            
    #         # Extract progress information
    #         job_status = status.get("job_status", "unknown")
    #         current_progress = status.get("progress", {})
    #         completed_tasks = current_progress.get("completed_tasks", 0)
            
    #         # If this is the first time we're getting task information, set up the progress bar
    #         if total_tasks == 0 and "total_subtasks" in current_progress:
    #             total_tasks = current_progress.get("total_subtasks", 100)
    #             progress_bar = tqdm(total=total_tasks, desc="Training Progress")
            
    #         # Update progress bar if it exists
    #         if progress_bar and completed_tasks > last_progress:
    #             progress_bar.update(completed_tasks - last_progress)
    #             last_progress = completed_tasks
            
    #         # Check if job is complete
    #         if job_status.lower() in ["completed", "failed", "error"]:
    #             if progress_bar:
    #                 # Ensure progress bar is at 100% for completed jobs
    #                 if job_status.lower() == "completed" and last_progress < total_tasks:
    #                     progress_bar.update(total_tasks - last_progress)
    #                 progress_bar.close()
                
    #             # Return the final status
    #             return status
            
    #         # Wait before polling again
    #         time.sleep(polling_interval)

    def wait_for_completion(self, job_id, polling_interval=5, timeout=3600):
        """
        Wait for a job to complete, with a progress bar and structured response for frontend.
        
        Args:
            job_id (str): ID of the job to monitor
            polling_interval (int): Seconds between status checks
            timeout (int): Maximum seconds to wait
            
        Returns:
            dict: Final job status in structured format
        """
        start_time = time.time()
        progress_bar = None
        last_progress = 0
        total_tasks = 0
        
        print(f"Monitoring job: {job_id}")
        
        while True:
            # Check if we've exceeded the timeout
            if time.time() - start_time > timeout:
                if progress_bar:
                    progress_bar.close()
                return {
                    "job_id": job_id,
                    "status": "error",
                    "completed_tasks": last_progress,
                    "total_subtasks": total_tasks,
                    "progress_percentage": (last_progress / total_tasks) * 100 if total_tasks else 0,
                    "message": f"Timeout after {timeout} seconds"
                }
            
            # Get job status
            status = self.check_job_status(job_id)
            print(json.dumps(status, indent=2))
            
            # Handle error in status check
            if status.get("job_status") == "error":
                if progress_bar:
                    progress_bar.close()
                return {
                    "job_id": job_id,
                    "status": "error",
                    "completed_tasks": last_progress,
                    "total_subtasks": total_tasks,
                    "progress_percentage": (last_progress / total_tasks) * 100 if total_tasks else 0,
                    "message": status.get("message", "An error occurred while fetching job status")
                }
            
            # Extract progress information
            job_status = status.get("job_status", "unknown")
            completed_tasks = status.get("completed_tasks", 0)
            total_tasks = status.get("total_subtasks", 100)  # Default to 100 if unknown
            
            # Initialize progress bar
            if progress_bar is None:
                progress_bar = tqdm(total=total_tasks, desc="Training Progress")
            
            # Update progress bar if progress has changed
            if progress_bar and completed_tasks > last_progress:
                progress_bar.update(completed_tasks - last_progress)
                last_progress = completed_tasks
            
            # Calculate progress percentage
            progress_percentage = (completed_tasks / total_tasks) * 100 if total_tasks else 0
    
            # If job is completed, return response
            if job_status.lower() in ["completed", "failed", "error"]:
                if progress_bar:
                    if job_status.lower() == "completed" and last_progress < total_tasks:
                        progress_bar.update(total_tasks - last_progress)
                    progress_bar.close()
                
                return {
                    "job_id": job_id,
                    "status": job_status,
                    "completed_tasks": completed_tasks,
                    "total_subtasks": total_tasks,
                    "progress_percentage": progress_percentage,
                    "message": "Job completed successfully" if job_status == "completed" else f"Job ended with status: {job_status}"
                }
            
            # Wait before polling again
            time.sleep(polling_interval)
    
            # Return intermediate status for frontend polling
            return {
                "job_id": job_id,
                "status": job_status,
                "completed_tasks": completed_tasks,
                "total_subtasks": total_tasks,
                "progress_percentage": progress_percentage,
                "message": "Job is still in progress"
            }
    
    def create_job(self, estimator, dataset_id, train_params=None, wait_for_completion=False, polling_interval=5, timeout=3600):
        """
        Create a new job configuration without sending dataset data.
        
        Args:
            estimator: A scikit-learn estimator, GridSearchCV, or RandomizedSearchCV object
            dataset_id (str): ID of the dataset to be used for training (already on backend)
            train_params (dict): Additional training parameters (test_size, random_state, etc.)
            wait_for_completion (bool): Whether to wait for the job to complete
            polling_interval (int): Seconds between status checks if waiting
            timeout (int): Maximum seconds to wait if waiting
            
        Returns:
            dict: Response containing status and job details
        """
        # Generate a unique job ID
        job_id = str(uuid.uuid4())
        
        # Extract model details
        model_details = self.extract_model_details(estimator)
        
        # Set default train parameters if not provided
        if train_params is None:
            train_params = {}
        
        # Add default test_size if not specified
        if 'test_size' not in train_params and not isinstance(estimator, (GridSearchCV, RandomizedSearchCV)):
            train_params['test_size'] = 0.2
        
        # Create the job configuration
        job_config = {
            'job_id': job_id,
            'session_id': self.session_id,
            'dataset_id': dataset_id,
            'model_details': model_details,
            'train_params': train_params,
            'timestamp': pd.Timestamp.now().isoformat()
        }
        
        # Send to training API
        api_response = self.send_to_training_api(job_config)
        
        # Print results for debugging
        print(f"Created job configuration: {json.dumps(job_config, indent=2)}")
        print(f"API response: {api_response}")
        
        result = {
            'status': 'success' if api_response.get("status") != "error" else 'error', 
            'job_id': job_id,
            'job_config': job_config,
            'api_response': api_response
        }
        
        # Wait for job completion if requested
        if wait_for_completion and api_response.get("status") != "error":
            print(f"Waiting for job {job_id} to complete...")
            final_status = self.wait_for_completion(job_id, polling_interval, timeout)
            result['final_status'] = final_status
        
        return result

In [192]:
# def create_subtasks(job_config):
#     """
#     Create subtasks for distributed processing based on the job configuration.
    
#     Args:
#         job_config (dict): The job configuration from create_job()
        
#     Returns:
#         list: List of subtask configurations
#     """
#     job_id = job_config['job_id']
#     session_id = job_config['session_id']
#     dataset_id = job_config['dataset_id']
#     model_details = job_config['model_details']
#     train_params = job_config.get('train_params', {})
    
#     subtasks = []
    
#     # If this is a search job (GridSearchCV or RandomizedSearchCV)
#     if 'search_type' in model_details:
#         search_type = model_details['search_type']
#         base_model_type = model_details['model_type']
#         base_params = model_details['hyperparameters']['base_estimator_params']
#         cv_params = model_details['hyperparameters'].get('cv_params', {})
#         search_params = model_details['hyperparameters'].get('search_params', {})
        
#         # Get CV value for train/test splitting
#         cv = cv_params.get('cv', 5)
        
#         # For GridSearchCV, create one subtask per parameter combination
#         if search_type == 'GridSearchCV':
#             param_grid = search_params.get('param_grid', {})
#             param_combinations = list(ParameterGrid(param_grid))
            
#             for i, params in enumerate(param_combinations):
#                 # Combine base parameters with this specific combination
#                 full_params = {**base_params, **params}
                
#                 subtask_id = f"{job_id}-subtask-{i+1}"
#                 subtask = {
#                     'subtask_id': subtask_id,
#                     'job_id': job_id,
#                     'session_id': session_id,
#                     'dataset_id': dataset_id,
#                     'model_type': base_model_type,
#                     'parameters': full_params,
#                     'train_params': {
#                         'cv': cv,  # Use CV value for validation
#                         **train_params
#                     }
#                 }
#                 subtasks.append(subtask)
        
#         # For RandomizedSearchCV, create requested number of random combinations
#         elif search_type == 'RandomizedSearchCV':
#             param_distributions = search_params.get('param_distributions', {})
#             n_iter = search_params.get('n_iter', 10)
#             random_state = cv_params.get('random_state', 42)
            
#             # Generate random parameter combinations
#             param_combinations = list(ParameterSampler(
#                 param_distributions, n_iter, random_state=random_state))
            
#             for i, params in enumerate(param_combinations):
#                 # Combine base parameters with this specific combination
#                 full_params = {**base_params, **params}
                
#                 subtask_id = f"{job_id}-subtask-{i+1}"
#                 subtask = {
#                     'subtask_id': subtask_id,
#                     'job_id': job_id,
#                     'session_id': session_id,
#                     'dataset_id': dataset_id,
#                     'model_type': base_model_type,
#                     'parameters': full_params,
#                     'train_params': {
#                         'cv': cv,  # Use CV value for validation
#                         **train_params
#                     }
#                 }
#                 subtasks.append(subtask)
    
#     # For regular estimator, create a single subtask
#     else:
#         subtask_id = f"{job_id}-subtask-1"
#         subtask = {
#             'subtask_id': subtask_id,
#             'job_id': job_id,
#             'session_id': session_id,
#             'dataset_id': dataset_id,
#             'model_type': model_details['model_type'],
#             'parameters': model_details['hyperparameters'],
#             'train_params': train_params
#         }
#         subtasks.append(subtask)
    
#     return subtasks


# def save_subtasks_to_redis(subtasks, redis_config=None):
#     """
#     Save subtasks to Redis under session_id -> job_id -> subtasks structure.
    
#     Args:
#         subtasks (list): List of subtask configurations
#         redis_config (dict): Redis connection configuration
        
#     Returns:
#         dict: Result of the operation
#     """
#     if not subtasks:
#         return {"status": "error", "message": "No subtasks to save"}
    
#     # Default Redis configuration
#     if redis_config is None:
#         redis_config = {
#             'host': 'localhost',
#             'port': 6379,
#             'db': 0,
#             'password': None
#         }
    
#     try:
#         # Connect to Redis
#         r = redis.Redis(
#             host=redis_config['host'],
#             port=redis_config['port'],
#             db=redis_config['db'],
#             password=redis_config['password'],
#             decode_responses=True
#         )
        
#         # Get session_id and job_id from the first subtask
#         session_id = subtasks[0]['session_id']
#         job_id = subtasks[0]['job_id']
        
#         # Create key structure
#         session_key = f"ml:sessions:{session_id}"
#         job_key = f"{session_key}:jobs:{job_id}"
        
#         # Store job metadata
#         r.hset(job_key, "total_subtasks", len(subtasks))
#         r.hset(job_key, "completed_subtasks", 0)
#         r.hset(job_key, "status", "pending")
#         r.hset(job_key, "created_at", pd.Timestamp.now().isoformat())
        
#         # Add job to session list
#         r.sadd(f"{session_key}:jobs", job_id)
        
#         # Save each subtask
#         for subtask in subtasks:
#             subtask_id = subtask['subtask_id']
#             subtask_key = f"{job_key}:subtasks:{subtask_id}"
            
#             # Convert subtask to JSON
#             subtask_json = json.dumps(subtask, default=str)
            
#             # Store subtask
#             r.set(subtask_key, subtask_json)
            
#             # Add to pending subtasks list
#             r.rpush(f"{job_key}:pending_subtasks", subtask_id)
            
#             # Add to subtasks set
#             r.sadd(f"{job_key}:subtasks", subtask_id)
        
#         return {
#             "status": "success",
#             "message": f"Saved {len(subtasks)} subtasks to Redis",
#             "session_id": session_id,
#             "job_id": job_id
#         }
    
#     except redis.RedisError as e:
#         return {"status": "error", "message": f"Redis error: {str(e)}"}
    
#     except Exception as e:
#         return {"status": "error", "message": f"Error saving to Redis: {str(e)}"}

In [228]:
def example_usage():
    import requests

    session_id = ""
    dataset_id = "iris"
    columns = ['sepal_length','sepal_width','petal_length','petal_width']
    target = 'species'
    url = "http://127.0.0.1:5001/create_session"  # Replace with the actual API URL

# Make a GET request to obtain the session ID
    response = requests.post(url)

# Check if the request was successful
    if response.status_code == 200 or 201:
        session_id = response.json().get("session_id")  # Adjust key if needed
        print(f"Session ID: {session_id}")
    else:
        print(f"Failed to get session ID: {response.status_code}, {response.text}")
        
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.linear_model import LogisticRegression
    from scipy.stats import randint, uniform
    
    
    # Initialize the task manager with API details
    task_manager = MLTaskManager(
        api_url=f"http://127.0.0.1:5001",
        session_id=session_id,
        api_key="your-api-key-here",
    )
    
    # Example 1: Create a single model job
    # rf = RandomForestClassifier(n_estimators=100, max_depth=5)
    # job_response = task_manager.create_job(
    #     rf, 
    #     dataset_id="iris",
    #     train_params={
    #         'test_size': 0.25,
    #         'random_state': 42,
    #         'feature_columns': columns,
    #         'target_column': target
    #     }
    # )
    
    # # Create subtasks for this job
    # subtasks = create_subtasks(job_response['job_config'])
    # print(f"Created {len(subtasks)} subtasks")
    # print(json.dumps(subtasks, indent=2))
    
    # Save subtasks to Redis
    # redis_result = save_subtasks_to_redis(
    #     subtasks,
    #     redis_config={
    #         'host': 'redis-server.example.com',
    #         'port': 6379,
    #         'db': 0,
    #         'password': 'your-redis-password'
    #     }
    # )
    # print(f"Redis result: {redis_result}")
    
    # Example 2: Create a GridSearchCV job
    param_grid = {
        'C': [0.1, 1.0, 10.0, 100],
        'solver': ['liblinear', 'lbfgs']
    }
    lr = LogisticRegression()
    grid_search = GridSearchCV(lr, param_grid, cv=5)
    
    job_response = task_manager.create_job(
        grid_search, 
        dataset_id=dataset_id,
        train_params={
            'random_state': 42,
            'feature_columns': columns,
            'target_column': target
        },
        # wait_for_completion=True
    )
    
    # Create and save subtasks for GridSearchCV
    subtasks = create_subtasks(job_response['job_config'])
    print(f"Created {len(subtasks)} subtasks for GridSearchCV")
    print(json.dumps(subtasks, indent=2))
    
    # redis_result = save_subtasks_to_redis(subtasks)
    # print(f"Redis result: {redis_result}")

    # lr = LogisticRegression()
    # random_search = RandomizedSearchCV(lr, param_grid, cv=5)
    
    # job_response = task_manager.create_job(
    #     random_search, 
    #     dataset_id=dataset_id,
    #     train_params={
    #         'random_state': 42,
    #         'feature_columns': columns,
    #         'target_column': target
    #     }
    # )
    
    # # Create and save subtasks for GridSearchCV
    # subtasks = create_subtasks(job_response['job_config'])
    # print(f"Created {len(subtasks)} subtasks for RandomizedSearch")
    # print(json.dumps(subtasks, indent=2))
    
    # redis_result = save_subtasks_to_redis(subtasks)
    # print(f"Redis result: {redis_result}")


if __name__ == "__main__":
    example_usage()

Session ID: 9db8c432-b7d6-406a-97ca-3d5315910d2e
Created job configuration: {
  "job_id": "fcc21e56-a466-4d8c-b7da-32179e4e0f49",
  "session_id": "9db8c432-b7d6-406a-97ca-3d5315910d2e",
  "dataset_id": "iris",
  "model_details": {
    "model_type": "LogisticRegression",
    "search_type": "GridSearchCV",
    "hyperparameters": {
      "base_estimator_params": {
        "C": 1.0,
        "class_weight": null,
        "dual": false,
        "fit_intercept": true,
        "intercept_scaling": 1,
        "l1_ratio": null,
        "max_iter": 100,
        "multi_class": "auto",
        "n_jobs": null,
        "penalty": "l2",
        "random_state": null,
        "solver": "lbfgs",
        "tol": 0.0001,
        "verbose": 0,
        "warm_start": false
      },
      "search_params": {
        "param_grid": {
          "C": [
            0.1,
            1.0,
            10.0,
            100
          ],
          "solver": [
            "liblinear",
            "lbfgs"
          ]
      

In [25]:
!pip install confluent_kafka

Collecting confluent_kafka
  Downloading confluent_kafka-2.8.2-cp312-cp312-macosx_11_0_arm64.whl.metadata (22 kB)
Downloading confluent_kafka-2.8.2-cp312-cp312-macosx_11_0_arm64.whl (3.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m16.6 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: confluent_kafka
Successfully installed confluent_kafka-2.8.2


In [65]:
from confluent_kafka.admin import AdminClient

conf = {'bootstrap.servers': '127.0.0.1:9092'}
admin_client = AdminClient(conf)

try:
    topics = admin_client.list_topics(timeout=5)
    print("Kafka Connection Successful! Available Topics:", topics.topics.keys())
except Exception as e:
    print("Kafka Connection Failed:", e)

Kafka Connection Successful! Available Topics: dict_keys(['result', 'results', 'train', '__consumer_offsets'])


In [49]:
from kafka import KafkaAdminClient, KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError

KAFKA_BROKER = "127.0.0.1:9092"  # Change this if your Kafka runs on a different host/IP



# Function to test Kafka Producer


# Function to test Kafka Consumer
def test_consumer():
    try:
        consumer = KafkaConsumer(
            "result",  # Test topic
            bootstrap_servers=KAFKA_BROKER,
            auto_offset_reset="earliest",
            enable_auto_commit=True,
            consumer_timeout_ms=5000  # Wait for messages for 5 seconds
        )
        print("Consumer Test Passed: Connected successfully.")
        return consumer
    except KafkaError as e:
        print("Kafka Consumer Failed:", e)
        return False

# Run all tests
if __name__ == "__main__":

    # test_producer()
    consumer = test_consumer()


    for message in consumer:

        print(f"Processing message: {message.value}")
        result = message.value


Consumer Test Passed: Connected successfully.


KeyboardInterrupt: 

In [220]:
import redis
import json

# Connect to Redis
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)

# List of keys to check
keys_to_check = [
    # "active_sessions:6fac2199-edda-4a11-b209-125197a3140b:jobs:e8ede610-5a27-423e-956e-b8fb11215837:subtasks:e8ede610-5a27-423e-956e-b8fb11215837-subtask-4",
    # "active_sessions:6fac2199-edda-4a11-b209-125197a3140b:jobs:e8ede610-5a27-423e-956e-b8fb11215837:subtasks",
    # "active_sessions:6fac2199-edda-4a11-b209-125197a3140b:jobs:e8ede610-5a27-423e-956e-b8fb11215837:pending_subtasks",
    # "jobs:e8ede610-5a27-423e-956e-b8fb11215837:completion",
    # "jobs:e8ede610-5a27-423e-956e-b8fb11215837:result",
    # "jobs:e8ede610-5a27-423e-956e-b8fb11215837:status",
    "active_sessions:6fac2199-edda-4a11-b209-125197a3140b:jobs:e8ede610-5a27-423e-956e-b8fb11215837"

]

# Fetch and display data for each key
for key in keys_to_check:
    value = redis_client.hget("status",key)  # Try to get the key value
    if value:
        try:
            value = json.loads(value)  # Attempt to parse JSON if applicable
        except json.JSONDecodeError:
            pass  # Keep value as is if it's not JSON

        print(f"Key: {key}\nValue: {value}\n{'='*50}")
    else:
        print(f"Key: {key} does not exist or has no value.\n{'='*50}")


ConnectionError: Error 61 connecting to localhost:6379. Connection refused.