# Statistiques Wikipedia - énoncé

In [1]:
from pyquickhelper.ipythonhelper import add_notebook_menu
add_notebook_menu()

## Exercice 1 : parallélisation du téléchargement

On peut paralléliser le téléchargement de différentes façons :

* avec des [threads](https://en.wikipedia.org/wiki/Thread_(computing)) (librairie [threading](https://docs.python.org/3/library/threading.html) : synchronisation rapide mais parfois délicate et mémoire partagée entre threads
* avec des [processus](https://fr.wikipedia.org/wiki/Processus_(informatique)) (librairie [multiprocessing](https://docs.python.org/3.5/library/multiprocessing.html), [joblib](https://pythonhosted.org/joblib/), [jupyter](http://www.xavierdupre.fr/app/ensae_teaching_cs/helpsphinx3/notebooks/td2a_cenonce_session_2D.html) : synchronisation lente, pas de mémoire partagée
* avec un [cluster](https://fr.wikipedia.org/wiki/Grappe_de_serveurs), [jupyter](http://www.xavierdupre.fr/app/ensae_teaching_cs/helpsphinx3/notebooks/td2a_cenonce_session_2D.html) : synchronisation lente, pas de mémoire partagée, parallélisme en grande dimension

La page [ParallelProcessing](https://wiki.python.org/moin/ParallelProcessing) recense des modules qui implémente cela mais elle n'est pas très à jour. Il faut vérifier si les modules proposés sont encore maintenus.

### Approche avec des threads

In [None]:
import threading, time, os
from datetime import datetime, timedelta
from mlstatpy.data.wikipedia import download_pageviews
folder = "wikipv"
if not os.path.exists(folder):
    os.mkdir(folder)

class DownloadThread(threading.Thread) :
    """thread definition, it downloads a stream one after another
    until a queue is empty"""
    def __init__ (self, qu, name, folder) :
        threading.Thread.__init__ (self)
        self.qu = qu
        self.name = name
        self.folder = folder
       
    def run (self) :
        while not self.qu.empty():
            date = self.qu.get(False)
            if date is None:
                break
            print(self.name, "download", date, "len(qu)", self.qu.qsize())
            try:
                download_pageviews(date, folder=self.folder)
            except Exception as e:
                print("skipping dt", dt, "rerun to get it", e)
        print("done", self.name)
        self.qu.task_done()
            
# on créé des files et les threads associés
import queue       
queues = [queue.Queue() for i in range(0, 3)]
m = [DownloadThread(q, "thread %d" % i, folder) for i, q in enumerate(queues)]

# on remplit les files
dt = datetime.now() - timedelta(15)
hour = timedelta(hours=1)
for h in range(0, 24*7):
    queues[h%3].put(dt)
    dt += hour
    
# on démarre les threads
for t in m:
    t.start()
    
# on attend qu'elles se vident
for i, q in enumerate(queues):
    print("attendre file", i, [q.qsize() for q in queues])
    while not q.empty():
        time.sleep(1)

thread 0 download 2016-08-27 22:30:34.234240 len(qu) 55
thread 1 download 2016-08-27 23:30:34.234240 len(qu) 55
thread 0 download 2016-08-28 01:30:34.234240 len(qu) 54
thread 2 download 2016-08-28 00:30:34.234240 len(qu) 55
thread 1 download 2016-08-28 02:30:34.234240 len(qu) 54
attendre file 0 [54, 54, 55]
thread 0 download 2016-08-28 04:30:34.234240 len(qu) 53
thread 1 download 2016-08-28 05:30:34.234240 len(qu) 53
thread 2 download 2016-08-28 03:30:34.234240 len(qu) 54
thread 0 download 2016-08-28 07:30:34.234240 len(qu) 52
thread 1 download 2016-08-28 08:30:34.234240 len(qu) 52
thread 2 download 2016-08-28 06:30:34.234240 len(qu) 53
thread 0 download 2016-08-28 10:30:34.234240 len(qu) 51
thread 2 download 2016-08-28 09:30:34.234240 len(qu) 52
thread 1 download 2016-08-28 11:30:34.234240 len(qu) 51
thread 0 download 2016-08-28 13:30:34.234240 len(qu) 50
thread 1 download 2016-08-28 14:30:34.234240 len(qu) 50
thread 0 download 2016-08-28 16:30:34.234240 len(qu) 49
thread 1 download 2

### Parallélisation avec des processus

Il n'est pas toujours évident de comprendre ce qu'il se passe quand l'erreur se produit dans un processus différent. Si on change le *backend* pour ``"threading"``, l'erreur devient visible. Voir [Parallel](https://pythonhosted.org/joblib/generated/joblib.Parallel.html?highlight=parallel). Le code ne fonctionne pas lorsque ``n_jobs > 1`` sous Windows et que le backend est celui par défaut.

In [None]:
from joblib import Parallel, delayed
from datetime import datetime, timedelta
import os
folder = "wikipv"
if not os.path.exists(folder):
    os.mkdir(folder)
    
# on remplit les files
dt = datetime.now() - timedelta(14)
hour = timedelta(hours=1)
dates = [dt + hour*i for i in range(0,24)]
    
def downloadp2(dt, folder):
    from mlstatpy.data.wikipedia import download_pageviews
    download_pageviews(dt, folder=folder)

# L'instruction ne marche pas toujours depuis un notebook,
# dans ce cas, il faut exécuter un programme.
Parallel(n_jobs=1, verbose=5)(delayed(downloadp2)(dt, folder) for dt in dates)

## Filtrage pour ne garder que les lignes avec fr\t

In [None]:
def filtre(input, country):
    import os
    print(input)
    output = input + "." + country
    if not os.path.exists(output):
        with open(input, "r", encoding="utf-8") as f:
            with open(output, "w", encoding="utf-8") as g:
                for line in f:
                    if line.startswith(country):
                        g.write(line)

import os
from joblib import Parallel, delayed
folder = "wikipv"
files = os.listdir(folder)  
files = [os.path.join(folder, _) for _ in files if _.startswith("pageviews") and _.endswith("0000")]

Parallel(n_jobs=3, verbose=5, backend="threading")(delayed(filtre)(name, "fr") for name in files)

wikipv\pageviews-20160827-210000wikipv\pageviews-20160827-220000
wikipv\pageviews-20160827-230000

wikipv\pageviews-20160828-000000
wikipv\pageviews-20160828-010000
wikipv\pageviews-20160828-020000
wikipv\pageviews-20160828-030000
wikipv\pageviews-20160828-040000
wikipv\pageviews-20160828-050000
wikipv\pageviews-20160828-060000
wikipv\pageviews-20160828-070000
wikipv\pageviews-20160828-080000
wikipv\pageviews-20160828-090000
wikipv\pageviews-20160828-100000
wikipv\pageviews-20160828-110000


[Parallel(n_jobs=3)]: Done  12 tasks      | elapsed:   53.4s


wikipv\pageviews-20160828-120000


## Insérer le fichier dans une base de données SQL

In [None]:
import pandas
df = pandas.read