# Multiprocessing

In [1]:
# we use the same loading code

import zipfile
import pandas as pd
import io
import json

countries = {}
categories = {}

with zipfile.ZipFile("../datasets/youtube-new.zip") as z:
    for file in z.filelist:
        print(f'processing {file.filename}')
        key = file.filename.split('.')[0]        
        if file.filename.endswith('.csv'):
            with z.open(file.filename) as f:
                countries[key] = pd.read_csv(io.StringIO(f.read().decode('latin-1')))
        if file.filename.endswith('.json'):                
            with z.open(file.filename) as f:
                categories[key] = json.loads(f.read())
            
    

processing US_category_id.json
processing JPvideos.csv
processing KR_category_id.json
processing MX_category_id.json
processing MXvideos.csv
processing DE_category_id.json
processing KRvideos.csv
processing FRvideos.csv
processing CA_category_id.json
processing RUvideos.csv
processing GB_category_id.json
processing GBvideos.csv
processing FR_category_id.json
processing IN_category_id.json
processing USvideos.csv
processing JP_category_id.json
processing RU_category_id.json
processing CAvideos.csv
processing DEvideos.csv
processing INvideos.csv


In [8]:
# Our aggregation function

from collections import Counter

def unique_tags(x, top: int=10):
    print('starting unique tags function')
    # split by '|'
    res = x.apply(lambda x: x.split('|')) 
    # flatten in single list
    flat = [elem.strip('"') for row in res for elem in row]
    # return a set (so it keeps only unique elements)
    return Counter(flat).most_common(top)


### Multiprocessing code

In [9]:
# Parallel processing with Pool.apply_async()

import multiprocessing as mp
cpu_count = mp.cpu_count()
pool = mp.Pool(cpu_count)
print(f'created a cpu pool with {cpu_count} CPUs')


results = []
print('Creating tasks')
for country, df in countries.items():
    results.append(pool.apply_async(unique_tags, args=(df['tags'], 25))
                    )                                                 
print('Closing the pool, processes are starting now')
pool.close()
print('Blocks Execution until all processes in the pool are done')
pool.join() 

print('Displaying results')
output = [p.get() for p in results]
print(output)

starting unique tags function
starting unique tags function
created a cpu pool with 2 CPUs
Creating tasks
Closing the pool, processes are starting now
Blocks Execution until all processes in the pool are done
starting unique tags function
starting unique tags function
starting unique tags function
starting unique tags function
starting unique tags function
starting unique tags function
starting unique tags function
starting unique tags function
Displaying results
[[('[none]', 3200), ('æ\x96\x99ç\x90\x86', 800), ('ã\x81\x8aã\x82\x82ã\x81\x97ã\x82\x8d', 675), ('ã\x83\x8bã\x83¥ã\x83¼ã\x82¹', 592), ('ç\x8c«', 591), ('ã\x81\x8aç¬\x91ã\x81\x84', 571), ('ã\x82¢ã\x83\x8bã\x83¡', 530), ('ã\x81\x8bã\x82\x8fã\x81\x84ã\x81\x84', 484), ('é\x87£ã\x82\x8a', 480), ('å¤§é£\x9fã\x81\x84', 444), ('é\x9d¢ç\x99½', 443), ('funny', 440), ('cat', 430), ('ã\x82²ã\x83¼ã\x83\xa0', 427), ('å\x8b\x95ç\x94»', 419), ('ç\x88\x86ç¬\x91', 411), ('å\x8f¯æ\x84\x9bã\x81\x84', 388), ('è¡\x9dæ\x92\x83', 384), ('UUUM', 381),

## Fix: How do we know which country ?

In [11]:
def unique_tags_mp(country, x, top: int=10):
    print(f'starting unique tags function {country}')
    # split by '|'
    res = x.apply(lambda x: x.split('|')) 
    # flatten in single list
    flat = [elem.strip('"') for row in res for elem in row]
    # return a set (so it keeps only unique elements)
    print(f'done with {country}')
    return {country: Counter(flat).most_common(top)}


In [12]:
# Parallel processing with Pool.apply_async()
cpu_count = mp.cpu_count()
pool = mp.Pool(cpu_count)
print(f'created a cpu pool with {cpu_count} CPUs')


results = []
print('Creating tasks')
for country, df in countries.items():
    results.append(pool.apply_async(unique_tags_mp, args=(country, df['tags'], 25))
                    )                                                 
print('Closing the pool, processes are starting now')
pool.close()
print('Blocks Execution until all processes in the pool are done')
pool.join() 

print('Displaying results')
output =  {}
for p in results:
    output.update(p.get())
print(output)

starting unique tags function JPvideos
starting unique tags function MXvideos
created a cpu pool with 2 CPUs
Creating tasks
Closing the pool, processes are starting now
Blocks Execution until all processes in the pool are done
done with JPvideos
starting unique tags function KRvideos
done with MXvideos
done with KRvideos
starting unique tags function FRvideos
done with FRvideos
starting unique tags function RUvideos
starting unique tags function GBvideos
done with RUvideos
done with GBvideos
starting unique tags function USvideos
starting unique tags function CAvideos
done with USvideos
done with CAvideos
starting unique tags function DEvideos
starting unique tags function INvideos
done with DEvideos
done with INvideos
Displaying results
{'JPvideos': [('[none]', 3200), ('æ\x96\x99ç\x90\x86', 800), ('ã\x81\x8aã\x82\x82ã\x81\x97ã\x82\x8d', 675), ('ã\x83\x8bã\x83¥ã\x83¼ã\x82¹', 592), ('ç\x8c«', 591), ('ã\x81\x8aç¬\x91ã\x81\x84', 571), ('ã\x82¢ã\x83\x8bã\x83¡', 530), ('ã\x81\x8bã\x82\x8fã\

### 5. For each of the top 25 tags (worldwide), calculate the ratios: like, dislike & comments (by view)


#### _if we have time_
    
Steps:
1. Aggregate results from the exercise above to find the top25 worldwide tags
2. Create a multiprocessing job to get the like, dislikes, comments, views aggregated by tag for each country
3. Aggregate the results
4. Compute ratios