# **Assignment 1:**

Monash ID:    **30155843**

Monash username: **mmah0021**



## <center>**FIT 5148- Distributed Databases and Big Data**<center>

### **Background:** 

Everyone who travels on the public transport network of Adelaide Metro has the right to experience an enjoyable journey, and it is essential that travel to others with consideration.South Australia's Department of Planning, Transport and Infrastructure (DPTI) is collecting information from multiple road crashes in an effort to improve road safety for further assessment.

Our goal here is to leverage the python multiprocessing framework so that information can be queried efficiently and quickly using separate methods and parallel algorithms to partition information.

Here we would like to use the parallel methods (parallel search, join, sort and group-by) that we have learned in this unit to perform various activities on the dataset.



## Preparation

Import relevant python libraries for use the parallel methods.
<br>
<br>

In [3]:
import pandas as pd
import numpy as np
import multiprocessing as mp
from multiprocessing import Pool
from datetime import datetime

#### Importing the .csv files into Jupyter notebook to prepare data for further analysis

In [4]:
crashDf = pd.read_csv('2018_DATA_SA_Crash.csv')
unitsDf = pd.read_csv('2018_DATA_SA_Units.csv')
crashReader = crashDf.values.tolist()
unitsReader = unitsDf.values.tolist()

## Task 1: Parallel Search

### 1.1 

In this task have chosen Linear Search with Round Robin Partitioning to show all the crash events in the suburb Adelaide in the month of January and display the Date of the Crash , Suburb Name , Postcode and Number of Casualties.

**Round robin partition**

Liner searched key = ['ADELAIDE','January']. (suburb, month)

In [6]:
# Round-robin data partitionining function
def rr_partition(data, n):
    """
    Perform data partitioning on data

    Arguments:
    data -- an input dataset which is a list
    n -- the number of processors

    Return:
    result -- the paritioned subsets of D
    """
    result = []
    for i in range(n):
        result.append([])
    
    # For each bin, perform the following
    for index, element in enumerate(data): 
        # Calculate the index of the bin that the current data point will be assigned
        index_bin = (int) (index % n)
        result[index_bin].append(element)
    
    return result

In [7]:
# Linear search function
def linear_search(data, key):
    """
    Perform linear search on data for the given key

    Arguments:
    data -- an input dataset which is a list or a numpy array
    key -- an query record

    Return:
    result -- the position of searched record
    """
    
    resultList = []
    
    for x in data:
        # check for key matching
        if x[2] == key[0] and x[11] == key[1]:
            result = {}
            result['Date'] = (str(x[10])+'-'+x[11]+'-'+x[12])
            result['Suburb Name'] = (x[2])
            result['Postcode'] = (x[3])
            result['Number of casualties'] = (x[6])
            resultList.append(result)
    
    return resultList

In [8]:
# Parallel searching algorithm for exact match
def parallel_search(data, query, n_processor):
    """
    Perform parallel search for exact match on data for the given key

    Arguments:
    data -- an input dataset which is a list
    query -- a query record
    n_processor -- the number of parallel processors
    m_partition -- a data partitioning method
    m_search -- a search method
    
    Return:
    results -- the matched record information
    """

    results = []

    # Pool: a Python method enabling parallel processing. 
    # We need to set the number of processes to n_processor, 
    # which means that the Pool class will only allow 'n_processor' processes 
    # running at the same time.
    pool = mp.Pool(processes=n_processor)

    # Perform data partitioning first
    DD = rr_partition(data, n_processor)      
    for d in DD: # Perform parallel search on all data partitions    
        result = pool.apply_async(linear_search, [d, query])
        output = result.get() # if you use pool.apply_sync(), uncomment this.
        results.append(output) # if you use pool.apply_sync(), uncomment this.   
    """ 
    The method 'pool.apply()' will lock the function call until the function call is finished. 
    The method 'pool.apply_sync()' will not lock the function call,the call results will return immediately instead 
    of waiting for the result, and each method call will be alloacted to a different process. 
    So in this case,pool.apply_async() is processing the search in parallel,
    while the pool.apply() is not. 
    The reason we can use pool.apply() to do search for range_partition and hash_partition data 
    is that as long as we know which partition to do search，we don't need to search in parallel.


    """

    return results


parallel_search(crashReader, ['ADELAIDE','January'], 3)

[[{'Date': '2018-January-Monday',
   'Suburb Name': 'ADELAIDE',
   'Postcode': 5000,
   'Number of casualties': 0},
  {'Date': '2018-January-Tuesday',
   'Suburb Name': 'ADELAIDE',
   'Postcode': 5000,
   'Number of casualties': 0},
  {'Date': '2018-January-Wednesday',
   'Suburb Name': 'ADELAIDE',
   'Postcode': 5000,
   'Number of casualties': 0},
  {'Date': '2018-January-Tuesday',
   'Suburb Name': 'ADELAIDE',
   'Postcode': 5000,
   'Number of casualties': 0},
  {'Date': '2018-January-Wednesday',
   'Suburb Name': 'ADELAIDE',
   'Postcode': 5000,
   'Number of casualties': 0},
  {'Date': '2018-January-Wednesday',
   'Suburb Name': 'ADELAIDE',
   'Postcode': 5000,
   'Number of casualties': 1},
  {'Date': '2018-January-Wednesday',
   'Suburb Name': 'ADELAIDE',
   'Postcode': 5000,
   'Number of casualties': 1},
  {'Date': '2018-January-Saturday',
   'Suburb Name': 'ADELAIDE',
   'Postcode': 5000,
   'Number of casualties': 0},
  {'Date': '2018-January-Sunday',
   'Suburb Name': 'ADE

### 1.2 

Range partitioned. range 1- [0,2] range 2 - [3,5] range 3-above 5
Binary search to find value 7. Checks in range 3. Sorts that dataset and gets lower bound of data records starting from 7 and upper bound. Returns that segment.

In [9]:
# Range data partitionining function (Need to modify as instructed above)
def range_partition_q1(data, range_indices):
    """
    Perform range data partitioning on data based on the join attribute

    Arguments:
    data -- an input dataset which is a list
    range_indices -- the index list of ranges to be s:plit

    Return:
    result -- the paritioned subsets of D
    """
    
    result = []
    
    # First, we sort the dataset according to 'total cas'  
    new_data = list(data)
    new_data.sort(key = lambda x: x[6])
    
    # Calculate the number of bins
    n_bin = len(range_indices) 

    # For each bin, perform the following
    for i in range(n_bin): 
        # Find elements to be belonging to each range - according to total cas
        s = [x for x in new_data if x[6] < range_indices[i]] 
        # Add the partitioned list to the result
        result.append(s) 
        # Find the last element in the previous partition
        last_element = s[len(s)-1]
        # Find the index of of the last element
        last = new_data.index(last_element)
        # Remove the partitioned list from the dataset
        new_data = new_data[int(last)+1:] 

        # Append the last remaining data list
    result.append([x for x in new_data if x[6] >= range_indices[n_bin-1]]) 
    
    return result

In [10]:
def fetch_lower_bound(data, low, high, key):
    low_index = -1
    middle = 0
    while low <= high:
        middle = int(low + (high - low + 1) / 2)
        midVal = data[middle]
  
        if midVal[6] < key:
            # if mid is less than key, all elements in range [low, mid] are also less 
            # so we now search in [mid + 1, high] 
            low = middle + 1
        elif midVal[6] > key:
            # if mid is greater than key, all elements in range [mid + 1, high] are also greater 
            # so we now search in [low, mid - 1] 
            high = middle - 1
        elif midVal[6] == key:
            # if mid is equal to key, we note down the last found index then we search for more in left side of mid 
            # so we now search in [low, mid - 1] 
            low_index = middle
            high = middle - 1
    return low_index 


def fetch_upper_bound(data, low, high, key):
    upper_index = -1; 
    middle = 0
    while low <= high:
        middle = int(low + (high - low + 1) / 2)
        midVal = data[middle]
  
        if midVal[6] < key:
            # if mid is less than key, then all elements in range [low, mid - 1] are also less 
            # so we now search in [mid + 1, high] 
            low = middle + 1
        elif midVal[6] > key:
            # if mid is greater than key, then all elements in range [mid + 1, high] are also greater so we now search in  
            # [low, mid - 1] 
            high = middle - 1
        elif midVal[6] == key:
            # if mid is equal to key, we note down the last found index then we search for more in right side
            # of mid so we now search in [mid + 1, high] 
            upper_index = middle 
            low = middle + 1
    return upper_index; 


# Binary search function
def binary_search(data, key):
    """
    Perform binary search on data for the given key

    Arguments:
    data -- an input dataset which is a list
    key -- an query record

    Return:
    result -- the position of searched record
    """
    lower = 0
    upper = len(data)-1
    
    # fetches lower index of the range
    lower_ind = fetch_lower_bound(data, lower, upper, key)
    # fetches upper index of the range
    upper_ind = fetch_upper_bound(data, lower, upper, key)
    
    # return the list elements from lower index till upper index
    resultList = data[lower_ind: upper_ind+1]
    
    return resultList

In [11]:
# Parallel searching algorithm for range selection
def parallel_search(data, query, n_processor):
    """
    Perform parallel search for range selection on data for the given key

    Arguments:
    data -- the input dataset which is a list
    query_range -- a query record in the form of a range (e.g. [30, 50])
    n_processor -- the number of parallel processors
    
    Return:
    results -- the matched record information
    """
    
    results = []

    pool = Pool(processes=n_processor)

    range_indices = [2, 5]   # ideally pass this into the function as a variable
    DD = range_partition_q1(data, range_indices)
    for index,element in enumerate(range_indices):
        if query < element:
            m = DD[index]
            break
        else:
            m = DD[-1]
    result = pool.apply(binary_search, [m, query])
    results.append(result)
    
    return results

parallel_search(crashReader, 7, 10)

[[['2018-222-15/08/2019',
   '3 Country',
   'MOUNT SCHANK',
   5291,
   'DISTRICT COUNCIL OF GRANT',
   2,
   7,
   0,
   4,
   3,
   2018,
   'January',
   'Tuesday',
   '05:30 pm',
   110,
   'Cross Road',
   'CURVED, VIEW OPEN',
   'Level',
   'Not Applicable',
   'Sealed',
   'Dry',
   'Not Raining',
   'Daylight',
   'Right Angle',
   2,
   'Driver Rider',
   '3: SI',
   'Give Way Sign',
   nan,
   nan,
   1504113.89,
   1329441.75,
   15041141329442.0],
  ['2018-6762-15/08/2019',
   '2 Metropolitan',
   'CRAIGMORE',
   5114,
   'CITY OF PLAYFORD.',
   2,
   7,
   0,
   0,
   7,
   2018,
   'July',
   'Tuesday',
   '03:30 pm',
   50,
   'Cross Road',
   'Straight road',
   'Slope',
   'Not Applicable',
   'Sealed',
   'Dry',
   'Not Raining',
   'Daylight',
   'Right Angle',
   1,
   'Driver Rider',
   '2: MI',
   'Roundabout',
   nan,
   nan,
   1339137.31,
   1695802.0,
   13391371695802.0]]]

In [12]:
crashDf[crashDf['Total Cas'] == 7]

Unnamed: 0,REPORT_ID,Stats Area,Suburb,Postcode,LGA Name,Total Units,Total Cas,Total Fats,Total SI,Total MI,...,Crash Type,Unit Resp,Entity Code,CSEF Severity,Traffic Ctrls,DUI Involved,Drugs Involved,ACCLOC_X,ACCLOC_Y,UNIQUE_LOC
221,2018-222-15/08/2019,3 Country,MOUNT SCHANK,5291,DISTRICT COUNCIL OF GRANT,2,7,0,4,3,...,Right Angle,2,Driver Rider,3: SI,Give Way Sign,,,1504113.89,1329441.75,15041140000000.0
6761,2018-6762-15/08/2019,2 Metropolitan,CRAIGMORE,5114,CITY OF PLAYFORD.,2,7,0,0,7,...,Right Angle,1,Driver Rider,2: MI,Roundabout,,,1339137.31,1695802.0,13391370000000.0


### 1.3


In this task I have chosen the Has Partitioning, Linear search and Parallel searching algorithm.
 
with hash partitioning, rows could be evenly distributed, apparently randomly across partitions. The advantage is that every single query will be broken down into parallel queries on all partitions with a lower information quantity for each partition; however, the findings will naturally have to be centrally combined.

A linear search sequentially checks each item in the list until it finds an item that matches the target value. Partitioning range maps data to partitions based on boundaries identified by the ranges of column values that can be set for each partition. Historical information management applications often find this method useful.

Linear Searched for 'Adelaide' in each hash dataset with key (total number of cas) greater than 3.

In [14]:
# Define a simple hash function.
def s_hash(x, n):
    """
    Define a simple hash function for demonstration

    Arguments:
    x -- an input record
    n -- the number of processors

    Return:
    result -- the hash value of x
    """
    result = x%n 

    return result

# Hash data partitionining function. 
# We will use the "s_hash" function defined above to realise this partitioning
def h_partition(data, n):
    """
    Perform hash data partitioning on data

    Arguments:
    data -- an input dataset which is a list
    n -- the number of processors

    Return:
    result -- the paritioned subsets of D
    """
    
    dic = {} # We will use a dictionary
    for x in data: # For each data record, perform the following
        h = s_hash(x[6], n) # Get the hash key of the input
        if (h in dic.keys()): # If the key exists
            s = dic[h]
            s.append(x)
            dic[h] = s # Add the new input to the value set of the key
        else: # If the key does not exist
            s = list() # Create an empty value set
            s.append(x)
            dic[h] = s # Add the value set to the key
    
    return dic

In [15]:
# Linear search function
def linear_search_suburb(data, key):
    """
    Perform linear search on data for the given key

    Arguments:
    data -- an input dataset which is a list or a numpy array
    key -- an query record

    Return:
    result -- the position of searched record
    """
    
    resultList = []
    
    for x in data:
        # Match suburb
        if x[2] == key:
            resultList.append(x)
    
    return resultList

In [16]:
# Parallel searching algorithm for range selection
def parallel_search_range(data, query_range, n_processor):
    """
    Perform parallel search for range selection on data for the given key

    Arguments:
    data -- the input dataset which is a list
    query_range -- a query record in the form of a range (e.g. [30, 50])
    n_processor -- the number of parallel processors
    
    Return:
    results -- the matched record information
    """
    
    results = []

    pool = Pool(processes=n_processor)

    # Perform data partitioning first
    DD = h_partition(data, n_processor) 
    
    for query in range(query_range[0], query_range[1], 1):
        # Each element in DD has a pair (hash key: records)
        query_hash = s_hash(query, n_processor)
        d = list(DD[query_hash])
        result = pool.apply(linear_search_suburb, [d, 'ADELAIDE'])
        results.append(result)

    return results

parallel_search_range(crashReader, [4, 8], 8)

[[['2018-601-15/08/2019',
   '1 City',
   'ADELAIDE',
   5000,
   'CITY OF ADELAIDE',
   8,
   4,
   0,
   2,
   2,
   2018,
   'January',
   'Sunday',
   '09:12 pm',
   50,
   'Not Divided',
   'Straight road',
   'Level',
   'Not Applicable',
   'Sealed',
   'Dry',
   'Not Raining',
   'Night',
   'Hit Pedestrian',
   1,
   'Driver Rider',
   '3: SI',
   'No Control',
   nan,
   nan,
   1329806.36,
   1670224.76,
   13298061670225.0]],
 [],
 [],
 []]

In [17]:
crashDf[((crashDf['Suburb'] == 'ADELAIDE') & (crashDf['Total Cas'] > 3))]

Unnamed: 0,REPORT_ID,Stats Area,Suburb,Postcode,LGA Name,Total Units,Total Cas,Total Fats,Total SI,Total MI,...,Crash Type,Unit Resp,Entity Code,CSEF Severity,Traffic Ctrls,DUI Involved,Drugs Involved,ACCLOC_X,ACCLOC_Y,UNIQUE_LOC
600,2018-601-15/08/2019,1 City,ADELAIDE,5000,CITY OF ADELAIDE,8,4,0,2,2,...,Hit Pedestrian,1,Driver Rider,3: SI,No Control,,,1329806.36,1670224.76,13298060000000.0


## Task 2 : Parallel Join

### 2.1

In this task first join the report-id attributes between two tables by nested-loop algorithm then Performed a divide and broadcast-based parallel join algorithms. The number of the sub-tables distributed equally to the processors. Then Applied a join on each processor and we assume that a shared-memory architecture has no replication of the broadcast table occurs.


In [18]:
def NL_join(T1, T2, option):
    """
    Perform the nested-loop join algorithm.
    The join attribute is the numeric attribute in the input tables T1 & T2

    Arguments:
    T1 & T2 -- Tables to be joined

    Return:
    result -- the joined table
    """
    results = []
    
    # For each record of T1
    for tr1 in T1:
        # For each record of T2
        for tr2 in T2:
            # option1 : Task2.1 
            if option == 1:
                # joins on REPORT_ID
                if (tr1[0] == tr2[0]):
                    # Store the joined records into the result list
                    result = {}
                    result['Date'] = (str(tr2[10])+'-'+tr2[11]+'-'+tr2[12])
                    result['Time'] = (tr2[13])
                    result['Suburb Name'] = (tr2[2])
                    result['Gender'] = (tr1[7])
                    result['Age'] = (tr1[8])
                    result['Number of Casualties'] = (tr1[2])
                    result['License Type'] = (tr1[11])
                    results.append(result)
            # option2 : Task2.2 
            elif option == 2:
                # joins on REPORT_ID and checks for suburb adelaide
                if (tr1[0] == tr2[0] and tr2[2] == "ADELAIDE"):
                    # Store the joined records into the result list
                    result = {}
                    result['Date'] = (str(tr2[10])+'-'+tr2[11]+'-'+tr2[12])
                    result['Time'] = (tr2[13])
                    result['Gender'] = (tr1[7])
                    result['Age'] = (tr1[8])
                    result['Number of Casualties'] = (tr1[2])
                    result['License Type'] = (tr1[11])
                    results.append(result)
            # option1 : Task5
            elif option == 3:
                # joins on REPORT_ID and checks for license type as unlicensed
                if (tr1[0] == tr2[0] and tr2[11] == "Unlicenced"):
                    # Store the joined records into the result list
                    result = []
                    result.append(tr1[2])
                    result.append(tr1[6])
                    results.append(result)

    return results

In [19]:
# Include this package for parallel processing
def DDP_join(T1, T2, n_processor):
    """
    Perform a divide and broadcast-based parallel join algorithms.
    The join attribute is the numeric attribute in the input tables T1 & T2

    Arguments:
    T1 & T2 -- Tables to be joined
    n_processor -- the number of parallel processors

    Return:
    result -- the joined table
    """
    
    results = []
    
    # Partition T1 into sub-tables using rr_partition().
    # The number of the sub-tables must be the equal to the n_processor
    T1_subsets = rr_partition(T1, n_processor)
    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)
    
    midResults = []
    
    for t1 in T1_subsets:
        # Apply a join on each processor
        
        # Note that as we assume a shared-memory architecture, no replication
        # of the broadcast table (in this case: table T2 (smaller table) occurs.
        output = pool.apply_async(NL_join, [t1, T2, 1])
        
        midResults.append(output)

    for result in midResults:
        results.append(result.get())

    return results[0][0:20]

DDP_join(unitsReader, crashReader, 10)

[{'Date': '2018-October-Wednesday',
  'Time': '11:20 am',
  'Suburb Name': 'MITCHELL PARK',
  'Gender': 'Male',
  'Age': '018',
  'Number of Casualties': 0,
  'License Type': 'Provisional 1 '},
 {'Date': '2018-January-Monday',
  'Time': '04:00 am',
  'Suburb Name': 'CROYDON',
  'Gender': 'Female',
  'Age': '030',
  'Number of Casualties': 1,
  'License Type': 'Full'},
 {'Date': '2018-January-Monday',
  'Time': '10:00 am',
  'Suburb Name': 'KENSINGTON',
  'Gender': 'Female',
  'Age': '058',
  'Number of Casualties': 0,
  'License Type': 'Provisional 2'},
 {'Date': '2018-January-Monday',
  'Time': '02:30 pm',
  'Suburb Name': 'ONE TREE HILL',
  'Gender': 'Male',
  'Age': '040',
  'Number of Casualties': 0,
  'License Type': 'Full'},
 {'Date': '2018-January-Monday',
  'Time': '04:10 pm',
  'Suburb Name': 'ADELAIDE',
  'Gender': nan,
  'Age': nan,
  'Number of Casualties': 0,
  'License Type': nan},
 {'Date': '2018-January-Monday',
  'Time': '05:20 pm',
  'Suburb Name': 'WESTBOURNE PARK',


### 2.2

Range data partitionining on data based on the join attribute function used for this task and ranges partition on unique key in REPORT_ID which is at index 5.


Ranges are [0-3], [3-6] and [6+]

Then performed a disjoint partitioning-based parallel join algorithm.
Partition two tables into sub-tables using range_partition().


In [20]:
items = crashDf['REPORT_ID'].tolist()
items = [item[5] for item in items]
set(items)

{'1', '2', '3', '4', '5', '6', '7', '8', '9'}

In [21]:
# Range data partitionining function (Need to modify as instructed above)
def range_partition_q2(data, range_indices):
    """
    Perform range data partitioning on data based on the join attribute

    Arguments:
    data -- an input dataset which is a list
    range_indices -- the index list of ranges to be s:plit

    Return:
    result -- the paritioned subsets of D
    """
    
    result = []
    
    # First, we sort the dataset according to unique key in REPORT_ID which is at index 5  
    new_data = list(data)
    new_data.sort(key = lambda x: x[0][5])
    
    # Calculate the number of bins
    n_bin = len(range_indices) 

    # For each bin, perform the following
    for i in range(n_bin): 
        # Find elements to be belonging to each range
        s = [x for x in new_data if x[2][0] < range_indices[i]] 
        # Add the partitioned list to the result
        result.append(s) 
        # Find the last element in the previous partition
        last_element = s[len(s)-1]
        # Find the index of of the last element
        last = new_data.index(last_element)
        # Remove the partitioned list from the dataset
        new_data = new_data[int(last)+1:] 

        # Append the last remaining data list
    result.append([x for x in new_data if x[2][0] >= range_indices[n_bin-1]]) 
    
    return result

In [22]:
# Include this package for parallel processing
import multiprocessing as mp

def DPBP_join(T1, T2, n_processor):
    """
    Perform a disjoint partitioning-based parallel join algorithm.
    The join attribute is the numeric attribute in the input tables T1 & T2

    Arguments:
    T1 & T2 -- Tables to be joined
    n_processor -- the number of parallel processors

    Return:
    result -- the joined table
    """
    
    results = []
    
    ### START CODE HERE ### 
    
    # Partition T1 & T2 into sub-tables using range_partition().
    # The number of the sub-tables must be the equal to the n_processor
    # Range is set from 0-3, 3-6, and 6 and beyond.
    T1_subsets = range_partition_q2(T1, [3, 6])
    T2_subsets = range_partition_q2(T2, [3, 6])
    
    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)
    
    midResults = []
    for i in range(len(T1_subsets)):
        # Apply a join on each processor
        output = pool.apply_async(NL_join, [T1_subsets[i], T2_subsets[i], 2])
       
        midResults.append(output)
        
    for result in midResults:
        results.append(result.get())

    ### END CODE HERE ###
    
    return results[0][0:20]

DDP_join(unitsReader, crashReader, 3)

[{'Date': '2018-October-Wednesday',
  'Time': '11:20 am',
  'Suburb Name': 'MITCHELL PARK',
  'Gender': 'Male',
  'Age': '018',
  'Number of Casualties': 0,
  'License Type': 'Provisional 1 '},
 {'Date': '2018-October-Wednesday',
  'Time': '11:20 am',
  'Suburb Name': 'MITCHELL PARK',
  'Gender': nan,
  'Age': nan,
  'Number of Casualties': 0,
  'License Type': nan},
 {'Date': '2018-January-Monday',
  'Time': '02:14 am',
  'Suburb Name': 'GOLDEN GROVE',
  'Gender': 'Male',
  'Age': '022',
  'Number of Casualties': 0,
  'License Type': 'Full'},
 {'Date': '2018-January-Monday',
  'Time': '02:26 am',
  'Suburb Name': 'ELIZABETH SOUTH',
  'Gender': 'Male',
  'Age': '034',
  'Number of Casualties': 2,
  'License Type': 'Full'},
 {'Date': '2018-January-Monday',
  'Time': '04:00 am',
  'Suburb Name': 'ELIZABETH VALE',
  'Gender': 'Unknown',
  'Age': 'XXX',
  'Number of Casualties': 0,
  'License Type': 'Unknown'},
 {'Date': '2018-January-Monday',
  'Time': '04:00 am',
  'Suburb Name': 'ELIZAB

## Task 3 : Parallel Sort

###  3.1


In this task first used Quicksort as an algorithm for dividing and conquering. It selects an element as the pivot and divides the specified set around the pivot. For second part k-way merging algorithm used which is specializing in various sorted lists and merging them into a single sorted list. In next step split the file into runs for the first time so that the run size is small enough to fit into the main memory. Then sort each run using the merge sorting algorithm in the main memory. The resulting runs finally merge into successively larger runs until the file is sorted by parallel merge-all sorting method. 


In [23]:
def qsort(arr, index): 

    """ 
    Quicksort a list
    
    Arguments:
    arr -- the input list to be sorted

    Return:
    result -- the sorted arr
    """
    if len(arr) <= 1:
        return arr
    else:
        #take the first element as the pivot
        pivot = arr[0]
        left_arr = [x for x in arr[1:] if x[index] < pivot[index]]
        right_arr = [x for x in arr[1:] if x[index] >= pivot[index]]
        value = qsort(left_arr, index) + [pivot] + qsort(right_arr, index)
        return value

In [24]:
# Let's first look at 'k-way merging algorithm' that will be used 
# to merge sub-record sets in our external sorting algorithm.
import sys

# Find the smallest record
def find_min_3_1(records):    
    """ 
    Find the smallest record
    
    Arguments:
    records -- the input record set

    Return:
    result -- the smallest record's index
    """
    m = records[0]
    index = 0
    for i in range(len(records)):
        if(records[i][6] < m[6]):  # Checking total cas
            index = i
            m = records[i]
    return index

def k_way_merge(record_sets):
    """ 
    K-way merging algorithm
    
    Arguments:
    record_sets -- the set of mulitple sorted sub-record sets

    Return:
    result -- the sorted and merged record set
    """
    
    # indexes will keep the indexes of sorted records in the given buffers
    indexes = []
    for x in record_sets:
        indexes.append(0) # initialisation with 0

    # final result will be stored in this variable
    result = []  
    #print(record_sets)
    while(True):
        merged_result = [] # the merging unit (i.e. # of the given buffers)
        
        # This loop gets the current position of every buffer
        for i in range(len(record_sets)):
            if(indexes[i] >= len(record_sets[i])):
                merged_result.append([sys.maxsize]*33)  # Making a list of max elements to check for end of list
            else:
                merged_result.append(record_sets[i][indexes[i]])  
        
        # find the smallest record 
        smallest = find_min_3_1(merged_result)
    
        # if we only have sys.maxsize on the tuple, we reached the end of every record set
        if(merged_result[smallest] == [sys.maxsize]*33): # Making a list of max elements to check for end of list
            break

        # This record is the next on the merged list
        result.append(record_sets[smallest][indexes[smallest]])
        indexes[smallest] +=1
   
    return result

In [25]:
def serial_sorting(dataset, buffer_size):
    """
    Perform a serial external sorting method based on sort-merge
    The buffer size determines the size of eac sub-record set

    Arguments:
    dataset -- the entire record set to be sorted
    buffer_size -- the buffer size determining the size of each sub-record set

    Return:
    result -- the sorted record set
    """
    
    if (buffer_size <= 2):
        print("Error: buffer size should be greater than 2")
        return
    
    result = []
    
    # --- Sort Phase ---
    sorted_set = []
    
    # Read buffer_size pages at a time into memory and
    # sort them, and write out a sub-record set (i.e. variable: subset)
    start_pos = 0
    N = len(dataset)
    while True:
        if ((N - start_pos) > buffer_size):
            # read B-records from the input, where B = buffer_size
            subset = dataset[start_pos:start_pos + buffer_size] 
            # sort the subset (using qucksort defined above)
            sorted_subset = qsort(subset, 6)  # Added index for quick sorting on
            sorted_set.append(sorted_subset)
            start_pos += buffer_size
        else:
            # read the last B-records from the input, where B is less than buffer_size
            subset = dataset[start_pos:] 
            # sort the subset (using qucksort defined above)
            sorted_subset = qsort(subset, 6)  # Added index for quick sorting on
            sorted_set.append(sorted_subset)
            break
    
    # --- Merge Phase ---
    merge_buffer_size = buffer_size - 1
    dataset = sorted_set
    while True:
        merged_set = []

        N = len(dataset)
        start_pos = 0
        while True:
            if ((N - start_pos) > merge_buffer_size): 
                # read C-record sets from the merged record sets, where C = merge_buffer_size
                subset = dataset[start_pos:start_pos + merge_buffer_size]
                merged_set.append(k_way_merge(subset)) # merge lists in subset
                start_pos += merge_buffer_size
            else:
                # read C-record sets from the merged sets, where C is less than merge_buffer_size
                subset = dataset[start_pos:]
                merged_set.append(k_way_merge(subset)) # merge lists in subset
                break

        dataset = merged_set
        if (len(dataset) <= 1): # if the size of merged record set is 1, then stop 
            result = merged_set
            break
    
    return result

In [26]:
# Include this package for parallel processing
import multiprocessing as mp

def parallel_merge_all_sorting(dataset, n_processor, buffer_size):
    """
    Perform a parallel merge-all sorting method

    Arguments:
    dataset -- entire record set to be sorted
    n_processor -- number of parallel processors
    buffer_size -- buffer size determining the size of each sub-record set

    Return:
    result -- the merged record set
    """
    if (buffer_size <= 2):
        print("Error: buffer size should be greater than 2")
        return
    
    result = []
    
    # Pre-requisite: Perform data partitioning using round-robin partitioning
    subsets = rr_partition(dataset, n_processor)
    
    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)

    # ----- Sort phase -----
    sorted_set = []
    for s in subsets:
        # call the serial_sorting method above
        sorted_set.append(*pool.apply_async(serial_sorting, [s, buffer_size]).get())
    pool.close()
    
    # ---- Final merge phase ----
    #print("sorted entire set:" + str(sorted_set))
    result = k_way_merge(sorted_set)
    
    return result[0:20]

final_result = parallel_merge_all_sorting(crashReader, 4, 5)
header_list = ["REPORT_ID","Stats Area","Suburb","Postcode","LGA Name","Total Units","Total Cas",
               "Total Fats","Total SI","Total MI","Year","Month","Day","Time","Area Speed","Position Type",
               "Horizontal Align","Vertical Align","Other Feat","Road Surface","Moisture Cond","Weather Cond",
               "DayNight","Crash Type","Unit Resp","Entity Code","CSEF Severity","Traffic Ctrls","DUI Involved",
               "Drugs Involved","ACCLOC_X","ACCLOC_Y","UNIQUE_LOC"]
displayDf = pd.DataFrame(final_result, columns = header_list)
displayDf.head(20)

Unnamed: 0,REPORT_ID,Stats Area,Suburb,Postcode,LGA Name,Total Units,Total Cas,Total Fats,Total SI,Total MI,...,Crash Type,Unit Resp,Entity Code,CSEF Severity,Traffic Ctrls,DUI Involved,Drugs Involved,ACCLOC_X,ACCLOC_Y,UNIQUE_LOC
0,2018-1-15/08/2019,2 Metropolitan,MITCHELL PARK,5043,CC MARION.,4,0,0,0,0,...,Right Angle,2,Driver Rider,1: PDO,No Control,,,1324362.05,1662130.48,13243620000000.0
1,2018-9-15/08/2019,2 Metropolitan,KENSINGTON,5068,"CC OF NORWOOD,PAYNEHAM & ST PETERS",3,0,0,0,0,...,Right Angle,1,Driver Rider,1: PDO,Traffic Signals,,,1332255.7,1671076.68,13322560000000.0
2,2018-13-15/08/2019,2 Metropolitan,ONE TREE HILL,5114,CITY OF PLAYFORD.,3,0,0,0,0,...,Hit Fixed Object,1,Driver Rider,1: PDO,No Control,,,1342295.01,1695080.42,13422950000000.0
3,2018-21-15/08/2019,2 Metropolitan,WESTBOURNE PARK,5041,CC MITCHAM.,3,0,0,0,0,...,Right Turn,1,Driver Rider,1: PDO,Traffic Signals,,,1327546.38,1664994.52,13275460000000.0
4,2018-25-15/08/2019,2 Metropolitan,WOODVILLE WEST,5011,CITY OF CHARLES STURT,3,0,0,0,0,...,Hit Parked Vehicle,2,Driver Rider,1: PDO,No Control,Y,,1322376.77,1674822.22,13223770000000.0
5,2018-33-15/08/2019,2 Metropolitan,SEATON,5023,CITY OF CHARLES STURT,2,0,0,0,0,...,Hit Fixed Object,1,Driver Rider,1: PDO,No Control,,,1321463.24,1674243.72,13214630000000.0
6,2018-45-15/08/2019,2 Metropolitan,BOLIVAR,5110,CITY OF SALISBURY,2,0,0,0,0,...,Side Swipe,2,Driver Rider,1: PDO,No Control,,,1328187.5,1689720.89,13281880000000.0
7,2018-49-15/08/2019,3 Country,PORT WAKEFIELD,5550,WAKEFIELD REGIONAL COUNCIL,2,0,0,0,0,...,Hit Fixed Object,1,Driver Rider,1: PDO,No Control,,,1294782.61,1748943.79,12947830000000.0
8,2018-69-15/08/2019,2 Metropolitan,PARA HILLS,5096,CITY OF SALISBURY,2,0,0,0,0,...,Hit Parked Vehicle,2,Driver Rider,1: PDO,No Control,,,1333560.24,1683844.15,13335600000000.0
9,2018-73-15/08/2019,2 Metropolitan,QUEENSTOWN,5014,CITY OF PORT ADELAIDE ENFIELD,3,0,0,0,0,...,Side Swipe,1,Driver Rider,1: PDO,No Control,,,1320600.17,1678608.97,13206000000000.0


### 3.2


used parallel binary merge sort method to sort the crsh data vase on date. first need to pre processing the data to generate a combined date field.

In [27]:
unitsDf['Veh Year'].unique()

array(['2006', nan, '2017', '2015', '2005', '2009', '2004', '2011',
       '2000', '2008', '2003', '2016', '1999', '2001', '2002', '2014',
       'XXXX', '2007', '1998', '1997', '1996', '2013', '2010', '1994',
       '1993', '1990', '2012', '1995', '1988', '1989', '1973', '1991',
       '1986', '1957', '1992', '1980', '1982', '1977', '1985', '1978',
       '1971', '1987', '1969', '1984', '1981', '1966', '1951', '2018',
       '1955', '1983', '1968', '1964', '1976', '1967', '1900', '1965',
       '1979', '1962', '1974', '1934'], dtype=object)

In [28]:
tempUnitsDf = unitsDf.copy()
tempUnitsDf = tempUnitsDf[(tempUnitsDf['Veh Year'] != 'XXXX')]
tempUnitsDf = tempUnitsDf.dropna(subset=['Veh Year'])

In [29]:
tempUnitsDf = tempUnitsDf.astype({"Veh Year": int})

In [30]:
tempUnitsDf['Veh Year'].unique()

array([2006, 2017, 2015, 2005, 2009, 2004, 2011, 2000, 2008, 2003, 2016,
       1999, 2001, 2002, 2014, 2007, 1998, 1997, 1996, 2013, 2010, 1994,
       1993, 1990, 2012, 1995, 1988, 1989, 1973, 1991, 1986, 1957, 1992,
       1980, 1982, 1977, 1985, 1978, 1971, 1987, 1969, 1984, 1981, 1966,
       1951, 2018, 1955, 1983, 1968, 1964, 1976, 1967, 1900, 1965, 1979,
       1962, 1974, 1934])

In [31]:
def qsort_q3(arr, index): 

    """ 
    Quicksort a list
    
    Arguments:
    arr -- the input list to be sorted

    Return:
    result -- the sorted arr
    """
    if len(arr) <= 1:
        return arr
    else:
        #take the first element as the pivot
        pivot = arr[0]
        left_arr = [x for x in arr[1:] if int(x[index]) < int(pivot[index])]
        right_arr = [x for x in arr[1:] if int(x[index]) >= int(pivot[index])]
        value = qsort_q3(left_arr, index) + [pivot] + qsort_q3(right_arr, index)
        return value

In [32]:
# Let's first look at 'k-way merging algorithm' that will be used 
# to merge sub-record sets in our external sorting algorithm.
import sys

# Find the smallest record
def find_min(records):    
    """ 
    Find the smallest record
    
    Arguments:
    records -- the input record set

    Return:
    result -- the smallest record's index
    """
    m = records[0]
    index = 0
    for i in range(len(records)):
        if(int(records[i][5]) < int(m[5])):  # Comparing for min veh year
            index = i
            m = records[i]
    return index

def k_way_merge_binary(record_sets):
    """ 
    K-way merging algorithm
    
    Arguments:
    record_sets -- the set of mulitple sorted sub-record sets

    Return:
    result -- the sorted and merged record set
    """
    
    # indexes will keep the indexes of sorted records in the given buffers
    indexes = []
    for x in record_sets:
        indexes.append(0) # initialisation with 0

    # final result will be stored in this variable
    result = []  
    #print(record_sets)
    while(True):
        merged_result = [] # the merging unit (i.e. # of the given buffers)
        
        # This loop gets the current position of every buffer
        for i in range(len(record_sets)):
            if(indexes[i] >= len(record_sets[i])):
                merged_result.append([2030]*33)  # Maximum year for finding last element
            else:
                merged_result.append(record_sets[i][indexes[i]])  
        
        # find the smallest record 
        smallest = find_min(merged_result)
    
        # if we only have sys.maxsize on the tuple, we reached the end of every record set
        if(merged_result[smallest] == [2030]*33):  # Maximum year for finding last element
            break

        # This record is the next on the merged list
        result.append(record_sets[smallest][indexes[smallest]])
        indexes[smallest] +=1
   
    return result

In [33]:
def serial_sorting_binary(dataset, buffer_size):
    """
    Perform a serial external sorting method based on sort-merge
    The buffer size determines the size of eac sub-record set

    Arguments:
    dataset -- the entire record set to be sorted
    buffer_size -- the buffer size determining the size of each sub-record set

    Return:
    result -- the sorted record set
    """
    
    if (buffer_size <= 2):
        print("Error: buffer size should be greater than 2")
        return
    
    result = []
    
    # --- Sort Phase ---
    sorted_set = []
    
    # Read buffer_size pages at a time into memory and
    # sort them, and write out a sub-record set (i.e. variable: subset)
    start_pos = 0
    N = len(dataset)
    while True:
        if ((N - start_pos) > buffer_size):
            # read B-records from the input, where B = buffer_size
            subset = dataset[start_pos:start_pos + buffer_size] 
            # sort the subset (using qucksort defined above)
            sorted_subset = qsort_q3(subset, 5)  # adding index for quick sorting - veh year
            sorted_set.append(sorted_subset)
            start_pos += buffer_size
        else:
            # read the last B-records from the input, where B is less than buffer_size
            subset = dataset[start_pos:] 
            # sort the subset (using qucksort defined above)
            sorted_subset = qsort_q3(subset, 5)  # adding index for quick sorting - veh year
            sorted_set.append(sorted_subset)
            break
    
    # --- Merge Phase ---
    merge_buffer_size = buffer_size - 1
    dataset = sorted_set
    while True:
        merged_set = []

        N = len(dataset)
        start_pos = 0
        while True:
            if ((N - start_pos) > merge_buffer_size): 
                # read C-record sets from the merged record sets, where C = merge_buffer_size
                subset = dataset[start_pos:start_pos + merge_buffer_size]
                merged_set.append(k_way_merge_binary(subset)) # merge lists in subset
                start_pos += merge_buffer_size
            else:
                # read C-record sets from the merged sets, where C is less than merge_buffer_size
                subset = dataset[start_pos:]
                merged_set.append(k_way_merge_binary(subset)) # merge lists in subset
                break

        dataset = merged_set
        if (len(dataset) <= 1): # if the size of merged record set is 1, then stop 
            result = merged_set
            break
    
    return result

In [34]:
def parallel_binary_merge_sorting(dataset, n_processor, buffer_size):
    """
    Perform a parallel binary-merge sorting method

    Arguments:
    dataset -- entire record set to be sorted
    n_processor -- number of parallel processors
    buffer_size -- buffer size determining the size of each sub-record set

    Return:
    result -- the merged record set
    """
    
    if (buffer_size <= 2):
        print("Error: buffer size should be greater than 2")
        return
    
    result = []

    # Pre-requisite: Perform data partitioning using round-robin partitioning
    subsets = rr_partition(dataset, n_processor)
    
    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)

    # ----- Sort phase -----
    sorted_set = []
    for s in subsets:
        # call the serial_sorting method above
        sorted_set.append(*pool.apply_async(serial_sorting_binary, [s, buffer_size]).get())
    pool.close()
    
    # ---- Final merge phase ----
    #print("sorted entire set:" + str(sorted_set))
    dataset = sorted_set
    while True:
        merged_set = []

        N = len(dataset)
        start_pos = 0
        pool = mp.Pool(processes = N//2)

        while True:
            if ((N - start_pos) > 2): 
                subset = dataset[start_pos:start_pos + 2]
                merged_set.append(pool.apply(k_way_merge_binary, [subset]))
                start_pos += 2
            else:
                subset = dataset[start_pos:]
                merged_set.append(pool.apply(k_way_merge_binary, [subset]))
                break
        
        pool.close()
        dataset = merged_set
        
        if (len(dataset) == 1): # if the size of merged record set is 1, then stop 
            result = merged_set
            break
    
    return result[0][0:30]


result = parallel_binary_merge_sorting(tempUnitsDf.values.tolist(), 4, 5)
for item in result:
    outputList = {}
    outputList['Report ID'] = item[0]
    outputList['Unit No'] = item[1]
    outputList['Vehicle Registration State'] = item[3]
    outputList['Vehicle Year'] = item[5]
    outputList['License State'] = item[9]
    print(outputList)

{'Report ID': '2018-12792-15/08/2019', 'Unit No': 1, 'Vehicle Registration State': 'SA', 'Vehicle Year': 1900, 'License State': 'SA'}
{'Report ID': '2018-6385-15/08/2019', 'Unit No': 1, 'Vehicle Registration State': 'SA', 'Vehicle Year': 1900, 'License State': 'SA'}
{'Report ID': '2018-9371-15/08/2019', 'Unit No': 1, 'Vehicle Registration State': 'SA', 'Vehicle Year': 1900, 'License State': 'UNKNOWN'}
{'Report ID': '2018-12484-15/08/2019', 'Unit No': 1, 'Vehicle Registration State': 'SA', 'Vehicle Year': 1934, 'License State': 'SA'}
{'Report ID': '2018-2351-15/08/2019', 'Unit No': 1, 'Vehicle Registration State': 'SA', 'Vehicle Year': 1951, 'License State': 'SA'}
{'Report ID': '2018-2981-15/08/2019', 'Unit No': 1, 'Vehicle Registration State': 'SA', 'Vehicle Year': 1955, 'License State': 'SA'}
{'Report ID': '2018-1583-15/08/2019', 'Unit No': 1, 'Vehicle Registration State': 'SA', 'Vehicle Year': 1957, 'License State': 'SA'}
{'Report ID': '2018-238-15/08/2019', 'Unit No': 1, 'Vehicle Re

## Task 4 : Parallel Group By:

### 4.1


The function of Pandas dataframe.groupby is used to divide the data into groups based on certain criteria. Pandas objects on any of their axes can be split. The abstract grouping definition is to provide group names with a mapping of labels, then performed a parallel merge_all groupby method to enabling parallel processing and aggregate record dictionary according to the group_by attribute.



In [35]:
crashDf['Crash Type'].unique()

array(['Right Angle', 'Rear End', 'Right Turn', 'Hit Fixed Object',
       'Other', 'Hit Animal', 'Hit Parked Vehicle', 'Roll Over',
       'Hit Pedestrian', 'Side Swipe', 'Left Road - Out of Control',
       'Head On', 'Hit Object on Road'], dtype=object)

In [36]:
# The first step in the merge-all groupby method
def local_groupby(dataset, key_index, value_index):
    """
    Perform a local groupby method

    Arguments:
    dataset -- entire record set to be merged

    Return:
    result -- the aggregated record set according to the group_by attribute index
    """

    dict = {}
    for index, record in enumerate(dataset):
        key = record[key_index]
        val = record[value_index]
        if key not in dict:
            dict[key] = 0
        dict[key] += val
    return dict

In [37]:
def parallel_merge_all_groupby(dataset):
    """
    Perform a parallel merge_all groupby method

    Arguments:
    dataset -- entire record set to be merged

    Return:
    result -- the aggregated record dictionary according to the group_by attribute index
    """
    
    result = {}

    # Define the number of parallel processors: the number of sub-datasets.
    n_processor = len(dataset)

    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)

    # ----- Local aggregation step -----
    # Implement here
    local_result = []
    for s in dataset:
        # call the local aggregation method
        local_result.append(pool.apply(local_groupby, [s, 23, 7]))
    pool.close()

    # ---- Global aggregation step ----
    # Let's assume that the global operator is sum.
    # Implement here
    for r in local_result:
        for key, val in r.items():
            if key not in result:
                result[key] = 0
            result[key] += val    
    
    return result

# Partitioning the data - initial placement
partitioned_data = rr_partition(crashReader, 5)
parallel_merge_all_groupby(partitioned_data)

{'Right Angle': 5,
 'Hit Fixed Object': 25,
 'Rear End': 3,
 'Right Turn': 4,
 'Hit Parked Vehicle': 3,
 'Hit Animal': 2,
 'Roll Over': 11,
 'Side Swipe': 2,
 'Hit Pedestrian': 6,
 'Head On': 18,
 'Left Road - Out of Control': 1,
 'Other': 0,
 'Hit Object on Road': 0}

In [38]:
crashDf.groupby('Crash Type').sum()

Unnamed: 0_level_0,Postcode,Total Units,Total Cas,Total Fats,Total SI,Total MI,Year,Area Speed,Unit Resp,ACCLOC_X,ACCLOC_Y,UNIQUE_LOC
Crash Type,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
Head On,1347970,566,234,18,54,162,524680,18940,408,348526200.0,434505000.0,3485263000000000.0
Hit Animal,983765,386,44,2,8,34,375348,17970,338,246183500.0,324347800.0,2461836000000000.0
Hit Fixed Object,11537193,4806,846,25,136,685,4502158,156090,2551,2978656000.0,3741736000.0,2.978657e+16
Hit Object on Road,206445,92,9,0,2,7,80720,2950,48,53517120.0,66791760.0,535171200000000.0
Hit Parked Vehicle,5145539,2239,224,3,19,202,2038180,53635,1454,1344339000.0,1682914000.0,1.344339e+16
Hit Pedestrian,2139637,886,298,6,59,233,845542,22915,679,557496700.0,701044200.0,5574968000000000.0
Left Road - Out of Control,685921,130,52,1,5,46,260322,10875,201,168997800.0,223683700.0,1689979000000000.0
Other,455249,181,26,0,4,22,177584,6750,197,116484100.0,148580000.0,1164841000000000.0
Rear End,18218205,8425,1680,3,44,1633,7216368,229045,5534,4756208000.0,5984002000.0,4.756209e+16
Right Angle,12941984,5271,1266,5,80,1181,5103522,148865,3840,3367754000.0,4230559000.0,3.367754e+16


### 4.2

One of the primary benefits of Spectral clustering is its flexibility to integrate similarity function groups with an benefit over a traditional k-means algorithm also when the amount of organizations is low, the two-phase technique operates well, while the redistribution technique works well when the amount of organizations is big.

In [39]:
# Range data partitionining function (Need to modify as instructed above)
def range_partition_q4(data, range_indices):
    """
    Perform range data partitioning on data based on the join attribute

    Arguments:
    data -- an input dataset which is a list
    range_indices -- the index list of ranges to be s:plit

    Return:
    result -- the paritioned subsets of D
    """
    result = []
    
    # First, we sort the dataset according to unique key in REPORT_ID which is at index 5  
    new_data = [ [k,v] for k, v in data.items() ]
    new_data.sort(key = lambda x: x[0])
    
    # Calculate the number of bins
    n_bin = len(range_indices) 

    # For each bin, perform the following
    for i in range(n_bin): 
        # Find elements to be belonging to each range
        s = [x for x in new_data if x[0] < range_indices[i]] 
        # Add the partitioned list to the result
        result.append(s) 
        # Find the last element in the previous partition
        last_element = s[len(s)-1]
        # Find the index of of the last element
        last = new_data.index(last_element)
        # Remove the partitioned list from the dataset
        new_data = new_data[int(last)+1:] 

        # Append the last remaining data list
    result.append([x for x in new_data if x[0] >= range_indices[n_bin-1]]) 
    
    return result

In [40]:
def two_phase_merge_all_groupby(dataset):
    """
    Perform a parallel merge_all groupby method

    Arguments:
    dataset -- entire record set to be merged

    Return:
    result -- the aggregated record dictionary according to the group_by attribute index
    """
    
    result = {}

    # Define the number of parallel processors: the number of sub-datasets.
    n_processor = len(dataset)

    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)

    # ----- Local aggregation step -----
    # Implement here
    local_result = []
    for s in dataset:
        # call the local aggregation method
        local_result.append(pool.apply(local_groupby, [s, 2, 6]))
    pool.close()
    
    # Result list for adding range redistribution results
    redistribution_results = [ [] for i in range(len(local_result))]
    for item in local_result:
        range_data = range_partition_q4(item, ['H','M'])  # redistribute b
        for i in range(len(redistribution_results)):
            redistribution_results[i].extend(range_data[i])
        
    # ---- Global aggregation step ----
    # Let's assume that the global operator is sum.
    # Implement here
    for r in redistribution_results:
        for item in r:
            key = item[0]
            val = item[1]
            if key not in result:
                result[key] = 0
            result[key] += val    
    
    return result

# Partitioning the data - initial placement
partitioned_data = rr_partition(crashReader, 3)
final_results = two_phase_merge_all_groupby(partitioned_data)
for item in sorted(final_results.keys()):
    print(item,':', final_results[item])

ABERFOYLE PARK : 6
ABMINGA STATION : 0
ADELAIDE : 211
ADELAIDE AIRPORT : 6
AGERY : 1
ALBERT PARK : 10
ALBERTON : 9
ALDGATE : 7
ALDINGA : 10
ALDINGA BEACH : 12
ALFORD : 1
ALLENBY GARDENS : 2
ALMA : 1
ANANGU PITJANTJATJARA YANKUNYTJATJARA : 3
ANDREWS FARM : 12
ANGAS PLAINS : 0
ANGASTON : 9
ANGLE PARK : 2
ANGLE VALE : 7
ANNA CREEK : 2
ANNADALE : 5
APAMURRA : 1
ARCOONA : 2
ARDROSSAN : 0
ARMAGH : 3
ARTHURTON : 1
ASCOT PARK : 6
ASHBOURNE : 3
ASHFORD : 2
ASHTON : 1
ATHELSTONE : 10
ATHOL PARK : 5
AUSTRALIA PLAINS : 1
AVENUE RANGE : 1
BALAKLAVA : 3
BALD HILLS : 0
BALGOWAN : 1
BALHANNAH : 3
BANGHAM : 0
BANGOR : 6
BANKSIA PARK : 2
BARMERA : 7
BARNA : 0
BARNDIOOTA : 1
BAROOTA : 1
BASKET RANGE : 2
BEACHPORT : 0
BEAUFORT : 2
BEAUMONT : 0
BEDFORD PARK : 19
BEETALOO VALLEY : 4
BELAIR : 14
BELALIE EAST : 4
BELALIE NORTH : 0
BELLEVUE HEIGHTS : 2
BELTANA : 0
BERRI : 5
BETHEL : 2
BEULAH PARK : 2
BEVERLEY : 10
BIBARINGA : 3
BIGGS FLAT : 0
BIRDWOOD : 8
BIRKENHEAD : 9
BLACK FOREST : 3
BLACK POINT : 0
BLACK S

WANBI : 0
WANDILO : 3
WANGARY : 1
WANGOLINA : 0
WANILLA : 0
WARD BELT : 4
WARD HILL : 0
WARNERTOWN : 1
WAROOKA : 0
WARRADALE : 22
WASLEYS : 4
WATERFALL GULLY : 1
WATERLOO : 0
WATERLOO CORNER : 23
WATERVALE : 1
WATTLE FLAT : 9
WATTLE PARK : 5
WATTLE RANGE : 2
WATTLE RANGE EAST : 1
WAYVILLE : 20
WEETULTA : 0
WELBOURN HILL : 1
WELLAND : 5
WELLINGTON : 1
WELLINGTON EAST : 0
WEST BEACH : 13
WEST CROYDON : 5
WEST HINDMARSH : 11
WEST LAKES : 15
WEST LAKES SHORE : 4
WEST RANGE : 0
WEST RICHMOND : 7
WESTALL : 0
WESTBOURNE PARK : 13
WESTERN FLAT : 1
WESTLAKES : 1
WHITE HILL : 0
WHITE SANDS : 0
WHITES FLAT : 0
WHITES VALLEY : 2
WHYALLA : 1
WHYALLA BARSON : 2
WHYALLA NORRIE : 7
WHYALLA PLAYFORD : 9
WHYALLA STUART : 2
WHYTE YARCOWIE : 1
WIGLEY FLAT : 2
WILD DOG VALLEY : 0
WILD HORSE PLAINS : 0
WILGENA : 0
WILLALO : 0
WILLAMULKA : 0
WILLASTON : 8
WILLIAMSTOWN : 14
WILLOW CREEK : 1
WILLOWIE : 0
WILLUNGA : 7
WILLUNGA HILL : 4
WILLUNGA SOUTH : 3
WILMINGTON : 4
WINDSOR : 3
WINDSOR GARDENS : 25
WINGFIELD

In [41]:
crashDf.groupby('Suburb').sum()

Unnamed: 0_level_0,Postcode,Total Units,Total Cas,Total Fats,Total SI,Total MI,Year,Area Speed,Unit Resp,ACCLOC_X,ACCLOC_Y,UNIQUE_LOC
Suburb,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
ABERFOYLE PARK,77385,33,6,0,1,5,30270,735,26,1.991382e+07,2.481991e+07,1.991382e+14
ABMINGA STATION,5440,1,0,0,0,0,2018,110,1,1.502159e+06,1.957757e+06,1.502159e+13
ADELAIDE,2805000,1193,211,2,19,190,1132098,28825,883,7.452455e+08,9.371572e+08,7.452456e+15
ADELAIDE AIRPORT,113050,44,6,0,4,2,38342,860,33,2.512545e+07,3.172131e+07,2.512545e+14
AGERY,11116,3,1,0,0,1,4036,200,2,2.493580e+06,3.517071e+06,2.493580e+13
ALBERT PARK,75210,30,10,0,1,9,30270,880,19,1.982621e+07,2.514387e+07,1.982621e+14
ALBERTON,85238,35,9,0,4,5,34306,970,30,2.245829e+07,2.852737e+07,2.245829e+14
ALDGATE,61848,25,7,0,0,7,24216,700,24,1.609068e+07,1.992755e+07,1.609068e+14
ALDINGA,165536,70,10,0,2,8,64576,2350,53,4.213633e+07,5.226964e+07,4.213633e+14
ALDINGA BEACH,144844,61,12,0,2,10,56504,1455,38,3.679309e+07,4.567852e+07,3.679309e+14


## Task 5 : Parallel Group By Join

### 5.1 

Please use an appropriate data partitioning,
group by and join techniques and also briefly explain why you have
chosen these.

Normal join on REPORT_ID and license type = Unlicensed
Paralled Groupby all

In [42]:
unitsDf['Licence Type'].unique()

array(['Provisional 1 ', 'Full', nan, 'Provisional 2', 'Unknown',
       'Learners', 'Unlicenced', 'Probationary', 'Disqualified'],
      dtype=object)

In [43]:
def DDP_join(T1, T2, n_processor):
    """
    Perform a divide and broadcast-based parallel join algorithms.
    The join attribute is the numeric attribute in the input tables T1 & T2

    Arguments:
    T1 & T2 -- Tables to be joined
    n_processor -- the number of parallel processors

    Return:
    result -- the joined table
    """
    
    results = []
    
    # Partition T1 into sub-tables using rr_partition().
    # The number of the sub-tables must be the equal to the n_processor
    T1_subsets = rr_partition(T1, n_processor)
    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)
    
    midResults = []
    
    for t1 in T1_subsets:
        # Apply a join on each processor
        
        # Note that as we assume a shared-memory architecture, no replication
        # of the broadcast table (in this case: table T2 (smaller table) occurs.
        output = pool.apply_async(NL_join, [t1, T2, 3])
        
        midResults.append(output)

    for result in midResults:
        results.append(result.get())

    return results

In [44]:
def group_by_join(dataset1, dataset2):
    """
    Perform a parallel merge_all groupby method

    Arguments:
    dataset -- entire record set to be merged

    Return:
    result -- the aggregated record dictionary according to the group_by attribute index
    """
    
    result = {}

    # Define the number of parallel processors: the number of sub-datasets.
    
    join_results = DDP_join(dataset1, dataset2, 10)
    
    n_processor = len(join_results)

    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)

    # ----- Local aggregation step -----
    # Implement here
    local_result = []
    for s in join_results:
        # call the local aggregation method
        local_result.append(pool.apply(local_groupby, [s, 0, 1]))
    pool.close()

    # ---- Global aggregation step ----
    # Let's assume that the global operator is sum.
    # Implement here
    for r in local_result:
        for key, val in r.items():
            if key not in result:
                result[key] = 0
            result[key] += val    
    
    return result

final_results = group_by_join(crashReader, unitsReader)
for item in sorted(final_results.keys()):
    print(item,':', final_results[item])

ADELAIDE : 5
ALBERT PARK : 0
ALDINGA : 0
ANDREWS FARM : 3
ASCOT PARK : 1
BEDFORD PARK : 1
BELAIR : 1
BEULAH PARK : 2
BIRDWOOD : 0
BIRKENHEAD : 0
BLAIR ATHOL : 1
BLAKEVIEW : 2
BOLIVAR : 2
BRAHMA LODGE : 1
BURRA : 2
BURTON : 2
CAMPBELLTOWN : 0
CHELTENHAM : 1
CHRISTIE DOWNS : 1
CHRISTIES BEACH : 0
CLARE : 0
CLAY WELLS : 1
COLLINSWOOD : 0
COOBER PEDY : 1
COONDAMBO : 0
COWANDILLA : 0
CRAIGMORE : 0
CUDLEE CREEK : 0
CURRENCY CREEK : 4
DAVOREN PARK : 2
DAWESLEY : 5
DRY CREEK : 3
EDWARDSTOWN : 1
ELIZABETH : 0
ELIZABETH DOWNS : 4
ELIZABETH EAST : 3
ELIZABETH GROVE : 1
ELIZABETH NORTH : 0
ELIZABETH PARK : 5
ELIZABETH SOUTH : 0
ELIZABETH VALE : 1
ENFIELD : 1
ETHELTON : 0
EVANSTON PARK : 1
EYRE : 4
FAIRVIEW PARK : 0
FINDON : 1
FLINDERS PARK : 2
FORESTVILLE : 0
FULHAM : 0
FULHAM GARDENS : 0
GAWLER : 0
GAWLER EAST : 0
GAWLER WEST : 3
GEPPS CROSS : 0
GILLES PLAINS : 4
GLANVILLE : 1
GLENELG : 0
GLENSIDE : 0
GLOBE DERBY PARK : 1
GOMERSAL : 0
GREENHILL : 0
GREENWITH : 1
HACKHAM : 2
HACKNEY : 0
HAINES : 2