# HDInsight, PIG

Short examples on how to connect to a cluster from a notebook and submit a job (Azure + PIG).

In [1]:
from jyquickhelper import add_notebook_menu
add_notebook_menu()

## Download the data

In [2]:
url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00222/"
file = "bank.zip"
import pyensae
data = pyensae.download_data(file, website=url)

In [3]:
import pandas
df = pandas.read_csv("bank-full.csv", sep=";")

In [4]:
df.head()

Unnamed: 0,age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y
0,58,management,married,tertiary,no,2143,yes,no,unknown,5,may,261,1,-1,0,unknown,no
1,44,technician,single,secondary,no,29,yes,no,unknown,5,may,151,1,-1,0,unknown,no
2,33,entrepreneur,married,secondary,no,2,yes,yes,unknown,5,may,76,1,-1,0,unknown,no
3,47,blue-collar,married,unknown,no,1506,yes,no,unknown,5,may,92,1,-1,0,unknown,no
4,33,unknown,single,unknown,no,1,no,no,unknown,5,may,198,1,-1,0,unknown,no


In [5]:
df.to_csv("bank_full_tab_no.txt", sep="\t", index=False, header=None)

## Connect to the cluster

In [6]:
import pyensae
blobstorage = 
blobpassword = 
hadoop_server = 
hadoop_password = 
username = "centrale"
client, bs =  %hd_open
client, bs

(<pyensae.remote.azure_connection.AzureClient at 0x1a349a00550>,
 <azure.storage.blob.blockblobservice.BlockBlobService at 0x1a349a314a8>)

## Upload the data

In [7]:
%blob_up bank_full_tab_no.txt hdblobstorage/centrale2/bank_full_tab_no.txt

'centrale2/bank_full_tab_no.txt'

In [8]:
%blob_ls hdblobstorage/centrale2

Unnamed: 0,name,last_modified,content_type,content_length,blob_type
0,centrale2/bank_full_tab_no.txt,2016-06-16 10:18:58+00:00,,3751188,BlockBlob


## Submit a PIG query

In [9]:
mapping = {'int64': 'double', 'float': 'double', 'object': 'chararray'}
schema = ["%s:%s" % (_[0], mapping.get(str(_[1]), _[1])) for _ in zip(df.columns, df.dtypes)]
schema = ", ".join(schema)
schema

'age:double, job:chararray, marital:chararray, education:chararray, default:chararray, balance:double, housing:chararray, loan:chararray, contact:chararray, day:double, month:chararray, duration:double, campaign:double, pdays:double, previous:double, poutcome:chararray, y:chararray'

On ajoute l'instruction [DESCRIBE](http://pig.apache.org/docs/r0.16.0/test.html#describe).

In [10]:
%%PIG_azure aggage3.pig
values = LOAD '$CONTAINER/centrale/bank_full_tab_no.txt' USING PigStorage('\t') AS (age:double, 
                    job:chararray, marital:chararray, education:chararray, 
                   default:chararray, balance:double, housing:chararray, loan:chararray, 
                   contact:chararray, day:double, month:chararray, duration:double, 
                   campaign:double, pdays:double, previous:double, poutcome:chararray, y:chararray);
DESCRIBE values;
gr = GROUP values BY loan ;
DESCRIBE gr;
agg = FOREACH gr GENERATE group, AVG(age) AS avg_age ;
DESCRIBE agg;
STORE agg INTO '$CONTAINER/centrale/bank_full_tab_no_agg.txt' USING PigStorage('\t') ;

In [11]:
jid = %hd_pig_submit aggage3.pig

In [12]:
jid

{'id': 'job_1466069083851_0005'}

In [13]:
%hd_queue

[{'detail': None, 'id': 'job_1466069083851_0005'},
 {'detail': None, 'id': 'job_1466069083851_0004'},
 {'detail': None, 'id': 'job_1466069083851_0003'},
 {'detail': None, 'id': 'job_1466069083851_0002'},
 {'detail': None, 'id': 'job_1466069083851_0001'}]

In [14]:
df = %hd_job_status jid['id']
df["status"]["state"]

'RUNNING'

In [15]:
%hd_tail_stderr -n 100 jid['id']

In [16]:
%%PIG_azure aggage4.pig
values = LOAD '$CONTAINER/centrale/bank_full_tab_no.txt' USING PigStorage('\t') AS (age:double, 
                                                    job:chararray, marital:chararray, education:chararray, 
                                                   default:chararray, balance:double, housing:chararray, loan:chararray, 
                                                   contact:chararray, day:double, month:chararray, duration:double, 
                                                   campaign:double, 
                                                   pdays:double, previous:double, poutcome:chararray, y:chararray);
DESCRIBE values;
gr = GROUP values BY loan ;
DESCRIBE gr;
agg = FOREACH gr GENERATE group, AVG(values.age) AS avg_age ;
DESCRIBE agg;
STORE agg INTO '$CONTAINER/centrale/bank_full_tab_no_agg2.txt' USING PigStorage('\t') ;

In [17]:
jid = %hd_pig_submit aggage4.pig

In [18]:
jid

{'id': 'job_1466069083851_0008'}

In [19]:
%hd_queue

[{'detail': None, 'id': 'job_1466069083851_0009'},
 {'detail': None, 'id': 'job_1466069083851_0008'},
 {'detail': None, 'id': 'job_1466069083851_0007'},
 {'detail': None, 'id': 'job_1466069083851_0006'},
 {'detail': None, 'id': 'job_1466069083851_0005'},
 {'detail': None, 'id': 'job_1466069083851_0004'},
 {'detail': None, 'id': 'job_1466069083851_0003'},
 {'detail': None, 'id': 'job_1466069083851_0002'},
 {'detail': None, 'id': 'job_1466069083851_0001'}]

In [20]:
df = %hd_job_status jid['id']
df["status"]["state"]

'RUNNING'

In [21]:
hd_tail_stderr -n 50 jid['id']

In [22]:
%blob_ls /centrale

Unnamed: 0,name,last_modified,content_type,content_length,blob_type
0,centrale/bank_full.csv,2016-06-15 22:17:59+00:00,,4610348,BlockBlob
1,centrale/bank_full_tab.txt,2016-06-15 22:19:46+00:00,,3751306,BlockBlob
2,centrale/bank_full_tab_no.txt,2016-06-15 23:00:52+00:00,,3751306,BlockBlob
3,centrale/bank_full_tab_no_agg.txt,2016-06-16 10:32:11+00:00,,0,BlockBlob
4,centrale/bank_full_tab_no_agg.txt/_SUCCESS,2016-06-16 10:32:11+00:00,,0,BlockBlob
5,centrale/bank_full_tab_no_agg.txt/part-r-00000,2016-06-16 10:32:11+00:00,,49,BlockBlob
6,centrale/bank_full_tab_no_agg2.txt,2016-06-16 21:13:14+00:00,,0,BlockBlob
7,centrale/bank_full_tab_no_agg2.txt/_SUCCESS,2016-06-16 21:13:14+00:00,,0,BlockBlob
8,centrale/bank_full_tab_no_agg2.txt/part-r-00000,2016-06-16 21:13:13+00:00,,49,BlockBlob
9,centrale/scripts/pig/aggage.pig,2016-06-15 23:15:54+00:00,,782,BlockBlob


In [23]:
%blob_downmerge --help

usage: blob_downmerge [-h] [-o] remotepath localfile

download a set of files from a blob storage folder, files will be merged, we
assume the container is the first element to the remote path

positional arguments:
  remotepath       remote path of the folder to download
  localfile        local name for the downloaded merged file

optional arguments:
  -h, --help       show this help message and exit
  -o, --overwrite  overwrite the local file
usage: blob_downmerge [-h] [-o] remotepath localfile



In [24]:
%blob_down /centrale/bank_full_tab_no_agg2.txt/part-r-00000 agg_hadoop3.txt

'agg_hadoop3.txt'

In [25]:
import pandas
df = pandas.read_csv("agg_hadoop3.txt", sep="\t", header=-1)
df

Unnamed: 0,0,1
0,no,41.008823
1,yes,40.555632
2,loan,


J'ai oublié d'enlever le header. On vérifie que les calcus sont bons en les faisant en local.

In [26]:
df = pandas.read_csv("bank-full.csv", sep=";")
df.head()

Unnamed: 0,age,job,marital,education,default,balance,housing,loan,contact,day,month,duration,campaign,pdays,previous,poutcome,y
0,58,management,married,tertiary,no,2143,yes,no,unknown,5,may,261,1,-1,0,unknown,no
1,44,technician,single,secondary,no,29,yes,no,unknown,5,may,151,1,-1,0,unknown,no
2,33,entrepreneur,married,secondary,no,2,yes,yes,unknown,5,may,76,1,-1,0,unknown,no
3,47,blue-collar,married,unknown,no,1506,yes,no,unknown,5,may,92,1,-1,0,unknown,no
4,33,unknown,single,unknown,no,1,no,no,unknown,5,may,198,1,-1,0,unknown,no


In [27]:
df[["loan", "age"]].groupby("loan").mean()

Unnamed: 0_level_0,age
loan,Unnamed: 1_level_1
no,41.008823
yes,40.555632
