In [1]:
import findspark
findspark.init()

# import SparkContext and SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import os

# Use SparkContext and  SparkSession

conf = SparkConf()\
    .setMaster('local')\
    .setAppName('Data Prep')
sc = SparkContext(conf = conf)
ss = SparkSession.builder\
    .master('local')\
    .appName('Data Prep')\
    .getOrCreate()
ss

# Read data into Spark dataframe

## read json data directly into pyspark dataframe

In [2]:
# Load data
study = ss.read.csv('data/study_sample.csv',header = True, inferSchema = True)


## Read Mongo collection into pySpark Dataframe

In [3]:
study = ss.read.format('com.mongodb.spark.sql.DefaultSource')\
            .option('uri','mongodb://127.0.0.1/opentarget.study')\
            .load()

In [4]:
credset = ss.read.format('com.mongodb.spark.sql.DefaultSource')\
            .option('uri','mongodb://127.0.0.1/opentarget.credset')\
            .load()

In [5]:
variant = ss.read.format('com.mongodb.spark.sql.DefaultSource')\
            .option('uri','mongodb://127.0.0.1/opentarget.variant')\
            .load()

# Explore data using pySpark

read collections with filters:

In [6]:
pipeline = "{'$match':{'study_id':'GCST004695'}}"

In [7]:
df = ss.read.format('com.mongodb.spark.sql.DefaultSource')\
    .option('uri','mongodb://127.0.0.1/opentarget.study')\
    .option('pipeline',pipeline)\
    .load()

In [9]:
df.show()

+--------------------+-------------------+------------+-------+---------+--------------+-------------+---------------+--------+-----------+--------------------+----------+--------------+---------------+-------------------+
|                 _id|   ancestry_initial|has_sumstats|n_cases|n_initial|num_assoc_loci|         pmid|     pub_author|pub_date|pub_journal|           pub_title|  study_id|trait_category|     trait_efos|     trait_reported|
+--------------------+-------------------+------------+-------+---------+--------------+-------------+---------------+--------+-----------+--------------------+----------+--------------+---------------+-------------------+
|{619c6ef50bf566f7...|["European=113006"]|        true|32384.0|   113006|             2|PMID:28604731|Hammerschlag AR| 6/12/17|  Nat Genet|Genome-wide assoc...|GCST004695|Nervous system|["EFO_0004698"]|Insomnia complaints|
+--------------------+-------------------+------------+-------+---------+--------------+-------------+------

In [10]:
study.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- ancestry_initial: string (nullable = true)
 |-- has_sumstats: boolean (nullable = true)
 |-- n_cases: double (nullable = true)
 |-- n_initial: integer (nullable = true)
 |-- num_assoc_loci: integer (nullable = true)
 |-- pmid: string (nullable = true)
 |-- pub_author: string (nullable = true)
 |-- pub_date: string (nullable = true)
 |-- pub_journal: string (nullable = true)
 |-- pub_title: string (nullable = true)
 |-- study_id: string (nullable = true)
 |-- trait_category: string (nullable = true)
 |-- trait_efos: string (nullable = true)
 |-- trait_reported: string (nullable = true)



In [11]:
credset.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- bio_feature: null (nullable = true)
 |-- is95_credset: boolean (nullable = true)
 |-- is99_credset: boolean (nullable = true)
 |-- lead_alt: string (nullable = true)
 |-- lead_chrom: integer (nullable = true)
 |-- lead_pos: integer (nullable = true)
 |-- lead_ref: string (nullable = true)
 |-- lead_variant_id: string (nullable = true)
 |-- logabf: double (nullable = true)
 |-- multisignal_method: string (nullable = true)
 |-- phenotype_id: null (nullable = true)
 |-- postprob: double (nullable = true)
 |-- postprob_cumsum: double (nullable = true)
 |-- study_id: string (nullable = true)
 |-- tag_alt: string (nullable = true)
 |-- tag_beta: double (nullable = true)
 |-- tag_beta_cond: double (nullable = true)
 |-- tag_chrom: integer (nullable = true)
 |-- tag_pos: integer (nullable = true)
 |-- tag_pval: double (nullable = true)
 |-- tag_pval_cond: double (nullable = true)
 |-- tag_ref: string (nullable

In [11]:
variant.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- alt_allele: string (nullable = true)
 |-- chr_id: integer (nullable = true)
 |-- chr_id_b37: integer (nullable = true)
 |-- gene_id_any: string (nullable = true)
 |-- gene_id_any_distance: integer (nullable = true)
 |-- gene_id_prot_coding: string (nullable = true)
 |-- gene_id_prot_coding_distance: integer (nullable = true)
 |-- gnomad_afr: double (nullable = true)
 |-- gnomad_eas: double (nullable = true)
 |-- gnomad_nfe: double (nullable = true)
 |-- gnomad_nfe_est: double (nullable = true)
 |-- gnomad_nfe_nwe: double (nullable = true)
 |-- gnomad_nfe_onf: double (nullable = true)
 |-- gnomad_oth: double (nullable = true)
 |-- index: integer (nullable = true)
 |-- most_severe_consequence: string (nullable = true)
 |-- phred: double (nullable = true)
 |-- position: integer (nullable = true)
 |-- position_b37: integer (nullable = true)
 |-- raw: double (nullable = true)
 |-- ref_allele: string (nullab

In [12]:
study.columns

['_id',
 'ancestry_initial',
 'has_sumstats',
 'n_cases',
 'n_initial',
 'num_assoc_loci',
 'pmid',
 'pub_author',
 'pub_date',
 'pub_journal',
 'pub_title',
 'study_id',
 'trait_category',
 'trait_efos',
 'trait_reported']

In [13]:
study.show(5)

+--------------------+-------------------+------------+-------+---------+--------------+-------------+---------------+--------+-----------+--------------------+-------------+--------------+---------------+--------------------+
|                 _id|   ancestry_initial|has_sumstats|n_cases|n_initial|num_assoc_loci|         pmid|     pub_author|pub_date|pub_journal|           pub_title|     study_id|trait_category|     trait_efos|      trait_reported|
+--------------------+-------------------+------------+-------+---------+--------------+-------------+---------------+--------+-----------+--------------------+-------------+--------------+---------------+--------------------+
|{619c6ef50bf566f7...|["European=113006"]|        true|32384.0|   113006|             2|PMID:28604731|Hammerschlag AR| 6/12/17|  Nat Genet|Genome-wide assoc...|   GCST004695|Nervous system|["EFO_0004698"]| Insomnia complaints|
|{619c6ef50bf566f7...|["European=117890"]|        true| 5092.0|   117890|            10|    

In [14]:
study.count()

202

In [15]:
study['study_id','trait_category','trait_efos'].show(5)
# df_study.select('study_id','trait_category','trait_efos').show(5)

+-------------+--------------+---------------+
|     study_id|trait_category|     trait_efos|
+-------------+--------------+---------------+
|   GCST004695|Nervous system|["EFO_0004698"]|
|NEALE2_6148_2|Nervous system|["EFO_0000516"]|
|NEALE2_6147_5|Nervous system|["EFO_0009678"]|
|NEALE2_6148_5|Nervous system|["EFO_0001365"]|
|NEALE2_6148_4|Nervous system|["EFO_0001059"]|
+-------------+--------------+---------------+
only showing top 5 rows



# Dataframe Join Operations

## SQL Queries

In [12]:
study.createOrReplaceTempView('study')

In [13]:
credset.createOrReplaceTempView('credset')

In [14]:
variant.createOrReplaceTempView('variant')

In [15]:
study_sql_results = ss.sql('SELECT * FROM study WHERE trait_category="Nervous system"')

In [16]:
study_sql_results.select('study_id','trait_category','trait_efos').show(10)

+-------------+--------------+--------------------+
|     study_id|trait_category|          trait_efos|
+-------------+--------------+--------------------+
|   GCST004695|Nervous system|     ["EFO_0004698"]|
|NEALE2_6148_2|Nervous system|     ["EFO_0000516"]|
|NEALE2_6147_5|Nervous system|     ["EFO_0009678"]|
|NEALE2_6148_5|Nervous system|     ["EFO_0001365"]|
|NEALE2_6148_4|Nervous system|     ["EFO_0001059"]|
|   GCST006475|Nervous system|["EFO_0003761","E...|
|   GCST005902|Nervous system|     ["EFO_0003761"]|
|   GCST005903|Nervous system|     ["EFO_0003761"]|
|   GCST005904|Nervous system|     ["EFO_0003761"]|
|   GCST005522|Nervous system|     ["EFO_0000614"]|
+-------------+--------------+--------------------+
only showing top 10 rows



In [17]:
join_query = ss.sql('''

        SELECT s.study_id, s.trait_category, v.chr_id, v.position, v.ref_allele, v.alt_allele, v.rs_id,
               v.most_severe_consequence, c.postprob, c.type
        FROM study s 
        JOIN credset c ON c.study_id = s.study_id
        JOIN variant v ON v.chr_id = c.tag_chrom AND v.position = c.tag_pos 
                        AND v.ref_allele = c.tag_ref AND v.alt_allele = c.tag_alt

''')

In [18]:
join_query.show(5)

+-----------------+------------------+------+--------+----------+----------+---------+-----------------------+-----------+----+
|         study_id|    trait_category|chr_id|position|ref_allele|alt_allele|    rs_id|most_severe_consequence|   postprob|type|
+-----------------+------------------+------+--------+----------+----------+---------+-----------------------+-----------+----+
|       GCST004131|     Immune system|     1| 1394423|         A|         G|rs1240747|         intron_variant| 0.03513854|gwas|
|       GCST004131|     Immune system|     1| 1847030|         A|         G|rs6687065|         intron_variant|0.006404042|gwas|
|       GCST004131|     Immune system|     1| 1847030|         A|         G|rs6687065|         intron_variant|0.001979732|gwas|
|       GCST005531|     Immune system|     1| 2592766|         C|         T| rs745368|   non_coding_transc...|0.003813633|gwas|
|NEALE2_20002_1111|Respiratory system|     1| 8668805|         G|         A|rs4908505|         intron_va