# Reservoir Sampling distribué - énoncé

In [1]:
import pyensae
%nb_menu

## création d'un fichier à sampler

In [148]:
with open("sample4.txt", "w", encoding="utf8") as f:
    for i in range(0,100000):
        f.write("{0}\t{1}{0}\n".format(i, chr(i%26 + 65)))
    f.write("100001\tAAAAAA")

In [150]:
%head sample4.txt

## connexion

In [8]:
import os
blobhp = {}
if "HDCREDENTIALS" in os.environ:
    blobhp["blob_storage"], blobhp["password1"], blobhp["hadoop_server"], blobhp["password2"], blobhp["username"] = \
        os.environ["HDCREDENTIALS"].split("**")
    r = type(blobhp)
else:
    from pyquickhelper.ipythonhelper import open_html_form
    params={"blob_storage":"", "password1":"", "hadoop_server":"", "password2":"", "username":"axavier"}
    r = open_html_form(params=params,title="server + hadoop + credentials", key_save="blobhp")
r

dict

In [9]:
import pyensae
blobstorage = blobhp["blob_storage"]
blobpassword = blobhp["password1"]
hadoop_server = blobhp["hadoop_server"]
hadoop_password = blobhp["password2"]
username = blobhp["username"]

In [10]:
client, bs = %hd_open
client, bs

(<pyensae.remote.azure_connection.AzureClient at 0x959b9b0>,
 <azure.storage.blob.blobservice.BlobService at 0x959b9e8>)

## upload du fichier

In [11]:
%blob_up sample3.txt /$PSEUDO/sampling/sample4.txt

'$PSEUDO/sampling/sample4.txt'

In [12]:
%blob_ls /$PSEUDO/sampling

Unnamed: 0,name,last_modified,content_type,content_length,blob_type
0,axavier/sampling/datafu-1.2.0.jar,"Fri, 13 Nov 2015 00:03:49 GMT",application/octet-stream,1600826,BlockBlob
1,axavier/sampling/out_sampled_rs4_2015.txt,"Fri, 13 Nov 2015 01:08:22 GMT",,0,BlockBlob
2,axavier/sampling/out_sampled_rs4_2015.txt/_SUC...,"Fri, 13 Nov 2015 01:08:22 GMT",application/octet-stream,0,BlockBlob
3,axavier/sampling/out_sampled_rs4_2015.txt/part...,"Fri, 13 Nov 2015 01:08:21 GMT",application/octet-stream,12785,BlockBlob
4,axavier/sampling/sample.txt,"Fri, 13 Nov 2015 00:02:50 GMT",application/octet-stream,1377780,BlockBlob
5,axavier/sampling/sample2.txt,"Fri, 13 Nov 2015 00:35:55 GMT",application/octet-stream,1377793,BlockBlob
6,axavier/sampling/sample3.txt,"Fri, 13 Nov 2015 00:39:40 GMT",application/octet-stream,1377793,BlockBlob
7,axavier/sampling/sample4.txt,"Sun, 15 Nov 2015 12:24:22 GMT",application/octet-stream,1377793,BlockBlob
8,axavier/sampling/sample4_hash.txt,"Fri, 13 Nov 2015 14:50:39 GMT",,0,BlockBlob
9,axavier/sampling/sample4_hash.txt/_SUCCESS,"Fri, 13 Nov 2015 14:50:39 GMT",application/octet-stream,0,BlockBlob


## Code python pour le reservoir sampling

In [18]:
ensemble = [ "%d%s" % (i, chr(i%26 + 97)) for i in range(0,10000)]
ensemble[:5]

['0a', '1b', '2c', '3d', '4e']

In [19]:
import random
def reservoir_sampling(ensemble, k):
    N = len(ensemble)
    echantillon = []
    for i, e in enumerate(ensemble):
        if len(echantillon) < k:
            echantillon.append(e)
        else:
            j = random.randint(0, i)
            if j < k:
                echantillon[j] = e
    return echantillon

reservoir_sampling(ensemble, 10)

['8681x',
 '8356k',
 '5490e',
 '4405l',
 '5890o',
 '2689l',
 '8672o',
 '3603p',
 '8599t',
 '6086c']

## python à jython

On s'assure que le code précédent fonctionne en jython (python compilé en java). On s'inspire pour cela de la documentation [jython-udfs](https://pig.apache.org/docs/r0.12.0/udf.html#jython-udfs).

### On créé d'abord un script PIG pour récupérer le schema et les premières lignes

In [34]:
%%PIG sample_explore.pig

ensemble = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt' 
        USING PigStorage('\t') AS (x:int, v:chararray) ;
DESCRIBE ensemble;
ens_group = GROUP ensemble ALL;
DESCRIBE ens_group;
sampled = FOREACH ens_group GENERATE FLATTEN(ensemble);
DESCRIBE sampled;

--ens10 = LIMIT ensemble 10;
--ens_group10 = LIMIT en_group10 ;
--DUMP ens10;
--DUMP ens_group10;

Si la fonction suivante provoque une erreur ::

    AzureException: STATUS: 403, JSON: Expecting value: line 1 column 1 (char 0)
    <Response [403]>
    unable to submit job: sample_explore.pig
    
Vérifier les identifiants utilisés pour se connecter.

In [35]:
jid = %hd_pig_submit sample_explore.pig
jid

{'id': 'job_1446540516812_0185'}

In [38]:
st = %hd_job_status jid["id"]
(st["id"],st["percentComplete"],st["completed"],
st["status"]["jobComplete"],st["status"]["state"])

('job_1446540516812_0185', None, None, False, 'RUNNING')

La sortie standard contient les informations souhaitées :

In [41]:
%hd_tail_stderr jid["id"] -n 5

Et la sortie du second dump :: 
    
    (all,{(100001,AAAAAA),(99999,D99999),(99998,C99998)...

### Le code Jython

In [1]:
import pyensae

In [2]:
%%PYTHON reservoir_sample.py

import random

@schemaFunction("rsSchema")
def rsSchema(input):
    return input

@outputSchemaFunction("rsSchema")
def reservoir_sampling(ensemble):
    ensemble = eval(ensemble)
    k = 10
    N = len(ensemble)
    echantillon = []
    for i, e in enumerate(ensemble):
        if len(echantillon) < k:
            echantillon.append(e)
        else:
            j = random.randint(0, i)
            if j < k:
                echantillon[j] = e
    return echantillon

In [4]:
%%jython reservoir_sample.py reservoir_sampling
{(100001,"AAAAAA"),(99999,"D99999"),(99998,"C99998")}

On uploade le script :

In [16]:
%blob_up reservoir_sample.py hdblobstorage/axavier/scripts/pig/reservoir_sample.py

'axavier/scripts/pig/reservoir_sample.py'

In [20]:
%blob_ls hdblobstorage/axavier/scripts/pig

Unnamed: 0,name,last_modified,content_type,content_length,blob_type
0,axavier/scripts/pig/reservoir_sample.py,"Sun, 15 Nov 2015 13:11:01 GMT",application/octet-stream,513,BlockBlob
1,axavier/scripts/pig/sample.pig,"Fri, 13 Nov 2015 01:07:11 GMT",application/octet-stream,392,BlockBlob
2,axavier/scripts/pig/sample.pig.log,"Fri, 13 Nov 2015 01:08:35 GMT",,0,BlockBlob
3,axavier/scripts/pig/sample.pig.log/exit,"Fri, 13 Nov 2015 01:08:35 GMT",application/octet-stream,3,BlockBlob
4,axavier/scripts/pig/sample.pig.log/stderr,"Fri, 13 Nov 2015 01:08:31 GMT",application/octet-stream,16755,BlockBlob
5,axavier/scripts/pig/sample.pig.log/stdout,"Fri, 13 Nov 2015 01:08:31 GMT",application/octet-stream,0,BlockBlob
6,axavier/scripts/pig/sample_explore.pig,"Sun, 15 Nov 2015 12:32:38 GMT",application/octet-stream,372,BlockBlob
7,axavier/scripts/pig/sample_explore.pig.log,"Sun, 15 Nov 2015 12:33:17 GMT",,0,BlockBlob
8,axavier/scripts/pig/sample_explore.pig.log/exit,"Sun, 15 Nov 2015 12:33:17 GMT",application/octet-stream,3,BlockBlob
9,axavier/scripts/pig/sample_explore.pig.log/stderr,"Sun, 15 Nov 2015 12:33:07 GMT",application/octet-stream,1426,BlockBlob


On ajoute le code jython au script précédent :

In [11]:
%%PIG sample_explore_complete.pig

REGISTER '$CONTAINER/$SCRIPTPIG/reservoir_sample.py' using reservoir_sample as myrs;

ensemble = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt' 
        USING PigStorage('\t') AS (x:int, v:chararray) ;
DESCRIBE ensemble;
ens_group = GROUP ensemble ALL;
DESCRIBE ens_group;
sampled = FOREACH ens_group GENERATE FLATTEN(myrs(ensemble));
DESCRIBE sampled;

STORE sampled INTO 
INTO '$CONTAINER/$PSEUDO/sampling/sample_rs.txt' USING PigStorage();

In [17]:
jid = %hd_pig_submit sample_explore_complete.pig
jid

{'id': 'job_1446540516812_0187'}

In [23]:
st = %hd_job_status jid["id"]
(st["id"],st["percentComplete"],st["completed"],
st["status"]["jobComplete"],st["status"]["state"])

('job_1446540516812_0187', None, 'done', True, 'SUCCEEDED')

In [24]:
%hd_tail_stderr jid["id"] -n 100

A corriger plus tard. Dans l'immédiat, on utilisera la librairie [datafu](https://datafu.incubator.apache.org/docs/datafu/guide/sampling.html). Si le cluster ne reconnaît pas la librairie, voir la section java pour comprendre comment l'importer. On la déclare dans le script par l'instruction ``REGISTER``.

In [60]:
%%PIG sample_explore_datafu.pig

REGISTER '$CONTAINER/$PSEUDO/sampling/datafu-1.2.0.jar';
DEFINE RS datafu.pig.sampling.ReservoirSample('1000');

ensemble = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt' 
        USING PigStorage('\t') AS (x:int, v:chararray) ;
DESCRIBE ensemble;
ens_group = GROUP ensemble ALL;
DESCRIBE ens_group;
sampled = FOREACH ens_group GENERATE FLATTEN(RS(ensemble));
DESCRIBE sampled;

STORE sampled 
INTO '$CONTAINER/$PSEUDO/sampling/sample_datafu_rs.txt' USING PigStorage();

In [61]:
jid = %hd_pig_submit sample_explore_datafu.pig
jid

{'id': 'job_1446540516812_0193'}

In [72]:
st = %hd_job_status jid["id"]
(st["id"],st["percentComplete"],st["completed"],
st["status"]["jobComplete"],st["status"]["state"])

('job_1446540516812_0193', '50% complete', None, False, 'RUNNING')

In [71]:
%hd_tail_stderr jid["id"] -n 100

In [None]:
%blob_ls /$PSEUDO/sampling

## version distribuée

Astuce : on distribué puis on recombine les échantillons en faisant un dernier reservoir sampling mais pondéré.

## version distribuée améliorée

Le problème de la version précédente : chaque sous-ensemble traité d'un seul bloc utilise une séquence de nombres aléatoires sur laquelle on ne connaît pas grand chose. Si les mêmes *seed* sont utilisées, il est possible que les séquences, même si elles simulent le hasard, soient extrêmement corrélées entre chaque bloc. Il faut remédier à cela.

Il faut également s'assurer que chaque bloc n'est pas *skewed*.

In [80]:
%%PIG_azure script_rs.pig

DEFINE MD5 datafu.pig.hash.MD5();
DEFINE RS datafu.pig.sampling.ReservoirSampling('100');

ensemble = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt' 
        USING PigStorage('\t') AS (x:int, v:chararray) ;
DESCRIBE ensemble;

ens_group = GROUP ensemble BY (x,v);
DESCRIBE ens_group;

compte_group = FOREACH ens_group 
            GENERATE group.x AS x, 
                     group.v AS v, 
                     COUNT(ensemble) AS nb_ligne ;
DESCRIBE compte_group;

hash_group = FOREACH compte_group 
                GENERATE x, v, nb_ligne,
                        MD5(v) as val;            
DESCRIBE hash_group;  

group_hash = GROUP hash_group BY val ;
DESCRIBE group_hash;

rs_parall = FOREACH group_hash GENERATE
                    COUNT(hash_group) AS nb_hash,
                    RS(hash_group) AS Rs ;
DESCRIBE rs_parall;

--STORE hash_group 
--INTO '$CONTAINER/$PSEUDO/sampling/sample4_hash.txt' USING PigStorage();

In [81]:
jid=%hd_pig_submit script_rs.pig
jid

{'id': 'job_1446540516812_0177'}

In [83]:
st = %hd_job_status jid["id"]
(st["id"],st["percentComplete"],st["completed"],
st["status"]["jobComplete"],st["status"]["state"])

('job_1446540516812_0177', None, 'done', True, 'SUCCEEDED')

In [84]:
%hd_tail_stderr jid["id"] -n 100

In [77]:
%blob_ls /$PSEUDO/sampling

Unnamed: 0,name,last_modified,content_type,content_length,blob_type
0,axavier/sampling/datafu-1.2.0.jar,"Fri, 13 Nov 2015 00:03:49 GMT",application/octet-stream,1600826,BlockBlob
1,axavier/sampling/out_sampled_rs4_2015.txt,"Fri, 13 Nov 2015 01:08:22 GMT",,0,BlockBlob
2,axavier/sampling/out_sampled_rs4_2015.txt/_SUC...,"Fri, 13 Nov 2015 01:08:22 GMT",application/octet-stream,0,BlockBlob
3,axavier/sampling/out_sampled_rs4_2015.txt/part...,"Fri, 13 Nov 2015 01:08:21 GMT",application/octet-stream,12785,BlockBlob
4,axavier/sampling/sample.txt,"Fri, 13 Nov 2015 00:02:50 GMT",application/octet-stream,1377780,BlockBlob
5,axavier/sampling/sample2.txt,"Fri, 13 Nov 2015 00:35:55 GMT",application/octet-stream,1377793,BlockBlob
6,axavier/sampling/sample3.txt,"Fri, 13 Nov 2015 00:39:40 GMT",application/octet-stream,1377793,BlockBlob
7,axavier/sampling/sample4.txt,"Fri, 13 Nov 2015 00:41:49 GMT",application/octet-stream,1377793,BlockBlob
8,axavier/sampling/sample4_hash.txt,"Fri, 13 Nov 2015 14:50:39 GMT",,0,BlockBlob
9,axavier/sampling/sample4_hash.txt/_SUCCESS,"Fri, 13 Nov 2015 14:50:39 GMT",application/octet-stream,0,BlockBlob


In [78]:
%blob_downmerge /$PSEUDO/sampling/sample4_hash.txt sample4_hash.txt

'sample4_hash.txt'

In [79]:
%head sample4_hash.txt

## version java

On s'inspire de l'exemple suivant [Sampling](http://datafu.incubator.apache.org/docs/datafu/guide/sampling.html).
 On télécharge [datafu 1.2](http://datafu.incubator.apache.org/docs/datafu/) depuis [Maven](http://mvnrepository.com/artifact/com.linkedin.datafu/datafu/1.2.0). Ce n'est pas la dernière version mais suivre les  instructions pour *builder* datafu (voir [documentation](http://datafu.incubator.apache.org/docs/datafu/1.2.0/)). En particulier, la version pondérée du reservoir sampling n'est pas disponible (voir [history](https://github.com/apache/incubator-datafu/commits/master/datafu-pig/src/main/java/datafu/pig/sampling/WeightedReservoirSample.java), la version 1.2.0 est sorti en décembre 2013).
 
L'implémentation [java](https://github.com/apache/incubator-datafu/blob/master/datafu-pig/src/main/java/datafu/pig/sampling/ReservoirSample.java) n'a pas l'air de résoudre un problème qui peut survenir si la taille de l'échantillon demandée est trop grande. Voir section suivante.

In [4]:
import pyensae
pyensae.download_data("datafu-1.2.0.jar", url="http://central.maven.org/maven2/com/linkedin/datafu/datafu/1.2.0/")

'datafu-1.2.0.jar'

In [47]:
%blob_up datafu-1.2.0.jar /$PSEUDO/sampling/datafu-1.2.0.jar

'$PSEUDO/sampling/datafu-1.2.0.jar'

In [153]:
%blob_ls /$PSEUDO/sampling

Unnamed: 0,name,last_modified,content_type,content_length,blob_type
0,axavier/sampling/datafu-1.2.0.jar,"Fri, 13 Nov 2015 00:03:49 GMT",application/octet-stream,1600826,BlockBlob
1,axavier/sampling/sample.txt,"Fri, 13 Nov 2015 00:02:50 GMT",application/octet-stream,1377780,BlockBlob
2,axavier/sampling/sample2.txt,"Fri, 13 Nov 2015 00:35:55 GMT",application/octet-stream,1377793,BlockBlob
3,axavier/sampling/sample3.txt,"Fri, 13 Nov 2015 00:39:40 GMT",application/octet-stream,1377793,BlockBlob
4,axavier/sampling/sample4.txt,"Fri, 13 Nov 2015 00:41:49 GMT",application/octet-stream,1377793,BlockBlob


In [261]:
%%PIG_azure sample.pig

REGISTER '$CONTAINER/$PSEUDO/sampling/datafu-1.2.0.jar';

DEFINE RS datafu.pig.sampling.ReservoirSample('1000');

dset = LOAD '$CONTAINER/$PSEUDO/sampling/sample4.txt' 
        USING PigStorage('\t') AS (x:int, v:chararray) ;
sampled = FOREACH (GROUP dset ALL) GENERATE FLATTEN(RS(dset));
STORE sampled INTO '$CONTAINER/$PSEUDO/sampling/out_sampled_rs4_2015.txt' USING PigStorage() ;

In [262]:
jid = %hd_pig_submit sample.pig

In [265]:
st = %hd_job_status jid["id"]
st["id"],st["percentComplete"],st["completed"],st["status"]["jobComplete"],st["status"]["state"]

('job_1446540516812_0136', None, None, False, 'RUNNING')

In [266]:
%hd_tail_stderr jid["id"] -n 10

In [274]:
%blob_ls /$PSEUDO/sampling

Unnamed: 0,name,last_modified,content_type,content_length,blob_type
0,axavier/sampling/datafu-1.2.0.jar,"Fri, 13 Nov 2015 00:03:49 GMT",application/octet-stream,1600826,BlockBlob
1,axavier/sampling/out_sampled_rs4_2015.txt,"Fri, 13 Nov 2015 01:08:22 GMT",,0,BlockBlob
2,axavier/sampling/out_sampled_rs4_2015.txt/_SUC...,"Fri, 13 Nov 2015 01:08:22 GMT",application/octet-stream,0,BlockBlob
3,axavier/sampling/out_sampled_rs4_2015.txt/part...,"Fri, 13 Nov 2015 01:08:21 GMT",application/octet-stream,12785,BlockBlob
4,axavier/sampling/sample.txt,"Fri, 13 Nov 2015 00:02:50 GMT",application/octet-stream,1377780,BlockBlob
5,axavier/sampling/sample2.txt,"Fri, 13 Nov 2015 00:35:55 GMT",application/octet-stream,1377793,BlockBlob
6,axavier/sampling/sample3.txt,"Fri, 13 Nov 2015 00:39:40 GMT",application/octet-stream,1377793,BlockBlob
7,axavier/sampling/sample4.txt,"Fri, 13 Nov 2015 00:41:49 GMT",application/octet-stream,1377793,BlockBlob
8,axavier/sampling/sampled4_2015.txt,"Fri, 13 Nov 2015 00:50:20 GMT",,0,BlockBlob
9,axavier/sampling/sampled4_2015.txt/_SUCCESS,"Fri, 13 Nov 2015 00:50:20 GMT",application/octet-stream,0,BlockBlob


In [278]:
%blob_downmerge /$PSEUDO/sampling/out_sampled_rs4_2015.txt out_sampled_rs4_2015.txt -o

'out_sampled_rs4_2015.txt'

In [279]:
%head out_sampled_rs4_2015.txt

## fin

In [280]:
%blob_close

True

## version avec itérateur