# FIT5148 - Big data management and processing

# Assignment 1 - Solution Workbook


**Instructions:**
- You will be using Python 3.
- Read the assignment instruction carefully and implement the algorithms in this workbook. 
- You can introduce new cells as necessary.

**Your details**
- Name: Sadia Karim
- Student ID: 23386320

Let's get started!

In [1]:
# Import required modules
import csv
import sys
import pprint
import geohash2
import multiprocessing as mp

from datetime import datetime 
from multiprocessing import Pool # For multiprocessing

In [2]:
### Task 0: Reading the CSV files ###

# Create empty lists
climate_historic = []
fire_historic = []
fire_historic_copy = []

# Open the climate_historic CSV file
with open('climate_historic.csv', mode='r', encoding='utf-8-sig') as climate_historic_file: 
    
  # Read the climate_historic CSV file, append each row to the array
    for row in csv.reader(climate_historic_file):
        climate_historic.append(row) 
    
# Open the fire_historic CSV file 
with open('fire_historic.csv', mode='r', encoding='utf-8-sig') as fire_historic_file: 
    
  # Read the fire_historic CSV file, append each row to the array
    for row in csv.reader(fire_historic_file):
        fire_historic.append(row)
        

In [3]:
################################################ FUNCTIONS ################################################

In [4]:
# Round-robin data partitionining function
def rr_partition(data, n):
    
    # Create n number of bins
    result = []
    
    # Create empty lists based on the number of n
    for i in range(n):
        result.append([])
   
    # For each bin, perform the following
    for index, element in enumerate(data): 
        
        # Calculate the index of the bin that the current data point will be assigned
        index_bin = (int) (index % n)
        
        # Append the data point to the correct bin index
        result[index_bin].append(element)
    
    return result

In [5]:
# Date sorting function - the two files have different types of date data, so we need to handle both
def date_sort_function(data, column, file_type):

    input_data = list(data)
    if file_type == "fire":
        
        # Convert fire date string to datetime object to enable sorting on this value
        date_sorted = sorted(input_data, key=lambda x: datetime.strptime(x[column], '%Y-%m-%dT%H:%M:%S'))
    
    # Convert climate date string to datetime object to enable sorting on this value
    elif file_type == "climate":
        date_sorted = sorted(input_data, key=lambda x: datetime.strptime(x[column], '%Y-%m-%d'))

    return date_sorted


In [6]:
# Float sorting function
def float_sort_function(data, column):

    input_data = list(data)

    # Skipping the first line in the file which is a heading, sort the data based on an int value
    float_data_sorted = sorted(input_data[1:], key=lambda x: int(x[column]))

    return float_data_sorted


In [7]:
def str_range_partition(data, column, range_indices):
    result = []

    # Calculate the number of bins
    n_bin = len(range_indices)
    
    # For each bin, perform the following
    for i in range(n_bin): 

        # Find elements to be belonging to each range
        s = [x for x in data if x[column] < str(range_indices[i])] 

        # Add the partitioned list to the result
        result.append(s) 
        
        # Find the last element in the previous partition
        last_element = s[len(s)-1]

        # Find the index of of the last element
        last = data.index(last_element)

        # Remove the partitioned list from the dataset
        data = data[int(last)+1:] 

    # Append the last remaining data list
    result.append([x for x in data if x[column] >= str(range_indices[n_bin-1])])

    return result

In [8]:
def number_range_partition(data, column, range_indices):
    result = []

    # Calculate the number of bins
    n_bin = len(range_indices)
    
    # For each bin, perform the following
    for i in range(n_bin): 

        # Find elements to be belonging to each range
        s = [x for x in data if x[column] < int(range_indices[i])] 

        # Add the partitioned list to the result
        result.append(s) 
        
        # Find the last element in the previous partition
        last_element = s[len(s)-1]

        # Find the index of of the last element
        last = data.index(last_element)

        # Remove the partitioned list from the dataset
        data = data[int(last)+1:] 

    # Append the last remaining data list
    result.append([x for x in data if x[column] >= int(range_indices[n_bin-1])])

    return result

In [9]:
def get_range_limits(range_indices):
    
    # If there are 2 range indices
    if len(range_indices) == 2:
        
        # Assign first index as the lower limit
        lower_limit = range_indices[0]
        
        # Assign second index as upper_limit1
        upper_limit1 = range_indices[1]
        
        return lower_limit, upper_limit1
    
    
    else:
        # Since there can only be a maximum of 3 indices:
        
        # Assign first index as the lower limit
        lower_limit = range_indices[0]

        # Assign second index as the upper_limit1
        upper_limit1 = range_indices[1]
        
        # Assign third index as the upper_limit2
        upper_limit2 = range_indices[2]
        
        return lower_limit, upper_limit1, upper_limit2



In [10]:
# This function is to find which partition the query is in 
def bin_search(query, lower_limit, upper_limit, range_partitions):
    query_location = []

    # If query is less than the lower limit
    if query < lower_limit:
        
        # Location of query is in first partition
        query_location = range_partitions[0]

    # If query is greater than or equal to lower limit, but less than upper_limit
    elif lower_limit <= query < upper_limit:
        
        # Location of query is in second partition
        query_location = range_partitions[1]

    # Else query is greater than or equal to upper_limit
    else:
        
        # Location of query is in third partition
        query_location = range_partitions[2]

    return query_location


In [11]:
# Linear Search function
def linear_search(data, key):

    matched_record = None
    position = -1 # not found position

    for x in data:
        # If x is matched with key
        if x == key: 
            matched_record = x
            
            # Get the index of x
            position = data.index(x) 
            break
    
    return position, matched_record

In [12]:
# # Binary Search function returning first and last instances of key

## Pass only one column of data in
def binary_search_first_instance(data, key):
    start = 0
    end = len(data) - 1
    results = []

    # While loop to go through the list 
    while start <= end:
        # mid value is middle of list
        mid = (start+end) // 2

        # If value in middle of list is less than query_value, take the right half of the list
        if data[mid] < key:
            # New start value is entry after mid 
            start = mid + 1
       
        # If value in middle of list is more than query_value, take the left of the list
        elif data[mid] > key:
            # New end value is entry before mid
            end = mid - 1


        else:
            # Check if mid is the beginning of the list
            if mid - 1 < 0:
                # Return mid if it is the beginning of the list
                results = mid
                return results
            
            # Check the value to the left of mid to find the first instance of key
            # If it does not equal key, then current mid is the first instance 
            if data[mid - 1] != key:
                results = mid
                return results
            
            # Else ignore all values to the right and assign a new end point for list 
            end = mid - 1
            
    return results


# ################################################################            

## Pass only one column of data in
def binary_search_last_instance(data, key):

    start = 0 
    end = len(data) - 1 
    results = []
    
    # While loop to go through the list     
    while start <= end:
        # mid value is middle of list
        mid = (start+end) // 2 
        
        # If value in middle of list is less than query_value, take the right half of the list
        if data[mid] < key:
            
            # New end value is entry before mid
            start = mid + 1
            
        # If value in middle of list is more than query_value, take the left of the list
        elif data[mid] > key:
            # New end value is entry before mid
            end = mid - 1

            
        else:
            # Check if mid is the end of the list
            if mid + 1 > len(data) - 1:
                
                # Return mid if it is the beginning of the list
                results = mid
                return results
            
            # Check the value to the right of mid to find the last instance of key
            # If it does not equal key, then current mid is the last instance 
            if data[mid + 1] != key:
                results = mid
                return results
            
            # Else ignore all values to the lest and assign a new start point for list 
            start = mid + 1
            
    return results

# ################################################################   

def binary_search_all(query, key):
    results = []
    first_instance = 0
    last_instance = 0
    
    # Find first instance of query
    first_instance = binary_search_first_instance(query, key)
    
    # Find last instance of query
    last_instance = binary_search_last_instance(query, key)
    
    # Loop through the two instances to get all instances of the query
    while first_instance <=  last_instance:
        results.append(first_instance)
        first_instance = first_instance + 1
        
    return results


In [13]:
def SM_join(s_T1, T1_index, s_T2, T2_index, data_type, conf_check, range_min, range_max, val_1, val_2, val_3, val_4):
#  Input s_T1 is a sorted list, s_T2 is a sorted list

    result = []

    i = j = 0
    while True:
        r = s_T1[i][T1_index]
        
        # If data type is date, extract the date portion of the datetime string
        if data_type == "date_str":
            r = r[:10]
            
        s = s_T2[j][T2_index]

        # If join attribute s_T1(i) < join attribute s_T2(i)
        if r < s:
            # Iterate to next entry in s_T1
            i = i + 1

        # If join attribute s_T1(1) > join attribute s_T2(1)
        elif r > s:
            # Iterate to next entry in s_T2
            j = j + 1


        # If values are a match
        elif r == s :
            # If  we are doing a confidence check, check if confidence value is between 90 - 100 inclusive
                if conf_check == "conf_check_yes":
                    conf_val = s_T1[i][4]
                
                    # Convert conf_val to a float and append if it is within the required range
                    if float(conf_val) in range(range_min,range_max):
                        result.append([s_T1[i][val_1], s_T2[j][val_2], s_T1[i][val_3], s_T1[i][val_4]]) 
                        
                    i = i + 1
                    
                elif conf_check == "conf_check_no":
                    # No confidence check, append values that match to result[]
                    result.append([s_T1[i][val_1], s_T2[j][val_2], s_T1[i][val_3], s_T1[i][val_4]])
                    
                    i = i + 1
                    
                # Since there are repeated dates in table 1 (fire_partitions), we want all of these values. 
                # Only when the date in table 1 becomes greater than table 2, then we iterate 
                # to the next value in table 2.
                if r > s:
                    j = j + 1

        # if either s_T1(i) or s_T2(j) is EOF Then break
        if (i == len(s_T1)) or (j == len(s_T2)):
            break

    return result


In [14]:
def qsort(arr, index): 

    # If length of array is 1, return the array
    if len(arr) <= 1:
        return arr
    
    else:
        # take the first element as the pivot. This will only work for numerical values
        pivot = float(arr[0][index])
        
        # If the value at index in entry is less than the pivot, add entry to left array
        left_arr = [x for x in arr[1:] if float(x[index]) < pivot]
        
        # If the value at index in entry is greater than or equal to the pivot, add entry to right array
        right_arr = [x for x in arr[1:] if float(x[index]) >= pivot]
        
        # Continue to sort the arrays 
        value = qsort(left_arr, index) + [arr[0]] + qsort(right_arr, index)

        return value
            

In [15]:
# Find the smallest record
def find_min(records):
    
    # Find location of minimum record, starting from first record element
    m = records[0]
    index = 0
    for i in range(len(records)):
        if (records[i] < m):
            index = i
            m = records[i]
    return index


def k_way_merge(record_sets, sort_index):

    # indexes will keep the indexes of sorted records in the given buffers
    indexes = []
    for x in record_sets:
        indexes.append(0)  # initialisation with 0

    # final result will be stored in this variable
    result = []

    # the merging unit (i.e. # of the given buffers)
    tuple = []

    while (True):
        # initialise tuple
        tuple = []  

        # This loop gets the current position of every buffer
        for i in range(len(record_sets)):
            if (indexes[i] >= len(record_sets[i])):
                tuple.append(sys.maxsize)
                
            else:
                tuple.append(float(record_sets[i][indexes[i]][sort_index]))

        # find the smallest record
        smallest = find_min(tuple)

        # if we only have sys.maxsize on the tuple, we reached the end of every record set
        if (tuple[smallest] == sys.maxsize):
            break

        # This record is the next on the merged list
        result.append(record_sets[smallest][indexes[smallest]])
        indexes[smallest] += 1

    return result


In [16]:
def binary_merge_sort(sorted_set, index_val):

    j = 0

    while True:
        
        # if length of array is 1, we have merged all the sublists
        if len(sorted_set) <= 1:
            return sorted_set
        
        result = [] # clearing the result array
        
        for j in range(0, len(sorted_set) - 1, 2): # start, end, step
            buffers = []  
            i = 0
            
            while i < 2:

                # Get next 2 lists in sorted_list 
                buffers.append(sorted_set[j + i])
                i += 1

            # Merge the two lists using k-way merge
            merged = k_way_merge(buffers, index_val)
            
            # Append to result
            result.append(merged)

        # Check if odd number of lists
        mod_1 = len(sorted_set) % 2
        
        # If odd number of lists, we get the last list and k_way_merge it to the second last list
        if mod_1 != 0:
            
            # Create new sublist and perform  k-way-merge
            new_list = sorted_set[-1: -2 :-1]
            merged = k_way_merge(new_list, index_val)
            
            # Reset sorted_set to result[]
            sorted_set = result

            # Append the newly merged list to result
            result.append(merged)
            
        # If even number of lists, run through loop again
        else:
            sorted_set = result
            

In [17]:
# Input data is a list of dictionaries
def redistribution(input_data, date_range):
    
    new_dict = {}
    start_date = date_range[0]
    end_date = date_range[1]
    
    # Loop through the dictionaries inside the list
    for i in input_data:
        for key in i.keys():
            
            # Get the year value
            date_year = int(key[:4])
            
            # If the year value is less than first date range, date_key is 1
            if date_year <= date_range[0]:
                date_key = 1
                
            # If the year value is less than or equal to second date range, date_key is 2
            elif date_year <= date_range[1]:
                date_key = 2
                
            # If the year value is greater than second date range, date_key is 3   
            else:
                date_key = 3

            # If the date does not exist inside new_dict, create it 
            if date_key not in new_dict.keys():
                new_dict[date_key] = {}
                new_dict[date_key][key] = []
                
                # Append the key and value to the date_key sub-dictionary
                for val in i[key]:
                    new_dict[date_key][key].append(val)
                    
            # If the key and value do not appear in the sub-dictionary,
            # create a new entry and append key and value
            else:
                if key not in new_dict[date_key]:
                    new_dict[date_key][key] = []
                    for val in i[key]:
                        new_dict[date_key][key].append(val)

                # Extend the existing key entry in the dictionary with the new value
                else:
                    new_dict[date_key][key].extend(i[key])

    return new_dict

In [18]:
# This function is for performing local_groupby on date data 
def local_groupby(dataset, val_index):

    dict = {}
    
    # For each pair in the dataset
    for index, record in enumerate(dataset):

        # Key is the date value from the datetime value
        key = record[val_index]
        key = key[:10]
        
        # If the key does not exist in dictionary, add new entry and set count to 1
        if key not in dict:
            dict[key] = 0
            
        # Add one for every count of key
        dict[key] += 1
        
    return dict


In [19]:
# This function is for performing local_groupby on temperature data
def local_groupby_temp(dataset, val_index, temp_index):
    
    dict = {}
    
    # For each pair in the dataset
    for index, record in enumerate(dataset):

        # Key is the date value from the datetime value
        key = record[val_index]
        key = key[:10]
        
        # If the key does not exist in dictionary, add new entry and add the temperature value
        if key not in dict:
            dict[key] = [record[temp_index]]
            
        # Append every new temp val found for the date
        dict[key].append(record[temp_index])
    return dict


In [20]:
# Specifically for Q5
def local_groupby_temp2(dataset):
    
    new_dict = {}
    
    for i in dataset:
        # If loc hash not in new_dict, append loc and temp value
        if i[2] not in new_dict.keys():
            new_dict[i[2]] = [i[1]]
            
        # Append temp value to existing loc
        else:
            new_dict[i[2]].append(i[1])
            
    return new_dict

In [21]:
# Input is a dictionary
def get_average_temp(data_dict):
    
    temp_dict = {}
    
    # For the keys in the data being passed 
    for year in data_dict.keys(): 

        # Get the date value from the keys
        for date_val in data_dict[year].keys(): 
            
            # Get all the values from that date_val
            date_data = data_dict[year][date_val] 
            
            # Convert all the values to int
            date_data = [int(i) for i in date_data] 
            
            # Calculate the average based on sum and length of numbers
            avg_val = sum(date_data)/len(date_data)
            
            # Round temperature to 2 decimal places
            avg_val = "{:.2f}".format(avg_val)
            
            # Append key and value to temperature dictionary
            temp_dict[date_val] = avg_val

    return temp_dict


In [22]:
# Input is a dictionary
def get_average_temp_loc(data_dict):
    ## dict as = {"loc1": [[1,2,3,4,5]], "loc2": [[1,2,3,4,5]
    
    loc_dict = {}

    for loc in data_dict.keys():
    
        # Get all the values from that location
        loc_data = data_dict[loc][0] 

        # Convert the values to int
        loc_data = [int(i) for i in loc_data] 

        # Calculate the average based on sum and length of numbers
        avg_val = sum(loc_data)/len(loc_data)

        # Round temperature to 2 decimal places
        avg_val = "{:.2f}".format(avg_val)

        # Append key and value to location dictionary
        loc_dict[loc] = avg_val 

    return loc_dict


In [23]:
################################################ /FUNCTIONS ################################################

In [24]:
##### TASK 1.1 - Round-Robin Partitioning, Linear Search #####
"""
STEPS OUTLINE:
1. Round Robin partition data
2. Linear search
3. Use 'pool' to create parallelism
4. Find and return temperature and relative humidity on October 23 2019
"""
input_data = list(climate_historic[1:])
# climate_date_data = [item[1] for item in input_data]
results = []
query = '2019-10-23'
n_processor = 3

pool = Pool(processes = n_processor) 


"""Step 1: Round Robin partition data"""
data_partitions = rr_partition(input_data, 3)


"""Step 2 & 3: Linear search in parallel"""
for d in data_partitions:
        climate_date = [item[1] for item in d] # Get climate date from each entry in data_partitions
        
        # Linear search in parallel
        result = pool.apply_async(linear_search, [climate_date, query])
        output = result.get() 
        results.append(output)

# Loop through the values in results list to find the query index in bin
for counter, value in enumerate(results):
    if str(value) not in "(-1, None)":
        
        # Assign the first value in entry to be index
        date_index = value[0]
#         print(date_index)
        
        # Assign 
        date_bin_index = results.index(value)
#         print(date_bin_index)


"""Step 4: Find and return temperature and relative humidity on October 23 2019"""
# find the value at the index in the partition
data_entry = data_partitions[date_bin_index][date_index]

# Return value for temperature
temperature = data_entry[2]

# Return value for humidity
humidity = data_entry[3]

print("Date:",query + "\nAir temperature:", str(temperature) + u"\N{DEGREE SIGN}C" "\nRelative humidity:", humidity + "%")

Date: 2019-10-23
Air temperature: 13°C
Relative humidity: 50.4%


In [25]:
### TASK 1.2 - Range Partitioning, Binary Search ###
"""
STEPS OUTLINE:
1. Assume data is sorted by date already
2. Range partition data
3. Find partition where requested data is stored
4. Binary search
5. Use 'pool' to create parallelism
6. Find and return all fire records on October 23 2019 
"""

# input_data = fire_historic
query = "2019-10-23" # query term
n_processor = 3 # number of parallel processors
partition_range = [2013, 2015]
results = []
n_processor = 3
column = 2
date_list = []

lower_limit = str(partition_range[0])
upper_limit = str(partition_range[1])

# query = datetime.strptime(query, '%Y-%m-%d')
# Activate 3 processors
pool = Pool(processes = n_processor) 

"""Step 1: Import data  based on date"""
# We will use fire data that has already been sorted
fire_datetime_sorted = date_sort_function(fire_historic[1:], 2, "fire")
input_data = list(fire_datetime_sorted)

"""Step 2: Range partition Data"""

# We want to partition the data based on the 'year', however we want to partition them evenly to be useful
# in future functions
fire_partitions = str_range_partition(input_data, column, partition_range)


"""Step 3: Find partition where values from 2019 are stored"""
# Find the partition in which the values are stored using the bin_search function
partition_location = bin_search(query, lower_limit, upper_limit, fire_partitions)

# Loop through all the values in the partition to find the date string only from the datetime value
for each in partition_location:

    # Get the index which has datetime value
    datetime_val = each[2]
    
    # Store the date value in a list to pass to the pool function. Only take the first 10 chars to get date data
    date_list.append(datetime_val[:10])


"""Step 4 & 5: Binary Search in parallel"""
search_results = pool.apply(binary_search_all, [date_list, query])


"""Step 6: Find and return all fire records on October 23 2019"""
# Loop through results, return the full data entry in the partition and append it to the results list
for i in search_results:
    results.append(partition_location[i])

# Print results
print("The following fire records are from "+query+":")
pprint.pprint(results)

The following fire records are from 2019-10-23:
[['-37.232', '143.252', '2019-10-23T04:10:50', '30.5', '80', '55'],
 ['-36.216', '146.388', '2019-10-23T04:11:00', '19.9', '81', '59'],
 ['-36.218', '146.377', '2019-10-23T04:11:00', '22.5', '83', '60']]


In [26]:
### TASK 1.3 - Range Partitioning, Binary Search ###
"""
STEPS OUTLINE:
1. Sort data by temperature
2. Range partition data
3. Find partition where requested data is stored
4. Binary search
5. Use 'pool' to create parallelism
6. Find the latitude, longitude, surface temperature and confidence when the surface
temperature (°C) was between 65 °C and 100 °C.

7. JUSTIFICATION FOR RANGE PARTITION AND BINARY SEARCH:
--------------------------------------------------------
I decided to use range partitioning and binary search for this task for a few reasons. Because we are
looking for data between two temperature values, and I can easily sort the data by the date value and allocate it
to one specific partition, thus using less time to search as it doesn't need to look at the other partitions
because the data we are not looking for does not sit there.

Because binary search is easily done on a range based on the function I wrote in the previous task, it made sense
for me to run it here as well. Because of the way the binary function is set up, it only needs to run 72 times 
to return the first and last instances of each query number, as opposed to linear search which needs to run every 
single time for every single instance that the query value exists in the list, which in this case is 7928 times.

I have returned only the first 20 entries in the answer.
--------------------------------------------------------
"""

n_processor = 3 # number of parallel processors
partition_range = [65, 101]
results = []
column = 5 # we know temperature values are in column 6, so assign 5 to get values

lower_limit = partition_range[0]
upper_limit = partition_range[1]

temp_min = 65
temp_max = 100

# Define pool processing function
pool = Pool(processes = n_processor) 


"""Step 1: Sort data based on temperature"""
# input_data = list(fire_historic_copy)
input_data = list(fire_datetime_sorted)


# Convert temperature strings to floats
for i in input_data[1:]:
    i[column] = float(i[column])
    
# Sort data based on temperature data
temperature_data_sorted = float_sort_function(input_data, column)


"""Step 2: Range Partition data"""
# Partition the data using the data
temperature_partitions = number_range_partition(temperature_data_sorted, column, partition_range)


"""Step 3: Find partitions where temperature values from 65 to 100 are stored"""
# We need to find which partition(s) our data lies in so we can run the functions correctly.
# The bin_search function finds which range the min and max temps are in.
temprange_partition_location_min = bin_search(temp_min, lower_limit, upper_limit, temperature_partitions)
temprange_partition_location_max = bin_search(temp_max, lower_limit, upper_limit, temperature_partitions)


"""Step 4: Binary search & Step 5: Use 'pool' to create parallelism"""
# If the bin index is the same (i.e data is only in one partition), we use only one bin.
if temperature_partitions.index(temprange_partition_location_min) == temperature_partitions.index(temprange_partition_location_max):
    
    # Place each surface temperature value in temprange_partition_location_min into a list:
    temp_celsius = [item[column] for item in temprange_partition_location_min]

    # Loop through all the temperature values. Add +1 to the max range so we include the value
    for i in range(temp_min, temp_max+1):
    #     temp_search_results = binary_search_all(temp_celsius, i)
        temp_search_results = pool.apply(binary_search_all, [temp_celsius, i])

        for j in temp_search_results:
    #         print(temprange_partition_location_min[j])
            results.append(temprange_partition_location_min[j])


else:
    # Get the two bins that our data is in 
    temp_celsius_min = [item[column] for item in temprange_partition_location_min]
    temp_celsius_max = [item[column] for item in temprange_partition_location_max]

    
    # Loop through the temperature ranges. Add +1 to the max range so we include the value
    for i in range(temp_min, temp_max + 1):
        
        # If the temperature value is in the lower bin, perform parallel search here
        if i in temp_celsius_min:
            temp_search_results = pool.apply(binary_search_all, [temp_celsius_min, i])
            
            # Append each result to the results list
            for j in temp_search_results:
                results.append(temprange_partition_location_min[j])
        
        # If the temperature value is in the upper bin, perform parallel search here
        elif i in temp_celsius_max:
            temp_search_results = pool.apply(binary_search_all, [temp_celsius_max, i])

            # Append each result to the results list
            for j in temp_search_results:
                results.append(temprange_partition_location_max[j])



"""Step 6: Find the latitude, longitude, surface temperature and confidence when the surface
temperature (°C) was between 65 °C and 100 °C."""

for i in results[0:20]:
    print("+-------------------------------------------------------------------------------------------+")
    print("| Latitude:", i[0], " | Longitude:", i[1], " | Surface Temperature:", str(i[5])+u"\N{DEGREE SIGN}C", " | Confidence:", i[4],"|")

+-------------------------------------------------------------------------------------------+
| Latitude: -38.37  | Longitude: 146.285  | Surface Temperature: 65.0°C  | Confidence: 88 |
+-------------------------------------------------------------------------------------------+
| Latitude: -38.312  | Longitude: 146.259  | Surface Temperature: 65.0°C  | Confidence: 100 |
+-------------------------------------------------------------------------------------------+
| Latitude: -37.938  | Longitude: 145.763  | Surface Temperature: 65.0°C  | Confidence: 89 |
+-------------------------------------------------------------------------------------------+
| Latitude: -37.386  | Longitude: 145.015  | Surface Temperature: 65.0°C  | Confidence: 89 |
+-------------------------------------------------------------------------------------------+
| Latitude: -37.363  | Longitude: 145.007  | Surface Temperature: 65.0°C  | Confidence: 88 |
+----------------------------------------------------------------

In [27]:
### TASK 2.1 - Divide & Broadcast Parallel Join, Sort-Merge Join ###

"""
STEPS OUTLINE:
Step 1: Divide larger table into no. of processors
==> I am passing fire_partitions to the function as it has already been partitioned by date to 3 partitions in a 
previous function 

Step 2: Broadcast smaller table to each partition
==> Since we are using shared memory, we do not need to broadcast Table 2 (climate_historic)

Step 3: Perform local sort-merge join on each partition
==> I have skipped the first row in climate_historic as it is the header row

Step 4: Return Surface Temperature, Air Temperature, Relative Humidity, and Wind Speed
"""


def divide_broadcast_join(T1, T1_index, T2,  T2_index, n_processor, data_type, conf_check, range_min, range_max, val_1, val_2, val_3, val_4):
    results = []
    pool = Pool(processes = n_processor)
    T1 = list(T1)
    T2 = list(T2)

#   For each partition in Table 
    for partition in T1:
#       Run the sort-merge join between the two tables and return surface temp, air temp, relative humidity and wind speed
        result = pool.apply_async(SM_join, [partition, T1_index, T2, T2_index, data_type, conf_check, range_min, range_max, val_1, val_2, val_3, val_4])
        output = result.get()
        results.append(output)
    
    return results

"""Run function"""
print('Surface Temperature  |  Air Temperature  |  Relative Humidity  |  Wind Speed')
# Setting range limits to be from negative infinity to infinity
dbj = divide_broadcast_join(fire_partitions, 2, climate_historic[1:], 1, 3, "date_str", "conf_check_no", float("-inf"), float("inf"), 5, 2, 3, 4)
dbj[0][:20] # Returning 20 results. Since this is a list of lists, we need to access [0] to get inside list.

Surface Temperature  |  Air Temperature  |  Relative Humidity  |  Wind Speed


[['57', '16', '10.4', '83'],
 [64.0, '18', '26.7', '80'],
 [71.0, '18', '20.9', '92'],
 [62.0, '22', '27.1', '87'],
 [76.0, '22', '54.5', '95'],
 [55.0, '22', '17.3', '81'],
 [79.0, '22', '60.8', '96'],
 [66.0, '22', '48.5', '100'],
 [96.0, '22', '120.8', '100'],
 [66.0, '22', '50.2', '100'],
 [67.0, '22', '51.8', '100'],
 [40.0, '22', '13.8', '81'],
 [64.0, '22', '45', '100'],
 [53.0, '22', '29.2', '100'],
 [86.0, '22', '90.9', '100'],
 [96.0, '22', '120.4', '100'],
 [54.0, '22', '28.3', '100'],
 [48.0, '22', '22.6', '100'],
 [106.0, '22', '159', '100'],
 [69.0, '22', '53', '100']]

In [28]:
### TASK 2.2 - Disjoint Partitioning-Based Parallel Join, Sort-Merge Join ###

"""
STEPS OUTLINE:
Step 1: Partition both lists by range
==> I am passing fire_partitions to the function as it has already been partitioned by date previously. 
I am partitioning climate_historic with the same function

Step 2: Run Sort-Merge local join on both lists
==> Running sort merge on both  lists

Step 3: Return DateTime, Air Temperature, Surface Temperature, and Confidence when 
Confidence is between 90 and 100 inclusive.
==> I have skipped the first row in climate_historic as it is the header row
"""

def disjoint_partitioning_parallel_join(T1, T1_index, T2, T2_index, n_processor, data_type, conf_check, range_min, range_max, val_1, val_2, val_3, val_4):

    pool = Pool(processes = n_processor)
    
    results = []
    T1 = list(T1) # fire_partitions copy
    T2 = list(T2) # climate_historic

# Step 1: Partition both data sets by the same range
    T1_partitioned = T1 # Already partitioned
    T2_partitioned = str_range_partition(T2, 1, [2013,2015])
    
# Step 2: Run Sort-Merge local join on both lists
    for i in T1_partitioned:
        for j in T2_partitioned:

#       Run the sort-merge join between the two tables and return surface temp, air temp, relative humidity and wind speed
            result = pool.apply_async(SM_join, [i, T1_index, j, T2_index, data_type, conf_check, range_min, range_max, val_1, val_2, val_3, val_4])
            output = result.get()
            results.append(output)
        
#     results = results[0:100]
    return results
#     return

"""Step 3: Run function"""
print('DateTime  |  Air Temperature  |  Surface Temperature  |  Confidence')
dppj = disjoint_partitioning_parallel_join(fire_partitions, 2, climate_historic[1:], 1, 3, "date_str", "conf_check_yes", 90, 100, 2, 2, 5, 4)

# Returning 20 results. Since this is a list of lists, we need to access [0] to get inside list.
dppj[0][:20] 

DateTime  |  Air Temperature  |  Surface Temperature  |  Confidence


[['2009-01-26T04:28:00', '18', 71.0, '92'],
 ['2009-01-29T04:57:20', '22', 76.0, '95'],
 ['2009-01-29T04:57:20', '22', 79.0, '96'],
 ['2009-01-29T16:01:30', '22', 45.0, '95'],
 ['2009-01-29T16:01:30', '22', 44.0, '94'],
 ['2009-01-29T16:01:30', '22', 45.0, '96'],
 ['2009-01-29T23:45:30', '22', 79.0, '96'],
 ['2009-01-29T23:45:30', '22', 67.0, '90'],
 ['2009-01-29T23:45:30', '22', 84.0, '98'],
 ['2009-01-30T04:02:30', '20', 83.0, '98'],
 ['2009-01-30T04:02:30', '20', 66.0, '90'],
 ['2009-01-30T04:02:30', '20', 75.0, '94'],
 ['2009-01-30T04:02:30', '20', 71.0, '92'],
 ['2009-01-30T04:02:30', '20', 70.0, '92'],
 ['2009-01-30T04:02:30', '20', 77.0, '95'],
 ['2009-01-30T04:02:30', '20', 77.0, '95'],
 ['2009-01-30T04:02:40', '20', 71.0, '92'],
 ['2009-01-30T04:02:40', '20', 75.0, '94'],
 ['2009-01-30T13:24:10', '20', 45.0, '96'],
 ['2009-01-30T13:24:10', '20', 44.0, '92']]

In [29]:
### TASK 3.1 - Parallel Binary merge-sort, round robin partition on climate_historic. ###

"""STEPS OUTLINE:

Step 1: Partition data

Step 2: Local sort  - carried  out  on each partition

Step 3: Binary merge

Step 4: Return top 20 days with least air temperature. Display date and air temperature in output

--------------------------------------------------------
JUSTIFICATION FOR DATA PARTITION TECHNIQUE: 
I have decided to partition my data using round robin because I wanted to ensure equal load 
distribution at the beginning. I was reluctant to partition by temperature range 
as it would mean one processor does the bulk, if not all of the work. I was also reluctant to partition by 
date range in case some years had more fires than others. There is going to be some skew regardless, as is 
inevitable in such partitioning
--------------------------------------------------------
"""

def parallel_binary_merge_sort(input_data, n_processor, val_index):
    
    pool = Pool(processes = n_processor)   
    merged = []
    sorted_set = []
    input_data = list(input_data)
    result = []
    
    # Step 1: partition the data
    partitions = rr_partition(input_data, 3)
    pool = Pool(processes = n_processor)
    
    # Step 2: Local sort using quicksort function
    for i in partitions:
        for each in i:
            each[2] = float(each[2]) # Convert air temperature value to float so we can sort on it
        
        sorted_set.append(pool.apply_async(qsort, [i, 2]).get())

    # Step 3: Binary merge the lists
    final = binary_merge_sort(sorted_set, 2)
    
    # Step 4: Return top 20 days with least air temperature
    date_sorted = sorted(final[0][0:20], key = lambda x: datetime.strptime(x[1], '%Y-%m-%d'))
    for each in date_sorted:
        print("+---------------------------------------+")
        print("| Date:",each[1], "|", "Temperature:",str(each[2])+(u"\N{DEGREE SIGN}C |"))

    return date_sorted

"""Run function - Return top 20 days with least air temperature."""
pbms = parallel_binary_merge_sort(climate_historic[1:], 3, 2)

+---------------------------------------+
| Date: 2009-06-30 | Temperature: 5.0°C |
+---------------------------------------+
| Date: 2009-07-01 | Temperature: 5.0°C |
+---------------------------------------+
| Date: 2009-08-02 | Temperature: 5.0°C |
+---------------------------------------+
| Date: 2010-07-01 | Temperature: 5.0°C |
+---------------------------------------+
| Date: 2010-07-02 | Temperature: 5.0°C |
+---------------------------------------+
| Date: 2010-08-03 | Temperature: 5.0°C |
+---------------------------------------+
| Date: 2011-07-02 | Temperature: 5.0°C |
+---------------------------------------+
| Date: 2011-07-03 | Temperature: 5.0°C |
+---------------------------------------+
| Date: 2011-08-04 | Temperature: 5.0°C |
+---------------------------------------+
| Date: 2012-07-02 | Temperature: 5.0°C |
+---------------------------------------+
| Date: 2012-07-03 | Temperature: 5.0°C |
+---------------------------------------+
| Date: 2012-08-04 | Temperature: 

In [35]:
### 3.2 - Parallel Merge-All Sort Algorithm ###
"""STEPS OUTLINE:

Step 1: Local Sort using QuickSort. I will be using fire_partitions which is the fire data already
partitioned in a previous step.

Step 2: Final Merge using K-Way Merge

Step 3: Return the top 100 fires with the least surface temperature

"""

def parallel_merge_all_sorting(dataset, n_processor, index):

    result = []
    
    # Data is already partitioned  into 3 subsets
    subsets = dataset 
    
    # Pool: a Python method enabling parallel processing. 
    pool = Pool(processes = n_processor)

    # Step 1: Local Sort using quicksort function, append to list sorted_set
    sorted_set = []
    for s in subsets:
        # sort the data using the quicksort function
        sorted_set.append([*pool.apply_async(qsort, [s, index]).get()])
    pool.close()

    # Step 2: Final Merge using k_way_merge to sort all the lists in sorted_set
    result = k_way_merge(sorted_set, 5)
    
    # Step 3: Return the top 100 results, displaying all columns
    result = result[0:100]
    print("The top 100 fires with the least surface temperature:")
    return result

"""Run function"""
parallel_merge_all_sorting(fire_partitions, 3, 5)

The top 100 fires with the least surface temperature:


[['-36.941', '143.268', '2017-11-11T13:30:08', '11.6', '80', 37.0],
 ['-38.259', '145.978', '2018-03-10T15:15:10', '13.3', '80', 37.0],
 ['-37.665', '148.51', '2018-05-08T13:18:24', '17.5', '80', 37.0],
 ['-37.87', '146.142', '2019-03-02T15:32:57', '10', '80', 37.0],
 ['-37.437', '146.781', '2019-03-07T12:34:41', '42.7', '80', 37.0],
 ['-35.932', '145.178', '2019-03-17T13:11:59', '10.9', '80', 37.0],
 ['-36.332', '147.521', '2019-04-30T15:14:10', '14.6', '80', 37.0],
 ['-37.88', '146.138', '2019-05-16T15:14:32', '16.4', '80', 37.0],
 ['-37.318', '143.355', '2019-05-18T13:23:58', '14.1', '80', 37.0],
 ['-37.336', '148.073', '2017-09-24T13:30:09', '29.2', '82', 38.0],
 ['-35.816', '141.24', '2018-01-20T16:09:19', '19.8', '82', 38.0],
 ['-36.724', '146.337', '2018-02-28T12:59:23', '12.9', '81', 38.0],
 ['-37.356', '146.732', '2018-03-10T13:36:03', '29.9', '82', 38.0],
 ['-38.303', '143.005', '2018-03-19T13:29:50', '13.6', '82', 38.0],
 ['-38.312', '142.989', '2018-03-21T13:17:37', '12.7',

In [31]:
# Task 4.1 - Parallel GroupBy
"""STEPS OUTLINE:

Step 1: Local aggregate in each processor

Step 2: Global aggregate

Step 3: Return total number of fires each day

--------------------------------------------------------
JUSTIFICATION FOR DATA PARTITION TECHNIQUE:
Since the fire data has already been partitioned by date in a previous task, I will be reusing these partitions
for this task. This means the data has already been sorted by date, and so the global aggregate function
is a simple matter of joining the three results from local_result together.
--------------------------------------------------------
"""

def parallel_merge_all_groupby(dataset, val_index):

    result = {}
    
    # Define the number of parallel processors: the number of sub-datasets.
    n_processor = len(dataset)

    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)

    # Step 1: Local aggregate in each processor
    local_result = []
    for i in dataset:
        # call the local_groupby method
        local_result.append(pool.apply(local_groupby, [i, val_index]))
    pool.close()
    
    # Step 2: Global aggregate
    for each in local_result:
        result.update(each)

    # Step 3: Return total number of fires each day
    return result

"""Run function"""
parallel_merge_all_groupby(fire_partitions, 2)

{'2009-01-20': 1,
 '2009-01-26': 2,
 '2009-01-29': 41,
 '2009-01-30': 45,
 '2009-01-31': 2,
 '2009-02-05': 4,
 '2009-02-06': 2,
 '2009-02-07': 410,
 '2009-02-08': 88,
 '2009-02-09': 137,
 '2009-02-10': 91,
 '2009-02-11': 35,
 '2009-02-12': 34,
 '2009-02-13': 88,
 '2009-02-14': 20,
 '2009-02-15': 39,
 '2009-02-16': 25,
 '2009-02-17': 27,
 '2009-02-18': 11,
 '2009-02-19': 14,
 '2009-02-20': 15,
 '2009-02-21': 6,
 '2009-02-22': 16,
 '2009-02-23': 37,
 '2009-02-24': 46,
 '2009-02-25': 24,
 '2009-02-26': 26,
 '2009-02-27': 18,
 '2009-02-28': 3,
 '2009-03-01': 16,
 '2009-03-02': 10,
 '2009-03-10': 1,
 '2009-03-19': 10,
 '2009-03-20': 4,
 '2009-03-22': 2,
 '2009-03-23': 5,
 '2009-03-24': 1,
 '2009-03-25': 1,
 '2009-03-26': 2,
 '2009-03-27': 5,
 '2009-03-28': 21,
 '2009-03-29': 36,
 '2009-03-30': 54,
 '2009-03-31': 69,
 '2009-04-01': 17,
 '2009-04-02': 22,
 '2009-04-07': 6,
 '2009-04-08': 2,
 '2009-04-09': 27,
 '2009-04-10': 1,
 '2009-04-11': 6,
 '2009-04-12': 8,
 '2009-04-14': 19,
 '2009-04-1

In [32]:
# Task 4.2 - Parallel Two-Phase Method
"""STEPS OUTLINE:

Step 1: Initial data placement using Round-Robin 

Step 2: Local aggregate data using date as group-by attribute

Step 3: Global aggregate data using range partitioning

Step 4: Find average surface temperature for each day

--------------------------------------------------------
WHY IS PARALLEL TWO-PHASE METHOD BETTER THAN THE TRADITIONAL METHOD?
The Two-Phase method allows for true parallelism  amongst the processors, as each processor has its
own set of data that it needs to perform on, Compared to the traditional method where all the local 
aggregated data is directed to one processor to complete the global aggregation - leading to an extremely 
heavy load on only one processor.
--------------------------------------------------------

"""

def parallel_two_phase_method(dataset, val_index, temp_index):
    result = {}
    n_processor = 3
    partition_range = [2013, 2015]
    local_result = []

    # Step 1: Initial data placement using round-robin
    partitions = rr_partition(dataset, 3)

    # Pool: a Python method enabling parallel processing.
    pool = mp.Pool(processes = n_processor)

    
    # Step 2: Local aggregate using date as group-by attribute
    for i in partitions:
        local_result.append(pool.apply(local_groupby_temp, [i, val_index, temp_index]))
    pool.close()

    
    # Step 3: Redistribute local_results into new_partitions using based on date range
    new_dict = redistribution(local_result, partition_range)


    # Step 4: Find average surface temperature for each day
    average = get_average_temp(new_dict)
    
    return average

"""Run function"""
parallel_two_phase_method(fire_historic[1:], 2, 5)

{'2019-10-29': '66.00',
 '2019-10-23': '58.00',
 '2019-10-21': '78.27',
 '2019-10-10': '53.50',
 '2019-10-03': '57.29',
 '2019-10-01': '60.50',
 '2019-09-19': '82.00',
 '2019-09-05': '59.50',
 '2019-09-03': '63.43',
 '2019-08-27': '44.50',
 '2019-07-29': '74.50',
 '2019-07-05': '57.29',
 '2019-07-04': '73.11',
 '2019-06-27': '84.00',
 '2019-06-26': '65.14',
 '2019-06-25': '74.33',
 '2019-06-24': '82.33',
 '2019-06-20': '62.50',
 '2019-06-14': '48.00',
 '2019-06-11': '64.56',
 '2019-06-07': '48.00',
 '2019-06-01': '47.00',
 '2019-05-30': '55.43',
 '2019-05-25': '66.00',
 '2019-05-24': '70.75',
 '2019-05-23': '62.45',
 '2019-05-22': '55.70',
 '2019-05-21': '58.50',
 '2019-05-19': '74.39',
 '2019-05-18': '55.67',
 '2019-05-17': '64.05',
 '2019-05-16': '61.09',
 '2019-05-14': '66.80',
 '2019-05-13': '61.14',
 '2019-05-10': '72.67',
 '2019-05-08': '53.00',
 '2019-05-07': '67.39',
 '2019-05-06': '54.69',
 '2019-05-05': '48.00',
 '2019-05-04': '57.67',
 '2019-05-03': '67.67',
 '2019-05-01': '

In [33]:
# Geohash library: https://github.com/dbarthe/geohash/
# eg) geohash2.encode(42.6, -5.6, precision=5)

# Task 5 - Parallel GroupBy Join (Join first, then GroupBy)

"""STEPS OUTLINE:

Step 1: Range partition data

Step 2: Sort-Merge Join based on date attribute.

Step 3: Hash latitude/longitude data

Step 4: Traditional groupby location

Step 5: Find average air temperature in each location

JUSTIFICATION:

RANGE PARTITIONING: I have decided to use existing fire_partitions which has the data partitioned into 3 by date
to reduce the effort required in preparing the data. 

SORT-MERGE JOIN: I have joined the partitions to climate_historic as it returns the joined partitions, and since 
we have shared memory, the climate data does not need to be redistributed

TRADITIONAL GROUPBY: Although the two-phase method is true parallelism, for our case, since the data is not 
that large, we can get a reasonable performance from the traditional method. 

Because our join attribute is based on date, and our groupby attribute is based on hashed location, we
Join first, then do GroupBy
"""

def parallel_groupby_join(T1, T1_index, T2, T2_index, n_processor ,data_type, conf_check, range_min, range_max, val_1, val_2, val_3, val_4):
    results = []
    pool = Pool(processes = n_processor)
    T1 = list(T1) # Already partitioned by range
    T2 = list(T2)  # Since we are in a shared-memory architecture, we do not need to redistribute T2
    local_result = []
        
    # Pool: a Python method enabling parallel processing. 
    pool = mp.Pool(processes = n_processor)
    
    # Step 2: Divide and Broadcast, run Sort-Merge local join on each processor
    for partition in T1:
        # Run the sort-merge join between the two tables and return datetime, air temp, latitude and longitude
        results.append([*pool.apply(SM_join, [partition, T1_index, T2, T2_index, data_type, conf_check, range_min, range_max, val_1, val_2, val_3, val_4])])

        
    # Step 3: Hash latitude/longitude data
    for i in results:
#         pprint.pprint(i)
        
        # Run geohash function over every latitude and longitude pair in each row
#         print(results)
        for each in i:
            geohash = geohash2.encode(float(each[2]), float(each[3]), precision = 3)
            each.insert(2, geohash)
            
            # Remove the latitude and longitude columns from the list.
            # This way we get to keep the date and temperature values without needing a new list
            del each[3], each[3] 

            
        # Traditional local groupby 
        result1 = pool.apply_async(local_groupby_temp2, [i])
        output1 = result1.get()
        local_result.append(output1)
        
        
    # Create a location result dictionary and append the location and air temperature
    location_result = {}
    for each in local_result:
        for key, val in each.items():
            
            if key not in location_result:
                location_result[key] = []
                location_result[key].append(val)

            else:
                location_result[key].extend([val])
        
    # Get average air temperature in each location
    average =  get_average_temp_loc(location_result)

    return average

"""Run function"""
parallel_groupby_join(fire_partitions, 2, climate_historic[1:], 1, 3, "date_str", "conf_check_no", float("-inf"), float("inf"), 2, 2, 0, 1)


{'r1q': '17.21',
 'r1s': '17.92',
 'r1p': '19.66',
 'r30': '17.75',
 'r1r': '18.94',
 'r32': '18.11',
 'r1m': '17.02',
 'r38': '20.73',
 'r1k': '17.04',
 'r1j': '18.07',
 'r33': '18.06',
 'r36': '16.41',
 'r1w': '17.53',
 'r1n': '17.31',
 'r1x': '17.19',
 'r1h': '16.41',
 'r39': '16.87',
 'r1t': '17.53',
 'r1u': '17.95',
 'r1v': '16.33'}