# H2O Sparkiling Water EMR Starter (Python)

### This notebooks assumes that following environment variables are set:

```
PATH=$PATH:$HOME/.local/bin:$HOME/bin
export PATH
export PATH=/mnt/opt/anaconda520:/mnt/opt/anaconda520/bin:$PATH
export PYSPARK_PYTHON=/mnt/opt/anaconda520/bin/python
export SPARK_HOME=/usr/lib/spark
export HADOOP_CONF_DIR=/etc/hadoop/conf
export MASTER="yarn"
```

It also assumes that Pysparkling Python module can be found here :

`/mnt/opt/sparkling-water-2.4.11/py/build/dist/h2o_pysparkling_2.4-2.4.11.zip`

In [1]:
from pyspark.sql import SparkSession

try:
    spark.stop()
except:
    pass

In [2]:
from pyspark.sql import SparkSession

try:
    spark.stop()
except:
    pass

spark = SparkSession \
        .builder.appName("Example_Notebook") \
        .config("spark.submit.deployMode","client") \
        .config("spark.dynamicAllocation.enabled",False) \
        .enableHiveSupport() \
        .getOrCreate()

import pyspark
print("Pyspark version:", pyspark.__version__)
print("Pyspark module location:", pyspark.__file__)

Pyspark version: 2.4.0
Pyspark module location: /mnt/opt/anaconda520/lib/python3.6/site-packages/pyspark/__init__.py


In [3]:
spark

### Optional - Get parameters for config_pyspark by running and replace some of them

This might also fix problem with environment variables and not being able to connect to the YARN cluster.

Run:
`pyspark` and then 
`sc.getConf().getAll()`

```
config_pyspark =[(u'spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES', u'http://ip-172-31-36-110.us-east-2.compute.internal:20888/proxy/application_1558398699025_0001'), (u'spark.eventLog.enabled', u'true'), (u'spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version.emr_internal_use_only.EmrFileSystem', u'2'), (u'spark.yarn.executor.memoryOverheadFactor', u'0.1875'), (u'spark.driver.extraLibraryPath', u'/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native'), (u'spark.sql.parquet.output.committer.class', u'com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter'), (u'spark.app.id', u'application_1558398699025_0001'), (u'spark.driver.appUIAddress', u'http://ip-172-31-36-110.us-east-2.compute.internal:4040'), (u'spark.blacklist.decommissioning.timeout', u'1h'), (u'spark.yarn.historyServer.address', u'ip-172-31-36-110.us-east-2.compute.internal:18080'), (u'spark.driver.host', u'ip-172-31-36-110.us-east-2.compute.internal'), (u'spark.yarn.appMasterEnv.SPARK_PUBLIC_DNS', u'$(hostname -f)'), (u'spark.executor.cores', u'4'), (u'spark.sql.emr.internal.extensions', u'com.amazonaws.emr.spark.EmrSparkSessionExtensions'), (u'spark.executor.extraJavaOptions', u"-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'"), (u'spark.eventLog.dir', u'hdfs:///var/log/spark/apps'), (u'spark.sql.hive.metastore.sharedPrefixes', u'com.amazonaws.services.dynamodbv2'), (u'spark.sql.warehouse.dir', u'hdfs:///user/spark/warehouse'), (u'spark.serializer.objectStreamReset', u'100'), (u'spark.executorEnv.PYTHONPATH', u'/usr/lib/spark/python/lib/py4j-0.10.7-src.zip:/usr/lib/spark/python/:<CPS>{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-0.10.7-src.zip'), (u'spark.submit.deployMode', u'client'), (u'spark.ui.proxyBase', u'/proxy/application_1558398699025_0001'), (u'spark.history.fs.logDirectory', u'hdfs:///var/log/spark/apps'), (u'spark.ui.filters', u'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter'), (u'spark.sql.parquet.fs.optimized.committer.optimization-enabled', u'true'), (u'spark.driver.extraClassPath', u'/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar'), (u'spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored.emr_internal_use_only.EmrFileSystem', u'true'), (u'spark.history.ui.port', u'18080'), (u'spark.shuffle.service.enabled', u'true'), (u'spark.executor.memory', u'4269M'), (u'spark.hadoop.yarn.timeline-service.enabled', u'false'), (u'spark.driver.port', u'32873'), (u'spark.resourceManager.cleanupExpiredHost', u'true'), (u'spark.executor.id', u'driver'), (u'spark.executor.extraClassPath', u'/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar'), (u'spark.yarn.dist.files', u'file:/etc/spark/conf/hive-site.xml'), (u'spark.app.name', u'PySparkShell'), (u'spark.files.fetchFailure.unRegisterOutputOnHost', u'true'), (u'spark.driver.extraJavaOptions', u"-XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p'"), (u'spark.master', u'yarn'), (u'spark.decommissioning.timeout.threshold', u'20'), (u'spark.sql.catalogImplementation', u'hive'), (u'spark.stage.attempt.ignoreOnDecommissionFetchFailure', u'true'), (u'spark.rdd.compress', u'True'), (u'spark.executor.extraLibraryPath', u'/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native'), (u'spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS', u'ip-172-31-36-110.us-east-2.compute.internal'), (u'spark.yarn.isPython', u'true'), (u'spark.dynamicAllocation.enabled', u'true'), (u'spark.ui.showConsoleProgress', u'true'), (u'spark.blacklist.decommissioning.enabled', u'true')]
remove_params =[]
for configs in config_pyspark:
    #print(configs)
    if configs[0] in ['spark.app.name','spark.app.id','spark.history.ui.port','spark.executor.memory'
                      ,'spark.dynamicAllocation.enabled']:
        print('Removing:', configs)
        remove_params.append(configs)
for param in remove_params:
    config_pyspark.remove(param)

#config_p.remove(('spark.app.name', 'PySparkShell'))
config_pyspark.append(('spark.executor.memory', '2176M')) 
config_pyspark.append(('spark.dynamicAllocation.enabled',False))
print('Number of parameters', len(config_pyspark))

from pyspark.conf import SparkConf
config_params = spark.sparkContext._conf.setAll(config_pyspark)

try:
    spark.stop()
except:
    pass

spark = SparkSession \
        .builder \
        .appName("Example_Notebook") \
        .config(conf=config_params) \
        .enableHiveSupport() \
        .getOrCreate()
spark
```

In [4]:
import sys
sys.path.append('/mnt/opt/sparkling-water-2.4.11/py/build/dist/h2o_pysparkling_2.4-2.4.11.zip')
import pysparkling
from pysparkling import *

In [5]:
import h2o
hc = H2OContext.getOrCreate(spark)

Connecting to H2O server at http://ip-172-31-46-145.us-east-2.compute.internal:54321 ... successful.


0,1
H2O cluster uptime:,15 secs
H2O cluster timezone:,Etc/UTC
H2O data parsing timezone:,UTC
H2O cluster version:,3.24.0.3
H2O cluster version age:,15 days
H2O cluster name:,sparkling-water-jupyterlab_application_1558494441471_0003
H2O cluster total nodes:,2
H2O cluster free memory:,8.31 Gb
H2O cluster total cores:,4
H2O cluster allowed cores:,8



Sparkling Water Context:
 * H2O name: sparkling-water-jupyterlab_application_1558494441471_0003
 * cluster size: 2
 * list of used nodes:
  (executorId, host, port)
  ------------------------
  (2,ip-172-31-35-227.us-east-2.compute.internal,54321)
  (1,ip-172-31-36-140.us-east-2.compute.internal,54321)
  ------------------------

  Open H2O Flow in browser: http://ip-172-31-46-145.us-east-2.compute.internal:54321 (CMD + click in Mac OSX)

    
 * Yarn App ID of Spark application: application_1558494441471_0003
    
