# Preperation

## Download the data:

See tutorial for the defails:

```
$ mkdir /home/labuser/epidemiology && cd "$_" # mkdir and cd
$ wget https://transfer.sh/zGqkG/CHD.zip
$ wget https://transfer.sh/10rhH9/MENO.zip
$ wget https://transfer.sh/fiFTX/T2D.zip
```

## Get Checksum Files

```
$ wget https://raw.githubusercontent.com/Infomdss2018/infomdss/master/tutorial_6/check_sum_files/CHD.zip.sha1  
$ wget https://raw.githubusercontent.com/Infomdss2018/infomdss/master/tutorial_6/check_sum_files/MENO.zip.sha1 
$ wget https://raw.githubusercontent.com/Infomdss2018/infomdss/master/tutorial_6/check_sum_files/T2D.zip.sha1 
```

## Execute Checksum

```
$ sha1sum -c CHD.zip.sha1
$ sha1sum -c MENO.zip.sha1
$ sha1sum -c T2D.zip.sha1
```

## Unzip Epidemiology Data

```
$ unzip CHD.zip -d data/
$ unzip MENO.zip -d data/
$ unzip T2D.zip -d data/
```

## Give Permissions

```
$ chmod 777 data/*
```

## Step 6: Open Notebook

```
https://vmlabaXXX.westeurope.cloudapp.azure.com:8000 (replace XXX with the number of your VM)
```

# Assignments 

## 1. Load the three datasets within your Spark environment.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("epidemiology").getOrCreate()
sc = spark.sparkContext

chd_path = "/home/labuser/epidemiology/data/CHD_coronary_heart_disease_SAMPLE.csv"
men_path = "/home/labuser/epidemiology/data/MENO_Menopause_HapMap2_DayNG2015_18112015_SAMPLE.csv"
t2d_path = "/home/labuser/epidemiology/data/T2D_diabetes_type_two_SAMPLE.csv"

# Note that only the SAMPLE data is used, if you want to test the real data you can uncommend the following lines
#chd_path = "/home/labuser/epidemiology/data/CHD_coronary_heart_disease.csv"
#men_path = "/home/labuser/epidemiology/data/MENO_Menopause_HapMap2_DayNG2015_18112015.csv"
#t2d_path = "/home/labuser/epidemiology/data/T2D_diabetes_type_two.csv"

# We need this path in question 2
epi_path = "/home/labuser/epidemiology/data/epi_tutorial_7.csv"

chd_RDD = sc.textFile(chd_path)
men_RDD = sc.textFile(men_path)
t2d_RDD = sc.textFile(t2d_path)

Note that a good way of manipulating data is by using data frames, which can be created from your RDD data, for example we can create a dataframe, meno_df, from meno_RDD:

In [2]:
header = chd_RDD.first()

# split data set
temp_var = chd_RDD.filter(lambda line: line != header).map(lambda k: k.split(";"))
chd_df = temp_var.toDF(header.split(";"))

Now we have a dataframe, we can for example show the first twenty rows:

In [3]:
chd_df.show()

+----------------+---+---------+-------------+----------------+------------------+-----------+-----+--------+--------+--------+----------+---------+
|      markername|chr|  bp_hg19|effect_allele|noneffect_allele|effect_allele_freq|median_info|model|    beta|  se_dgc|   p_dgc|het_pvalue|n_studies|
+----------------+---+---------+-------------+----------------+------------------+-----------+-----+--------+--------+--------+----------+---------+
|       rs7540212|  1|157259747|            G|               T|           .675984|        993|FIXED| .010379|.0101002| .304137|   .852783|       48|
|      rs35174775|  6| 24847434|            G|               A|           .685355|    .981575|FIXED|-.003668|.0102143|.7195163|    .85337|       47|
|      rs17717184| 16| 77829273|            C|               T|           .836133|        971|FIXED|-.008041|.0127475|.5281767|   .375501|       46|
|       rs6896654|  5| 78024784|            T|               A|           .785148|     .60226|FIXED| .0206

Or we can look-up the types of data in every column:

In [4]:
chd_df.schema

StructType(List(StructField(markername,StringType,true),StructField(chr,StringType,true),StructField(bp_hg19,StringType,true),StructField(effect_allele,StringType,true),StructField(noneffect_allele,StringType,true),StructField(effect_allele_freq,StringType,true),StructField(median_info,StringType,true),StructField(model,StringType,true),StructField(beta,StringType,true),StructField(se_dgc,StringType,true),StructField(p_dgc,StringType,true),StructField(het_pvalue,StringType,true),StructField(n_studies,StringType,true)))

Or we can show all column names:

In [5]:
chd_df.columns

['markername',
 'chr',
 'bp_hg19',
 'effect_allele',
 'noneffect_allele',
 'effect_allele_freq',
 'median_info',
 'model',
 'beta',
 'se_dgc',
 'p_dgc',
 'het_pvalue',
 'n_studies']

Or we can create a new matrices with a selection of available columns:

In [6]:
chd_df.select(["markername", "chr", "bp_hg19"]).show()

+----------------+---+---------+
|      markername|chr|  bp_hg19|
+----------------+---+---------+
|       rs7540212|  1|157259747|
|      rs35174775|  6| 24847434|
|      rs17717184| 16| 77829273|
|       rs6896654|  5| 78024784|
|      rs10514937| 17| 46322298|
|      rs56336286|  2| 20835450|
|      rs72633026|  8| 39958554|
|      rs11937467|  4| 35492268|
|       rs4542601| 15| 58565099|
|chr5:161737169:D|  5|161737169|
|       rs2964449|  5|  5414800|
|      rs58541550|  2| 25839551|
|      rs13360903|  5|141821413|
|       rs6562633| 13| 71071896|
|      rs60116510| 19| 43447347|
|      rs78768893|  4|170804226|
|      rs13264028|  8| 54859940|
|      rs62481284|  7|155612556|
|      rs61052895|  3| 10147516|
|      rs61052895|  3| 10147516|
+----------------+---+---------+
only showing top 20 rows



### Answer to question 1

A fast way of opening a file, spliting the data on a specified deliminator and creating a dataframe with an automated way to infer the schema is shown below:

In [7]:
# Create dataframe with schema inference
chd_df = spark.read.option("delimiter",";").csv(chd_path, inferSchema=True, header=True)
men_df = spark.read.option("delimiter",";").csv(men_path, inferSchema=True, header=True)
t2d_df = spark.read.option("delimiter",";").csv(t2d_path, inferSchema=True, header=True)

# Show all dataframes to check if all went well:
chd_df.show()
men_df.show()
t2d_df.show()

+----------------+---+---------+-------------+----------------+------------------+-----------+-----+---------+---------+---------+----------+---------+
|      markername|chr|  bp_hg19|effect_allele|noneffect_allele|effect_allele_freq|median_info|model|     beta|   se_dgc|    p_dgc|het_pvalue|n_studies|
+----------------+---+---------+-------------+----------------+------------------+-----------+-----+---------+---------+---------+----------+---------+
|       rs7540212|  1|157259747|            G|               T|          0.675984|      993.0|FIXED| 0.010379|0.0101002| 0.304137|  0.852783|       48|
|      rs35174775|  6| 24847434|            G|               A|          0.685355|   0.981575|FIXED|-0.003668|0.0102143|0.7195163|   0.85337|       47|
|      rs17717184| 16| 77829273|            C|               T|          0.836133|      971.0|FIXED|-0.008041|0.0127475|0.5281767|  0.375501|       46|
|       rs6896654|  5| 78024784|            T|               A|          0.785148|    0.

## 2. Merge the datasets on markername, into one integrated raw data file titled ```epi.csv```, hereafter: “data file”.

*Tip: You will notice that the diabetes set does not have markername. However, it has a column named chr:Position. This is a combined column, which is the same as the combination of two columns in the CAD file (chr and bp_hg19).*

1. First we can create new column in the CHD data frame, this new column is created by merging the values from the ```chr``` and ```bp_hg19``` column. We call this new column ```Chr:Position``` (so we can join it with the ```Chr:Position``` column from T2D)


2. We can join CHD and MENO with respect to column markername, since we only need the overlapping makers, we use an inner join. 


3. Lastly we can inner-join T2D with the CHD-MENO-join from step 2.


### Resources

This image explains the different joins:

<img src="https://i.stack.imgur.com/rOeAz.jpg" width="500px">


More on joins and other examples:

http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.join

In [8]:
from pyspark.sql.functions import col

def map_chr_bp_hg19(line):
    chr_val = line[1]
    bp_hg19_val = line[2]
    return [str(chr_val) + ':' + str(bp_hg19_val)] + list(line)

# (1) add an additional column to the CARDIO-dataframe, as explained in the TIP-section
df_chd_header = chd_df.schema.names
df_chd_chrpos = chd_df.rdd.map(map_chr_bp_hg19)
df_chd_chrpos = df_chd_chrpos.toDF(["Chr:Position"] + df_chd_header)

# (2) Join the CARDIO-dataframe and the MENOPAUSE-dataframe
df_join_chd_men = df_chd_chrpos.join(men_df, 'markerName', 'inner').cache()

# (3) Join the CARDIO-MENOPAUSE-dataframe and the DIABETES-dataframe
df_join_chd_men_t2d = df_join_chd_men.join(t2d_df, 'Chr:Position', 'inner').cache()

# show the result
df_join_chd_men_t2d.show()

+------------+----------+---+---------+-------------+----------------+------------------+-----------+-----+---------+---------+---------+----------+---------+-------+-------+----------+------+------+----+-------+-------+-------+------+-------+---------------+
|Chr:Position|markername|chr|  bp_hg19|effect_allele|noneffect_allele|effect_allele_freq|median_info|model|     beta|   se_dgc|    p_dgc|het_pvalue|n_studies|allele1|allele2|HapMap_eaf|effect|stderr|   p|Allele1|Allele2| Effect|StdErr|P-value|TotalSampleSize|
+------------+----------+---+---------+-------------+----------------+------------------+-----------+-----+---------+---------+---------+----------+---------+-------+-------+----------+------+------+----+-------+-------+-------+------+-------+---------------+
| 16:75152974| rs1579333| 16| 75152974|            C|               T|          0.679763|      984.0|FIXED|-0.002167| 0.010425| 0.835334|  0.300132|       46|      t|      c|      0.26|  0.04|  null|0.05|      T|      C|

## 3. Record how many SNPs (markers) in each dataset cannot be merged.

The number of markers that is not merged is just the number of rows in every laoded data file minus the number of row in the joined dataframe.

So we can easally calculate the the number of removed row by using the ```count``` expression. 

In [9]:
# get the number of rows form the loaded data frames
chd_count = df_chd_chrpos.count()
men_count = men_df.count()
t2d_count = t2d_df.count()

# count the row in the joined data-frame
chd_men_t2d_count = df_join_chd_men_t2d.count()

# print the results
print ("Numer of joined lines:  {0: 10d}".format(chd_men_t2d_count))
print ("Removed rows from cata: {0: 10d}".format(chd_count - chd_men_t2d_count))
print ("Removed rows from meno: {0: 10d}".format(men_count - chd_men_t2d_count))
print ("Removed rows from meta: {0: 10d}".format(t2d_count - chd_men_t2d_count))

Numer of joined lines:          24
Removed rows from cata:         81
Removed rows from meno:         80
Removed rows from meta:         79


## 4. Further sanitise the data file by removing any empty lines, unused columns. Identical rows and save the cleaned data-set.

In this exampl e we only remove duplicated row and rows with an empty value, this can be done in one line (3), but we can also do this in seperate steps (1) and (2).

Note that the sum of the removed duplicated and incompleate lines is not the same as the number of removed lines in the cleaned dataframe, how is this possible?

In [10]:
def remove_incomplete_rows(line):
    for word in line:
        if word == "" or word == None:
            return False
    return True

# (1) EXAMPLE: remove duplicated lines
epi_unique_df = df_join_chd_men_t2d.dropDuplicates().cache()
print ("Number of duplicate lines removed: {0}".format(chd_men_t2d_count - epi_unique_df.count()))

# (2) EXAMPLE: remove incomplete lines
epi_complete_df = df_join_chd_men_t2d.rdd.filter(remove_incomplete_rows).toDF()
print ("Number of incomplete lines removed: {0}".format(chd_men_t2d_count - epi_complete_df.count()))

# (3) remove incomplete and duplicated lines in one go
df_clean_epi = df_join_chd_men_t2d.rdd.filter(remove_incomplete_rows).toDF().dropDuplicates().cache()
print ("Number of incomplete or duplicate lines removed: {0}".format(chd_men_t2d_count - df_clean_epi.count()))

# print the results
print ("--- RESULTS ---")
print ("Clean data-set length: {0}".format(df_clean_epi.count()))
df_clean_epi.show()

Number of duplicate lines removed: 3
Number of incomplete lines removed: 5
Number of incomplete or duplicate lines removed: 7
--- RESULTS ---
Clean data-set length: 17
+------------+----------+---+---------+-------------+----------------+------------------+-----------+-----+---------+---------+---------+----------+---------+-------+-------+----------+------+------+----+-------+-------+-------+------+-------+---------------+
|Chr:Position|markername|chr|  bp_hg19|effect_allele|noneffect_allele|effect_allele_freq|median_info|model|     beta|   se_dgc|    p_dgc|het_pvalue|n_studies|allele1|allele2|HapMap_eaf|effect|stderr|   p|Allele1|Allele2| Effect|StdErr|P-value|TotalSampleSize|
+------------+----------+---+---------+-------------+----------------+------------------+-----------+-----+---------+---------+---------+----------+---------+-------+-------+----------+------+------+----+-------+-------+-------+------+-------+---------------+
|  2:13193545|rs13406567|  2| 13193545|            C

In [11]:
# map every row to a csv-like string
def toCSVLine(data):
    return ';'.join(str(d) for d in data)

# save the datafile as a comma separated csv-file
lines = df_clean_epi.rdd.map(toCSVLine)
lines.saveAsTextFile(epi_path)