In [1]:
from dask.distributed import Client, LocalCluster
from dask import delayed

In [2]:
import dask.bag as db
import os
import json
from operator import itemgetter
from operator import add

from nltk.tokenize import RegexpTokenizer
from nltk.corpus import stopwords
from nltk import word_tokenize

from collections import Counter

import time

import numpy as np

In [None]:
#make client

client = Client()
client

In [None]:
#load data and repartition
filename = os.path.join('data', 'papers_in_json_singleline', '*.json')
lines = db.read_text(filename)
js = lines.map(json.loads).repartition(10)

## definition of functions

In [None]:
#merge all the body texts in one for each file
def merge(record):
    text=''
    for rec in record:
        text+=rec['text']
    return text

texts = js.pluck("body_text").map(merge)
texts.take(1)

In [None]:
#function for text cleaning
#we remove punctuation, numbers and stopwords 
#stopwords are taken from a library but also definied by us
#after this files are lists of words

tokenizer = RegexpTokenizer(r'\w+')
no_words={"i", "as", "or", "it", "et", "also", "may"}

def clean_func(text):

    def merge_text(text,stop_words):
        new_sentence =''
        for w in text:
            if w.lower() not in stop_words and w.isalpha(): 
                new_sentence += w 
                new_sentence += " "
        return new_sentence

    stop_words = set(stopwords.words('english'))
    result = word_tokenize(text)
    result = merge_text(result,stop_words.union(no_words))
    result = tokenizer.tokenize(result)
    return result

text_clean = texts.map(clean_func)
text_clean.take(1)

In [None]:
# we transform each list in a list of dictionaries 
#where the unique words and their frequency is stored

def count_words(text):
    counts = dict(zip(Counter(text).keys(), Counter(text).values()))
    wList = [{"word":x , "counts": y} for x,y in counts.items()]
    return wList

words = text_clean.map(count_words)
words.take(1)

In [None]:
# at last we sum over all file using the foldby method
# we pass the bag to the foldby after flattening it 
#the methos accesse the dictiornarios gropyng them with the value associated to the key word
# and the sum the counts of each word

def incr_amount(tot, x):
    return tot+x['counts']


total_counts = words.flatten().foldby('word', binop=incr_amount, 
                   initial=0, 
                   combine=add, 
                   combine_initial=0).compute()

total_counts

In [None]:
# at last we order the obtained list

total_counts_ordered = sorted(total_counts, key=itemgetter(1), reverse=True)
total_counts_ordered

In [None]:
#timing of the operation

start = time.time()
word_count = (lines.map(json.loads).repartition(10).pluck('body_text')
                   .map(merge).map(clean_func)
                   .map(count_words).flatten()
                   .foldby('word', binop=incr_amount, 
                        initial=0, combine=add, 
                        combine_initial=0).compute())
word_sorted = sorted(word_count, key=itemgetter(1), reverse=True)
end = time.time()

print("time: ", end - start)
print(word_sorted[:10])

In [None]:
client.close()

In [None]:
cluster = LocalCluster(n_workers=2)
client = Client(cluster)

In [None]:
start = time.time()
word_count = (lines.map(json.loads).repartition(10).pluck('body_text')
                   .map(merge).map(clean_func)
                   .map(count_words).flatten()
                   .foldby('word', binop=incr_amount, 
                        initial=0, combine=add, 
                        combine_initial=0).compute())
word_sorted = sorted(word_count, key=itemgetter(1), reverse=True)
end = time.time()

print("time: ", end - start)
print(word_sorted[:10])

In [None]:
def get_time(workers=4, partitions=10):
    myCluster = LocalCluster(n_workers=workers)
    client = Client(myCluster) #make client
    start = time.time() #strat taking time
    word_count = (lines.map(json.loads).repartition(partitions).pluck('body_text')
                       .map(merge).map(clean_func)
                       .map(count_words).flatten()
                       .foldby('word', binop=incr_amount, 
                            initial=0, combine=add, 
                            combine_initial=0).compute())
    word_sorted = sorted(word_count, key=itemgetter(1), reverse=True)
    end = time.time()
    client.close() #close client
    myCluster.close() #close cluster
    return end-start
    

In [None]:
#for nw in range (3,5):
#    print("nw ", nw, " time ", get_time(nw))

In [None]:
#works = [1,2,3,4,5,6,7,8]
#parts = [1,2,5,10,50, 100, 150]
works = [8]
parts = [8]
for w in works:
    for p in parts:
        print("nw: ", w, " Partitions ", p, " time: ", get_time(w,p))

In [None]:
get_time(12, 12)

In [None]:
get_time(24, 48)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import bokeh.palettes as palette

In [None]:
height = [word_sorted[i][1] for i in range(0,len(word_sorted))]
bars = [word_sorted[i][0] for i in range(0,len(word_sorted))]
y_pos = np.arange(len(bars))

fig, ax = plt.subplots(figsize=(10,8))
index = 30
# Create bars
ax.barh(y_pos[:index], height[:index],color=palette.inferno(index))
plt.yticks(y_pos[:index], bars[:index])
ax.grid(True, which="both", ls="-",color='0.93')
ax.set_axisbelow(True)

## parte 2

In [None]:
filename = os.path.join('data', 'papers_in_json_singleline', '*.json')
lines = db.read_text(filename)
js = lines.map(json.loads).repartition(10)

In [None]:
cluster=LocalCluster(n_workers=4)
client= Client(cluster)

In [None]:
js.pluck('metadata').pluck('authors').take(2)

In [None]:
authors = js.pluck('metadata').pluck('authors')

In [None]:
def flatten(record):
    uni=loc=lab=''
    if 'institution' in record['affiliation'].keys():
        uni=record['affiliation']['institution']
    else: uni="Unknown"
    
    if 'laboratory' in record['affiliation'].keys():
        lab=record['affiliation']['laboratory']
    else: lab="Unknown"
    
    if 'location' in record['affiliation'].keys():
        if 'country' in record['affiliation']['location'].keys():
            loc=record['affiliation']['location']['country']
        else: loc="Unknown"
    else: loc="Unknown"
    
    if uni=='': uni="Unknown"
    if lab=='': lab="Unknown"
    if loc=='': loc="Unknown"
    
    return {
        'name':       record['first'],
        'surname':    record['last'], 
        'University': uni,
        'Laboratory': lab,
        'Country':    loc,
    }

authors.flatten().map(flatten).take(12)

In [None]:
auth_df = authors.flatten().map(flatten).to_dataframe()
auth_df.head()

In [None]:
univs = auth_df.University.value_counts().nlargest(10).compute()
univs

In [None]:
labos = auth_df.Laboratory.value_counts().nlargest(10).compute()
labos

In [None]:
countries = auth_df.Country.value_counts().nlargest(10).compute()
countries

In [None]:
def get_time_univs(workers=4, partitions=10):
    myCluster = LocalCluster(n_workers=workers)
    client = Client(myCluster) #make client
    start = time.time() #strat taking time
    auth_df = (lines.map(json.loads).repartition(10)
                    .pluck('metadata').pluck('authors')
                    .flatten().map(flatten)
                    .to_dataframe())
    univs = auth_df.University.value_counts().nlargest(10).compute()
    end = time.time()
    client.close() #close client
    myCluster.close() #close cluster
    return end-start

def get_time_countries(workers=4, partitions=10):
    myCluster = LocalCluster(n_workers=workers)
    client = Client(myCluster) #make client
    start = time.time() #strat taking time
    auth_df = (lines.map(json.loads).repartition(10)
                    .pluck('metadata').pluck('authors')
                    .flatten().map(flatten)
                    .to_dataframe())
    countries = auth_df.Country.value_counts().nlargest(10).compute()
    end = time.time()
    client.close() #close client
    myCluster.close() #close cluster
    return end-start

def get_N_univs(workers=4, partitions=10, N=10):
    myCluster = LocalCluster(n_workers=workers)
    client = Client(myCluster) #make client
    auth_df = (lines.map(json.loads).repartition(10)
                    .pluck('metadata').pluck('authors')
                    .flatten().map(flatten)
                    .to_dataframe())
    univs = auth_df.University.value_counts().nlargest(N).compute()
    client.close() #close client
    myCluster.close() #close cluster
    return univs
def get_N_countries(workers=4, partitions=10, N=10):
    myCluster = LocalCluster(n_workers=workers)
    client = Client(myCluster) #make client
    auth_df = (lines.map(json.loads).repartition(10)
                    .pluck('metadata').pluck('authors')
                    .flatten().map(flatten)
                    .to_dataframe())
    countries = auth_df.Country.value_counts().nlargest(N).compute()
    client.close() #close client
    myCluster.close() #close cluster
    return countries
    

In [None]:
get_time_univs(10,10)

In [None]:
get_time_countries(10,10)

In [None]:
get_N_univs(10,10, 10)

In [None]:
get_N_countries(10,10, 10)

# parte 3

In [3]:
cluster=LocalCluster(n_workers=4)
client= Client(cluster)
client


0,1
Client  Scheduler: tcp://127.0.0.1:37207  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 12  Memory: 16.80 GB


In [4]:
filename = os.path.join('data', 'papers_in_json_singleline', '*.json')
lines = db.read_text(filename)
js = lines.map(json.loads).repartition(10)

In [5]:
metas = js.pluck(["paper_id", "metadata"])
m=metas.take(1)[0]

In [6]:
print (m[1]["title"])

PfSWIB, a potential chromatin regulator for var gene regulation and parasite development in Plasmodium falciparum


In [7]:
import io
def load_vectors(fname):
    fin = io.open(fname, 'r', encoding='utf-8', newline='\n', errors='ignore')
    n, d = map(int, fin.readline().split())
    data = {}
    for line in fin:
        tokens = line.rstrip().split(' ')
        data[tokens[0]] = list(map(float, tokens[1:]))
    return data
model = load_vectors('/home/alessandro/Downloads/wiki-news-300d-1M.vec')
#to get the embedding of word ’hello’:
model['hello']

[-0.192,
 0.1544,
 0.0467,
 0.0592,
 0.1369,
 -0.0772,
 -0.0384,
 0.0537,
 0.1435,
 -0.1353,
 -0.053,
 -0.0668,
 0.0185,
 0.0873,
 0.0903,
 0.1663,
 0.0035,
 -0.2102,
 0.201,
 -0.0249,
 -0.0279,
 -0.3241,
 -0.0066,
 -0.0264,
 -0.1628,
 -0.1094,
 -0.0882,
 0.0097,
 0.1228,
 0.0059,
 -0.051,
 0.0649,
 0.1577,
 0.0174,
 0.0991,
 0.1328,
 -0.0586,
 0.1814,
 -0.0098,
 0.1877,
 0.0518,
 -0.0697,
 -0.0629,
 -0.1981,
 -0.1373,
 -0.0811,
 -0.0631,
 -0.0639,
 0.1244,
 -0.0247,
 0.0225,
 -0.3174,
 -0.8462,
 -0.1248,
 0.0824,
 0.0212,
 0.0996,
 -0.0344,
 0.1383,
 -0.0983,
 0.1637,
 -0.1906,
 -0.1969,
 0.1473,
 0.1318,
 -0.07,
 0.0304,
 -0.0345,
 0.1507,
 -0.1786,
 0.1038,
 -0.0035,
 -0.093,
 0.1255,
 0.1431,
 0.105,
 -0.0332,
 0.1255,
 0.0339,
 0.2101,
 -0.205,
 -0.1511,
 -0.2529,
 -0.0887,
 0.0172,
 -0.0392,
 0.1185,
 0.0742,
 -0.0041,
 -0.0321,
 -0.1839,
 -0.0481,
 -0.0944,
 -0.0761,
 0.1848,
 0.1434,
 -0.1804,
 0.1901,
 -0.064,
 -0.0045,
 -0.3591,
 0.1271,
 0.131,
 -0.1333,
 0.0142,
 0.0421,
 -

In [None]:
text = (m[1]["title"])
text_split = text.split()

text_embedded = []
for t in text_split:
    try:
        text_embedded.append(model[t])
    except:
        pass
    
#text_embedded =[ model[t] for t in text_split]
text_embedded
#text_split

In [None]:
try: 
    t = np.array(model['PfSWIB'])
except:
    t= np.zeros(1)

t

In [37]:
def embedd (text):
    text_split = text.split()
    text_embedded = []
    for t in text_split:
        try:
            text_embedded.append(model[t])
        except:
            pass
    return text_embedded

def reco_emb(reco):
    return {
        "paper_id": reco['paper_id'],
        "title": model[reco['title']]
    }

def flatten(reco):
    return {
        "paper_id": reco[0],
        "title": reco[1]["title"]
    }

In [34]:
titles = db.from_sequence(metas.map(flatten).compute())
titles.take(3)


#titles = metas.map(flatten_embedding)
#titles.take(3)

({'paper_id': '000a0fc8bbef80410199e690191dc3076a290117',
  'title': 'PfSWIB, a potential chromatin regulator for var gene regulation and parasite development in Plasmodium falciparum'},
 {'paper_id': '000affa746a03f1fe4e3b3ef1a62fdfa9b9ac52a',
  'title': 'Correlation between antimicrobial consumption and incidence of health-care- associated infections due to methicillin- resistant Staphylococcus aureus and vancomycin-resistant enterococci at a university hospital in Taiwan from 2000 to 2010'},
 {'paper_id': '000b0174f992cb326a891f756d4ae5531f2845f7',
  'title': 'Full Title: A systematic review of MERS-CoV (Middle East Respiratory Syndrome Coronavirus) 2 seroprevalence and viral RNA prevalence in dromedary camels: implications for animal vaccination'})

In [46]:
t = titles.take(3)
#reco_emb(t[0])
t[0]

{'paper_id': '000a0fc8bbef80410199e690191dc3076a290117',
 'title': 'PfSWIB, a potential chromatin regulator for var gene regulation and parasite development in Plasmodium falciparum'}

In [41]:
et = t.map(reco_emb).compute()


KeyboardInterrupt: 

In [None]:
et



In [None]:
import itertools

#c = list(itertools.product(a, b))

In [None]:
grid = db.from_sequence(list(itertools.product(small_titl, small_titl))).repartition(10)

In [None]:
grid.take(2, npartitions=5)

In [None]:
list(itertools.product(small_titl, small_titl))