# Processing Javierre 2016 data

The Javierre interval data is spilt by cell lines in which the data was recoreded.This information needs to be considered when the data is processed. As there's no expectation for this data to be changed, the most time consuming steps are written here and the resulting data is saved as partitioned parquet.

In [None]:
import pandas as pd

from pyspark.sql import dataframe
from functools import reduce
import os

import pyspark.sql
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.conf import SparkConf

spark_mem_limit = 10

spark_conf = (
    SparkConf()
    .set('spark.driver.memory', f'{spark_mem_limit}g')
    .set('spark.executor.memory', f'{spark_mem_limit}g')
    .set('spark.driver.maxResultSize', '0')
    .set('spark.debug.maxToStringFields', '2000')
    .set('spark.sql.execution.arrow.maxRecordsPerBatch', '500000')
    .set('spark.ui.showConsoleProgress', 'false')
)
spark = (
    pyspark.sql.SparkSession.builder.config(conf=spark_conf)
    .master('local[*]')
    .config("spark.driver.bindAddress", "127.0.0.1")
    .getOrCreate()
)


data_folder = 'gs://genetics-portal-input/v2g_input/javierre2016/'

def read_data(input_directory: str, file_name: str) -> pd.DataFrame:
    '''This function reads Javierre 2016 datasets

    A compressed tsv file is read based on the provided file name and path.
    The file name is read and parsed as bio_feature (later normalized as cell line.)
    The function doesn't care if the files are in gs:// location or local.

    Columns in the returned dataframe:
    - chrom: chromosome for the first interval
    - start: start position for the first interval
    - end: end position for the first interval
    - name: name of the second interval + score
    - score: some sort of score... I don't exactly know.
    - annotation: again, not sure what it is.
    - bio_feature: raw cell type from the file name.

    Params:
        input_directory: str - path to the input directory
        file_name: str - name of the file to read
    Returns:
        pyspak.dataFrame - dataframe with the data
    '''
    # logging.info(f'Reading file: {file_name}')
    print(f'Reading file: {file_name}')
    return spark.createDataFrame(
        pd.read_csv(input_directory + file_name, sep='\t', header=None)
        .rename(columns={0: 'chrom', 1: 'start', 2: 'end', 3: 'name', 4: 'score', 5: 'annotation'})
        .astype({'chrom': 'string', 'start': 'int', 'end': 'int', 'score': 'float'})
        .assign(bio_feature=file_name.split('.')[0])
    ).persist()

schema = T.StructType([
    T.StructField("chrom", T.StringType(), True),
    T.StructField("start", T.IntegerType(), True),
    T.StructField("end", T.IntegerType(), True),
    T.StructField("name", T.StringType(), True),
    T.StructField("score", T.StringType(), True),
    T.StructField("annotation", T.StringType(), True),
    T.StructField("bio_feature", T.StringType(), True),
])

if data_folder.startswith('gs://'):
    file_list = [x for x in gcsfs.GCSFileSystem().ls(data_folder) if x.endswith('.txt.gz')]
else:
    file_list = [x for x in os.listdir(data_folder) if x.endswith('.txt.gz')]

df = reduce(lambda x, y: x.union(read_data(data_folder, y)), file_list, spark.createDataFrame(data=[], schema=schema))
(
    df
    .orderBy('chrom', 'start')
    .write
    .mode('overwrite')
    .parquet('gs://genetics-portal-input/v2g_input/javierre_2016_preprocessed.parquet')
)
df.show()