<a href="https://colab.research.google.com/github/sigmarkarl/sparkgor/blob/master/SparkGOR_Jupyter_Introduction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction to Spark + GOR

This notebook gives an introduction to the integration of Spark and GOR using the Google Colab environment. To use this notebook in a meaningful way, you will need to have access to some data in the GOR format, such as the reference data available as part of the GOR Open Source project. 

This notebook can be run in one of two ways:

* Connect to a hosted Jupyter runtime with Google Drive
* Connect to a local Jupyter runtime with local file storage

For the purposes of this demonstration, we will be using a local Jupyter runtime, which is setup as described on the following page: https://research.google.com/colaboratory/local-runtimes.html



## Initial setup

In order to run the examples that are outlined in this notebook, you must first install pyspark and numpy locally using your package manager. For example, if you use pip to install Python packages, you can use the following commands to install the necessary packages:

```
pip install pyspark
pip install pandas
pip install numpy
```

It is advisable to do this before running your local Jupyter server so that the modules will be available to you in the Jupyter environment.

You should also ensure that the Jupyter server is using Python 3 because of dependencies needed by pyspark. An example is shown here:

```
export PYSPARK_PYTHON=/usr/local/bin/python3
```


## Hosted Jupyter runtime

It is also possible to connect to a hosted Jupyter server by selecting **Connect to hosted runtime** in the dropdown menu at the top of this page. Keep in mind that the memory settings for this method are limited to what is made available by the Google Colab environment. These settings should be sufficient to run any of the examples here.

The files are shared at the following location: https://drive.google.com/drive/folders/1lM6YKPZIexb_1D8pk6KnVd5AXn0Jp0ek if you wish to use this method.

You will need to set up the files within your Google Drive. Note that the necessary files for the examples are over the free limit for Drive and you will need to have a higher storage limit.

Use the following code block to mount the Google Drive and make the folder accessible to the hosted runtime.


In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## Setting up the reference data and PheCode

To install and run a local Jupyter server, please follow the instructions on this page: https://research.google.com/colaboratory/local-runtimes.html

You must also download the reference data and PheCode files as outlined on the following sections of the GOR open source project: 

* [Set up the reference data](https://github.com/gorpipe/gor#setting-up-reference-data-optional)
* [Set up the PheCode GWAS data](https://github.com/gorpipe/gor#setting-up-phecode-gwas-data-optional)

When you start your local Jupyter server, make sure to start it from within the folder where you have put your reference data files. Otherwise, you may need to edit the paths in the examples below.

In [None]:
# Setting config variables for the PySpark session

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").appName("SparkGOR demo").config("spark.jars.packages","org.gorpipe:gor-spark:2.11.10,io.projectglow:glow-spark3_2.12:0.6.0").config("spark.jars.excludes","org.apache.logging.log4j:log4j-core,org.apache.hive:hive-exec,net.sourceforge.f2j:arpack_combined_all,org.apache.avro:avro-mapred").getOrCreate()

# For lower spec computers, the following additional config variable may be necessary (append to the config variables above):
#config("spark.driver.memory","8g").config("spark.executor.memory","8g")

In [None]:
# Python helper functions:
# This section contains helper functions for translating java dataframes into python dataframes.

import types
from pyspark.mllib.common import _py2java, _java2py
from pyspark.sql import DataFrame

sc = spark.sparkContext

def pydataframe(self,qry):
    return _java2py(sc,self.dataframe(qry,None))

def gor(self,qry):
    df = _py2java(sc,self)
    ReflectionUtil = spark._jvm.py4j.reflection.ReflectionUtil 
    Rowclass = ReflectionUtil.classForName("org.apache.spark.sql.Row") 
    ct = spark._jvm.scala.reflect.ClassTag.apply(Rowclass)
    gds = spark._jvm.org.gorpipe.spark.GorDatasetFunctions(df,ct,ct)
    return _java2py(sc,gds.gor(qry,True,sgs))

def createGorSession(self):
    sgs = self._jvm.org.gorpipe.spark.SparkGOR.createSession(self._jsparkSession)
    sgs.pydataframe = types.MethodType(pydataframe,sgs)
    return sgs

def createGorSessionWOptions(self,gorproject,cachedir,config,alias):
    sgs = self._jvm.org.gorpipe.spark.SparkGOR.createSession(self._jsparkSession,gorproject,cachedir,config,alias)
    sgs.pydataframe = types.MethodType(pydataframe,sgs)
    return sgs

setattr(DataFrame, 'gor', gor)
setattr(SparkSession, 'createGorSession', createGorSession)
setattr(SparkSession, 'createGorSessionWOptions', createGorSessionWOptions)

In [None]:
# Initialize SparkGOR session
# If you are using the GOR session with options, you can specify other folders for the config and project directories.

import os
sgs = spark.createGorSession()
#sgs = spark.createGorSessionWOptions(os.path.expanduser("~/gorproject"),"/tmp","config/gor_config.txt","config/gor_standard_aliases.txt")

# Examples from the SparkGOR paper

The following section runs examples from the [SparkGOR paper](https://arxiv.org/abs/2009.00061), which can be accessed at: https://arxiv.org/abs/2009.00061

In [None]:
# From Examples 2,3,4

ordbsnp = sgs.pydataframe("select * from <(pgor ref/dbsnp/dbsnp.gorz | top 100000 | split rsIDs | rename rsIDs rsID) order by rsID")
ordbsnp.write.mode("overwrite").save("dbsnp.rsOrd.parquet")
sgs.setCreate("#myordrssnps#","select * from dbsnp.rsOrd.parquet where rsID like 'rs222%' order by chrom, pos")

ss = sgs.pydataframe("create #myphewas# = pgor [#myordrssnps#] | varjoin -l -r phecode_gwas/Phecode_adjust_f2.gord; nor [#myphewas#] | sort -c pval_mm:n,rsID")
ss.toPandas()

Unnamed: 0,Chrom,POS,reference,allele,rsID,pVal_mm,OR_mm,CASE_info,GC,QQ,BONF,HOLM,Source
0,chrY,4807709,T,C,rs2220163,,,,,,,,
1,chrY,4811710,A,G,rs2220164,,,,,,,,
2,chrY,4812194,T,C,rs2220165,,,,,,,,
3,chrY,3518194,A,C,rs2220353,,,,,,,,
4,chrY,3518396,G,C,rs2220354,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...
64,chr12,420072,A,G,rs2229350,,,,,,,,
65,chr12,406292,G,A,rs2229351,,,,,,,,
66,chr12,463248,G,A,rs2229352,,,,,,,,
67,chr12,463248,G,C,rs2229352,,,,,,,,


In [None]:
# Example 7
import pandas as pd
myPandasGenes = pd.DataFrame(["BRCA1","BRCA2"],columns=["gene"])
myGenes = spark.createDataFrame(myPandasGenes)
myGenes.createOrReplaceTempView("myGenes")
sgs.setCreateAndDefs("create #mygenes# = select gene from myGenes; def #genes# = ref/genes.gorz; def #exons# = ref/refgenes/refgenes_exons.gorz; def #dbsnp# = ref/dbsnp/dbsnp.gorz;")
sgs.setCreate("#myexons#", "gor #exons# | inset -c gene_symbol [#mygenes#]")
exonSnps = sgs.pydataframe("pgor [#myexons#] | join -segsnp -ir #dbsnp# | join -snpseg -r #genes#")
snpCount = exonSnps.groupBy("gene_symbol").count()
snpCount.toPandas()

Unnamed: 0,gene_symbol,count
0,RPL21P4,17
1,BRCA1,5596
2,BRCA2,8194


In [None]:
# Example 8
snpCount2 = sgs.pydataframe("select count(*) from <(pgor [#myexons#] | join -segsnp -ir #dbsnp# | join -snpseg -r #genes#) group by gene_symbol")
snpCount2.toPandas()

Unnamed: 0,count(1)
0,17
1,5596
2,8194


In [None]:
# Create parquet file from dbsnp.gorz
dbsnpGorz = spark.read.format("gorsat.spark.GorDataSource").load("ref/dbsnp/dbsnp.gorz").limit(1001)
#dbsnpGorz = sgs.pydataframe("select * from ref/dbsnp/dbsnp.gorz").limit(1000)
dbsnpGorz.write.mode("overwrite").save("dbsnp.parquet")
dbsnpGorz.toPandas()

Unnamed: 0,Chrom,POS,reference,allele,rsIDs
0,chr1,10020,AA,A,rs775809821
1,chr1,10039,A,C,rs978760828
2,chr1,10043,T,A,rs1008829651
3,chr1,10051,A,G,rs1052373574
4,chr1,10054,C,CC,rs1326880612
...,...,...,...,...,...
996,chr1,13505,C,A,rs1336834113
997,chr1,13507,G,A,rs1241683108
998,chr1,13508,T,C,rs4951868
999,chr1,13510,C,A,rs1283563088


In [None]:
# Example how to remove create
sgs.removeCreate("#myexons#")

'gor #exons# | inset -c gene_symbol [#mygenes#]'

In [None]:
# Example 9
dbsnpDf = spark.read.load("dbsnp.parquet")

myVars = dbsnpDf.gor("calc type = if(len(reference)=len(allele),'Snp','InDel')")
myVars.createOrReplaceTempView("myVars")
sgs.setDef("#VEP#","phecode_gwas/metadata/vep_single.gorz")
myVarsAnno = sgs.pydataframe("select * from myVars order by chrom,pos")
pyVarsAnno = myVarsAnno.gor("varnorm -left reference allele | group 1 -gc reference,allele,type -set -sc rsIDs | rename set_rsIDs rsIDs | varjoin -r -l -e 'NA' <(gor #VEP# | select 1-call,max_consequence)")
pyVarsAnno.toPandas()

Unnamed: 0,Chrom,POS,reference,allele,type,rsIDs,max_consequence
0,chr1,10020,,N,InDel,rs775809821,
1,chr1,10039,A,C,Snp,rs978760828,
2,chr1,10043,T,A,Snp,rs1008829651,
3,chr1,10051,A,G,Snp,rs1052373574,
4,chr1,10054,N,NC,InDel,rs1326880612,
...,...,...,...,...,...,...,...
1009,chr1,13505,C,A,Snp,rs1336834113,
1010,chr1,13507,G,A,Snp,rs1241683108,
1011,chr1,13508,T,C,Snp,rs4951868,
1012,chr1,13510,C,A,Snp,rs1283563088,
