In [21]:
with open('libraries.py') as f:
    code = f.read()
exec(code)

with open('functions.py') as f:
    code = f.read()
exec(code)

In [22]:
# determine user
user = getpass.getuser()
if user == 'peymansh':
    main_folder_path = '/Users/peymansh/Dropbox (MIT)/Research/AI and Occupations/ai-exposure'
    data_path = f'{main_folder_path}/output'

In [23]:
def get_tasks(onet_data_path,
              occupation_code):

    # Load the data
    onet = pd.read_csv(onet_data_path)
    onet = onet.sort_values(by=['year', 'occ_code', 'occ_title', 'task_id'])
    onet = onet[onet['year'] == 2023].reset_index(drop=True)

    # Get list of tasks
    my_df = onet[(onet.occ_code == f'{occupation_code}') & (onet.year == 2023)]
    tasks = my_df['task'].unique().tolist()
    return tasks

### Generate all possible partition schemes for the set of tasks (ignoring structre of the DAG)

In [24]:
from itertools import combinations

def partitions(set_):
    if not set_:
        yield []
        return
    for i in range(1, len(set_) + 1):
        for part in combinations(set_, i):
            remaining = set(set_) - set(part)
            if not remaining:
                yield [list(part)]
            else:
                for b in partitions(list(remaining)):
                    yield [list(part)] + b

def generate_unique_partitions(numbers):
    all_partitions = set()
    for partition in partitions(numbers):
        # Create a frozenset of frozensets to make each partition hashable and order-independent
        partition_set = frozenset(frozenset(part) for part in partition)
        all_partitions.add(partition_set)
    
    # Convert the frozensets back to lists for the final output
    unique_partitions = [list(map(list, partition)) for partition in all_partitions]

    # Sort elements
    unique_partitions = sorted([sorted(x) for x in unique_partitions], key=len)
    return unique_partitions

### Check if partition scheme is "valid" (i.e., if its non-singleton partitions are a connected graph)

In [25]:
def is_connected(matrix):
    # Number of nodes in the matrix
    num_nodes = matrix.shape[0]
    
    # Visited array to keep track of visited nodes
    visited = np.zeros(num_nodes, dtype=bool)
    
    # Helper function to perform DFS
    def dfs(node):
        visited[node] = True
        # Visit all the neighbors of the current node
        for neighbor in range(num_nodes):
            if matrix[node, neighbor] == 1 and not visited[neighbor]:
                dfs(neighbor)
            elif matrix[neighbor, node] == 1 and not visited[neighbor]:
                dfs(neighbor)
    
    # Start DFS from the first node (node 0)
    dfs(0)
    
    # If all nodes are visited, the matrix is connected
    return np.all(visited)


def validate_partition_using_connectedness(adjacency_matrix, tasks_list):
    # Return valid if Singleton
    if len(tasks_list) == 1:
        return True
    # Check if partition forms connected graph
    else:
        # Subset original adjacency matrix
        subset_matrix = adjacency_matrix[np.ix_(tasks_list, tasks_list)]

        # check if subset matrix is a connected graph
        subset_matrix_connected = is_connected(subset_matrix)

        # return true if connected and false otherwise
        return subset_matrix_connected

### Compute costs of all "valid" execution plans
#### New check for validity: automated cost of tasks in non-singleton partition must be less than human costs doing partition tasks separately

In [26]:
def get_partition_boundary(adjacency_matrix, partition):
    # create a matrix whose columns are nodes not in the partition and whose rows are nodes in the partition
    # (subset adjacency matrix to outgoing edges of partition nodes --i.e., rows-- and incoming edges of non-partition nodes --i.e., columns.)
    reduced_matrix = np.delete(adjacency_matrix, partition, axis=1) 
    reduced_matrix = reduced_matrix[partition, :]

    # find nodes in partition w/ an edge to non-partition nodes
    partition_boundary_tasks = [i for i in partition if np.any(reduced_matrix[partition.index(i), :])]

    return partition_boundary_tasks


def compute_partition_cost(adjacency_matrix, M_dict, A_dict, D_dict, AI_quality, partition):
    # initialize task_done_by_human as False
    # (only if partition is singleton and human cost <= automated cost partition is done manually)
    task_done_by_human = False

    # initialize partition boundary tasks as empty set []
    partition_boundary_tasks = []

    # if partition is a singleton 
    # pick minimum of human and management cost
    if len(partition) == 1:
        partition_is_valid = True # single-node partition is always valid

        # calculate human cost
        human_cost = sum(M_dict[key] for key in partition)

        # calculate management cost
        AI_cost = sum(A_dict[key] for key in partition)
        difficulty = sum(D_dict[key] for key in partition)
        management_cost = AI_cost * (AI_quality ** (-1 * difficulty))
        
        # pick the minimum of the two
        if human_cost < management_cost:
            partition_cost = human_cost
            task_done_by_human = True
        else:
            partition_cost = management_cost
            partition_boundary_tasks = partition
    

    # if partition not a singleton 
    # calculate management cost and return if partition passes a sanity check
    if len(partition) > 1:
        # calculate human cost
        human_cost = sum(M_dict[key] for key in partition)

        # calculate management cost
        # first get boundary tasks in partition
        partition_boundary_tasks = get_partition_boundary(adjacency_matrix, partition)

        # sanity check: no partition should have inner boundary of empty set
        if len(partition_boundary_tasks) == 0:
            # raise ValueError(f'Inner boundary of partition {partition} is empty set.')
            return 100000000, [], [], False
        
        # if partition has at least one boundary task calculate management cost
        # use boundary tasks for calculating management costs and partition tasks for difficulty
        AI_cost = sum(A_dict[key] for key in partition_boundary_tasks)
        difficulty = sum(D_dict[key] for key in partition)
        management_cost = AI_cost * (AI_quality ** (-1 * difficulty))

        # sanity check for partition validity: 
        # if human cost < management cost partition is invalid (should not have been formed)
        if human_cost < management_cost:
            partition_cost = 100000000 # (value doesn't matter)
            partition_is_valid = False
        else:
            partition_cost = management_cost
            partition_is_valid = True
            partition_boundary_tasks
    
    return partition_cost, partition_boundary_tasks, task_done_by_human, partition_is_valid


In [27]:
########## Random Thought: maybe better to sort valid_partitions on descending partition order to avoid recalculating single node partitions everytime? 
# tho the downside is that we have to first do the heavy calculations first...


def execute_plans(adjacency_matrix, valid_partitions, M_dict, A_dict, D_dict, alpha):
    execution_plan = []
    execution_plan_augmented_tasks = []
    execution_plan_human_tasks = []
    execution_plan_cost = []
    counter = 0
    for scheme in valid_partitions:
        # initialize scheme cost
        # and partitions that are done manually
        scheme_cost = 0
        augmented_tasks = []
        human_tasks = []
        
        for partition in scheme:
            # calculate partition cost 
            partition_cost, partition_boundary_tasks, task_done_by_human, partition_is_valid = compute_partition_cost(adjacency_matrix, M_dict, A_dict, D_dict, alpha, partition)
        
            # if (automated) partition is invalid ignore partition scheme
            # and stop calculating costs of further partitions
            if not partition_is_valid:
                break

            if task_done_by_human:
                human_tasks.append(partition)
            
            if not task_done_by_human:
                for boundary_task in partition_boundary_tasks:
                    augmented_tasks.append([boundary_task])

            # if (automated) partition passes sanity check
            # add this partition's cost to partition scheme cost
            scheme_cost += partition_cost
        
        # if stopped because an (automated) partition wasn't valid
        # ignore current partition scheme and continue
        if not partition_is_valid:
            continue
        
        # if partition scheme makes sense append costs
        execution_plan.append(scheme)
        execution_plan_augmented_tasks.append(augmented_tasks)
        execution_plan_human_tasks.append(human_tasks)
        execution_plan_cost.append(scheme_cost)

    return execution_plan, execution_plan_augmented_tasks, execution_plan_human_tasks, execution_plan_cost

### Combine steps into a function to run a for loop over

In [28]:
def DAG_costMin(input_path, output_path, unique_partitions, n=1000):
    # set alpha as AI quality metric
    epsilon = 1e-8
    alpha_list = np.linspace(epsilon, 1-epsilon, n).tolist()



    # read DAG
    dag_df = pd.read_csv(input_path)

    # remove edges if comment column labeled with "TriangleRemovedFlag" (edge is there for plotting purposes and is not part of the actual DAG)
    if 'comment' in dag_df.columns:
        dag_df = dag_df[~dag_df['comment'].str.endswith('TriangleRemovedFlag')]

    # get task stats
    tasks_stats = pd.read_csv(f'{occupation_folder}/{occupation}_taskStats.csv')



    
    # extract list of tasks and create a dictionary for indexing tasks
    tasks_list = tasks_stats['task'].unique()
    tasks_dict = {i: node for i, node in enumerate(tasks_list, start=0)}

    # create numpy array of adjacency matrix
    adjacency_matrix = np.zeros((len(tasks_list), len(tasks_list)), dtype=int)
    aux_dict = {value: key for key, value in tasks_dict.items()}
    for _, row in dag_df.iterrows():
        source_index = aux_dict[row['source']]
        target_index = aux_dict[row['target']]
        adjacency_matrix[source_index, target_index] = 1




    # add task_dict key and reset index
    aux_dict = {value: key for key, value in tasks_dict.items()}
    tasks_stats['dict_index'] = tasks_stats.apply(lambda row: aux_dict[row.task], axis=1)
    tasks_stats = tasks_stats.sort_values(by='dict_index')
    tasks_stats = tasks_stats.set_index('dict_index', drop=False)
    tasks_stats.index.name = None




    # create dictionaries for human cost, management cost, and difficulty
    M_dict = dict(zip(tasks_stats['dict_index'], tasks_stats['human_cost']))
    A_dict = dict(zip(tasks_stats['dict_index'], tasks_stats['management_cost']))
    D_dict = dict(zip(tasks_stats['dict_index'], tasks_stats['difficulty']))



    # Get valid partitioning schemes
    generate_valid_partitions_start = time.time()
    valid_partitions = []
    for scheme in unique_partitions:
        # Set valid partitions count to 0
        valid_partition_count = 0
        for partition in scheme:
            valid_partition = validate_partition_using_connectedness(adjacency_matrix, partition)
            if valid_partition:
                valid_partition_count += 1
        
        # If number of valid partitions within a partition scheme is equal to 
        # number of partitions in partition scheme then partition scheme is valid
        if valid_partition_count == len(scheme):
            valid_partitions.append(scheme)
    generate_valid_partitions_time = (time.time() - generate_valid_partitions_start)/60
    print(f"valid execution plans generation: {generate_valid_partitions_time:.2f} minutes")

    # Print stats
    print(f'Number of valid partitioning schemes given DAG structure: {len(valid_partitions)}')


    
    # run once to get stat
    execution_plan, execution_plan_augmented_tasks, execution_plan_human_tasks, execution_plan_cost = execute_plans(adjacency_matrix, valid_partitions, M_dict, A_dict, D_dict, 0.5)
    print(f'Number of valid execution plans: {len(execution_plan)}')




    random.seed(1)
    minimum_cost_list = []
    number_of_optimal_schemes_list = []
    optimal_execution_plan_list = []
    optimal_plan_augmentedTasks_list = []
    optimal_plan_augmentedTasks_count_list = []
    optimal_plan_humanTasks_list = []
    optimal_plan_humanTasks_count_list = []
    for counter, alpha in enumerate(alpha_list):
        # if counter % 100 == 0:
        #     print(f'-- Running {counter}th alpha --')

        # get list of execution plans and costs for this alpha
        execution_plan, execution_plan_augmented_tasks, execution_plan_human_tasks, execution_plan_cost = execute_plans(adjacency_matrix, valid_partitions, M_dict, A_dict, D_dict, alpha)

        # choose minimum
        minimum_cost = min(execution_plan_cost)
        minimum_cost_index = [index for index, value in enumerate(execution_plan_cost) if value == minimum_cost]

        # in rare cases there are more than one optimal plan
        if len(minimum_cost_index) > 1:
            optimal_execution_scheme = [execution_plan[index] for index in minimum_cost_index]
            optimal_execution_human_tasks = [execution_plan_human_tasks[index] for index in minimum_cost_index]
            optimal_execution_augmented_tasks = [execution_plan_augmented_tasks[index] for index in minimum_cost_index]
            # print(alpha)
            # print(optimal_execution_scheme)
            # print(optimal_execution_human_tasks)
            # print(f'Multiple Execution Plans for alpha={alpha}')
        else:
            optimal_execution_scheme = execution_plan[minimum_cost_index[0]]
            optimal_execution_human_tasks = execution_plan_human_tasks[minimum_cost_index[0]]
            optimal_execution_augmented_tasks = execution_plan_augmented_tasks[minimum_cost_index[0]]

        # append lists
        minimum_cost_list.append(minimum_cost)
        number_of_optimal_schemes_list.append(len(minimum_cost_index))
        optimal_execution_plan_list.append(optimal_execution_scheme)
        optimal_plan_augmentedTasks_list.append(optimal_execution_augmented_tasks)
        optimal_plan_augmentedTasks_count_list.append(len(optimal_execution_augmented_tasks))
        optimal_plan_humanTasks_list.append(optimal_execution_human_tasks)
        optimal_plan_humanTasks_count_list.append(len(optimal_execution_human_tasks))

    # save outputs
    output_df = pd.DataFrame({
        'alpha': alpha_list,
        'optimal_schemes_count': number_of_optimal_schemes_list,
        'cost': minimum_cost_list,
        'optimal_scheme': optimal_execution_plan_list,
        'optimal_scheme_augmented_tasks': optimal_plan_augmentedTasks_list,
        'augmented_tasks_count': optimal_plan_augmentedTasks_count_list,
        'optimal_scheme_human_tasks': optimal_plan_humanTasks_list,
        'human_tasks_count': optimal_plan_humanTasks_count_list
    })
    output_df.to_csv(output_path, index=False)

## Main Code

In [29]:
import time
start_time = time.time()

# number of alphas to sweep over
n = 100
print(f'Number of alphas to sweept over: {n}')

onet_data_path = f'{data_path}/data/onet_occupations_yearly.csv'

occupation_list = ['pileDriverOperators', 'dredgeOperators', 'gradersAndSortersForAgriculturalProducts',
                   'insuranceUnderwriters', 'insuranceAppraisersForAutoDamage', 'floorSandersAndFinishers', 
                   'reinforcingIronAndRebarWorkers', 'travelAgents', 'dataEntryKeyer', 
                   'athletesAndSportsCompetitors', 'audiovisualEquipmentInstallerAndRepairers', 'hearingAidSpecialists', 
                   'personalCareAides', 'proofreadersAndCopyMarkers', 'chiropractors', 
                   'shippingReceivingAndInventoryClerks', 'cooksShortOrder', 'orthodontists',
                   'subwayAndStreetcarOperators', 'packersAndPackagersHand', 'hoistAndWinchOperators', 
                   'forgingMachineSettersOperatorsAndTenders', 'avionicsTechnicians', 'dishwashers', 
                   'dispatchersExceptPoliceFireAndAmbulance', 'familyMedicinePhysicians', 'MachineFeedersAndOffbearers'
                   ]

# occupation_list = ['travelAgents', 'insuranceUnderwriters', 'pileDriverOperators'
#                    ]



# occupation_list = ['pileDriverOperators', 'dredgeOperators', 'gradersAndSortersForAgriculturalProducts',
#                    'insuranceUnderwriters', 'insuranceAppraisersForAutoDamage', 'floorSandersAndFinishers', 
#                    'reinforcingIronAndRebarWorkers', 'travelAgents', 'dataEntryKeyer', 
#                    'athletesAndSportsCompetitors'
#                    ]


Number of alphas to sweept over: 100


In [30]:
num_tasks_current = 0
num_tasks_previous = 0
for occupation in occupation_list:
    print(f'\n---------------------- Running: {occupation} ----------------------')
    occupation_start_time = time.time()

    # generate occupation-specific strings
    GPT_input_occupation, plot_title_occupation, occupation_code, occupation_folder = pick_occupation(occupation)


    # Get occupation tasks to create all possible partitions
    tasks = get_tasks(onet_data_path, occupation_code)
    num_tasks_current = len(tasks)
    print(f'Number of non-target tasks: {num_tasks_current}')

    if num_tasks_current < 10:
        n = 100
    else: 
        n = 100

    # if number of tasks in new occupation has increased generate new set of possible partitions
    if num_tasks_current != num_tasks_previous:
        unique_partitions_start_time = time.time()

        # Generate list of numbers for non-"Target" tasks in occupation
        tasks_list_numbers = list(range(num_tasks_current))

        # Generate all possible partitioning schemes
        unique_partitions = generate_unique_partitions(tasks_list_numbers)
        unique_partitions_end_time = time.time()

        unique_partitions_execution_time = unique_partitions_end_time - unique_partitions_start_time
        print(f'Time to generate all possible partition schemes: {unique_partitions_execution_time:.2f} seconds')
    
    # update num_tasks_previous for next iteration and print stats
    num_tasks_previous = num_tasks_current
    print(f'Number of all possible partitioning schemes: {len(unique_partitions)}')


    # Manual DAG
    M_input_path = f'{occupation_folder}/{occupation}_M_DAG_df.csv'
    M_output_path = f'{occupation_folder}/{occupation}_costMin_M.csv'

    # First Last Task DAG
    N_input_path = f'{occupation_folder}/{occupation}_N_GPT_DAG_df.csv'
    N_output_path = f'{occupation_folder}/{occupation}_costMin_N.csv'

    # First Last Task DAG
    CN_input_path = f'{occupation_folder}/{occupation}_CN_GPT_DAG_df.csv'
    CN_output_path = f'{occupation_folder}/{occupation}_costMin_CN.csv'

    # First Last Task DAG
    FLT_input_path = f'{occupation_folder}/{occupation}_FLT_GPT_DAG_df.csv'
    FLT_output_path = f'{occupation_folder}/{occupation}_costMin_FLT.csv'

    # Conditioned First Last Task DAG
    CFLT_input_path = f'{occupation_folder}/{occupation}_CFLT_GPT_DAG_df.csv'
    CFLT_output_path = f'{occupation_folder}/{occupation}_costMin_CFLT.csv'

    # Partitioned DAG
    P_input_path = f'{occupation_folder}/{occupation}_P_GPT_DAG_df.csv'
    P_output_path = f'{occupation_folder}/{occupation}_costMin_P.csv'

    # Conditioned Partitioned DAG
    CP_input_path = f'{occupation_folder}/{occupation}_CP_GPT_DAG_df.csv'
    CP_output_path = f'{occupation_folder}/{occupation}_costMin_CP.csv'
    


    # create list of all DAGs
    if occupation in ['travelAgents', 'insuranceUnderwriters', 'pileDriverOperators']:
        DAG_indicator_list = ['Manual DAG', 'Naive DAG', 'Conditioned Naive DAG', 'First-Last Task DAG', 'Conditioned First-Last Task DAG', 'Partitioned DAG', 'Conditioned Partitioned DAG']
        input_paths_list = [M_input_path, N_input_path, CN_input_path, FLT_input_path, CFLT_input_path, P_input_path, CP_input_path]
        output_paths_list = [M_output_path, N_output_path, CN_output_path, FLT_output_path, CFLT_output_path, P_output_path, CP_output_path]
    else:
        DAG_indicator_list = ['Naive DAG', 'Conditioned Naive DAG', 'First-Last Task DAG', 'Conditioned First-Last Task DAG', 'Partitioned DAG', 'Conditioned Partitioned DAG']
        input_paths_list = [N_input_path, CN_input_path, FLT_input_path, CFLT_input_path, P_input_path, CP_input_path]
        output_paths_list = [N_output_path, CN_output_path, FLT_output_path, CFLT_output_path, P_output_path, CP_output_path]


    for DAG_indicator, input_path, output_path in zip(DAG_indicator_list, input_paths_list, output_paths_list):
        print(f'\n-------Running: {occupation} - {DAG_indicator}-------')
        
        DAG_start_time = time.time()
        DAG_costMin(input_path, output_path, unique_partitions, n)
        DAG_end_time = time.time()

        DAG_execution_time = DAG_end_time - DAG_start_time
        print(f"\n{occupation} {DAG_indicator} runtime: {DAG_execution_time:.2f} seconds")

    occupation_end_time = time.time()
    occupation_execution_time = (occupation_end_time - occupation_start_time)/60
    print(f"\n\n************* {occupation} runtime: {occupation_execution_time:.2f} minutes *************")
    runtime_since_start = (time.time() - start_time)/60
    print(f"\nruntime since start: {runtime_since_start:.2f} minutes\n")


end_time = time.time()
execution_time = (end_time - start_time)/60
print(f"\n\nTotal Runtime: {execution_time:.2f} minutes")


---------------------- Running: pileDriverOperators ----------------------
Number of non-target tasks: 5
Time to generate all possible partition schemes: 0.00 seconds
Number of all possible partitioning schemes: 52

-------Running: pileDriverOperators - Manual DAG-------
valid execution plans generation: 0.00 minutes
Number of valid partitioning schemes given DAG structure: 26
Number of valid execution plans: 24

pileDriverOperators Manual DAG runtime: 0.05 seconds

-------Running: pileDriverOperators - Naive DAG-------
valid execution plans generation: 0.00 minutes
Number of valid partitioning schemes given DAG structure: 30
Number of valid execution plans: 15

pileDriverOperators Naive DAG runtime: 0.05 seconds

-------Running: pileDriverOperators - Conditioned Naive DAG-------
valid execution plans generation: 0.00 minutes
Number of valid partitioning schemes given DAG structure: 26
Number of valid execution plans: 15

pileDriverOperators Conditioned Naive DAG runtime: 0.05 seconds

FileNotFoundError: [Errno 2] No such file or directory: '/Users/peymansh/Dropbox (MIT)/Research/AI and Occupations/ai-exposure/output/daily_tasks_occupations_analysis/personalCareAides/personalCareAides_P_GPT_DAG_df.csv'