# Map/Reduce avec PIG sur Azure - correction

In [1]:
from jyquickhelper import add_notebook_menu
add_notebook_menu()

## Données

On considère le jeu de données suivant : [Localization Data for Person Activity Data Set](https://archive.ics.uci.edu/ml/datasets/Localization+Data+for+Person+Activity) qu'on récupère comme indiqué dans le notebook de l'énoncé.

In [2]:
from pyquickhelper.ipythonhelper import open_html_form
params={"blob_storage":"", "password1":"", "hadoop_server":"", "password2":"", "username":"xavierdupre"}
open_html_form(params=params,title="server + hadoop + credentials", key_save="blobhp")

In [3]:
blobstorage = blobhp["blob_storage"]
blobpassword = blobhp["password1"]
hadoop_server = blobhp["hadoop_server"]
hadoop_password = blobhp["password2"]
username = blobhp["username"]

In [4]:
import pyensae
%load_ext pyensae
%load_ext pyenbc
%hd_open

(<pyensae.remote.azure_connection.AzureClient at 0xafe7e10>,
 <azure.storage.blob.blobservice.BlobService at 0xafe7e48>)

## Exercice 1 : GROUP BY

In [5]:
import pandas, sqlite3
con = sqlite3.connect("ConfLongDemo_JSI.db3")
df = pandas.read_sql("""SELECT activity, count(*) as nb FROM person GROUP BY activity""", con)
con.close()
df.head()

Unnamed: 0,activity,nb
0,falling,2973
1,lying,54480
2,lying down,6168
3,on all fours,5210
4,sitting,27244


On vérifie que le fichier qu'on veut traiter est bien là :

In [6]:
%blob_ls /testensae/ConfLongDemo_JSI.small.txt

Unnamed: 0,name,last_modified,content_type,content_length,blob_type
0,testensae/ConfLongDemo_JSI.small.txt,"Thu, 29 Oct 2015 00:23:00 GMT",application/octet-stream,132727,BlockBlob


Il faut maintenant le faire avec PIG.

In [7]:
%%PIG_azure solution_groupby.pig

myinput = LOAD '$CONTAINER/testensae/ConfLongDemo_JSI.small.txt' 
          using PigStorage(',') 
          AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ;

gr = GROUP myinput BY activity ;
avgact = FOREACH gr GENERATE group, COUNT(myinput) ; 

STORE avgact INTO '$CONTAINER/$PSEUDO/testensae/ConfLongDemo_JSI.small.group.2015.txt' USING PigStorage() ;

On soumet le job :

In [8]:
jid = %hd_pig_submit solution_groupby.pig
jid

{'id': 'job_1445989166328_0009'}

On vérifie le status du job :

In [9]:
st = %hd_job_status jid["id"]
st["id"],st["percentComplete"],st["completed"],st["status"]["jobComplete"],st["status"]["state"]

('job_1445989166328_0009', '100% complete', None, False, 'RUNNING')

On regarde si la compilation s'est bien passée :

In [10]:
%hd_tail_stderr jid["id"]

On regarde le contenu du répertoire sur le blob storage :

In [11]:
df=%blob_ls /$PSEUDO/testensae
list(df["name"])

['xavierdupre/testensae',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.2015.txt',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.2015.txt/_SUCCESS',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.2015.txt/part-r-00000',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.txt',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.txt/_SUCCESS',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.txt/part-r-00000',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.txt',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.txt/_SUCCESS',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.txt/part-r-00000',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.keep_walking.txt',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.keep_walking.txt/_SUCCESS',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.keep_walking.txt/part-m-00000',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.walking2015.txt',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.walking2015.txt

In [12]:
import os
if os.path.exists("results.group.2015.xt") : os.remove("results.group.2015.txt")
%blob_downmerge /$PSEUDO/testensae/ConfLongDemo_JSI.small.group.2015.txt results.group.2015.txt

'results.group.2015.txt'

In [13]:
%lsr res.*[.]txt

Unnamed: 0,directory,last_modified,name,size
0,False,2015-10-29 01:56:11.025867,.\results.group.2015.txt,89
1,False,2015-10-29 01:46:45.425028,.\results.txt,21.65 Kb
2,False,2015-10-29 01:46:46.705466,.\results_allfiles.txt,21.65 Kb


In [14]:
%head results.group.2015.txt

## Exercice 2 : JOIN

In [15]:
con = sqlite3.connect("ConfLongDemo_JSI.db3")
df = pandas.read_sql("""SELECT person.*, A.nb FROM person INNER JOIN (
                            SELECT activity, count(*) as nb FROM person GROUP BY activity) AS A
                            ON person.activity == A.activity""", con)
con.close()
df.head()

Unnamed: 0,index,sequence,tag,timestamp,dateformat,x,y,z,activity,nb
0,0,A01,010-000-024-033,633790226051280329,27.05.2009 14:03:25:127,4.062931,1.892434,0.507425,walking,32710
1,1,A01,020-000-033-111,633790226051820913,27.05.2009 14:03:25:183,4.291954,1.78114,1.344495,walking,32710
2,2,A01,020-000-032-221,633790226052091205,27.05.2009 14:03:25:210,4.359101,1.826456,0.968821,walking,32710
3,3,A01,010-000-024-033,633790226052361498,27.05.2009 14:03:25:237,4.087835,1.879999,0.466983,walking,32710
4,4,A01,010-000-030-096,633790226052631792,27.05.2009 14:03:25:263,4.324462,2.07246,0.488065,walking,32710


Idem, maintenant il faut le faire avec PIG.

In [16]:
%%PIG_azure solution_groupby_join.pig

myinput = LOAD '$CONTAINER/testensae/ConfLongDemo_JSI.small.txt' 
          using PigStorage(',') 
          AS (index:long, sequence, tag, timestamp:long, dateformat, x:double,y:double, z:double, activity) ;

gr = GROUP myinput BY activity ;
avgact = FOREACH gr GENERATE group, COUNT(myinput) ; 

joined = JOIN myinput BY activity, avgact BY group ;

STORE joined INTO '$CONTAINER/$PSEUDO/testensae/ConfLongDemo_JSI.small.group.join.2015.txt' USING PigStorage() ;

In [17]:
jid = %hd_pig_submit solution_groupby_join.pig
jid

{'id': 'job_1445989166328_0011'}

In [18]:
st = %hd_job_status jid["id"]
st["id"],st["percentComplete"],st["completed"],st["status"]["jobComplete"],st["status"]["state"], st["userargs"]["file"]

('job_1445989166328_0011',
 '100% complete',
 'done',
 True,
 'SUCCEEDED',
 'wasb://hdblobstorage@hdblobstorage.blob.core.windows.net/xavierdupre/scripts/pig/solution_groupby_join.pig')

In [19]:
df=%blob_ls /$PSEUDO/testensae
df

Unnamed: 0,name,last_modified,content_type,content_length,blob_type
0,xavierdupre/testensae,"Tue, 25 Nov 2014 00:50:34 GMT",application/octet-stream,0,BlockBlob
1,xavierdupre/testensae/ConfLongDemo_JSI.small.g...,"Thu, 29 Oct 2015 00:55:09 GMT",,0,BlockBlob
2,xavierdupre/testensae/ConfLongDemo_JSI.small.g...,"Thu, 29 Oct 2015 00:55:09 GMT",application/octet-stream,0,BlockBlob
3,xavierdupre/testensae/ConfLongDemo_JSI.small.g...,"Thu, 29 Oct 2015 00:55:08 GMT",application/octet-stream,89,BlockBlob
4,xavierdupre/testensae/ConfLongDemo_JSI.small.g...,"Thu, 29 Oct 2015 00:58:43 GMT",,0,BlockBlob
5,xavierdupre/testensae/ConfLongDemo_JSI.small.g...,"Thu, 29 Oct 2015 00:58:43 GMT",application/octet-stream,0,BlockBlob
6,xavierdupre/testensae/ConfLongDemo_JSI.small.g...,"Thu, 29 Oct 2015 00:58:42 GMT",application/octet-stream,144059,BlockBlob
7,xavierdupre/testensae/ConfLongDemo_JSI.small.g...,"Tue, 25 Nov 2014 01:16:11 GMT",,0,BlockBlob
8,xavierdupre/testensae/ConfLongDemo_JSI.small.g...,"Tue, 25 Nov 2014 01:16:11 GMT",application/octet-stream,0,BlockBlob
9,xavierdupre/testensae/ConfLongDemo_JSI.small.g...,"Tue, 25 Nov 2014 01:16:10 GMT",application/octet-stream,144059,BlockBlob


In [20]:
set(df.name)

{'xavierdupre/testensae',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.2015.txt',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.2015.txt/_SUCCESS',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.2015.txt/part-r-00000',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.2015.txt',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.2015.txt/_SUCCESS',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.2015.txt/part-r-00000',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.txt',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.txt/_SUCCESS',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.join.txt/part-r-00000',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.txt',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.txt/_SUCCESS',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.group.txt/part-r-00000',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.keep_walking.txt',
 'xavierdupre/testensae/ConfLongDemo_JSI.small.keep_

In [21]:
if os.path.exists("results.join.2015.txt") : os.remove("results.join.2015.txt")
%blob_downmerge /$PSEUDO/testensae/ConfLongDemo_JSI.small.group.join.2015.txt results.join.2015.txt

'results.join.2015.txt'

In [22]:
%head results.join.2015.txt

<h3 id="prol">Prolongements</h3>

[PIG](http://pig.apache.org/) n'est pas la seule façon d'exécuter des jobs Map/Reduce. [Hive](https://hive.apache.org/) est un langage dont la syntaxe est très proche de celle du SQL. L'article [Comparing Pig Latin and SQL for Constructing Data Processing Pipelines](https://developer.yahoo.com/blogs/hadoop/comparing-pig-latin-sql-constructing-data-processing-pipelines-444.html) explicite les différences des deux approches.

**langage haut niveau**

Ce qu'il faut retenir est que le langage PIG est un langage haut niveau. Le programme est compilé en une séquence d'opérations Map/Reduce transparente pour l'utilisateur. Le temps de développement est très réduit lorsqu'on le compare au même programme écrit en Java. Le compilateur construit un plan d'exécution ([quelques exemples ici](http://chimera.labs.oreilly.com/books/1234000001811/ch07.html#explain)) et infère le nombre de machines requises pour distribuer le job. Cela suffit pour la plupart des besoins, cela nécessite.

**petits jeux**

Certains jobs peuvent durer des heures, il est conseillée de les essayer sur des petits jeux de données avant de les faire tourner sur les vrais données. Il est toujours frustrant de s'apercevoir qu'un job a planté au bout de deux heures car une chaîne de caractères est vide et que ce cas n'a pas été prévu.

Avec ces petits jeux, il est possible de faire tourner et conseillé de tester le job d'abord sur la passerelle ([exécution local](http://archive.cloudera.com/cdh/3/pig/tutorial.html#Running+the+Pig+Scripts+in+Local+Mode)) avant de le lancer sur le cluster. Avec pyensae, il faut ajouter l'option ``-local`` à la commande [hd_pig_submit](http://www.xavierdupre.fr/app/pyensae/helpsphinx/pyensae/remote/magic_azure.html?highlight=hd_pig_submit#pyensae.remote.magic_azure.MagicAzure.hd_pig_submit).

**concaténer les fichiers divisés**

Un programme PIG ne produit pas un fichier mais plusieurs fichiers dans un répertoire. La commande [getmerge](http://hadoop.apache.org/docs/r2.3.0/hadoop-project-dist/hadoop-common/FileSystemShell.html) télécharge ces fichiers sur la passerelle et les fusionne en un seul.

**ordre des lignes**

Les jobs sont distribués, même en faisant rien (LOAD + STORE), il n'est pas garanti que l'ordre des lignes soit préservé. La probabilié que ce soit le cas est quasi nulle.