# pyspark_cloud_NGS

## Analysis of NGS data with PySpark



```
# Ce texte est au format code
```


## Install SRA tools, download and extract sequences from SRA files
- download sra tools

In [1]:
import os
def setup_sra_tool(url):
  os.chdir('/content')
  !wget $url
  !gunzip sratoolkit.2.9.6-1-ubuntu64.tar.gz
  !tar -xf sratoolkit.2.9.6-1-ubuntu64.tar


def get_sra(url, sra_path):
  os.chdir('/content')
  !wget $url
  sra_name = url[-11:]
  os.chdir(sra_path)
  !./fastq-dump /content/$sra_name -O /content/
  os.chdir('/content')
  
# set up SRA toolkit
url_tk= 'https://ftp-trace.ncbi.nlm.nih.gov/sra/sdk/2.9.6-1/sratoolkit.2.9.6-1-ubuntu64.tar.gz'
setup_sra_tool(url_tk)

# download and extract sra file
sra_url = 'https://sra-download.ncbi.nlm.nih.gov/traces/era6/ERR/ERR3014/ERR3014700'
tool_path = '/content/sratoolkit.2.9.6-1-ubuntu64/bin'
get_sra(sra_url, tool_path)

--2021-02-09 18:49:56--  https://ftp-trace.ncbi.nlm.nih.gov/sra/sdk/2.9.6-1/sratoolkit.2.9.6-1-ubuntu64.tar.gz
Resolving ftp-trace.ncbi.nlm.nih.gov (ftp-trace.ncbi.nlm.nih.gov)... 130.14.250.13, 2607:f220:41e:250::11, 2607:f220:41e:250::12, ...
Connecting to ftp-trace.ncbi.nlm.nih.gov (ftp-trace.ncbi.nlm.nih.gov)|130.14.250.13|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 84492294 (81M) [application/x-gzip]
Saving to: ‘sratoolkit.2.9.6-1-ubuntu64.tar.gz’


2021-02-09 18:49:59 (35.7 MB/s) - ‘sratoolkit.2.9.6-1-ubuntu64.tar.gz’ saved [84492294/84492294]

--2021-02-09 18:50:01--  https://sra-download.ncbi.nlm.nih.gov/traces/era6/ERR/ERR3014/ERR3014700
Resolving sra-download.ncbi.nlm.nih.gov (sra-download.ncbi.nlm.nih.gov)... 165.112.9.231, 165.112.9.232, 165.112.9.235
Connecting to sra-download.ncbi.nlm.nih.gov (sra-download.ncbi.nlm.nih.gov)|165.112.9.231|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 176233092 (168M) [application/oc

## Fastq class to process the fastq file

In [2]:
# py4javaerror happen when the path of the file is not correct
!pip install pyspark[sql]

from __future__ import print_function
from functools import wraps
import pyspark as spark
from pyspark import SparkConf
import time
from operator import add
import os 
from subprocess import STDOUT, check_call, check_output



class Fastq:
    def __init__(self, path:str) -> str:
        self.path = path
        self.install_java_scala()
        self.stop_context()
        self.sc = spark.SparkContext.getOrCreate(conf=self.set_conf())
        self.data = self.sc.textFile(self.path)

    def stop_context(self):
        try:
          self.sc.stop()
        except:
          pass

    def set_conf(self):
        conf = SparkConf().setAppName("App")
        conf = (conf.setMaster('local[*]')
          .set('spark.executor.memory', '4G')
          .set('spark.driver.memory', '16G')
          .set('spark.driver.maxResultSize', '8G'))
        return conf

    def install_java_scala(self):
        try:
          java_ver = check_output(['java', '-version'], stderr=STDOUT)
        except:
          java_ver = b''
        try:
          scala_ver = check_output(['scala', '-version'], stderr=STDOUT)
        except:
          scala_ver = b''
        if b'1.8.0_232' not in java_ver:
          java_8_install = ['apt-get', '--quiet', 'install',
                            '-y', 'openjdk-8-jdk-headless']
          java_set_alt = ['update-alternatives', '--set', 'java', 
                          '/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java' ] 
          check_call(java_8_install, stdout=open(os.devnull, 'wb'), 
                     stderr=STDOUT)
          os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" 
          check_call(java_set_alt)  
        if b'2.11.12' not in scala_ver:
          scala_install = ['apt-get', '--quiet', 'install', 'scala']
          check_call(scala_install)
          

    def _logging(func):
        @wraps(func)
        def log_print(instance, *args, **kwargs):
          start = time.time()
          res = func(instance, *args, **kwargs)
          print("Finished Executing {}  in {}s!".format(func.__name__, time.time() - start))
          return res
        return log_print

    @_logging
    def get_data(self):
        return self.data


    @_logging
    def count_bases(self):
      seqs = self.extract_seq()
      seqs = seqs.flatMap(lambda line: list(line)) 
      seqs = seqs.map(lambda c: (c, 1))
      return seqs.reduceByKey(lambda a, b: a+b)#\
            #  .map(lambda c: (c, 1)) \
            #  .reduceByKey(lambda k1, k2: k1 + k2)
      # counts.saveAsTextFile('outputs')
      # print("saved output")

    @_logging
    def count_bases(self):
      seqs = self.extract_seq()
      seqs = seqs.flatMap(lambda line: line.split())
      return seqs

    @_logging
    def extract_seq(self):
        return self.data.filter(lambda x: x.isalpha())

    @_logging
    def get_lengths(self):
        seqs = self.extract_seq()
        return seqs.map(lambda x: len(x))

    def extract_qual(self):
        pass

    def extract_meta(self):
        pass

Collecting pyspark[sql]
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 80kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 52.9MB/s 
Collecting pyarrow>=0.15.1
[?25l  Downloading https://files.pythonhosted.org/packages/33/67/2f4fcce1b41bcc7e88a6bfdb42046597ae72e5bc95c2789b7c5ac893c433/pyarrow-3.0.0-cp36-cp36m-manylinux2014_x86_64.whl (20.7MB)
[K     |████████████████████████████████| 20.7MB 83.5MB/s 
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612242 sha256=966c6e2f049e57328a7339c89

In [3]:
# read file
os.chdir('/content')
fasta = Fastq('ERR3014700.fastq')

In [4]:
# show first read
fasta.data.take(4)

['@ERR3014700.1 M01569:152:000000000-AMPKW:1:1101:15463:1067 length=523',
 'CGAACTGAGCTACGATGACCACGAAGTCGAACTATACCGGGCGTTGGACGCTTATCGGGCGCGCATCGCCGTCGAGTACGTGCTGATTCGCGCCGTGCGCGACGAGATCTANNCTNNACTACGACGGGACAGCGGCGCGTTGCCACAGCGTTTCGCCTGCCACGTGCCACGGAACNTGTCNNNNNNNGTTGTTTGGGAACTTTGCCGTNATNNCTTGGCGCTNTGCAGAGCGCCAAGGCNNGANGGNNANNTTCCCAAACAANGCGCCAGGACANGTTCCGTNGNNNNTGGCAGGCGAAACGCTGTGGCAACGCGCCGCTGTCCCGTCGTAGTACAGCGTAGATCTCGTCGCGCACGGCGCGNNNNNGNCCGTACTCNACGGCGATGCGCGCCCGATAAGCGNCNAAAGCCCGGTNANGTTCGACGTCGTGGTCATCGTAGNTCAGNTNGAGNTCGGAAGANNGNNNGGAGGGTAAGAGTTTAGANNNCGGGTGTGTCCGTANACNTAAAAAAANAAATAATA',
 '+ERR3014700.1 M01569:152:000000000-AMPKW:1:1101:15463:1067 length=523',
 'CCCCCGGGEFGGGGGGFCFGGGGGFEGGGGGGGGGCGGGEEEEFGGGGGGGGEGGGGCCG>FGGGGGGGCEBFFDFGGF>EFGFGFFFGGGGGGDGEEECFGGGGGGGGGG##::##::@CFEGCFEDEGF7FEEGGGGGGGEGGGGGGDECECD8E5*;;:EECFFFFD>C:E>#*2AF#######12;EFC;88E8+*<FG:EFG=#*2##21:C56ED5#19BCC9<FGGGGGGGG##::#=C##:##::CCCEFGFGF#,:CEFGCCFG8#:CFFGGG#:####9:CFB@FEE7FF:F7=FFGGG8EFGGFGGGGGGG

In [5]:
# show read count
fasta.data.count()

1843156

In [6]:
# extract sequences alone from the fastq file
seqs = fasta.extract_seq()

Finished Executing extract_seq  in 0.0005970001220703125s!


In [7]:
seqs.take(4)

['CGAACTGAGCTACGATGACCACGAAGTCGAACTATACCGGGCGTTGGACGCTTATCGGGCGCGCATCGCCGTCGAGTACGTGCTGATTCGCGCCGTGCGCGACGAGATCTANNCTNNACTACGACGGGACAGCGGCGCGTTGCCACAGCGTTTCGCCTGCCACGTGCCACGGAACNTGTCNNNNNNNGTTGTTTGGGAACTTTGCCGTNATNNCTTGGCGCTNTGCAGAGCGCCAAGGCNNGANGGNNANNTTCCCAAACAANGCGCCAGGACANGTTCCGTNGNNNNTGGCAGGCGAAACGCTGTGGCAACGCGCCGCTGTCCCGTCGTAGTACAGCGTAGATCTCGTCGCGCACGGCGCGNNNNNGNCCGTACTCNACGGCGATGCGCGCCCGATAAGCGNCNAAAGCCCGGTNANGTTCGACGTCGTGGTCATCGTAGNTCAGNTNGAGNTCGGAAGANNGNNNGGAGGGTAAGAGTTTAGANNNCGGGTGTGTCCGTANACNTAAAAAAANAAATAATA',
 'GTCGTTTTGTTCACCGTCGCTGTGCAACGCGTGAAACAAGAGCGTGATGCGCACCTTCGGCGGTATGAAGAACGATTACGGAAAAACCGCGCACGGCGTCGGCAGTCTTTTNNGTNNCTTTGGGCGATGGGTCCGAGCTGCGGTATGGGTCACGGCGGCGTGTGTTTTATTGACGNAGATNNNNNNNTGTGACTAAAAACGTCCCAGCNCCAGAGCGATATGNTTAAATAAAAAAAATANGAAGTATTNTATTATGCGTGTCCTGGTTTTTATTTTTTGGATGNNTNGTNCCCATAAGGGTATGTTTCATCCTCNNACNTCNNCNNCCTTTATGCGANACATACATCCANAAATGAANANNNNGGACACGCATAATATGATACTACATATTTTTTTTATTGAAACATATCGCTCTGGGGCTGGGACGTTTTTAGTCANNNNNCNGCATCTTCGTCAATAAAAAAAACGCC

In [8]:
# compute read lengths
lens = fasta.get_lengths()

Finished Executing extract_seq  in 0.0017118453979492188s!
Finished Executing get_lengths  in 0.0018274784088134766s!


In [9]:
# show the lengths of the first 10 reads
lens.take(10)

[523, 600, 599, 600, 599, 600, 600, 529, 600, 538]

In [10]:
# get the average read length
len_sum = lens.reduce(lambda x, y: x+y)
len_sum//lens.count()

564

In [11]:
# count base occurance
bases = fasta.count_bases()

Finished Executing extract_seq  in 0.0004017353057861328s!
Finished Executing count_bases  in 0.0007264614105224609s!


In [12]:

bases.take(10)

['CGAACTGAGCTACGATGACCACGAAGTCGAACTATACCGGGCGTTGGACGCTTATCGGGCGCGCATCGCCGTCGAGTACGTGCTGATTCGCGCCGTGCGCGACGAGATCTANNCTNNACTACGACGGGACAGCGGCGCGTTGCCACAGCGTTTCGCCTGCCACGTGCCACGGAACNTGTCNNNNNNNGTTGTTTGGGAACTTTGCCGTNATNNCTTGGCGCTNTGCAGAGCGCCAAGGCNNGANGGNNANNTTCCCAAACAANGCGCCAGGACANGTTCCGTNGNNNNTGGCAGGCGAAACGCTGTGGCAACGCGCCGCTGTCCCGTCGTAGTACAGCGTAGATCTCGTCGCGCACGGCGCGNNNNNGNCCGTACTCNACGGCGATGCGCGCCCGATAAGCGNCNAAAGCCCGGTNANGTTCGACGTCGTGGTCATCGTAGNTCAGNTNGAGNTCGGAAGANNGNNNGGAGGGTAAGAGTTTAGANNNCGGGTGTGTCCGTANACNTAAAAAAANAAATAATA',
 'GTCGTTTTGTTCACCGTCGCTGTGCAACGCGTGAAACAAGAGCGTGATGCGCACCTTCGGCGGTATGAAGAACGATTACGGAAAAACCGCGCACGGCGTCGGCAGTCTTTTNNGTNNCTTTGGGCGATGGGTCCGAGCTGCGGTATGGGTCACGGCGGCGTGTGTTTTATTGACGNAGATNNNNNNNTGTGACTAAAAACGTCCCAGCNCCAGAGCGATATGNTTAAATAAAAAAAATANGAAGTATTNTATTATGCGTGTCCTGGTTTTTATTTTTTGGATGNNTNGTNCCCATAAGGGTATGTTTCATCCTCNNACNTCNNCNNCCTTTATGCGANACATACATCCANAAATGAANANNNNGGACACGCATAATATGATACTACATATTTTTTTTATTGAAACATATCGCTCTGGGGCTGGGACGTTTTTAGTCANNNNNCNGCATCTTCGTCAATAAAAAAAACGCC