# Load Package

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:


import os
cur_path = "/content/drive/MyDrive/Bigdataproject/"
os.chdir(cur_path)
!pwd

/content/drive/MyDrive/Bigdataproject


In [3]:
!pip install pyspark
!pip install pyspark[sql]
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317145 sha256=6f096cb4e211c3e6098efd250e6acbc276d8608c35e6b43cb1b1c61f62a48605
  Stored in directory: /root/.cache/pip/wheels/9f/34/a4/159aa12d0a510d5ff7c8f0220abbea42e5d81ecf588c4fd884
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
The following additional packages will be installed:
  libxtst6 openjdk-8-jre-

In [4]:
import pyspark as spark
import time
from operator import add
from pyspark.sql import SparkSession

In [5]:
import numpy as np
import random

# Step 0: Read Data

## Set spark session

In [None]:
# set spark session
ss = (SparkSession
  .builder
  .master("local[5]")
  .appName("fqproject")
  .getOrCreate())


## Set spark context

In [None]:
# set spark session
sc = (spark
  .SparkContext
  .getOrCreate(spark
    .SparkConf()
    .setAppName("fqproject")
    .setMaster('local[*]')
    .set('spark.executor.memory', '4G')
    .set('spark.driver.memory', '4G')
    .set('spark.driver.maxResultSize', '4G')))

## Read reads.fq

In [None]:
# read reads.fq and extract the second line of every four lines.
reads = (sc
  .textFile(cur_path + "data/reads.fq")
  .zipWithIndex()
  .filter(lambda x: (x[1]+1)%2==0 and (x[1]+1)%4!=0)
  .map(lambda x: x[0]))

In [None]:
reads.take(10)

['TCCTTACTGGTTTTGCAGGTAACTTATAGAGTATTTCCACTTCCCTTCTCCTATCCCTTGAAAAATTGTCATTTATTTCTCTTATCCATATGGCATAATC',
 'GGTTTTTCAGGTAACTTATAGAGTATTTCCACTTCCCTTCTCCTATCCCTGGAAAAATTGTCATTTATTTCTCTTATCCATATGGCATAATCAAAGAATA',
 'AGGTAACTTATAGAGTACTTCCACTTCCCTTCTCCTATCCCTTGAAAAATTGTCATTGATTTCTCTTATCCATATGGCATAATCAAAGAATAAATTGGTG',
 'CACTTCCCTTCTACTATCCCTTGAAAAATTGTCATTTATTTCTCTTATCCATATGGCATAATCAAAGAATAAATTGGTGATATTTGTTCAAAAATCCATG',
 'CCTTCTCCTATCCCTTGAAAAATTGTCATTTATTTCCCTTATCCATATGGCATAATCAAAGAATAAATTGGTGATATTTGTTCAAAAATCCATGCCTTTT',
 'ATCCCTTGAAAAATTGTCATTTATTTCTCTTATCCATATGGCATAATCAAAGAATAAATTGGTGATATTTGTTCAAAAATCCGTGCCTATTAGATTCATT',
 'TCTTATCCATATGGCATAATCAAAGAATAAATTGGTGATATTTGTTCAAAAATCCATGCCTATTAGATTCATTTAGAATATAAAAAAATTTTATTTTATT',
 'TATCCATATGGCATAATCAAAGAATCAATTGTTGATATTTGTTCAAAAATCCATGCCTATTAGATTCATTTAGAATATAAAAAAATTTTATTTTATTTTC',
 'TCAAAGAATAAATTGGTGATATTTGTTCAAAAATCCATGCCTATTAGATTCATTTAGATTATAAAAAAATTTTATTTTATTTTCACTTATTTCTTCTCCA',
 'TGAATAAATTGGTGATATTTGTTCAAAAATCCATGCCTACTAGATTCATTTAG

## Read reference.fa

In [None]:
# read reference
reference = (ss
  .read
  .csv(
      cur_path + "data/reference_chr21_20000000_20050000.fa",
      inferSchema=True,
      header=True))

In [None]:
reference.show(10)

+--------+--------+--------------------+
|   start|     end|            sequence|
+--------+--------+--------------------+
|20000000|20000100|CCCTTCTCCTATCCCTT...|
|20000100|20000200|TAGATTCATTTAGAATA...|
|20000200|20000300|TTCTTCTTCCTGAAGAA...|
|20000300|20000400|ACTTTTCAAGGATAGTT...|
|20000400|20000500|CTGACAGGACTTCTGCC...|
|20000500|20000600|TTTCCTTTTTTTTTTCT...|
|20000600|20000700|ATTATAAAAAGGGAGGG...|
|20000700|20000800|TTCTTTTCTTTTTCTCC...|
|20000800|20000900|ATAAATTTCTGCTTGAA...|
|20000900|20001000|TTCGTTAGTGTTTTTTA...|
+--------+--------+--------------------+
only showing top 10 rows



In [None]:
reference.count()

500

# Step 1: Cut reads into kmers = 15

## Define K-mers

In [None]:
# input a string s and cut k-mers
def extract_kmers(s: str, k: int = 15) -> list:
    return [s[i:i+k] for i in range(len(s)-k+1)]

## Map K-mers to reads

In [None]:
# K-mers of reads
reads_kmers = (reads
  .map(extract_kmers))

In [None]:
reads_kmers.take(2)

[['TCCTTACTGGTTTTG',
  'CCTTACTGGTTTTGC',
  'CTTACTGGTTTTGCA',
  'TTACTGGTTTTGCAG',
  'TACTGGTTTTGCAGG',
  'ACTGGTTTTGCAGGT',
  'CTGGTTTTGCAGGTA',
  'TGGTTTTGCAGGTAA',
  'GGTTTTGCAGGTAAC',
  'GTTTTGCAGGTAACT',
  'TTTTGCAGGTAACTT',
  'TTTGCAGGTAACTTA',
  'TTGCAGGTAACTTAT',
  'TGCAGGTAACTTATA',
  'GCAGGTAACTTATAG',
  'CAGGTAACTTATAGA',
  'AGGTAACTTATAGAG',
  'GGTAACTTATAGAGT',
  'GTAACTTATAGAGTA',
  'TAACTTATAGAGTAT',
  'AACTTATAGAGTATT',
  'ACTTATAGAGTATTT',
  'CTTATAGAGTATTTC',
  'TTATAGAGTATTTCC',
  'TATAGAGTATTTCCA',
  'ATAGAGTATTTCCAC',
  'TAGAGTATTTCCACT',
  'AGAGTATTTCCACTT',
  'GAGTATTTCCACTTC',
  'AGTATTTCCACTTCC',
  'GTATTTCCACTTCCC',
  'TATTTCCACTTCCCT',
  'ATTTCCACTTCCCTT',
  'TTTCCACTTCCCTTC',
  'TTCCACTTCCCTTCT',
  'TCCACTTCCCTTCTC',
  'CCACTTCCCTTCTCC',
  'CACTTCCCTTCTCCT',
  'ACTTCCCTTCTCCTA',
  'CTTCCCTTCTCCTAT',
  'TTCCCTTCTCCTATC',
  'TCCCTTCTCCTATCC',
  'CCCTTCTCCTATCCC',
  'CCTTCTCCTATCCCT',
  'CTTCTCCTATCCCTT',
  'TTCTCCTATCCCTTG',
  'TCTCCTATCCCTTGA',
  'CTCCTATCCC

# Step 2: Cut reference bins into kmers = 15

In [None]:
# K-mers of reference
reference_kmers = (reference
  .select('sequence')
  .rdd
  .flatMap(lambda x: x)
  .map(extract_kmers))

In [None]:
reference_kmers.take(2)

[['CCCTTCTCCTATCCC',
  'CCTTCTCCTATCCCT',
  'CTTCTCCTATCCCTT',
  'TTCTCCTATCCCTTG',
  'TCTCCTATCCCTTGA',
  'CTCCTATCCCTTGAA',
  'TCCTATCCCTTGAAA',
  'CCTATCCCTTGAAAA',
  'CTATCCCTTGAAAAA',
  'TATCCCTTGAAAAAT',
  'ATCCCTTGAAAAATT',
  'TCCCTTGAAAAATTG',
  'CCCTTGAAAAATTGT',
  'CCTTGAAAAATTGTC',
  'CTTGAAAAATTGTCA',
  'TTGAAAAATTGTCAT',
  'TGAAAAATTGTCATT',
  'GAAAAATTGTCATTT',
  'AAAAATTGTCATTTA',
  'AAAATTGTCATTTAT',
  'AAATTGTCATTTATT',
  'AATTGTCATTTATTT',
  'ATTGTCATTTATTTC',
  'TTGTCATTTATTTCT',
  'TGTCATTTATTTCTC',
  'GTCATTTATTTCTCT',
  'TCATTTATTTCTCTT',
  'CATTTATTTCTCTTA',
  'ATTTATTTCTCTTAT',
  'TTTATTTCTCTTATC',
  'TTATTTCTCTTATCC',
  'TATTTCTCTTATCCA',
  'ATTTCTCTTATCCAT',
  'TTTCTCTTATCCATA',
  'TTCTCTTATCCATAT',
  'TCTCTTATCCATATG',
  'CTCTTATCCATATGG',
  'TCTTATCCATATGGC',
  'CTTATCCATATGGCA',
  'TTATCCATATGGCAT',
  'TATCCATATGGCATA',
  'ATCCATATGGCATAA',
  'TCCATATGGCATAAT',
  'CCATATGGCATAATC',
  'CATATGGCATAATCA',
  'ATATGGCATAATCAA',
  'TATGGCATAATCAAA',
  'ATGGCATAAT

# Step 3: Collect all kmers and build a distinct kmer set

• Hint: you can use python “set” function

• Report the number of distinct kmers (N)

In [None]:
# Merge and distinct, collect to a set
kmers = set(reads_kmers 
  .union(reference_kmers)
  .flatMap(lambda x: x) 
  .distinct()
  .collect())

In [None]:
len(kmers)

72530

**The number of distinct kmers is 72,530**


In [None]:
reads

PythonRDD[40] at RDD at PythonRDD.scala:53

# Step4

In [None]:
from pyspark.ml.linalg import Vectors
import pandas as pd

In [None]:

# turn reads to list 
r_list = reads.collect()

In [None]:
# check feature vector of every kmer in each read
def check_kmer_features(kmer):
  m = []
  

  for i in range(len(r_list)):
    if kmer in r_list[i]:
      m.append(i)
  v = [1.0 for i in range(len(m))]
  return (kmer,Vectors.dense(Vectors.sparse(len(r_list),m,v)))

In [None]:
kmers_rdd = sc.parallelize(list(kmers))

In [None]:
reads_features = kmers_rdd.map(lambda x: check_kmer_features(x))

In [None]:
reads_features_df = reads_features.toDF(['kmer','read_features'])

In [None]:
#generate the dataframe with sparse vector
reads_features_df.show(1)

+---------------+--------------------+
|           kmer|       read_features|
+---------------+--------------------+
|CTCATCCCATTACCC|[0.0,0.0,0.0,0.0,...|
+---------------+--------------------+
only showing top 1 row



In [None]:
from pyspark.sql.functions import col
from pyspark.ml.functions import vector_to_array

In [None]:
reads_features_df = reads_features_df.withColumn("read", vector_to_array("read_features")).select(["kmer"] + [col("read")[i] for i in range(2000)])

In [None]:
read_name = {r_list[i]: 'reads' + str(i) for i in range(len(r_list))}

In [None]:
# old code
def check_kmer_features_old1(read):
  m = []
  kmers_list = list(kmers)
  for i in range(len(kmers_list)):
    if kmers_list[i] in read:
      m.append(i)
  v = [1.0 for i in range(len(m))]
  return (read_name[read],Vectors.sparse(len(kmers_list),m,v))


In [None]:
reads_features = reads.map(lambda x: check_kmer_features_old1(x))

In [None]:
reads_features.take(2)

[('reads0',
  SparseVector(72530, {173: 1.0, 853: 1.0, 1697: 1.0, 2461: 1.0, 2756: 1.0, 4009: 1.0, 4623: 1.0, 4892: 1.0, 6087: 1.0, 6582: 1.0, 7642: 1.0, 7938: 1.0, 9013: 1.0, 11645: 1.0, 13192: 1.0, 16390: 1.0, 17650: 1.0, 18185: 1.0, 18287: 1.0, 19252: 1.0, 19552: 1.0, 20320: 1.0, 20396: 1.0, 21788: 1.0, 22089: 1.0, 23402: 1.0, 23741: 1.0, 23943: 1.0, 23964: 1.0, 24722: 1.0, 25527: 1.0, 25942: 1.0, 26524: 1.0, 26766: 1.0, 27980: 1.0, 28197: 1.0, 30104: 1.0, 31464: 1.0, 34048: 1.0, 34662: 1.0, 35093: 1.0, 36084: 1.0, 37855: 1.0, 38460: 1.0, 40043: 1.0, 40470: 1.0, 40761: 1.0, 41115: 1.0, 41684: 1.0, 42492: 1.0, 42564: 1.0, 42906: 1.0, 43060: 1.0, 43484: 1.0, 44209: 1.0, 45091: 1.0, 46234: 1.0, 50524: 1.0, 51918: 1.0, 53785: 1.0, 54037: 1.0, 54184: 1.0, 54906: 1.0, 55883: 1.0, 56211: 1.0, 58218: 1.0, 59428: 1.0, 59616: 1.0, 61052: 1.0, 61059: 1.0, 61212: 1.0, 61334: 1.0, 61465: 1.0, 61883: 1.0, 61973: 1.0, 62577: 1.0, 63348: 1.0, 63439: 1.0, 65000: 1.0, 65351: 1.0, 65723: 1.0, 67217: 1

# Step5

In [None]:
reference_rdd = reference.select("sequence").rdd.map(lambda x: x['sequence'])

In [None]:
reference_list = reference_rdd.collect()
reference_list[0]

'CCCTTCTCCTATCCCTTGAAAAATTGTCATTTATTTCTCTTATCCATATGGCATAATCAAAGAATAAATTGGTGATATTTGTTCAAAAATCCATGCCTAT'

In [None]:
#generate the name of columns
start_rdd = reference.select("start").rdd.map(lambda x: x['start'])
end_rdd = reference.select("end").rdd.map(lambda x: x['end'])
s_list = start_rdd.collect()
e_list = end_rdd.collect()
reference_name = ['bin' + str(s_list[i]) + '_' +  str(e_list[i]) for i in range(len(s_list))]
reference_name[0]

'bin20000000_20000100'

In [None]:
# check feature vector of every kmer in each reference
def check_kmer_read_features_sequence(kmer):
  m = []
  for i in range(len(reference_list)):
    if kmer in reference_list[i]:
      m.append(i)
  v = [1.0 for i in range(len(m))]
  return (kmer,Vectors.dense(Vectors.sparse(len(reference_list),m,v)))

In [None]:
reference_features = kmers_rdd.map(lambda x:check_kmer_read_features_sequence(x))

In [None]:
# turn to dataframe
reference_features_df = reference_features.toDF(['kmer','reference_features'])

In [None]:
reference_features_df.show(1) 

+---------------+--------------------+
|           kmer|  reference_features|
+---------------+--------------------+
|CTCATCCCATTACCC|[0.0,0.0,0.0,0.0,...|
+---------------+--------------------+
only showing top 1 row



In [None]:
# expand the dense vector to different collomns
reference_features_df = reference_features_df.withColumn("xs", vector_to_array("reference_features")).select(["kmer"] + [col("xs")[i] for i in range(len(reference_name))])

In [None]:
reference_name = ['kmer'] + reference_name
reference_features_df = reference_features_df.select([col(c).alias(reference_name[i]) for i, c in enumerate(reference_features_df.columns)])

In [None]:
reference_name_d = {reference_list[i]: reference_name[i+1] for i in range(len(reference_list))}
len(reference_name_d)

500

In [None]:
# old code
def check_kmer_features_old(read):
  m = []
  kmers_list = list(kmers)
  for i in range(len(kmers_list)):
    if kmers_list[i] in read:
      m.append(i)
  v = [1.0 for i in range(len(m))]
  return (reference_name_d[read],Vectors.sparse(len(kmers_list),m,v))

In [None]:
# old code 
reference_features = reference_rdd.map(lambda x: check_kmer_features_old(x))
reference_features.take(1)

[('bin20000000_20000100',
  SparseVector(72530, {173: 1.0, 853: 1.0, 2587: 1.0, 2756: 1.0, 5004: 1.0, 6087: 1.0, 7345: 1.0, 7642: 1.0, 8863: 1.0, 9013: 1.0, 9275: 1.0, 11645: 1.0, 13476: 1.0, 13832: 1.0, 14097: 1.0, 15259: 1.0, 15334: 1.0, 15844: 1.0, 16322: 1.0, 16673: 1.0, 17226: 1.0, 17650: 1.0, 18185: 1.0, 19252: 1.0, 19552: 1.0, 20320: 1.0, 20396: 1.0, 21788: 1.0, 22089: 1.0, 23014: 1.0, 25527: 1.0, 25621: 1.0, 26022: 1.0, 26524: 1.0, 27980: 1.0, 33448: 1.0, 33756: 1.0, 34048: 1.0, 34650: 1.0, 36123: 1.0, 38460: 1.0, 38721: 1.0, 40470: 1.0, 40761: 1.0, 41115: 1.0, 41684: 1.0, 42040: 1.0, 42492: 1.0, 42564: 1.0, 43060: 1.0, 43786: 1.0, 44209: 1.0, 45091: 1.0, 46234: 1.0, 46869: 1.0, 47313: 1.0, 48110: 1.0, 49404: 1.0, 49868: 1.0, 50161: 1.0, 51918: 1.0, 53086: 1.0, 53752: 1.0, 53785: 1.0, 54037: 1.0, 54085: 1.0, 57822: 1.0, 58218: 1.0, 58702: 1.0, 59428: 1.0, 61052: 1.0, 61212: 1.0, 61973: 1.0, 62465: 1.0, 62577: 1.0, 63348: 1.0, 63439: 1.0, 65032: 1.0, 65351: 1.0, 67541: 1.0, 6754


# Step 6: Use minhash to reduce the feature dimension for reads and reference bins

In [None]:
# define minhash
def minhash(hashtable: list, entry: tuple) -> tuple:
  ans = []
  for ls in hashtable:
    for i in ls:
      if entry[1][ls[i]]==1:
        ans.append(i)
        break
  return (entry[0], ans)

In [None]:
# create hash table
def minhash_table(n: int) -> list:
  ans = []
  for i in range(n):
    ans.append(random.sample(range(72530), 72530))
  return ans

In [None]:
# set seed
random.seed(114514)

In [None]:
# create hash table
hashtable = minhash_table(1000)

In [None]:
# run minhash on reads
start = time.time()
print(time.localtime(start))
read_sig = reads_features.map(lambda x: minhash(hashtable, x))
read_sig_temp = read_sig.collect()
print(time.time()-start)

time.struct_time(tm_year=2023, tm_mon=4, tm_mday=11, tm_hour=20, tm_min=6, tm_sec=49, tm_wday=1, tm_yday=101, tm_isdst=0)
5877.96895980835


In [None]:
# run minhash on references
start = time.time()
print(time.localtime(start))
ref_sig = reference_features.map(lambda x: minhash(hashtable, x))
ref_sig_temp = ref_sig.collect()
print(time.time()-start)

time.struct_time(tm_year=2023, tm_mon=4, tm_mday=11, tm_hour=21, tm_min=44, tm_sec=47, tm_wday=1, tm_yday=101, tm_isdst=0)
2525.6211519241333


In [None]:
len(ref_sig_temp)

500

In [None]:
len(read_sig_temp)

2000

## Test sequential

In [None]:
# collect 500 sample for each
reads_features_50 = reads_features.take(50)
reference_features_50 = reference_features.take(50)

In [None]:
# run minhash on reads 50
start = time.time()
print(time.localtime(start))
read_sig_50 = list(map(lambda x: minhash(hashtable, x), reads_features_50))
print(time.time()-start)

time.struct_time(tm_year=2023, tm_mon=4, tm_mday=11, tm_hour=22, tm_min=46, tm_sec=42, tm_wday=1, tm_yday=101, tm_isdst=0)
284.1727809906006


In [None]:
# run minhash on ref 50
start = time.time()
print(time.localtime(start))
ref_sig_50 = list(map(lambda x: minhash(hashtable, x), reference_features_50))
print(time.time()-start)

time.struct_time(tm_year=2023, tm_mon=4, tm_mday=11, tm_hour=22, tm_min=51, tm_sec=26, tm_wday=1, tm_yday=101, tm_isdst=0)
278.68529081344604


# step 7

In [None]:
read_l = read_sig_temp.collect()

In [None]:
ref_l = ref_sig_temp.collect()

In [None]:
len(read_l)

2000

In [None]:
len(ref_l)

500

In [None]:
final_read = [i[0] for i in read_l]

In [None]:
final_ref = []
final_score = []
count = 0

for i in read_l:
  count += 1
  max_score = 0
  max_ref = ''
  for j in ref_l:
    score = len([1 for e in range(1000) if i[1][e] == j[1][e]])/1000
    if score > max_score:
      max_ref = j[0]
      max_score = score
  final_ref.append(max_ref)
  final_score.append(max_score)
  if count%100 == 0:
    print(count)




100
200
300
400
500
600
700
800
900
1000
1100
1200
1300
1400
1500
1600
1700
1800
1900
2000


In [None]:
final_ref[:10]

['bin20000000_20000100',
 'bin20000000_20000100',
 'bin20000000_20000100',
 'bin20000000_20000100',
 'bin20000000_20000100',
 'bin20000000_20000100',
 'bin20000000_20000100',
 'bin20000100_20000200',
 'bin20000000_20000100',
 'bin20000100_20000200']

In [None]:
final_score[:10]

[0.348, 0.253, 0.357, 0.692, 0.672, 0.641, 0.4, 0.181, 0.232, 0.374]

In [None]:
final_df = pd.DataFrame({'read': final_read,'reference': final_ref,'score': final_score})

In [None]:
final_df.head(5)

Unnamed: 0,read,reference,score
0,reads0,bin20000000_20000100,0.348
1,reads1,bin20000000_20000100,0.253
2,reads2,bin20000000_20000100,0.357
3,reads3,bin20000000_20000100,0.692
4,reads4,bin20000000_20000100,0.672


In [None]:
final_df.to_csv(cur_path + "data/final_result.csv", header=True, index=None, sep=',')


# Step 8

In [6]:
import pandas as pd
import numpy as np
from sklearn.metrics import mean_squared_error

In [7]:
def extract_start(string):
    return string[3:11]

In [9]:
benchmark = pd.read_csv(cur_path + 'data/read_position_benchmark.csv')
results = pd.read_csv(cur_path + "data/Final_result/final_result_k15_p1000.csv")

results['pred_start'] = results['reference'].astype(str).apply(extract_start)
result_compare = pd.concat([results, benchmark['reference_start']], axis=1).loc[:,['pred_start','reference_start']].astype(str)
result_compare = result_compare[result_compare['pred_start'].str.len() == 8].astype(int)

# Pearson
corr = np.corrcoef(result_compare['pred_start'], result_compare['reference_start'])[0,1]
print('Pearson: ',corr)

# MSE
y_true = result_compare['reference_start']
y_pred = result_compare['pred_start']
mse = mean_squared_error(y_true, y_pred)
print('MSE: ',mse)

Pearson:  0.9999971145120188
MSE:  896.5645
