Note: This is my answer to Fsecure's test
# Generate Statistics using MapReduce 
Input:
```
{
    fullname1: {
        'age': #value,
        'address': #value,
        'occupation': #value
    },
    fullname2: {
        # as above
    }
}
```
Output:
```
{
    'lastname1': {
        'count': #occurence,
        'age':{
            'age1': #occurence
            'age2': #occurence
        },
        'address':{
            'address1': #occurence
            'address2': #occurence
        },
        'occupation':{
            'job1': #occurence
        }
    }
}
```

In [1]:
import json 
import pandas as pd
import collections # for namedtupled and defaultdict
from functools import reduce

In [2]:
doc = "data.json"
def read_file(doc):
    # open json file
    with open(doc) as f:
        data = json.load(f)
    return data

## Export file to CSV

In [3]:
def to_dataframe(data):
    fullname = list()
    age = list()
    address = list()
    occupation = list()
    
    # push the extracted data to the DataFrame
    df = pd.DataFrame({'Full Name': fullname,
         'Age': age,
         'Address': address,
         'Occupation': occupation})

    # access to each item's data per format: {name: {v}}
    for name, v in data.items():
        fullname.append(name)

        # access to {age: , address: , occupation: }
        age.append(v['age'])
        address.append(v['address'])
        occupation.append(v['occupation'])
    
   
    return df

In [4]:
# export to CSV file
to_dataframe(read_file(doc)).to_csv(r"C:\Users\USER\Desktop\output.csv",index=False)

## Generate Stats

Steps include:
* Re-define the data schema, from dictionary to tuple.
* Extract information of interest using MAP function.
* Generate the statistics using REDUCE function.
* Export the file to JSON format.

In [5]:
# Define a tuple 
Datapoint = collections.namedtuple('Datapoint', [
    'fullname',
    'age',
    'address',
    'occupation'
])


data = read_file(doc)
# re-structure the data
datapoints = list()
for name, v in data.items():
    datapoints.append(Datapoint(fullname=name, age=v['age'], address=v['address'], occupation=v['occupation']))

In [6]:
def transform(x):
    '''
        The function is to extract the information of interest
    '''
    return {'lastname': x.fullname.split(' ')[-1], 'age': x.age, 'address': x.address, 'occupation': x.occupation}

Map Transform function with sets of Datapoints

In [7]:
# Map the transform function with NamedTuple
result = list(map(
    transform,
    datapoints
))

Create a reducer to calculate the frequency

In [8]:
def reducer(acc, val):
    '''
        The function is used to calcualte the occurence
        acc: accummulator, val: value
    '''
    
    try:
        acc[val['lastname']]['count'] += 1 
        
    except:
        # instantiate the nested dictionaries
        acc[val['lastname']] = dict()
        acc[val['lastname']]['age'] = dict()
        acc[val['lastname']]['address'] = dict()
        acc[val['lastname']]['occupation'] = dict()
        acc[val['lastname']]['count'] = 1
 
    # calculate the occurence of AGE
    age = acc[val['lastname']]['age']
    try:
        age[str(val['age'])] += 1
    except:
        age[str(val['age'])] = 1
        
    
    # calculate the occurence of ADDRESS
    address = acc[val['lastname']]['address']
    try:
        address[val['address']] += 1
    except:
        address[val['address']] = 1
        
    
    # calculate the occurence of OCCUPATION
    occupation = acc[val['lastname']]['occupation']
    try:
        occupation[val['occupation']] += 1
    except:
        occupation[val['occupation']] = 1
    
    return acc

Calculate the occurence using Reduce

In [9]:
# Create a defaultdict
dd = collections.defaultdict(list)

# Calculate the occurence 
results = reduce(
    reducer, 
    result, 
    dd
)

Save the result as .JSON

In [10]:
with open("C:\\Users\\USER\\Desktop\\output.json", "w") as json_file:
    json.dump(results, json_file)

## Parallel Computing to reduce runtime for big file

Multiprocess is a fork of `multiprocessing` developed to be compatible with IPython. More info: https://pypi.org/project/multiprocess/ 

In [11]:
import time

In [12]:
!pip install multiprocess
import multiprocess



Imagine having 1000 replicates of `data.json`, since 20 files doesn't make a diffence and 1 (fast) processor would yield better result

In [13]:
doc = 'data.json'
docs = [read_file(doc)]*1000

Re-structure the data as above, but with an additional for-loop

In [14]:
# maximum processors 

new_dd = collections.defaultdict(list)
dpoints = list()
for doc in docs:
    for name, v in data.items():
        dpoints.append(Datapoint(fullname=name, age=v['age'], address=v['address'], occupation=v['occupation']))

        
start = time.time()

pool = multiprocess.Pool(4)
p_mapped = pool.map(transform, dpoints)
reduce(reducer, p_mapped, new_dd)

end = time.time()
print('\nTime to complete: %.5f' % (end-start))


Time to complete: 4.41349


In [15]:
# only 1 processor 

new_dd = collections.defaultdict(list)
dpoints = list()
for doc in docs:
    for name, v in data.items():
        dpoints.append(Datapoint(fullname=name, age=v['age'], address=v['address'], occupation=v['occupation']))

        
start = time.time()

pool = multiprocess.Pool(1)
p_mapped = pool.map(transform, dpoints)
reduce(reducer, p_mapped, new_dd)

end = time.time()
print('\nTime to complete: %.5f' % (end-start))


Time to complete: 5.39989
