In [1]:
import os
import json
import pandas as pd
import re

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

class WorkflowSynthesizer:
    """
    Class for synthesizing a workflow from a dictionary of jobs. 
    The jobs and the workflow are represented as dataframes with time-series data.
    """

    def __init__(self):
        """
        Initialize a new instance of the WorkflowSynthesizer class.
        """
        self.workflow = pd.DataFrame()

    def synthetize(self, data_for_jobs):
        """
        Synthesize a workflow from the provided jobs.

        Args:
            data_for_jobs (dict): A dictionary where keys are job names 
            and values are DataFrames with columns 'timestamp', 'bytesRead', 
            and 'bytesWritten'.
        """
        min_timestamp = min(df['timestamp'].min() for df in data_for_jobs.values())
        max_timestamp = max(df['timestamp'].max() for df in data_for_jobs.values())
        
        # Create a DataFrame with a uniform timestamp range
        synthetic_timestamps = np.arange(min_timestamp, max_timestamp + 1, 5000)
        synthetic_df = pd.DataFrame({'timestamp': synthetic_timestamps})
        
        for job_name, job_df in data_for_jobs.items():
            # Merge with the synthetic DataFrame to get uniform timestamps
            merged_df = pd.merge(synthetic_df, job_df, on='timestamp', how='left').fillna(0)
            
            # Sum up the bytesRead and bytesWritten across all jobs
            if self.workflow.empty:
                self.workflow = merged_df
            else:
                self.workflow['bytesRead'] += merged_df['bytesRead']
                self.workflow['bytesWritten'] += merged_df['bytesWritten']

    def to_dict(self):
        """
        Convert the synthesized workflow to a dictionary format.

        Returns:
            dict: The synthesized workflow in dictionary format.
        """
        output = {}
        output['timestamp'] = self.workflow['timestamp'].tolist()
        output['bytesRead'] = self.workflow['bytesRead'].tolist()
        output['bytesWritten'] = self.workflow['bytesWritten'].tolist()
        return output
    
    def plot_workflow(self):
        """
        Plot the time series of bytesRead and bytesWritten for the synthesized workflow.
        """
        plt.figure(figsize=(14, 6))
        plt.plot(self.workflow['timestamp'], self.workflow['bytesRead'], label='Bytes Read')
        plt.plot(self.workflow['timestamp'], self.workflow['bytesWritten'], label='Bytes Written')
        plt.xlabel('Timestamp')
        plt.ylabel('Bytes')
        plt.title('Bytes Read and Written Over Time')
        plt.legend()
        plt.grid(True)
        plt.show()

# Example usage
data_for_jobs = {
    'job1': pd.DataFrame({'timestamp': [1687960785000, 1687960790000, 1687960795000], 'bytesRead': [10, 20, 30], 'bytesWritten': [5, 15, 25]}),
    'job2': pd.DataFrame({'timestamp': [1687961145000, 1687961150000, 1687961155000], 'bytesRead': [40, 50, 60], 'bytesWritten': [35, 45, 55]})
}

synthesizer = WorkflowSynthesizer()
synthesizer.synthetize(data_for_jobs)
print(synthesizer.workflow)
print(synthesizer.to_dict())
#synthesizer.plot_workflow()


        timestamp  bytesRead  bytesWritten
0   1687960785000       10.0           5.0
1   1687960790000       20.0          15.0
2   1687960795000       30.0          25.0
3   1687960800000        0.0           0.0
4   1687960805000        0.0           0.0
..            ...        ...           ...
70  1687961135000        0.0           0.0
71  1687961140000        0.0           0.0
72  1687961145000       40.0          35.0
73  1687961150000       50.0          45.0
74  1687961155000       60.0          55.0

[75 rows x 3 columns]
{'timestamp': [1687960785000, 1687960790000, 1687960795000, 1687960800000, 1687960805000, 1687960810000, 1687960815000, 1687960820000, 1687960825000, 1687960830000, 1687960835000, 1687960840000, 1687960845000, 1687960850000, 1687960855000, 1687960860000, 1687960865000, 1687960870000, 1687960875000, 1687960880000, 1687960885000, 1687960890000, 1687960895000, 1687960900000, 1687960905000, 1687960910000, 1687960915000, 1687960920000, 1687960925000, 16879609300

In [3]:
def camel_case_to_snake_case(name):
    """
    Convert a string from CamelCase to snake_case.
    
    Parameters:
        name (str): The string in CamelCase.
        
    Returns:
        str: The string in snake_case.
    """
    return re.sub('([a-z0-9])([A-Z])', r'\1_\2', re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)).lower()

# Update the get_column_names function to include the renamed columns using the new prefix strategy
def get_column_names(json_file_name):
    """
    Given a JSON file name, this function returns a list of relevant column names.
    
    Parameters:
        json_file_name (str): The name of the JSON file.
        
    Returns:
        list: A list of strings representing column names.
    """
    # Extract the prefix from the file name and convert it to snake_case
    prefix = camel_case_to_snake_case(json_file_name.split('.')[0])
    
    column_name_mapping = {
        "accessPatternRead.json": ["timestamp", f"{prefix}_random", f"{prefix}_sequential", f"{prefix}_stride", f"{prefix}_unclassified"],
        "accessPatternWrite.json": ["timestamp", f"{prefix}_random", f"{prefix}_sequential", f"{prefix}_stride", f"{prefix}_unclassified"],
        "ioSizesRead.json": ["timestamp", f"{prefix}_0B_16B", f"{prefix}_16B_4KB", f"{prefix}_4KB_128KB", f"{prefix}_128KB_1MB", f"{prefix}_1MB_16MB", f"{prefix}_16MB_128MB", f"{prefix}_128MB_+"],
        "ioSizesWrite.json": ["timestamp", f"{prefix}_0B_16B", f"{prefix}_16B_4KB", f"{prefix}_4KB_128KB", f"{prefix}_128KB_1MB", f"{prefix}_1MB_16MB", f"{prefix}_16MB_128MB", f"{prefix}_128MB_+"],
        "operationsCount.json": ["timestamp", f"{prefix}_read", f"{prefix}_write"],
        "volume.json": ["timestamp", "bytesRead", "bytesWritten"]
    }
    
    return column_name_mapping.get(json_file_name, [])

# Test the above function
test_file_names = ["accessPatternRead.json", "ioSizesWrite.json", 
                   "operationsCount.json", "volume.json"]
for name in test_file_names:
    print(f"For {name}, the columns are: {get_column_names(name)}")


For accessPatternRead.json, the columns are: ['timestamp', 'access_pattern_read_random', 'access_pattern_read_sequential', 'access_pattern_read_stride', 'access_pattern_read_unclassified']
For ioSizesWrite.json, the columns are: ['timestamp', 'io_sizes_write_0B_16B', 'io_sizes_write_16B_4KB', 'io_sizes_write_4KB_128KB', 'io_sizes_write_128KB_1MB', 'io_sizes_write_1MB_16MB', 'io_sizes_write_16MB_128MB', 'io_sizes_write_128MB_+']
For operationsCount.json, the columns are: ['timestamp', 'operations_count_read', 'operations_count_write']
For volume.json, the columns are: ['timestamp', 'bytesRead', 'bytesWritten']


In [5]:
import os


def is_file_extension(filename, expected_extension):
    """
    Check if the file has the expected extension.
    
    Parameters:
        filename (str): The name of the file.
        expected_extension (str): The expected file extension (without the dot).
        
    Returns:
        bool: True if the file has the expected extension, False otherwise.
    """
    _, file_extension = os.path.splitext(filename)
    
    return file_extension == f".{expected_extension}"

# Tests
assert is_file_extension("example.json", "json") == True
assert is_file_extension("example.txt", "json") == False
assert is_file_extension("example.json", "txt") == False
assert is_file_extension("example.JSON", "json") == False  # Case-sensitive
assert is_file_extension("example", "json") == False       # No extension

print("All tests passed!")



All tests passed!


In [9]:
# Initialize an empty dictionary to hold the data for each job
data_for_jobs = {}

# Loop through all items in the current directory
for wf_folder in os.listdir():
    # Check if the item is a folder and if its name is numeric (i.e., a job number)
    print(f"----\nProcessing workflow : {wf_folder}...")
    if os.path.isdir(wf_folder):
        # Initialize a dictionary to hold the data for this specific job
        data_for_this_job = {}
        # Loop through all JSON files in this folder
        for job_folder in os.listdir(wf_folder):
            is_folder = os.path.isdir(os.path.join(os.getcwd(), wf_folder, job_folder))
            print(f"Found job_folder: {job_folder}, is_folder:{is_folder}")
            # Ensure that we are dealing with job folder
            if is_folder:
                print(f"job_folder: {job_folder}")
                # Initialize a DataFrame to hold the data for this specific job
                df_for_this_job = pd.DataFrame()
                print(f"    Browsing {job_folder}...")
                for json_file in os.listdir(os.path.join(wf_folder, job_folder)):
                    if is_file_extension(json_file, "json"):
                        print(f"      Processing {json_file}...")
                        # Construct the full path to the JSON file
                        json_file_path = os.path.join(os.getcwd(), wf_folder, job_folder, json_file)
                        
                        # Read the JSON file into a list of lists
                        with open(json_file_path, 'r') as f:
                            json_data = json.load(f)
                        
                        # Create a temporary DataFrame from the JSON data
                        df_temp = pd.DataFrame(json_data, columns=
                                            get_column_names(json_file))
                        
                        # Merge the temporary DataFrame into the DataFrame for this job, based on the 'timestamp' column
                        if df_for_this_job.empty:
                            df_for_this_job = df_temp
                        else:
                            df_for_this_job = pd.merge(df_for_this_job, 
                                                       df_temp, 
                                                       on='timestamp', 
                                                       how='outer')
                            
                    # Save the DataFrame for this job to a CSV file
                    csv_file_path = os.path.join(os.getcwd(), wf_folder, f"{job_folder}.csv")
                    
                    #df_for_this_job.to_csv(csv_file_path, index=False)
                    print(f"            Saving here... {job_folder}.csv")
                    
                    # Add this DataFrame to the dictionary
                    data_for_jobs[job_folder] = df_for_this_job

            else:
                print(f"Skipping {job_folder}...")
# synthesizer = WorkflowSynthesizer()
# synthesizer.synthetize(data_for_jobs)
# print(synthesizer.workflow)
# print(synthesizer.to_dict())
# synthesizer.plot_workflow()


def list_and_classify_directory_contents(directory_path):
    """
    List and classify the contents of a given directory into folders and files.
    
    Parameters:
        directory_path (str): The path to the directory to list.
        
    Returns:
        None: Prints the classification results.
    """
    for item in os.listdir(directory_path):
        item_path = os.path.join(directory_path, item)
        
        if os.path.isdir(item_path):
            print(f"{item} -> Folder")
        elif os.path.isfile(item_path):
            print(f"{item} -> File")
        else:
            print(f"{item} -> Unknown")

# Usage example
# Replace 'your_directory_path_here' with the path of the directory you want to list and classify.
# Uncomment the line below to run the function.
# list_and_classify_directory_contents('your_directory_path_here')




----
 Processing workflow : ECMWF-649c3c40cc9340246f87cb58...
Found job_folder: 371912, is_folder:True
job_folder: 371912
    Browsing 371912...
      Processing ioSizesWrite.json...
            Saving here... 371912.csv
      Processing volume.json...
            Saving here... 371912.csv
      Processing ioSizesRead.json...
            Saving here... 371912.csv
      Processing operationsCount.json...
            Saving here... 371912.csv
      Processing accessPatternWrite.json...
            Saving here... 371912.csv
      Processing accessPatternRead.json...
            Saving here... 371912.csv
Found job_folder: 371913, is_folder:True
job_folder: 371913
    Browsing 371913...
      Processing ioSizesWrite.json...
            Saving here... 371913.csv
      Processing volume.json...
            Saving here... 371913.csv
      Processing ioSizesRead.json...
            Saving here... 371913.csv
      Processing operationsCount.json...
            Saving here... 371913.csv
      Pro