## Wuhan seafood market pneumonia virus (2019 Novel Coronavirus)
## DNA Generation Sequencing

https://towardsdatascience.com/next-generation-sequencing-data-analysis-with-pyspark-888a1e0a079


## Install SRA tools, download and extract sequences from SRA files
- download sra tools

In [0]:
import os
def setup_sra_tool(url):
  os.chdir('/content')
  !wget $url
  !gunzip sratoolkit.2.9.6-1-ubuntu64.tar.gz
  !tar -xf sratoolkit.2.9.6-1-ubuntu64.tar


def get_sra(url, sra_path):
  os.chdir('/content')
  !wget $url
  sra_name = url[-11:]
  os.chdir(sra_path)
  !./fastq-dump /content/$sra_name -O /content/
  os.chdir('/content')
  
# set up SRA toolkit
url_tk= 'https://ftp-trace.ncbi.nlm.nih.gov/sra/sdk/2.9.6-1/sratoolkit.2.9.6-1-ubuntu64.tar.gz'
setup_sra_tool(url_tk)

# download and extract sra file
sra_url = 'https://sra-download.ncbi.nlm.nih.gov/traces/sra50/SRR/010646/SRR10902284'
tool_path = '/content/sratoolkit.2.9.6-1-ubuntu64/bin'
get_sra(sra_url, tool_path)

--2020-02-03 06:57:29--  https://ftp-trace.ncbi.nlm.nih.gov/sra/sdk/2.9.6-1/sratoolkit.2.9.6-1-ubuntu64.tar.gz
Resolving ftp-trace.ncbi.nlm.nih.gov (ftp-trace.ncbi.nlm.nih.gov)... 130.14.250.10, 2607:f220:41e:250::13
Connecting to ftp-trace.ncbi.nlm.nih.gov (ftp-trace.ncbi.nlm.nih.gov)|130.14.250.10|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 84492294 (81M) [application/x-gzip]
Saving to: ‘sratoolkit.2.9.6-1-ubuntu64.tar.gz’


2020-02-03 06:57:30 (79.5 MB/s) - ‘sratoolkit.2.9.6-1-ubuntu64.tar.gz’ saved [84492294/84492294]

--2020-02-03 06:57:35--  https://sra-download.ncbi.nlm.nih.gov/traces/sra50/SRR/010646/SRR10902284
Resolving sra-download.ncbi.nlm.nih.gov (sra-download.ncbi.nlm.nih.gov)... 165.112.9.231, 165.112.9.232, 165.112.9.238
Connecting to sra-download.ncbi.nlm.nih.gov (sra-download.ncbi.nlm.nih.gov)|165.112.9.231|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 82543031 (79M) [application/octet-stream]
Saving to: ‘SRR10

## Fastq class to process the fastq file

In [0]:
# py4javaerror happen when the path of the file is not correct
!pip install pyspark[sql]

from __future__ import print_function
from functools import wraps
import pyspark as spark
from pyspark import SparkConf
import time
from operator import add
import os 
from subprocess import STDOUT, check_call, check_output



class Fastq:
    def __init__(self, path:str) -> str:
        self.path = path
        self.install_java_scala()
        self.stop_context()
        self.sc = spark.SparkContext.getOrCreate(conf=self.set_conf())
        self.data = self.sc.textFile(self.path)

    def stop_context(self):
        try:
          self.sc.stop()
        except:
          pass

    def set_conf(self):
        conf = SparkConf().setAppName("App")
        conf = (conf.setMaster('local[*]')
          .set('spark.executor.memory', '4G')
          .set('spark.driver.memory', '16G')
          .set('spark.driver.maxResultSize', '8G'))
        return conf

    def install_java_scala(self):
        try:
          java_ver = check_output(['java', '-version'], stderr=STDOUT)
        except:
          java_ver = b''
        try:
          scala_ver = check_output(['scala', '-version'], stderr=STDOUT)
        except:
          scala_ver = b''
        if b'1.8.0_232' not in java_ver:
          java_8_install = ['apt-get', '--quiet', 'install',
                            '-y', 'openjdk-8-jdk-headless']
          java_set_alt = ['update-alternatives', '--set', 'java', 
                          '/usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java' ] 
          check_call(java_8_install, stdout=open(os.devnull, 'wb'), 
                     stderr=STDOUT)
          os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" 
          check_call(java_set_alt)  
        if b'2.11.12' not in scala_ver:
          scala_install = ['apt-get', '--quiet', 'install', 'scala']
          check_call(scala_install)
          

    def _logging(func):
        @wraps(func)
        def log_print(instance, *args, **kwargs):
          start = time.time()
          res = func(instance, *args, **kwargs)
          print("Finished Executing {}  in {}s!".format(func.__name__, time.time() - start))
          return res
        return log_print

    @_logging
    def get_data(self):
        return self.data


    @_logging
    def count_bases(self):
      seqs = self.extract_seq()
      seqs = seqs.flatMap(lambda line: list(line)) 
      seqs = seqs.map(lambda c: (c, 1))
      return seqs.reduceByKey(lambda a, b: a+b)#\
            #  .map(lambda c: (c, 1)) \
            #  .reduceByKey(lambda k1, k2: k1 + k2)
      # counts.saveAsTextFile('outputs')
      # print("saved output")

    @_logging
    def extract_seq(self):
        return self.data.filter(lambda x: x.isalpha())

    @_logging
    def get_lengths(self):
        seqs = self.extract_seq()
        return seqs.map(lambda x: len(x))

    def extract_qual(self):
        pass

    def extract_meta(self):
        pass

Collecting pyspark[sql]
[?25l  Downloading https://files.pythonhosted.org/packages/87/21/f05c186f4ddb01d15d0ddc36ef4b7e3cedbeb6412274a41f26b55a650ee5/pyspark-2.4.4.tar.gz (215.7MB)
[K     |████████████████████████████████| 215.7MB 50kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 33.8MB/s 
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.4-py2.py3-none-any.whl size=216130387 sha256=0b0c0491302cb8192fb5c624955951983b27374e814974d39e66dfe8c35683d2
  Stored in directory: /root/.cache/pip/wheels/ab/09/4d/0d184230058e654eb1b04467dbc1292f00eaa186544604b471
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.4


In [0]:
# read file
os.chdir('/content')
fasta = Fastq('SRR10902284.fastq')

In [0]:
# show first read
fasta.data.take(4)

['@SRR10902284.1 1 length=305',
 'CGTATACTTCATTCAATTACATGTGCTAAAGTAAAGACTACCTTTCTGCTGAGGAAACAGCACCTATTTCCCACTGGAAGATAAGCGCCTGCTGCTTCCCTTAGATGTAATGCCATTCTCAAACTCCCCTCTCCCCTGGGATCAGGCCTGATTCCCCGTCACCGTAGTCACCATAGTAAGCTGAAAGCTACCATCGGGTGTCGAGCAGACGTTCCGAATGGGTCATCGCCGCCACGTATCCTCAGTGAAACAGAGTACTGTTCTCGCAAAACGAAAGTAGTCTTAACATACCTTAATACATAACA',
 '+SRR10902284.1 1 length=305',
 '-,168??>:/$//:1)(*,1/)(2\')(()\'#%)2$)15831-29;.0&#($*.+-..9<*:<561&\'-;<>6/4>::()*)%\'\'/%%:=?<//\'#%%,--*.\')8)**)$%\',-05,017*)$78>FC&3+29>6$"%-(%47$%&&(&\'016;=@=3:C664.4((.+0AB-\'\'+)*&%#$&))\'#11677((5*%(.)$\'*\'\'$$&&./$0*A006\'\'+.20)9=;=>7GD8**1300/)=.23$$.&-21*5?5CD8HGAC?)8>=-&\'(*/-\'\'\'*1-/03,+/))%%%%(%%%&$$\')&&']

In [0]:
# show read count
fasta.data.count()

1047560

In [0]:
# extract sequences alone from the fastq file
seqs = fasta.extract_seq()

Finished Executing extract_seq  in 0.0008852481842041016s!


In [0]:
seqs.take(4)

['CGTATACTTCATTCAATTACATGTGCTAAAGTAAAGACTACCTTTCTGCTGAGGAAACAGCACCTATTTCCCACTGGAAGATAAGCGCCTGCTGCTTCCCTTAGATGTAATGCCATTCTCAAACTCCCCTCTCCCCTGGGATCAGGCCTGATTCCCCGTCACCGTAGTCACCATAGTAAGCTGAAAGCTACCATCGGGTGTCGAGCAGACGTTCCGAATGGGTCATCGCCGCCACGTATCCTCAGTGAAACAGAGTACTGTTCTCGCAAAACGAAAGTAGTCTTAACATACCTTAATACATAACA',
 'CAGTAGTACTTCGTTCAGTTACGTATTGCTAAGGTTAAGACACTTTCTGCCGGTTGCGAGAACAGCACCTGTTTCCCACTGGAGGATATAGCCCCTTTTCACCCGTAGGTCGTATGCGGCATTAGCGTAAGTTTCCCTACGTTATCCCACTAATAGGCAGATTCTAAACATTTACTCACCCGTTCATACTAATCCGTCTAGCAAGCTAGACTTCATCGTTCGACTTGCATGTGTTAAGCCTGCCTTAGATATCCTCCAGTGGGGAAACAGGTGCTGTTTCTCGCAAAGGCGAAAGTAGTCTTAACCTTAGCAATACGTGAGC',
 'GTAATACTTCGTTCAGTTACATGTGCTAAGGTTAAGACTTGCTGCCTTTGCGAGAACAGCACCTGTTTTACTGGAGGATACGGTGGCGGCGACGACAGGTAAGGCATCTGCCCTATCAGCTTTCGATGGCGTCGCCATTACCTACCATAGTGACACGGTGACGGGAATCAGGGTTCAAATTCGGAGAGGAGCCTACGAGAGCAACTACCCACATCAAGGAAGGCAGCAGGCGCACCAAATTACCCTCCCGACCCGGGAAGTAGTGACGAAAAATATAACAATACGGGACTATCCTCCAGTGGGAAACAGGTGCTTGTTCTCGCAAAGGGGCAGAAAGTAGTCACTTAACCTTGGCAATACA

In [0]:
# compute read lengths
lens = fasta.get_lengths()

Finished Executing extract_seq  in 0.0015645027160644531s!
Finished Executing get_lengths  in 0.0019452571868896484s!


In [0]:
# show the lengths of the first 10 reads
lens.take(10)

[305, 322, 366, 399, 233, 326, 328, 327, 254, 270]

In [0]:
# get the average read length
len_sum = lens.reduce(lambda x, y: x+y)
len_sum//lens.count()

345

In [0]:
# count base occurance
bases = fasta.count_bases()

Finished Executing extract_seq  in 0.0023088455200195312s!
Finished Executing count_bases  in 0.07619762420654297s!


In [0]:

bases.take(10)

[('C', 22436039),
 ('N', 265540),
 ('G', 22393344),
 ('A', 21742728),
 ('T', 23731759)]

In [0]:
!pip freeze > requirements.txt

In [0]:
cat requirements.txt

absl-py==0.9.0
alabaster==0.7.12
albumentations==0.1.12
altair==4.0.0
asgiref==3.2.3
astor==0.8.1
astropy==4.0
atari-py==0.2.6
atomicwrites==1.3.0
attrs==19.3.0
audioread==2.1.8
autograd==1.3
Babel==2.8.0
backcall==0.1.0
backports.tempfile==1.0
backports.weakref==1.0.post1
beautifulsoup4==4.6.3
bleach==3.1.0
blis==0.2.4
bokeh==1.4.0
boto==2.49.0
boto3==1.10.47
botocore==1.13.47
Bottleneck==1.3.1
branca==0.3.1
bs4==0.0.1
bz2file==0.98
cachetools==4.0.0
certifi==2019.11.28
cffi==1.13.2
chainer==6.5.0
chardet==3.0.4
chart-studio==1.0.0
Click==7.0
cloudpickle==1.2.2
cmake==3.12.0
colorlover==0.3.0
community==1.0.0b1
contextlib2==0.5.5
convertdate==2.2.0
coverage==3.7.1
coveralls==0.5
crcmod==1.7
cufflinks==0.17.0
cvxopt==1.2.3
cvxpy==1.0.25
cycler==0.10.0
cymem==2.0.3
Cython==0.29.14
daft==0.0.4
dask==2.9.1
dataclasses==0.7
datascience==0.10.6
decorator==4.4.1
defusedxml==0.6.0
descartes==1.1.0
dill==0.3.1.1
distributed==1.25.3
Django==3.0.2
dlib==19.18.0
dm-sonnet==1.35
docopt==0.6.2
docu