EMR, HIVE, PRESTO and SPARK
===

**Creation of EMR CLUSTER.**

We create a cluster with Hive + Pig + Presto + Hue.

Preparation
====
Before creating the EMR cluster, we will configure the Environment Variables **VPCSGID** and **SUBNET** with the following command

In [None]:
import boto3,os
cf = boto3.client('cloudformation', region_name='eu-west-1')
response = cf.describe_stacks(StackName=os.getenv('STACKNAME'))
outputs = response['Stacks'][0]['Outputs']
for output in outputs:
    if output['OutputKey'] == 'VPCSGID':
        os.environ['VPCSGID']=output['OutputValue']
    if output['OutputKey'] == 'Subnet':
        os.environ['SUBNET']=output['OutputValue']

In [None]:
%env BUCKET=aws-potus-eu-west-1

In [None]:
!aws ec2 create-key-pair --key-name $USER$WORKSHOP --query 'KeyMaterial' --output text --region eu-west-1 \
   > $USER$WORKSHOP.pem

In [None]:
!chmod 500 $USER$WORKSHOP.pem

In [None]:
!aws emr create-default-roles --region eu-west-1

In [None]:
!aws emr create-cluster --release-label emr-5.11.0 --name "EMR cluster" \
  --applications Name=Hadoop Name=Hue Name=Spark Name=Hive Name=Zeppelin Name=HCatalog Name=Presto \
  --ec2-attributes KeyName=$USER$WORKSHOP,SubnetId=$SUBNET,AdditionalMasterSecurityGroups=$VPCSGID --use-default-roles \
  --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m4.xlarge \
    InstanceGroupType=CORE,InstanceCount=1,InstanceType=m4.xlarge \
  --region eu-west-1

In [None]:
%env CLUSTER=

In [None]:
!aws emr describe-cluster --cluster-id $CLUSTER --region eu-west-1

**Usage of EMR CLUSTER with pyhive**

We will create the connection strings, then create a table with HIVE with data on S3, and then do a sample request with HIVE and then PRESTO.

In [None]:
from pyhive import hive
from pyhive import presto
import os

hive_conn = hive.Connection(host="IP-OF-MASTER-EMR",\
                            configuration={'hive.execution.engine':'mr'},port=10000)

presto_conn = presto.Connection(host="IP-OF-MASTER-EMR", port=8889)

bucket = "s3://"+os.getenv('USER')+"-"+os.getenv('WORKSHOP')+"-aws-bigdata-workshop/split/Hillary/"
createclinton = "CREATE EXTERNAL TABLE hillary (id BIGINT,name STRING,text STRING,time BIGINT,isodate TIMESTAMP)\
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' \
    LOCATION '"+bucket+"'"

bucket = "s3://"+os.getenv('USER')+"-"+os.getenv('WORKSHOP')+"-aws-bigdata-workshop/split/Donald/"
createtrump = "CREATE EXTERNAL TABLE donald (id BIGINT,name STRING,text STRING,time BIGINT,isodate TIMESTAMP)\
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' \
    LOCATION '"+bucket+"'"

In [None]:
hive_cursor=hive_conn.cursor()
hive_cursor.execute(createtrump)
hive_cursor.execute(createclinton)

In [None]:
hive_cursor=hive_conn.cursor()
hive_cursor.execute("select count(*) from hillary")
print(hive_cursor.fetchall())

In [None]:
presto_cursor=presto_conn.cursor()
presto_cursor.execute('SELECT count(*) FROM hillary')
print(presto_cursor.fetchall())

Other sample requests to test :
```
select count(*) from hillary
select count(distinct id) from hillary
select count(distinct id) from hillary where text like '%obama%'
```

**SPARK with zeppelin**

Connect on http://EMR-MASTER-IP:8890/
Create a new Spark Notebook
We will use our previously created hive tables.

COPY/PASTE these code-blocks.

ANALYZE DATA WITH SPARK SQL :
```
%sql
select date_sub(isodate, 1) as date,'clinton' as candidate,count(distinct id) as count from hillary group by 1 union select date_sub(isodate, 1) as date,'trump' as candidate,count(distinct id) as count from donald group by 1 order by date
```

Do sentiment analysis with SPARK
```
import org.apache.spark.sql.DataFrame

val af = sc.textFile("s3://aws-potus-eu-west-1/sample/AFINN-en-165.txt").map(x=> x.split("\t")).map(x=>(x(0).toString,x(1).toInt)).collectAsMap()
case class Tweet(id: String, name: String,message: String, time: String, isodate : String,score: Integer)

def sentiment(candidate:String) : DataFrame= {         
    val x = sc.textFile("s3://aws-potus-eu-west-1/split/"+candidate+"/*").map(_.split("\\|")).flatMap{
        row =>
            if (row.length==5){
                  var s = row(2).toString.split(" ").map(word => {
                    var senti: Int = 0
                    if (af!=None && word!=None && af.get(word.toLowerCase())!=None){
                        senti =af.get (word.toLowerCase()).get    
                    }
                    senti;
                })
                Some(Tweet(row(0),row(1),row(2),row(3),row(4),s.sum))
            }else{
                None
            }
    }
    return x.toDF()
}
val trump = sentiment("Donald")
trump.createOrReplaceTempView("donaldwithsentiment")
val clinton = sentiment("Hillary")
clinton.createOrReplaceTempView("hillarywithsentiment")
```

ANALYZE WITH SPARK SQL and sentiment
```
%sql
select date_sub(isodate, 1) as date,'clinton' as candidate,count(distinct id) as count,sum(score) from hillarywithsentiment group by 1 union select date_sub(isodate, 1) as date,'trump' as candidate,count(distinct id) as count,sum(score) from donaldwithsentiment group by 1 order by date
```


ALTERNATIVE REQUEST SPARK SQL programmatically
```
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
val result = sqlContext.sql("SELECT count(*) from hillary")
result.show()
val df = sqlContext.sql("SELECT * from hillary")
df.printSchema()
df.groupBy("name").count().show()
```

**Usage of EMR CLUSTER with EMR steps**

Can be used for PIG, HIVE or JAVA/SPARK activities.

For the workshop we will use Hive steps.

In [None]:
!aws s3 cp s3://$BUCKET/sample/hillary.q . --only-show-errors
!cat hillary.q

In [None]:
!aws emr add-steps --region eu-west-1 --cluster-id $CLUSTER\
  --steps Type=HIVE,Name='Hive program',ActionOnFailure=CONTINUE,Args=[-f,s3://$BUCKET/sample/hillary.q,-d,INPUT=s3://$BUCKET/sample/,-d,OUTPUT=s3://$BUCKET/sample,arg1,arg2]


In [None]:
!aws emr list-steps --cluster-id $CLUSTER --region eu-west-1|jq '.Steps[0]'

In [None]:
!aws s3 cp s3://$BUCKET/sample/select-count-hillary.q . --only-show-errors
!cat select-count-hillary.q

In [None]:
!aws emr add-steps --region eu-west-1 --cluster-id $CLUSTER\
  --steps Type=HIVE,Name='Hive program',ActionOnFailure=CONTINUE,Args=[-f,s3://$BUCKET/sample/select-count-hillary.q,-d,INPUT=s3://$BUCKET/sample/,-d,OUTPUT=s3://$BUCKET/sample,arg1,arg2]


In [None]:
!aws emr list-steps --cluster-id $CLUSTER --region eu-west-1|jq '.Steps[0]'

In [None]:
!aws s3 ls s3://$BUCKET/sample/output/

In [None]:
!aws s3 cp s3://$BUCKET/sample/output/000000_0 .
!cat 000000_0