# AWS Hail on EMR Bokeh Plotting Example

This is taken from the [Hail Plotting Tutorial](https://hail.is/docs/0.2/tutorials/08-plotting.html) with adjustments for use with SageMaker Notebook instances and EMR.

### List EMR Master Nodes

`~/SageMaker/bin/list-clusters` will output the IP of each master node in your account and check Livy connectivity.

In [1]:
%%bash
~/SageMaker/bin/list-clusters

[1mCluster Name                     Cluster ID           Livy Endpoint                Livy TCP Check                [0m
aperry                           j-I6W634D27S7X       http://172.21.20.83:8998     Success                       


Collect the Cluster Name from the output above.  Replace `<CLUSTER_NAME>` below and run the cell to collect the EMR Master node IP. That IP will be used for the Livy connection and Bokeh plot transfers.

In [2]:
%%bash --out LIVY_ENDPOINT
~/SageMaker/bin/list-clusters | grep aperry | awk '{ print $3 }'

In [3]:
%%local
import re

LIVY_ENDPOINT = LIVY_ENDPOINT.strip()
EMR_MASTER_IP = re.sub('http://([0-9.]+):([0-9]{4})', '\\1', LIVY_ENDPOINT)

Use the Livy Endpoint above and start your session name `-s`, language `-l python`, the livy endpoint `-u`, and authentication type `-t`.

In [4]:
%reload_ext sparkmagic.magics
%spark add -s jsmith -l python -u $LIVY_ENDPOINT -t None

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1585336697337_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


### Plot Retrieval from Master

HTTP requests are sent to the EMR master node to pull the remote Hail plots back to this notebook server.  Notebook plots will be stored in a `plots` directory along side to your notebook.  If the plots directory does not exist, it will be created.

Once the plots have been pulled locally they will be shown inline.  

In [5]:
%%local
from IPython.display import display, HTML
import requests, os

def displayRemoteHtml(remoteFile):
    url = 'http://' + EMR_MASTER_IP + remoteFile
    req = requests.get(url)
    
    # Strip leading /
    localPath = remoteFile[1:]
    # Drop the filename and create local target directory
    targetPath = "/".join(localPath.split('/')[0:-1])
    if not os.path.exists(targetPath):
        os.makedirs(targetPath, exist_ok=True)
    open(localPath, 'wb').write(req.content)
    
    html = HTML(filename=localPath)
    display(html)


## Plotting Tutorial

The Hail plot module allows for easy plotting of data. This notebook contains examples of how to use the plotting functions in this module, many of which can also be found in the first tutorial.

In [6]:
# If using Hail version < 0.2.24, uncomment the following
# sc.addPyFile('/opt/hail/hail-python.zip')

import hail as hl
hl.init(sc)

from bokeh.io import show, output_notebook
from bokeh.layouts import gridplot
output_notebook()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

  to the path to the Hail Python module directory HAIL_DIR,
  e.g. /path/to/python/site-packages/hail:
    spark.jars=HAIL_DIR/hail-all-spark.jar
    spark.driver.extraClassPath=HAIL_DIR/hail-all-spark.jar
    spark.executor.extraClassPath=./hail-all-spark.jar
  'pip-installed Hail requires additional configuration options in Spark referring\n'
Running on Apache Spark version 2.4.4
SparkUI available at http://ip-172-21-20-83.bbq.lan:4040
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.34-914bd8a10ca2
LOGGING: writing to /hail-20200327-1923-0.2.34-914bd8a10ca2.log

In [7]:
hl.utils.get_1kg('data/')
mt = hl.read_matrix_table('data/1kg.mt')
table = (hl.import_table('data/1kg_annotations.txt', impute=True)
         .key_by('Sample'))
mt = mt.annotate_cols(**table[mt.s])
mt = hl.sample_qc(mt)

mt.describe()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
    'Population': str
    'SuperPopulation': str
    'isFemale': bool
    'PurpleHair': bool
    'CaffeineConsumption': int32
    'sample_qc': struct {
        dp_stats: struct {
            mean: float64, 
            stdev: float64, 
            min: float64, 
            max: float64
        }, 
        gq_stats: struct {
            mean: float64, 
            stdev: float64, 
            min: float64, 
            max: float64
        }, 
        call_rate: float64, 
        n_called: int64, 
        n_not_called: int64, 
        n_filtered: int64, 
        n_hom_ref: int64, 
        n_het: int64, 
        n_hom_var: int64, 
        n_non_ref: int64, 
        n_singleton: int64, 
        n_snp: int64, 
        n_insertion: int64, 
        n_deletion: int64, 
        n_transition: int64, 
        n_transversion: int64, 
        n_star: int64, 
      

### Histogram

The `histogram()` method takes as an argument an aggregated hist expression, as well as optional arguments for the legend and title of the plot.

In [8]:
from bokeh.plotting import figure, output_file, save

dp_hist = mt.aggregate_entries(hl.expr.aggregators.hist(mt.DP, 0, 30, 30))
p = hl.plot.histogram(dp_hist, legend='DP', title='DP Histogram')
output_file("/plots/histogram3.html")
save(p)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'/plots/histogram3.html'

In [9]:
%%local
displayRemoteHtml('/plots/histogram3.html')

This method, like all Hail plotting methods, also allows us to pass in fields of our data set directly. Choosing not to specify the `range` and `bins` arguments would result in a range being computed based on the largest and smallest values in the dataset and a default bins value of 50.

### Cumulative Histogram

The `cumulative_histogram()` method works in a similar way to `histogram()`.

In [10]:
p = hl.plot.cumulative_histogram(mt.DP, range=(0,30), bins=30)
output_file("/plots/cumulative_histogram.html")
save(p)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'/plots/cumulative_histogram.html'

In [11]:
%%local
displayRemoteHtml('/plots/cumulative_histogram.html')

### Scatter

The `scatter()` method can also take in either Python types or Hail fields as arguments for x and y.

In [12]:
p = hl.plot.scatter(mt.sample_qc.dp_stats.mean, mt.sample_qc.call_rate, xlabel='Mean DP', ylabel='Call Rate')
output_file("/plots/scatter.html")
save(p)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'/plots/scatter.html'

In [13]:
%%local
displayRemoteHtml('/plots/scatter.html')

We can also pass in a Hail field as a `label` argument, which determines how to color the data points.

In [14]:
mt = mt.filter_cols((mt.sample_qc.dp_stats.mean >= 4) & (mt.sample_qc.call_rate >= 0.97))
ab = mt.AD[1] / hl.sum(mt.AD)
filter_condition_ab = ((mt.GT.is_hom_ref() & (ab <= 0.1)) |
                        (mt.GT.is_het() & (ab >= 0.25) & (ab <= 0.75)) |
                        (mt.GT.is_hom_var() & (ab >= 0.9)))
mt = mt.filter_entries(filter_condition_ab)
mt = hl.variant_qc(mt).cache()
common_mt = mt.filter_rows(mt.variant_qc.AF[1] > 0.01)
gwas = hl.linear_regression_rows(y=common_mt.CaffeineConsumption, x=common_mt.GT.n_alt_alleles(), covariates=[1.0])
pca_eigenvalues, pca_scores, _ = hl.hwe_normalized_pca(common_mt.GT)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2020-03-27 19:24:47 Hail: INFO: linear_regression_rows: running on 250 samples for 1 response variable y,
    with input variable x, and 1 additional covariate...
2020-03-27 19:24:55 Hail: INFO: hwe_normalized_pca: running PCA using 9169 variants.
2020-03-27 19:24:58 Hail: INFO: pca: running PCA with 10 components...

In [15]:
p = hl.plot.scatter(pca_scores.scores[0], pca_scores.scores[1],
                    label=common_mt.cols()[pca_scores.s].SuperPopulation,
                    title='PCA', xlabel='PC1', ylabel='PC2', collect_all=True)
output_file("/plots/scatter2.html")
save(p)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'/plots/scatter2.html'
2020-03-27 19:25:31 Hail: WARN: cols(): Resulting column table is sorted by 'col_key'.
    To preserve matrix table column order, first unkey columns with 'key_cols_by()'
2020-03-27 19:25:31 Hail: INFO: Coerced sorted dataset
2020-03-27 19:25:31 Hail: INFO: Coerced sorted dataset

In [16]:
%%local
displayRemoteHtml('/plots/scatter2.html')

Hail's downsample aggregator is incorporated into the `scatter()`, `qq()`, and `manhattan()` functions. The `collect_all` parameter tells the plot function whether to collect all values or downsample. Choosing not to set this parameter results in downsampling.

In [17]:
p = hl.plot.scatter(pca_scores.scores[0], pca_scores.scores[1],
                    label=common_mt.cols()[pca_scores.s].SuperPopulation,
                    title='PCA', xlabel='PC1', ylabel='PC2', collect_all=True)
p2 = hl.plot.scatter(pca_scores.scores[0], pca_scores.scores[1],
                    label=common_mt.cols()[pca_scores.s].SuperPopulation,
                    title='PCA (downsampled)', xlabel='PC1', ylabel='PC2', collect_all=False, n_divisions=50)

# show(gridplot([p, p2], ncols=2, plot_width=400, plot_height=400))
grid = gridplot([p, p2], ncols=2, plot_width=400, plot_height=400)
output_file("/plots/gridplot.html")
save(grid)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'/plots/gridplot.html'
2020-03-27 19:25:33 Hail: INFO: Coerced sorted dataset
2020-03-27 19:25:33 Hail: INFO: Coerced sorted dataset
2020-03-27 19:25:33 Hail: INFO: Coerced sorted dataset
2020-03-27 19:25:33 Hail: INFO: Coerced sorted dataset

In [18]:
%%local
displayRemoteHtml('/plots/gridplot.html')

### 2-D histogram

For visualizing relationships between variables in large datasets (where scatter plots may be less informative since they highlight outliers), the `histogram_2d()` function will create a heatmap with the number of observations in each section of a 2-d grid based on two variables.

In [19]:
p = hl.plot.histogram2d(pca_scores.scores[0], pca_scores.scores[1])
output_file("/plots/2d-histogram.html")
save(p)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'/plots/2d-histogram.html'
2020-03-27 19:25:37 Hail: INFO: Ordering unsorted dataset with network shuffle

In [20]:
%%local
displayRemoteHtml('/plots/2d-histogram.html')

### Q-Q (Quantile-Quantile)

The `qq()` function requires either a Python type or a Hail field containing p-values to be plotted. This function also allows for downsampling.

In [21]:
p = hl.plot.qq(gwas.p_value, collect_all=True)
p2 = hl.plot.qq(gwas.p_value, n_divisions=75)

# show(gridplot([p, p2], ncols=2, plot_width=400, plot_height=400))
qq = gridplot([p, p2], ncols=2, plot_width=400, plot_height=400)
output_file("/plots/qq.html")
save(qq)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'/plots/qq.html'
2020-03-27 19:25:44 Hail: INFO: Ordering unsorted dataset with network shuffle
2020-03-27 19:25:49 Hail: INFO: Ordering unsorted dataset with network shuffle

In [22]:
%%local
displayRemoteHtml('/plots/qq.html')

### Manhattan

The `manhattan()` function requires a Hail field containing p-values.

In [23]:
p = hl.plot.manhattan(gwas.p_value)
output_file("/plots/manhattan.html")
save(p)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'/plots/manhattan.html'

In [24]:
%%local
displayRemoteHtml('/plots/manhattan.html')

We can also pass in a dictionary of fields that we would like to show up as we hover over a data point, and choose not to downsample if the dataset is relatively small.

In [25]:
hover_fields = dict([('alleles', gwas.alleles)])
p = hl.plot.manhattan(gwas.p_value, hover_fields=hover_fields, collect_all=True)
output_file("/plots/manhattan_hover.html")
save(p)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

'/plots/manhattan_hover.html'

In [26]:
%%local
displayRemoteHtml('/plots/manhattan_hover.html')

Remove the Livy notebook session

In [27]:
%spark cleanup