#![Spark Logo](http://spark-mooc.github.io/web-assets/images/ta_Spark-logo-small.png) + ![Python Logo](http://spark-mooc.github.io/web-assets/images/python-logo-master-v3-TM-flattened_small.png)
# **Insight Data Engineering Coding Challenge**


###  **Task 1: Word Frequency count in the tweets**
####  In this task word frequency is efficiently computed using distributed computing on Spark.
#### The steps followed are as below
##### 1. First read the input text file to a Spark RDD
##### 2. Split the words in each line based on space using **flatMap**
##### 3. Create a pair RDD with content as (word, 1)
##### 4. Using **reduceByKey** calculate the word count across RDD's efficiently
##### 5. Finally, the RDD is sorted by key and written to output file

In [15]:
# main tweet input which is used for both challenges
file_name = 'tweets.txt'

# Create an RDD by reading the input file
tweets = sc.textFile(file_name)

In [9]:
# Split the tweet based on white space
# Create pair rdd of type (word, 1)
# Compute the frequency of each word by reducing by key
# Sort the rdd based on words ascii code
# import timeit
tweet_word_count_rdd = (tweets.flatMap(lambda s: s.split())
                              .map(lambda s: (s, 1))
                              .reduceByKey(lambda x, y: x + y)
                              .sortByKey())

In [6]:
# Sort the RDD based on unicode and write the result to the output file
with open('ft1.txt','w') as f:
    for line in tweet_word_count_rdd.collect():
        f.write(line[0].encode('utf-8')+" "+str(line[1])+"\n")
    f.close()

### **Task 2:  Rolling Median of the tweets**
#### The Algorithm implemented to calculate rolling median is based on [Heap Data structure](https://en.wikipedia.org/wiki/Heap_(data_structure) is as below:
##### 1. Two Heaps, one MaxHeap and one MinHeap is maintained. 
##### 2. Max Heap is used to represent elements that are less than effective median and Min Heap for those elements that are greater than the effective median
##### 3. Each element is processed one by one and the size of the heaps differ by atmost 1
##### 4. When both heaps contain same number of elements, median is calculated as the average of root from both the heaps
##### 5. If the heaps are unbalanced, effective median is selected as the root of the heap consisting of more elements

### Spark is used for calculating rolling median as below:
##### 1. Creating max and min heap using python's **[heapq](https://docs.python.org/2/library/heapq.html)** library. (maxheap is simulated by adding -1 * value to the queue)
##### 2. Creating broadcast variable for maxHeap, minHeap and current median, as this saves the process of sending the variables from driver to each of the worker nodes whenever the function is called.
##### 3. The above algorithm is used to calculate rolling median and the final result is written to the output file

In [16]:
# function to split the tweet based on blank space and get the length of unique words for that tweet
def get_tweet_length(tweet):
    """
    Get the count of unique words in a tweet.
    Args: 
        tweet (str) - a single tweet.
    Returns:
        count of unique words in the tweet.
    """
    tweets = tweet.split()
    return len(set(tweets))

# create an RDD consisting of length of unique word tweets from the original tweets RDD
tweet_words_rdd = tweets.map(get_tweet_length)

In [17]:
# create max and min heap using heapq library
import heapq
PRESENT_MEDIAN = [0]
MAX_HEAP_LIST = []
MIN_HEAP_LIST = []
heapq.heapify(MIN_HEAP_LIST)
heapq.heapify(MAX_HEAP_LIST)

# Create broadcast variable for current median, min and max heaps
current_median = sc.broadcast(PRESENT_MEDIAN)
MAX_HEAP = sc.broadcast(MAX_HEAP_LIST)
MIN_HEAP = sc.broadcast(MIN_HEAP_LIST)


In [18]:
# Get the size of the heap
def get_size():
    """
    Get the relative counts of min_heap and max_heap
    Returns:
        0 - if both heaps are balanced
        1 - max_heap contains more elements that min_heap
        -1 - otherwise
    """
    if len(MIN_HEAP.value) == len(MAX_HEAP.value):
        return 0
    elif len(MIN_HEAP.value) < len(MAX_HEAP.value):
        return 1
    else:
        return -1


In [19]:

# function to calcuate rolling median
def get_median(val):
    """
    Function to compute the rolling median for the tweets from input file.

    Args:
        val (int) - number of unique words per tweet
        current_median (broadcast variable) - effective median after processing each tweet
    Returns:
        Rolling median of each tweet separated by new line
    """
    size_diff = get_size()

    # heaps are balanced
    if size_diff == 0:
        # if current word count is less than effective median
        # both heaps consists of same number of elements
        # Add new value to MAX_HEAP if value is less than current median
        # else add the value to MIN_HEAP
        # median is the top most elements of MIN_HEAP or MAX_HEAP

        if val < current_median.value[-1]:
            heapq.heappush(MAX_HEAP.value, val * -1)
            current_median.value.append(heapq.nsmallest(1, MAX_HEAP.value)[0] * -1)
            return current_median.value[-1]
        else:
            heapq.heappush(MIN_HEAP.value, val)
            current_median.value.append(heapq.nsmallest(1, MIN_HEAP.value)[0])
            return current_median.value[-1]

    # MAX_HEAP has more elements than MIN_HEAP
    elif size_diff == 1:
        if val < current_median.value[-1]:
            min_insert = heapq.heappop(MAX_HEAP.value)
            heapq.heappush(MIN_HEAP.value, min_insert * -1)
            heapq.heappush(MAX_HEAP.value, val * -1)
        else:
            heapq.heappush(MIN_HEAP.value, val)

        # Balanced heaps
        # Median is the average of root elements of MIN_HEAP and MAX_HEAP
        min_heap_root = heapq.nsmallest(1, MIN_HEAP.value)[0]
        max_heap_root = heapq.nsmallest(1, MAX_HEAP.value)[0] * -1
        current_median.value.append((min_heap_root + max_heap_root) / (2 * 1.0))
        return current_median.value[-1]

    # MIN_HEAP has more elements than MAX_HEAP
    else:
        if val < current_median.value[-1]:
            heapq.heappush(MAX_HEAP.value, val * -1)
        else:
            max_insert = heapq.heappop(MIN_HEAP.value)
            heapq.heappush(MAX_HEAP.value, max_insert * -1)
            heapq.heappush(MIN_HEAP.value, val)

        # Balanced heaps
        # Median is the average of root elements of MIN_HEAP and MAX_HEAP
        min_heap_root = heapq.nsmallest(1, MIN_HEAP.value)[0]
        max_heap_root = heapq.nsmallest(1, MAX_HEAP.value)[0] * -1
        current_median.value.append((min_heap_root + max_heap_root) / (2 * 1.0))
        return current_median.value[-1]


In [21]:
# get the rolling median for each of the tweets from unique tweet words RDD
rolling_median_rdd = tweet_words_rdd.map(get_median)
with open('ft2.txt','w') as out_file:
    for line in rolling_median_rdd.collect():
        out_file.write(str(line)+"\n")
    out_file.close()