# FIT5148: Assessment 1
## Distributed Databases and Big Data


### Student Name: Enzo Reyes

### Student Number: 21879818


### Introduction to Approach

For this assignment I wanted to try a re-usable approach, where upon each task wasn't an isolated unit of item/work, but it used a framework, as such things took a different approach from the tutorials.

Each part of function was made to operate as if it where working in a pipeline, on certain tasks I have a _kernel_ function which is that data pipeline that is run on each processor.

Also I wanted to to thing in which there was no possible race condition, as such most of the time the way I wrote things was based on determining if there was a race condition and making sure that the data was either copied if it was manipulated or isolated to one processor. Luckily a lot of this depended a lot on the Scheduler.

The other thing was that I didn't operate on the whole data structure, each function only works on its mailbox and reads values from the shared data frame as if it were read-only.

In [1]:
import operator
import csv

from multiprocessing import Pool

#Define the number of workers
__N_WORKERS__ = 4  # Working with 4 CPU
__CRASH_DATA__ = "2018_DATA_SA_Crash.csv"
__UNIT_DATA__  = "2018_DATA_SA_Units.csv"

In [2]:
#Mock Classes that simulate a Pandas frame

class loc_class:
    def __init__(self):
        self.rows = []
        self.csv_df = None
    
    def add_row(self, item):
        self.rows.append(item)
    
    def __getitem__(self, key):
        return self.rows[key]

class csv_data_frame:
    def __init__(self):
        self.size = 0
        self.header = {}
        self.reverse_key = {}

    def generate_row_alias(self, row):
        insert_key = {}
        for key in self.reverse_key.keys():
            insert_key[self.reverse_key[key]] = row[key]
        return insert_key
        
    def read_csv(self,filename):
        self.loc = loc_class()
        with open(filename,'rt')as f:
          data = csv.reader(f)
          row_val = 0
          for row in data:
            if row_val == 0 :
                for i in range(len(row)):
                    self.header[row[i]] = []
                    self.reverse_key[i] = row[i]
            else:
                for i in range(len(row)):
                    header_key = self.reverse_key[i]
                    self.header[header_key].append(row[i])
                self.loc.add_row(self.generate_row_alias(row))
            row_val = row_val + 1
            self.size = row_val
            
    def __len__(self):
        return self.size-1

In [3]:
def print_matrix(data):
    for lin in data:
        print("+---"*len(lin)+"+")
        for inlin in lin:
            print("|",str(inlin),"", end="")
        print("|")
    print("+---"*len(lin)+"+")

In [4]:
#load the data
#crash_data_df = pd.read_csv(__CRASH_DATA__)
#unit_data_df = pd.read_csv(__UNIT_DATA__)
crash_data_df = csv_data_frame()
crash_data_df.read_csv(__CRASH_DATA__)
unit_data_df = csv_data_frame()
unit_data_df.read_csv(__UNIT_DATA__)


In [5]:
#Helper Functions to do things

#Check if a value is an integer used to remove values which cause havoc with some 
#of the other functions
def is_int(in_val):
    try:
        x = int(in_val)
        return True
    except ValueError:
        return False

#Buffer Logic Class - Makes the code cleaner when dealing with Merging
class Buffer:
    def __init__(self):
        self.exhausted = False
        self.size = 0
        self.location = 0

    def peek(self):
        return self.queue[self.location]

    def take(self):
        self.location = self.location + 1

    def is_exhausted(self):
        if self.location >= self.size:
            return True
        return False
    
#Manages a set of return buffers
class BufferManager:
    def __init__(self):
        self.num_of_buffers = 0
        self.buffers = []

    #Returns the value in the buffers that is min
    #Allows a cleaner implementation of merge
    def get_min(self):
        min_buff = None
        for buff in self.buffers:
            if buff.is_exhausted() == False:
                if min_buff == None:
                    min_buff = buff
                if buff.peek()[1] < min_buff.peek()[1]:
                    min_buff = buff
        ret_val = min_buff.peek()
        min_buff.take()
        return ret_val
    
    def is_exhausted(self):
        ret_val = True
        for buff in self.buffers:
            ret_val = ret_val and buff.is_exhausted()
        return ret_val
    
    def add_buffer(self, temp_buff):
        buff = Buffer()
        buff.size = len(temp_buff)
        buff.queue = temp_buff
        self.buffers.append(buff)
        self.num_of_buffers = self.num_of_buffers + 1
        
#A function that was identified to be common on all group by functions        
def collate_table(hash_to_collate):
    return_group_aggregate = {}
    for key in hash_to_collate.keys():
        return_group_aggregate[key] = sum(hash_to_collate[key])
    return return_group_aggregate 


#Used by the Heap-Sort algorithm - a lot of code to deal 
# with useless nan's and non-values which make this algo break
def generate_heap(input_list, size_of_list, subtree_index):
    largest = subtree_index
    left = 2 * subtree_index + 1     
    right = 2 * subtree_index + 2   
    if is_int(input_list[subtree_index][1]):
        subtree_index_val = int(input_list[subtree_index][1])
    else:
        subtree_index_val = 0
    
    largest_val = subtree_index_val

    if left < size_of_list:
        left_val = 0
        if is_int(input_list[left][1]):
            left_val = int(input_list[left][1])
        
        if subtree_index_val < left_val: 
            largest = left 
            largest_val = left_val

    if right < size_of_list: 
        if is_int(input_list[right][1]):
            right_val = int(input_list[right][1])
        else:
            right_val = 0

        if largest_val < right_val: 
            largest = right 
    if largest != subtree_index: 
        input_list[subtree_index],input_list[largest] = input_list[largest],input_list[subtree_index]  
        generate_heap(input_list, size_of_list, largest) 

#A HOT-Encoder to generate values when doing rangepartitions on STRING literals
def single_char_encoder(input_string, num_of_workers):
    ascii_start = 97
    ascii_end = 122
    char_val = ord(input_string.lower()[0])
    if char_val >= ascii_start and char_val <= ascii_end:
        partition_range = ((ascii_end - ascii_start) +1 ) / (num_of_workers)
        return int((char_val-ascii_start) // partition_range)
    return ord(input_string.lower()[0]) % num_of_workers




#Display formatter turns the output of the tasks to a dataframe thats more friendly
def display_result(result_list, read_df):
    my_list = []
    columns = ['Date of Crash', 'Suburb', 'Postcode', 'Number of Casualties']
    
    
    for item in result_list:
        if isinstance(item, tuple) :
            row_num = item[0]
        else:
            row_num = item
        date_of_crash = str(read_df.loc[row_num]['Year']) + "-" + read_df.loc[row_num]['Month'] + "-" + read_df.loc[row_num]['Day'] 
        suburb = read_df.loc[row_num]['Suburb']
        postcode = read_df.loc[row_num]['Postcode']
        cas = read_df.loc[row_num]['Total Cas']
        my_list.append([date_of_crash,suburb,postcode, cas])
    print_matrix(my_list)

#Display formatter turns the output of the tasks to a dataframe thats more friendly
def display_result_task_2(results, show_max=20):
    my_list = []
    cycle = 0
    columns = ['Date of Crash', 'Time', 'Suburb', 'Gender', 'Age', 'Number of Casualties', 'License Type']
    
    for key in results[0].keys():
        items = results[0][key]
        doc = str(crash_data_df.loc[key]['Year']) + "-" + crash_data_df.loc[key]['Month'] + "-" + crash_data_df.loc[key]['Day']
        time = crash_data_df.loc[key]['Time']
        suburb = crash_data_df.loc[key]['Suburb']     
        cas = crash_data_df.loc[key]['Total Cas']
        for it in items:    
            gender = unit_data_df.loc[it]['Sex']
            age = unit_data_df.loc[it]['Age']
            lic_type = unit_data_df.loc[it]['Licence Type']
            my_list.append([doc,time,suburb,gender,age,cas,lic_type])
            cycle = cycle + 1
        if cycle >= show_max:
            break
    print_matrix(my_list)

#Display the following aggregate into a Dataframe
def display_aggregate(values, column_one_name, column_two_name, show_max=20):
    curr=0
    target = values[0]
    for merge_list in values:
        if curr != 0:
            target.update(values[curr])
        curr = curr + 1
    columns = [column_one_name, column_two_name]
    my_list = []
    size = 0
    for key, value in target.items():
        my_list.append([key,value])
        size = size + 1
        if size > show_max:
            break
    print_matrix(my_list)

The next section of code is the schedulers - this is what does the initial partitioning as it schedules what each worker in the Python pool does. 

The implementation is just a list of ID's that scheduled to an array, if additional copy data is required a tuple of 
(ID, VAL) - is implemented. These scheduler do not expect a join, that is they don't generate an INDEX

In [6]:
#Commmon Function that initialises the worker mailboxes so that each have their ID list, 
#this is common to all schedulers so made an own function
def init_mailbox(worker_pool_size = __N_WORKERS__):
    #Worker process mail
    mailbox = []
    for i in range(worker_pool_size):
        mailbox.append([]) 
    return mailbox

#The round-robin (random-equal) partition/scheduler it has a few tweaks to make it work on the assignment
#it has a copy function which takes a single value for optimisation when for example sorting single values
#It also removes invalid functions if enabled
#Inputs:
# -- data_frame : The Data frame to get ID and generate the work items
# -- copy_item : The Item to copy to the worker mailbox
# -- drop_invalid: Drop items if they are not a valid or expected data format
# -- worker_pool_size: The number of processors to be activated
# -- drop_func: Function that determines data validity - drops work item if False
#Return: a List of Location ID, split across a set of array equal to the worker_pool_size
def round_robin(data_frame, copy_item=None, drop_invalid=True, worker_pool_size = __N_WORKERS__, drop_func=is_int):
    mailbox = init_mailbox()
    if copy_item == None:
        for i in range(len(data_frame)):
            processor = i % worker_pool_size
            mailbox[processor].append(i)
        return mailbox
    else:
        for i in range(len(data_frame)):
            processor = i % worker_pool_size
            if drop_invalid:
                if drop_func(data_frame.loc[i][copy_item]):
                    item_val = (i, int(data_frame.loc[i][copy_item]))
                    mailbox[processor].append(item_val)
            else:
                item_val = (i, data_frame.loc[i][copy_item])
                mailbox[processor].append(item_val)  
        return mailbox

#The range_partition_scheduler partition/scheduler it takes an encoder which returns the CPU that
#a specific item should be sent to a CPU
#Inputs:
# -- data_frame : The Data frame to get ID and generate the work items
# -- range_key : The value that determines the range to be scheduled on
# -- range_encoder: A function that returns the CPU to be allocated to a specific work item
# -- worker_pool_size: The number of processors to be activated
#Return: a List of Location ID, split over the worker_pool_size
def range_partition_scheduler(data_frame, range_key, 
                              range_encoder = single_char_encoder, worker_pool_size = __N_WORKERS__):
    mailbox = init_mailbox(worker_pool_size)
    for i in range(len(data_frame)):
        range_value = data_frame.loc[i][range_key]
        processor = single_char_encoder(range_value, worker_pool_size)
        mailbox[processor].append(i)  
    return mailbox

#This is similar to the partition scheduler above, except in one aspect
#it is used after a group-by function has executed, and thus does not
#operate on the DF to generate ID, rather it acts on the data structure directly
def range_partition_scheduler_post_group(grouped_proc_values, range_encoder = single_char_encoder, 
                                         worker_pool_size = __N_WORKERS__):
    mailbox = init_mailbox(worker_pool_size)
    for proc_group_val in grouped_proc_values:
        for key in proc_group_val.keys():
            processor = single_char_encoder(key, worker_pool_size)
            mailbox[processor].append((key,proc_group_val[key]))
    return mailbox

#The hash_group_scheduler partition/scheduler assigns specific hashgroups to a processor, based on a
#Grouping
#The scheduler tries to make sure that values that are grouped are not some how lost in a dependency 
#loop.
#Inputs:
# -- data_frame : The Data frame to get ID and generate the work items
# -- hash_key : The value that determines the hash group to be scheduled on
# -- has_value: A function that returns the CPU to be allocated to a specific work item
# -- value_key : The key to group on
# -- drop_func: Function that determines data validity - drops work item if False
# -- worker_pool_size: The number of processors to be activated
#Return: a List of Location ID, split over the worker_pool_size
def hash_group_scheduler(data_frame, hash_key, has_value=True, value_key=None, 
                         drop_func=is_int, worker_pool_size = __N_WORKERS__):
    mailbox = init_mailbox(worker_pool_size)
    counter = 0
    cpu_index = {}
    for i in range(len(data_frame)):
        if has_value:
            if drop_func(data_frame.loc[i][value_key]) == False:
                continue
            if data_frame.loc[i][hash_key] in cpu_index.keys():
                mailbox[cpu_index[data_frame.loc[i][hash_key]]].append(i)
            else:
                cpu_index[data_frame.loc[i][hash_key]] = counter % worker_pool_size
                counter = counter + 1
        else:
            if data_frame.loc[i][hash_key] in cpu_index.keys():
                mailbox[cpu_index[data_frame.loc[i][hash_key]]].append(i)
            else:
                cpu_index[data_frame.loc[i][hash_key]] = counter % worker_pool_size
                counter = counter + 1
    return mailbox

The next of schedulers are the join schedulers, they work by generating one-time indecies on the values that are going to be joined, they return an ITEM_INDEX variable which is an index for the names and the mailbox which is the work to be done for each partition

Each Scheduler also deals with data-dependencies, that is it makes sure that items are not going to be sent to 
disparate processors which cannot read the data.

This works by assigning the parent table - the one that generates the index - to one processor. Then the inner table join attribute is scanned to see which index it points to, it then uses that index to get the CPU allocation, then it uses that to map that work item to that CPU.

In [7]:
#The broad_cast_join_scheduler assigns specific items to a processor, 
#The scheduler guarantees that each item in the inner table is located to its parent table
#The scheduler is similar/based to round robin
#Inputs:
# -- data_frame : The Data frame to get ID and generate the work items
# -- data_frame_dep : The data frame that depends (or inner table) on the parent data frame
# -- index_name: the Attribute which will be used to join both tables
# -- worker_pool_size: The number of processors to be activated
#Return: a List of Location ID, split over the worker_pool_size + and Index
def broad_cast_join_scheduler(data_frame_index, data_frame_dep, index_name,
                              worker_pool_size = __N_WORKERS__):
    mailbox = init_mailbox()
    item_index = {}
    cpu_index = {}
    for i in range(len(data_frame_index)):
        item_index[data_frame_index.loc[i][index_name]] = i
        processor = i % worker_pool_size
        cpu_index[data_frame_index.loc[i][index_name]] = processor

    for i in range(len(data_frame_dep)):
        mailbox[cpu_index[data_frame_dep.loc[i][index_name]]].append(i)
        
    return (item_index, mailbox)

#The range_partition_join_scheduler assigns specific items to a processor, 
#The scheduler guarantees that each item in the inner table is located to its parent table
#The scheduler is similar works out the item ranges and splits them over the processors
#Inputs:
# -- data_frame : The Data frame to get ID and generate the work items
# -- data_frame_dep : The data frame that depends (or inner table) on the parent data frame
# -- index_name: the Attribute which will be used to join both tables
# -- worker_pool_size: The number of processors to be activated
#Return: a List of Location ID, split over the worker_pool_size + and Index
def range_partition_join_scheduler(data_frame_index, data_frame_dep, index_name, 
                                   worker_pool_size = __N_WORKERS__):
    mailbox = init_mailbox()
    item_index = {}
    cpu_index = {}
    partition_range = (len(data_frame_index) +1 ) / (worker_pool_size)
    for i in range(len(data_frame_index)):
        item_index[data_frame_index.loc[i][index_name]] = i
        processor = int(i // partition_range)
        cpu_index[data_frame_index.loc[i][index_name]] = processor

    for i in range(len(data_frame_dep)):
        mailbox[cpu_index[data_frame_dep.loc[i][index_name]]].append(i)
        
    return (item_index, mailbox)

### Filter and Searches 

The Search is done through two functions which are quite large as the deal with a whole range of conditions and
multiple data types:

Linear-Filter is "Linear Search" I called it filter since it just does that removes stuff that is not the value
specified. However It has a remove field - if enabled it lets everything else pass EXCEPT the value specified.

Binary Search is very similar to a normal binary search except it has the range specifiers which allow one
to use it for range searches.

In [8]:
#Search Linearly for values, and if the value passes move the ID forward
#Inputs:
# -- search_key : The column searching
# -- search_item : The value to match
# -- worker_mailbox: The input list of ID's to match
# -- data_frame: The data frame that the ID's reference to
# -- remove: Rather than pass the value - Remove ID that match the value
# -- alternate_field: Look in the JOINED innertable rather than the OUTER
#Return: a List of Location ID, that match the specified criteria

def linear_filter(search_key, search_item, worker_mailbox, data_frame, remove=False, alternate_field=False):
    filter_list = []
    if isinstance(worker_mailbox, dict):
        ret_dict = {}
        for key in worker_mailbox:
            if alternate_field:
                ret_box = []
                for inner_table_loc in worker_mailbox[key]:
                    inner_val = data_frame.loc[inner_table_loc]
                    if inner_val[search_key] == search_item:
                        ret_box.append(inner_table_loc)
                if len(ret_box) >=1 :
                    ret_dict[key] = ret_box
            else:
                row_val = data_frame.loc[key]
                if row_val[search_key] == search_item:
                    ret_dict[key] = worker_mailbox[key]
        return ret_dict
    for item in worker_mailbox:
        if isinstance(item, tuple) :
            row_num = item[0]
        else:
            row_num = item
        row_val = data_frame.loc[row_num]
        if remove == False:
            if row_val[search_key] == search_item:
                filter_list.append(item)
        else:
            if row_val[search_key] != str(search_item):
                filter_list.append(item)
    return filter_list

#Use a binary search, and if the value passes move the ID forward
#Inputs:
# -- search_key : The column searching
# -- search_item : The value to match
# -- worker_mailbox: The input list of ID's to match
# -- data_frame: The data frame that the ID's reference to
# -- range_search: Search for a RANGE - MIN OR MAX
# -- range_greater: If True its a Greater-Equal Search
#Return: a List of Location ID, that match the specified criteria
def binary_search(search_key, search_item, worker_mailbox, data_frame, range_search=False, range_greater=True):
    worker_mailbox.sort(key = operator.itemgetter(1)) #Sort List
    partition_start = 0
    partition_end = len(worker_mailbox) - 1
    return_list = []
    while(partition_start <= partition_end):
        mid = (partition_start + partition_end)//2
        if int(worker_mailbox[mid][1]) == search_item :
            if range_search:
                if range_greater:
                    return worker_mailbox[mid:]
                else:
                    return worker_mailbox[:mid]
            else:
                return_list.append(worker_mailbox[mid][0])
                #Move up one, if this is the only item it would terminate
                partition_start = mid + 1
        else:
            if search_item < int(worker_mailbox[mid][1]):
                partition_end = mid - 1
            else:
                partition_start = mid + 1
    return return_list

### Other Functions

This includes the other operations/functions required to complete the assingment such as merge, sorting, joins


In [9]:
#This function depends on the JOIN SChedulers to setup the appropriate data structure
#It is based on the index, and does a hash-based join, it places the items that 
#match the join criteria in to the work mail box so the format is
# PARENT_ID: [CHILD1, CHILD2]
#Inputs:
# -- worker_mailbox: The input list
# -- df_ix: The Parent (INDEX) data frame
# -- df_dep: The child, dependent list, whose values will be embededd
# -- index: the index tables of the parent
# -- index_name: the name of the attribute used to match the CHILD table
def parallel_join(worker_mailbox, df_ix, df_dep, index, index_name):
    joined_list = {}
    for row_num in worker_mailbox:
        item  = df_dep.loc[row_num]
        idx_num = index[item[index_name]]
        if idx_num in joined_list:
            joined_list[idx_num].append(row_num)
        else:
            joined_list[idx_num] = [row_num]
    return joined_list

#Heap Sort - This function also depends on the scheduler doing its thing right to setup the datastructure
# so it can sort it depends on the input list being in [(ID,VAL), (ID,VAL)] - will sort on VAL
#Inputs:
# -- input_list: The input list to sort
# -- data_frame: The data frame

def heap_sort(input_list, data_frame):  
    size = len(input_list) 
    for i in range(size-1, -1, -1): 
        generate_heap(input_list, size, i) 
    for i in range(size-1, -1, -1): 
        input_list[i], input_list[0] = input_list[0], input_list[i] 
        generate_heap(input_list, i, 0) 
    return input_list

#Merges the results returned by individial partitions
#Inputs:
# -- input queues : the returned queues after sorting that need to be merged
def merge(input_queues):
    buffer_manager = BufferManager()
    sorted_list = []
    for item in input_queues:
        buffer_manager.add_buffer(item)
    while(buffer_manager.is_exhausted() == False):
        sorted_list.append(buffer_manager.get_min())
    return sorted_list

#This aggregates the - GROUPS - a set of data based on a key - It then executes the sum
#Inputs:
# -- worker_mailbox: The input list
# -- data_frame: The data frame that the ID's reference to
# -- group_attribute:  How to Group the items
# -- sum_attribute : What to add
def aggregate_item_by_group(worker_mailbox, data_frame, group_attribute, sum_attribute):
    hash_group = {}
    for row_num in worker_mailbox:
        if data_frame.loc[row_num][group_attribute] not in hash_group.keys():
            hash_group[data_frame.loc[row_num][group_attribute]] = []
        hash_group[data_frame.loc[row_num][group_attribute]].append(int(data_frame.loc[row_num][sum_attribute]))
    return collate_table(hash_group)

#This aggregates the - GROUPS - However it assumes another GROUP has happened before and thus does not need
#to refer to the original data structure
#Inputs:
# -- worker_mailbox: The input list 
def aggregate_grouped_values(worker_mailbox):
    hash_group = {}
    for item in worker_mailbox:
        if item[0] not in hash_group.keys():
            hash_group[item[0]] = []
        hash_group[item[0]].append(int(item[1]))
    return collate_table(hash_group)

## Tasks 

Now this is just the code - I'll be walking through it however I won't run them UNTIL the end once the __MAIN__ macro has passed to demonstrate that the code works correctly - This is because the PYTHON script wont have the correct pool until that MACRO is run the the workers are set



In [10]:

#The task Kernel using two linear_filters to filter the data out
#returns the list of filtered items (ID)
def task_1_1_kernel(worker_mailbox, data_frame):
    suburb_filtered = linear_filter('Suburb','ADELAIDE', worker_mailbox, data_frame)
    month_filtered = linear_filter('Month','January',suburb_filtered,data_frame)
    return month_filtered

def task_1_1():
    mailbox = round_robin(crash_data_df)
    #Launch kernels
    kernel_results = [pool.apply_async(task_1_1_kernel, (mailbox[i], crash_data_df)) for i in range(__N_WORKERS__)]
    values = [res.get() for res in kernel_results]
    flatten = lambda l: [item for sublist in l for item in sublist]
    merged =  flatten(values)[:20]
    return merged

#None of the code above will work UNTIL THIS IS EXECUTED
if __name__ == '__main__':   
    #Create the pool now to not waste time re-creating the thread pool
    pool = Pool(processes=__N_WORKERS__)

%time display_result(task_1_1(), crash_data_df)



+---+---+---+---+
| 2018-January-Monday | ADELAIDE | 5000 | 1 |
+---+---+---+---+
| 2018-January-Wednesday | ADELAIDE | 5000 | 1 |
+---+---+---+---+
| 2018-January-Saturday | ADELAIDE | 5000 | 0 |
+---+---+---+---+
| 2018-January-Saturday | ADELAIDE | 5000 | 0 |
+---+---+---+---+
| 2018-January-Sunday | ADELAIDE | 5000 | 4 |
+---+---+---+---+
| 2018-January-Tuesday | ADELAIDE | 5000 | 1 |
+---+---+---+---+
| 2018-January-Monday | ADELAIDE | 5000 | 0 |
+---+---+---+---+
| 2018-January-Saturday | ADELAIDE | 5000 | 1 |
+---+---+---+---+
| 2018-January-Tuesday | ADELAIDE | 5000 | 2 |
+---+---+---+---+
| 2018-January-Tuesday | ADELAIDE | 5000 | 0 |
+---+---+---+---+
| 2018-January-Tuesday | ADELAIDE | 5000 | 0 |
+---+---+---+---+
| 2018-January-Saturday | ADELAIDE | 5000 | 0 |
+---+---+---+---+
| 2018-January-Monday | ADELAIDE | 5000 | 0 |
+---+---+---+---+
| 2018-January-Monday | ADELAIDE | 5000 | 0 |
+---+---+---+---+
| 2018-January-Tuesday | ADELAIDE | 5000 | 0 |
+---+---+---+---+
| 2018

In [11]:
pool.terminate()

##### Task 1.2

For this task I chose ROUND-ROBIN, It is a search for a specific item, and if I wanted to fill up the
CPU tasks, I want to do so evenly as possible to achieve possible maximum throughput 
Only round-robin can possibly guarantee an even distribution of tasks across all processors and hence
why I chose it. 
The task is not dependent on any other value than it self.

In [12]:
#Kernel just executes the binary search 
def task_1_2_kernel(worker_mailbox, data_frame):
    return binary_search('Total Cas', 7, worker_mailbox, data_frame)


def task_1_2():
    mailbox = round_robin(crash_data_df, 'Total Cas')
    kernel_results = [pool.apply_async(task_1_2_kernel, (mailbox[i], crash_data_df)) for i in range(__N_WORKERS__)]
    values = [res.get() for res in kernel_results]
    flatten = lambda l: [item for sublist in l for item in sublist]
    merged =  flatten(values)
    return merged

#None of the code above will work UNTIL THIS IS EXECUTED
if __name__ == '__main__':   
    #Create the pool now to not waste time re-creating the thread pool
    pool = Pool(processes=__N_WORKERS__)

%time display_result(task_1_2(), crash_data_df)

+---+---+---+---+
| 2018-January-Tuesday | MOUNT SCHANK | 5291 | 7 |
+---+---+---+---+
| 2018-July-Tuesday | CRAIGMORE | 5114 | 7 |
+---+---+---+---+
CPU times: user 665 ms, sys: 113 ms, total: 777 ms
Wall time: 1.1 s


In [13]:
pool.terminate()

##### Task 1.3

For this task I chose ROUND-ROBIN, It is a search for a specific item, and if I wanted to fill up the
CPU tasks, I want to do so evenly as possible to achieve possible maximum throughput 
Only round-robin can possibly guarantee an even distribution of tasks across all processors and hence
why I chose it. 
The task is not dependent on any other value than it self.

For the Greater search I used the binary filter as the implementation was straight forward to return the python slice to left or right of the value. No need to use the linear search as only the pivot point is required in the binary version and this is much faster

For the suburb search I still used  a linear filter to remove suburbs other than adelaide, this avoid complications with trying to measure the "index" of a word - its safer to do so on an unknown type

Lastly in the kernel I added an additional filter to remove the value 3, this is because whats asked is for >3 not >=3, the binary search does a >=3.. this removes any additional stray data.

In [14]:
def task_1_3_kernel(worker_mailbox, data_frame):
    suburb_filtered = linear_filter('Suburb','ADELAIDE', worker_mailbox, data_frame)
    great_search =  binary_search('Total Cas', 3, suburb_filtered, data_frame, True, True)
    return linear_filter('Total Cas',3, great_search, data_frame, True)

def task_1_3():
    mailbox = round_robin(crash_data_df, 'Total Cas')
    kernel_results = [pool.apply_async(task_1_3_kernel, (mailbox[i], crash_data_df)) for i in range(__N_WORKERS__)]
    values = [res.get() for res in kernel_results]
    flatten = lambda l: [item for sublist in l for item in sublist]
    merged =  flatten(values)
    return merged

#None of the code above will work UNTIL THIS IS EXECUTED
if __name__ == '__main__':   
    #Create the pool now to not waste time re-creating the thread pool
    pool = Pool(processes=__N_WORKERS__)
    
%time display_result(task_1_3(), crash_data_df)

+---+---+---+---+
| 2018-January-Sunday | ADELAIDE | 5000 | 4 |
+---+---+---+---+
CPU times: user 706 ms, sys: 125 ms, total: 831 ms
Wall time: 1.15 s


In [15]:
pool.terminate()

##### Task 2.1

The _broad_cast_join_scheduler_ function is based on the round-robin scheduler, with additional modifications to make it a divide and broadcast algorith. It also has additional key feature that round-robin doesnt. Round-Robin assumes no dependencies between the tasks, this one does. This and _range_partition_join_scheduler_ __ASSUME__ that a Hash Join is going to be used as implemented by the _parallel_join_ function. 

The choice was made in due to data-dependency resolution, I wanted to avoid sharing data that requires CPU cross boundry communication since _PYTHON's_ concurrency is quite poor. 

No _kernel task_ function exists since the JOIN is the final operation

In [16]:
def task_2_1():
    schedule = broad_cast_join_scheduler(crash_data_df, unit_data_df, 'REPORT_ID')
    kernel_results = [pool.apply_async(parallel_join, (schedule[1][i],crash_data_df, unit_data_df, schedule[0], 'REPORT_ID')) for i in range(__N_WORKERS__)]
    values = [res.get(timeout=10) for res in kernel_results]
    return values

#None of the code above will work UNTIL THIS IS EXECUTED
if __name__ == '__main__':   
    #Create the pool now to not waste time re-creating the thread pool
    pool = Pool(processes=__N_WORKERS__)
    
%time display_result_task_2(task_2_1())

+---+---+---+---+---+---+---+
| 2018-October-Wednesday | 11:20 am | MITCHELL PARK | Male | 018 | 0 | Provisional 1  |
+---+---+---+---+---+---+---+
| 2018-October-Wednesday | 11:20 am | MITCHELL PARK | Male | XXX | 0 | Full |
+---+---+---+---+---+---+---+
| 2018-October-Wednesday | 11:20 am | MITCHELL PARK | Female | 066 | 0 |  |
+---+---+---+---+---+---+---+
| 2018-October-Wednesday | 11:20 am | MITCHELL PARK |  |  | 0 |  |
+---+---+---+---+---+---+---+
| 2018-January-Monday | 04:00 am | CROYDON | Female | 030 | 3 | Full |
+---+---+---+---+---+---+---+
| 2018-January-Monday | 04:00 am | CROYDON | Male | 032 | 3 | Full |
+---+---+---+---+---+---+---+
| 2018-January-Monday | 10:00 am | KENSINGTON | Female | 058 | 0 | Provisional 2 |
+---+---+---+---+---+---+---+
| 2018-January-Monday | 10:00 am | KENSINGTON | Female | 022 | 0 | Full |
+---+---+---+---+---+---+---+
| 2018-January-Monday | 10:00 am | KENSINGTON | Male | 073 | 0 | Full |
+---+---+---+---+---+---+---+
| 2018-January-Monday 

In [17]:
pool.terminate()

##### Task 2.2

The _range_partition_join_scheduler_ tries to partition across the range, it was discovered that because of the indexing and hashing properties required for the join to work, the unique_id field format wasn't an issue. It was already in line with what PANDAS was reporting as its index, and when used as a HASH the format wasn't an issue.

In [18]:
def task_2_2_kernel(worker_mailbox, df_ix, df_dep, index, index_name):
    join_list = parallel_join(worker_mailbox, df_ix, df_dep, index, index_name)
    return linear_filter('Suburb','ADELAIDE', join_list, df_ix)

def task_2_2():
    schedule = range_partition_join_scheduler(crash_data_df, unit_data_df, 'REPORT_ID')
    kernel_results = [pool.apply_async(task_2_2_kernel, (schedule[1][i],crash_data_df, unit_data_df, schedule[0], 'REPORT_ID')) for i in range(__N_WORKERS__)]
    values = [res.get(timeout=10) for res in kernel_results]
    return values

#None of the code above will work UNTIL THIS IS EXECUTED
if __name__ == '__main__':   
    #Create the pool now to not waste time re-creating the thread pool
    pool = Pool(processes=__N_WORKERS__)
    
%time display_result_task_2(task_2_2())

+---+---+---+---+---+---+---+
| 2018-January-Monday | 04:15 am | ADELAIDE | Male | 049 | 2 | Full |
+---+---+---+---+---+---+---+
| 2018-January-Monday | 04:15 am | ADELAIDE | Male | 032 | 2 | Full |
+---+---+---+---+---+---+---+
| 2018-January-Monday | 04:10 pm | ADELAIDE | Female | 056 | 0 | Full |
+---+---+---+---+---+---+---+
| 2018-January-Monday | 04:10 pm | ADELAIDE | Female | 036 | 0 | Full |
+---+---+---+---+---+---+---+
| 2018-January-Monday | 04:10 pm | ADELAIDE |  |  | 0 |  |
+---+---+---+---+---+---+---+
| 2018-January-Tuesday | 11:18 am | ADELAIDE | Male | 087 | 0 | Full |
+---+---+---+---+---+---+---+
| 2018-January-Tuesday | 11:18 am | ADELAIDE |  |  | 0 |  |
+---+---+---+---+---+---+---+
| 2018-January-Tuesday | 05:10 pm | ADELAIDE | Female | 037 | 0 | Full |
+---+---+---+---+---+---+---+
| 2018-January-Tuesday | 05:10 pm | ADELAIDE | Unknown | XXX | 0 | Unknown |
+---+---+---+---+---+---+---+
| 2018-January-Wednesday | 06:15 pm | ADELAIDE | Male | 059 | 1 |  |
+---+--

In [19]:
pool.terminate()

##### Task 3.1

I used round-robin for mostly performance reasons, I wanted to make sure all the processors had their worker queues full with tasks. As the input did not depend on any other external factors. A range scheduler could of worked well here as well.


In [20]:
def task_3_1():
    mailbox = round_robin(crash_data_df, 'Total Cas')
    kernel_results = [pool.apply_async(heap_sort, (mailbox[i], crash_data_df)) for i in range(__N_WORKERS__)]
    values = [res.get(timeout=10) for res in kernel_results]
    return display_result(merge(values)[:20], crash_data_df)

#None of the code above will work UNTIL THIS IS EXECUTED
if __name__ == '__main__':   
    #Create the pool now to not waste time re-creating the thread pool
    pool = Pool(processes=__N_WORKERS__)
    
%time task_3_1()

+---+---+---+---+
| 2018-May-Saturday | ROSTREVOR | 5073 | 0 |
+---+---+---+---+
| 2018-August-Wednesday | KILKENNY | 5009 | 0 |
+---+---+---+---+
| 2018-October-Monday | ST GEORGES | 5064 | 0 |
+---+---+---+---+
| 2018-September-Saturday | RIDGEHAVEN | 5097 | 0 |
+---+---+---+---+
| 2018-August-Saturday | BIRKENHEAD | 5015 | 0 |
+---+---+---+---+
| 2018-August-Wednesday | WINGFIELD | 5013 | 0 |
+---+---+---+---+
| 2018-November-Saturday | TORRENSVILLE | 5031 | 0 |
+---+---+---+---+
| 2018-October-Monday | WINGFIELD | 5013 | 0 |
+---+---+---+---+
| 2018-October-Sunday | COWIRRA | 5238 | 0 |
+---+---+---+---+
| 2018-May-Saturday | UPPER STURT | 5156 | 0 |
+---+---+---+---+
| 2018-May-Saturday | EDEN HILLS | 5050 | 0 |
+---+---+---+---+
| 2018-May-Tuesday | BROOKLYN PARK | 5032 | 0 |
+---+---+---+---+
| 2018-May-Friday | MANNINGHAM | 5086 | 0 |
+---+---+---+---+
| 2018-August-Wednesday | MODBURY HEIGHTS | 5092 | 0 |
+---+---+---+---+
| 2018-November-Monday | NACKARA | 5440 | 0 |
+---+---

In [21]:
pool.terminate()

##### Task 3.2

Like in 3.1 I went with round robin to make full use of the available CPU's

In [22]:
def task_3_2():
    mailbox = round_robin(unit_data_df, 'Veh Year', True)
    kernel_results = [pool.apply_async(heap_sort, (mailbox[i], unit_data_df)) for i in range(__N_WORKERS__)]
    values = [res.get(timeout=10) for res in kernel_results]
    #Here is the big difference, we now need to merge these in a pipeline
    #There needs to be a schedule that distributes the load across all workers
    #We are also going to reduce the amount of items so it should be really N/2
    n_pools = int(len(values) // 2)
    worker_box = []
    while n_pools >= 2 :
        worker_box = []
        for i in range(n_pools):
            worker_box.append([]) 
        for i in range(len(values)):
            processor = int(i // n_pools)
            worker_box[processor].append(values[i])
        parallel_merge_results = [pool.apply_async(merge, ([worker_box[i]])) for i in range(n_pools)]
        values = [res.get(timeout=10) for res in parallel_merge_results]
        n_pools = int(len(values) // 2)
    disp_val =  merge(values)
    my_list = []
    columns = ['Report Id', 'Unit No', 'Vehicle Reg State', 'Vehicle Year', 'State License']
    for item in disp_val:
        if isinstance(item, tuple) :
            row_num = item[0]
        else:
            row_num = item
        rep_id = str(unit_data_df.loc[row_num]['REPORT_ID'])  
        unit_no = unit_data_df.loc[row_num]['Unit No']
        reg = unit_data_df.loc[row_num]['Veh Reg State']
        year = unit_data_df.loc[row_num]['Veh Year']
        lic = unit_data_df.loc[row_num]['Lic State']
        my_list.append([rep_id,unit_no, reg, year, lic])
        if len(my_list) > 20:
            break
    print_matrix(my_list)

#None of the code above will work UNTIL THIS IS EXECUTED
if __name__ == '__main__':   
    #Create the pool now to not waste time re-creating the thread pool
    pool = Pool(processes=__N_WORKERS__)
    
%time task_3_2()



+---+---+---+---+---+
| 2018-9371-15/08/2019 | 01 | SA | 1900 | UNKNOWN |
+---+---+---+---+---+
| 2018-12792-15/08/2019 | 01 | SA | 1900 | SA |
+---+---+---+---+---+
| 2018-6385-15/08/2019 | 01 | SA | 1900 | SA |
+---+---+---+---+---+
| 2018-12484-15/08/2019 | 01 | SA | 1934 | SA |
+---+---+---+---+---+
| 2018-2351-15/08/2019 | 01 | SA | 1951 | SA |
+---+---+---+---+---+
| 2018-2981-15/08/2019 | 01 | SA | 1955 | SA |
+---+---+---+---+---+
| 2018-238-15/08/2019 | 01 | SA | 1957 | SA |
+---+---+---+---+---+
| 2018-1583-15/08/2019 | 01 | SA | 1957 | SA |
+---+---+---+---+---+
| 2018-7265-15/08/2019 | 02 | VIC | 1962 | SA |
+---+---+---+---+---+
| 2018-4197-15/08/2019 | 01 | SA | 1964 | UNKNOWN |
+---+---+---+---+---+
| 2018-11816-15/08/2019 | 01 | VIC | 1965 | SA |
+---+---+---+---+---+
| 2018-12413-15/08/2019 | 02 | SA | 1965 | SA |
+---+---+---+---+---+
| 2018-6392-15/08/2019 | 01 | SA | 1965 | SA |
+---+---+---+---+---+
| 2018-2281-15/08/2019 | 02 | SA | 1966 | SA |
+---+---+---+---+--

In [23]:
pool.terminate() ## Terminate the pool to prevent leaks

##### Task 4.1

For this task I implemented a psuedo hash-scheduler in _hash_group_scheduler_ psuedo because it uses Python dictionaries and not a hash-function, and more importantly its a group based scheduler. It makes an explicit assumption that the groups are going to be the main unit of scheduling. This also means that the CPU load is also unbalanced, as the data might not follow the group. The groups might be scheduled evenly but the tasks might not be so we'd endup with skewed worker queues.

The reason for use was very simple, even thought it has a skewed distribution it also prevents overlap of tasks. Dependencies are located to one processor. Which made the implementation much easier

In [24]:
def task_4_1():
    mailbox = hash_group_scheduler(crash_data_df, 'Crash Type', has_value=True, value_key='Total Cas')
    kernel_results = [pool.apply_async(aggregate_item_by_group, (mailbox[i], crash_data_df, 'Crash Type', 'Total Cas')) for i in range(__N_WORKERS__)]
    values = [res.get(timeout=10) for res in kernel_results]
    return display_aggregate(values,'Crash Type', 'Number')
    
#None of the code above will work UNTIL THIS IS EXECUTED
if __name__ == '__main__':   
    #Create the pool now to not waste time re-creating the thread pool
    pool = Pool(processes=__N_WORKERS__)

%time task_4_1()

+---+---+
| Right Angle | 1266 |
+---+---+
| Hit Pedestrian | 297 |
+---+---+
| Other | 26 |
+---+---+
| Hit Object on Road | 8 |
+---+---+
| Rear End | 1680 |
+---+---+
| Hit Animal | 44 |
+---+---+
| Side Swipe | 387 |
+---+---+
| Right Turn | 604 |
+---+---+
| Hit Parked Vehicle | 224 |
+---+---+
| Left Road - Out of Control | 52 |
+---+---+
| Hit Fixed Object | 846 |
+---+---+
| Roll Over | 449 |
+---+---+
| Head On | 234 |
+---+---+
CPU times: user 678 ms, sys: 105 ms, total: 783 ms
Wall time: 974 ms


In [25]:
pool.terminate() ## Terminate the pool to prevent leaks

##### Task 4.2

There are two key points as to why this can be better:

1. Round Robin - Makes no dependencies assumptions, this is because the algorithm doesn't require it, where as the _hash_group_scheduler_ tries to avoid making these depency assumptions, the two phase method doesn't even bother, making its implementation much simpler, easier and possibly can be applied to more scenarios than the one in 4.1
2. Performance 12s vs 3.1 seconds 

In [26]:
def task_4_2():
    mailbox = round_robin(crash_data_df)
    kernel_results = [pool.apply_async(aggregate_item_by_group, (mailbox[i], crash_data_df, 'Suburb', 'Total Cas')) for i in range(__N_WORKERS__)]
    values = [res.get(timeout=10) for res in kernel_results]
    mailbox = range_partition_scheduler_post_group(values)
    kernel_results = [pool.apply_async(aggregate_grouped_values, ([mailbox[i]])) for i in range(__N_WORKERS__)]
    values = [res.get(timeout=10) for res in kernel_results]
    return display_aggregate(values,'Suburb', 'Number')

#None of the code above will work UNTIL THIS IS EXECUTED
if __name__ == '__main__':   
    #Create the pool now to not waste time re-creating the thread pool
    pool = Pool(processes=__N_WORKERS__)
    
%time task_4_2()

+---+---+
| CROYDON | 12 |
+---+---+
| FOREST RANGE | 3 |
+---+---+
| BOLIVAR | 9 |
+---+---+
| CURRENCY CREEK | 10 |
+---+---+
| GILLES PLAINS | 11 |
+---+---+
| ELIZABETH EAST | 25 |
+---+---+
| ALDINGA | 10 |
+---+---+
| FULLARTON | 11 |
+---+---+
| BRIGHTON | 19 |
+---+---+
| GLENUNGA | 5 |
+---+---+
| ELIZABETH DOWNS | 17 |
+---+---+
| ADELAIDE | 211 |
+---+---+
| BEDFORD PARK | 19 |
+---+---+
| CRAIGMORE | 18 |
+---+---+
| ELIZABETH | 20 |
+---+---+
| ELIZABETH PARK | 26 |
+---+---+
| BROADVIEW | 15 |
+---+---+
| FINDON | 19 |
+---+---+
| ENFIELD | 33 |
+---+---+
| GEPPS CROSS | 46 |
+---+---+
| BURRA | 3 |
+---+---+
CPU times: user 634 ms, sys: 113 ms, total: 747 ms
Wall time: 935 ms


In [27]:
pool.terminate() ## Terminate the pool to prevent leaks

##### Task 5

For this task, I wanted to use the Two-Phase method for the parallel group by, however that meant I was locked to use the broad_cast_join_scheduler I already have. It does a lot of good dependency tracking when doing the join. The Join has to come first since the Group attribute is not the join attribute. 
For the license location I place the linear-filter in the next step of the pipeline, I had done this because by this stage the attributes have been joined and it makes the filtering easier and predictable (all the ones not matching the criteria are all removed)

Then there are two aggreate functions, the first run in the kernel task to group by the suburb and total casualties, and the similarly as the Two-Phase method another aggregate is called to do the global aggregation 

I wanted to use Two-Phase due to its impressive performance, given that this task includes all the above operations already done, and others seemed significantly slower.

In [28]:
def task_5_1_kernel(worker_mailbox, df_ix, df_dep, index, index_name):
    join_list = parallel_join(worker_mailbox, df_ix, df_dep, index, index_name)
    filtered_list =  linear_filter('Licence Type','Unlicenced', join_list, df_dep, False, True)
    return aggregate_item_by_group(filtered_list.keys(), df_ix, 'Suburb', 'Total Cas')

def task_5():
    schedule = broad_cast_join_scheduler(crash_data_df, unit_data_df, 'REPORT_ID')
    kernel_results = [pool.apply_async(task_5_1_kernel, (schedule[1][i],crash_data_df, unit_data_df, schedule[0], 'REPORT_ID')) for i in range(__N_WORKERS__)]
    values = [res.get(timeout=10) for res in kernel_results]
    mailbox = range_partition_scheduler_post_group(values)
    kernel_results = [pool.apply_async(aggregate_grouped_values, ([mailbox[i]])) for i in range(__N_WORKERS__)]
    values = [res.get(timeout=10) for res in kernel_results]
    return display_aggregate(values,'Suburb', 'Number')

#None of the code above will work UNTIL THIS IS EXECUTED
if __name__ == '__main__':   
    #Create the pool now to not waste time re-creating the thread pool
    pool = Pool(processes=__N_WORKERS__)
    
%time task_5()

+---+---+
| GILLES PLAINS | 4 |
+---+---+
| ANDREWS FARM | 3 |
+---+---+
| BLAIR ATHOL | 1 |
+---+---+
| ETHELTON | 0 |
+---+---+
| ELIZABETH DOWNS | 4 |
+---+---+
| ELIZABETH SOUTH | 0 |
+---+---+
| FULHAM GARDENS | 0 |
+---+---+
| GAWLER | 0 |
+---+---+
| ELIZABETH EAST | 3 |
+---+---+
| DAVOREN PARK | 2 |
+---+---+
| BLAKEVIEW | 2 |
+---+---+
| GLENELG | 0 |
+---+---+
| ADELAIDE | 5 |
+---+---+
| ASCOT PARK | 1 |
+---+---+
| EVANSTON PARK | 1 |
+---+---+
| CAMPBELLTOWN | 0 |
+---+---+
| ELIZABETH PARK | 5 |
+---+---+
| FLINDERS PARK | 2 |
+---+---+
| GEPPS CROSS | 0 |
+---+---+
| GAWLER WEST | 3 |
+---+---+
| CRAIGMORE | 0 |
+---+---+
CPU times: user 1.44 s, sys: 221 ms, total: 1.66 s
Wall time: 2 s


In [29]:
pool.terminate() ## Terminate the pool to prevent leaks