In [69]:
import findspark
findspark.init()

from pyspark import  SparkContext
from pyspark.sql import SparkSession
import time
import pandas as pd
import matplotlib.pyplot as plt


def build_session(cores: int):
    spark_session = SparkSession\
        .builder\
        .master("spark://192.168.2.243:7077") \
        .appName("test")\
        .config("spark.executor.cores", 1)\
        .config("spark.cores.max", cores)\
        .getOrCreate()
    return spark_session

spark_context = spark_session.sparkContext

In [19]:
df1 = spark_session.read\
.option("header","true")\
.option("delimiter", "\t")\
.csv("hdfs://team-4-master:9000/1000genomes/All.chrX.LC54_GL.20101123_20110521.snp.low_coverage.genotypes_processed.vcf")


In [3]:
# Filter out others and only keep the SNP alleles data
df2 = df1.select("REF","ALT")


In [4]:
# Combine the two column so that now we have a column showing the mutation of the SNP
# e.g. mutated from a G to an A

df2.createOrReplaceTempView("df2")
df3 = spark_session.sql("SELECT CONCAT(REF, 'to',  ALT) as Mutation FROM df2")


In [5]:
# WE now can know which mutation is the most common one in this chromosome.
df4 = df3.select("Mutation").groupBy("Mutation").count().sort("count", ascending=False).show()


+--------+------+
|Mutation| count|
+--------+------+
|    GtoA|282532|
|    CtoT|279711|
|    TtoC|199767|
|    AtoG|197835|
|    CtoA| 71314|
|    GtoT| 70288|
|    CtoG| 57710|
|    GtoC| 57569|
|    TtoA| 55049|
|    AtoT| 54708|
|    AtoC| 50670|
|    TtoG| 50634|
+--------+------+



In [50]:
spark_context.stop()

In [70]:
#Scalability analysis (Strong Scaling)

number_of_cores = [1, 2, 3, 4]
results_strong = []

for cores in number_of_cores:
    #Build Session
    spark_session = build_session(cores)
    spark_context = spark_session.sparkContext
    
    #Starting timer for reading the files
    start_time = time.time()
    df1 = spark_session.read\
    .option("header","true")\
    .option("delimiter", "\t")\
    .csv("hdfs://team-4-master:9000/1000genomes/All.chrX.LC54_GL.20101123_20110521.snp.low_coverage.genotypes_processed.vcf")
    read_time = time.time() - start_time
    
    # Starting the timer for executing the analysis
    start_time_execution = time.time()
    # Filter out others and only keep the SNP alleles data
    df2 = df1.select("REF","ALT")
    
    # Combine the two column so that now we have a column showing the mutation of the SNP e.g. mutated from a G to an A
    df2.createOrReplaceTempView("df2")
    df3 = spark_session.sql("SELECT CONCAT(REF, 'to',  ALT) as Mutation FROM df2")
    
    # WE now can know which mutation is the most common one in this chromosome.
    df4 = df3.select("Mutation").groupBy("Mutation").count().sort("count", ascending=False).show()
    
    execution_time = time.time() - start_time_execution
    total_time = time.time() - start_time
    results_strong.append((cores, read_time, execution_time, total_time))
    spark_context.stop()
    
df_results_strong = pd.DataFrame(data=results_strong, columns=["cores", "total time", "read time", "execution time"])
df_results_strong


+--------+------+
|Mutation| count|
+--------+------+
|    GtoA|282532|
|    CtoT|279711|
|    TtoC|199767|
|    AtoG|197835|
|    CtoA| 71314|
|    GtoT| 70288|
|    CtoG| 57710|
|    GtoC| 57569|
|    TtoA| 55049|
|    AtoT| 54708|
|    AtoC| 50670|
|    TtoG| 50634|
+--------+------+

+--------+------+
|Mutation| count|
+--------+------+
|    GtoA|282532|
|    CtoT|279711|
|    TtoC|199767|
|    AtoG|197835|
|    CtoA| 71314|
|    GtoT| 70288|
|    CtoG| 57710|
|    GtoC| 57569|
|    TtoA| 55049|
|    AtoT| 54708|
|    AtoC| 50670|
|    TtoG| 50634|
+--------+------+

+--------+------+
|Mutation| count|
+--------+------+
|    GtoA|282532|
|    CtoT|279711|
|    TtoC|199767|
|    AtoG|197835|
|    CtoA| 71314|
|    GtoT| 70288|
|    CtoG| 57710|
|    GtoC| 57569|
|    TtoA| 55049|
|    AtoT| 54708|
|    AtoC| 50670|
|    TtoG| 50634|
+--------+------+

+--------+------+
|Mutation| count|
+--------+------+
|    GtoA|282532|
|    CtoT|279711|
|    TtoC|199767|
|    AtoG|197835|
|    Ct

Unnamed: 0,cores,total time,read time,execution time
0,1,6.24817,26.657454,32.905629
1,2,6.069549,20.5158,26.585356
2,3,5.835913,21.300663,27.136582
3,4,5.920547,21.033583,26.954135


In [1]:
#Plot the result for the strong scalability test