# 4-SparkDataframe

This notebook demonstrates how to read and process a tabular datafile with the Spark(https://spark.apache.org/docs/latest/sql-programming-guide.html) library. Apache Spark is a unified analytics engine for large-scale parallel data processing. Spark can handle datasets that are large than the available memory (out-of-core) and process them in parallel on multiple cores.

Author: Peter W. Rose (pwrose@ucsd.edu)

In [1]:
import os
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc
import time

If LOCAL_SCRATCH_DIR environment variable is not set, this notebook accesses the ../data directory for temporary files.

In [2]:
DATA_DIR = os.getenv("LOCAL_SCRATCH_DIR", default="../data")
filename = os.path.join(DATA_DIR, "gene_info.tsv")
file_size = f"{os.path.getsize(filename)/1E9:.1f}"
RESULTS_DIR = "results"
os.makedirs(RESULTS_DIR, exist_ok=True)

### Setup Benchmark

The ```n_cores``` and ```file_format``` parameter are used for benchmarking ([see](7-ParallelEfficiency.ipynb)). 
The cell below has been [parameterized](https://papermill.readthedocs.io/en/latest/usage-parameterize.html#jupyterlab-3-0) as input parameters for [papermill](https://papermill.readthedocs.io/en/latest/index.html).

In [3]:
n_cores = 8
file_format = "csv"

In [4]:
start = time.time()

### Setup Spark

In [5]:
if n_cores > 0:
    # use n_cores for benchmarking
    spark = SparkSession.builder.master(f"local[{n_cores}]").appName("SparkDataframe").getOrCreate()
else:
    # use all available cores
    spark = SparkSession.builder.appName("SparkDataframe").getOrCreate()

spark.sparkContext.setLogLevel("ERROR")
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/13 17:48:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Read Data

In [6]:
# read only specified columns
column_names = ["GeneID", "Symbol", "Synonyms", "description", "type_of_gene", "#tax_id", "chromosome"]

if file_format == "csv":
    filename = os.path.join(DATA_DIR, "gene_info.tsv")
    genes = spark.read.option("header","true").option("sep", "\t").csv(filename)
elif file_format == "parquet":
    filename = os.path.join(DATA_DIR, "gene_info.parquet")
    genes = spark.read.parquet(filename)
else:
    print("invalid file format")
    
print("Filename:", filename)

genes = genes.select(column_names)
genes = genes.filter("type_of_gene == 'protein-coding'")
genes = genes.withColumnRenamed("#tax_id", "tax_id")

Filename: /scratch/pwrose/job_23000827/gene_info.tsv


### Process Data

In [7]:
groups = genes.groupBy(["tax_id"]).count()
groups = groups.toDF("tax_id", "count")
groups = groups.sort(col("count").desc())

Convert Spark to Pandas dataframe (this triggers the computation)

In [8]:
groups = groups.toPandas()

                                                                                

In [9]:
spark.stop()

### Display Results

#### Number of human protein-coding genes (tax_id = 9606)

In [10]:
groups.query("tax_id == '9606'")

Unnamed: 0,tax_id,count
480,9606,20646


#### Top 5 organisms with the most protein-coding genes

In [11]:
groups.head()

Unnamed: 0,tax_id,count
0,4565,104039
1,3708,90975
2,90675,82686
3,412133,72290
4,94328,68154


In [12]:
end = time.time()

In [13]:
if n_cores > 0:
    df = pd.DataFrame([{"cores": n_cores, "time": end-start, "size": file_size, "format": file_format, "dataframe": "Spark"}])
    output_file = f"4-SparkDataframe_{file_size}_{file_format}_{n_cores}.csv"
    df.to_csv(os.path.join(RESULTS_DIR, output_file), index=False)

In [14]:
print(f"Spark: {end - start:.1f} sec.")

Spark: 14.2 sec.
