In [4]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DoubleType
import time
import shutil
import os
import seaborn as sns

In [5]:
# Spark session and configuration
spark = (SparkSession.builder.master("local[80]")
         .config('spark.executor.instances', 16)
         .config('spark.executor.cores', 16)
         .config('spark.executor.memory', '10g')
         .config('spark.driver.memory', '15g')
         .config('spark.memory.offHeap.enabled', True)
         .config('spark.memory.offHeap.size', '20g')
         .config('spark.dirver.maxResultSize', '4096')
         .appName("amp.hell").getOrCreate())

# Enable Arrow-based columnar data 
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
spark.conf.set(
    "spark.sql.execution.arrow.pyspark.fallback.enabled", "true"
)

In [6]:
start = time.time()
path = '/beegfs/desy/user/bishara/projects/27_mlme/27.53_qqzz_onshell/data.4d.5helas_14.0tev.y/'
header = ['x'+str(i+1) for i in range(4)] + ['y'+str(i+1) for i in range(36)]
schema = StructType([StructField(header[i], DoubleType(), True) for i in range(40)])
df = spark.read.options(delimiter="  ").schema(schema).format("csv").load(path+'dlxy.*.unif_costh/batch.*.dat.gz')
print('read time:', time.time() - start)

start = time.time()
train_df, validate_df, test_df = df.randomSplit(weights=[0.5,0.25,0.25], seed=42)
train_path = '../data/train'
validate_path = '../data/validate'
test_path = '../data/test'
if os.path.exists(train_path):
    shutil.rmtree(train_path, ignore_errors=True)
if os.path.exists(validate_path):
    shutil.rmtree(validate_path, ignore_errors=True)
if os.path.exists(test_path):
    shutil.rmtree(test_path, ignore_errors=True)
(train_df.write.option("header",True)
           .option("delimiter",",")
           .option("compression","gzip")
           .csv(train_path))
(validate_df.write.option("header",True)
           .option("delimiter",",")
           .option("compression","gzip")
           .csv(validate_path))
(test_df.write.option("header",True)
           .option("delimiter",",")
           .option("compression","gzip")
           .csv(test_path))
print('write time:', time.time() - start)

read time: 31.28958487510681
write time: 73.77821731567383


In [7]:
start = time.time()
path = '/beegfs/desy/user/bishara/projects/27_mlme/27.53_qqzz_onshell/data.4d.5helas_14.0tev.mc/'
header = ['x'+str(i+1) for i in range(4)] + ['y'+str(i+1) for i in range(36)]
schema = StructType([StructField(header[i], DoubleType(), True) for i in range(40)])
df = spark.read.options(delimiter="  ").schema(schema).format("csv").load(path+'dlxy.*/batch.*.dat.gz')
print('read time:', time.time() - start)

start = time.time()
validate_df, test_df = df.randomSplit(weights=[0.5,0.5], seed=42)
validate_path = '../data/validate_mc'
test_path = '../data/test_mc'
if os.path.exists(validate_path):
    shutil.rmtree(validate_path, ignore_errors=True)
if os.path.exists(test_path):
    shutil.rmtree(test_path, ignore_errors=True)
(validate_df.write.option("header",True)
           .option("delimiter",",")
           .option("compression","gzip")
           .csv(validate_path))
(test_df.write.option("header",True)
           .option("delimiter",",")
           .option("compression","gzip")
           .csv(test_path))
print('write time:', time.time() - start)

read time: 40.68884587287903
write time: 62.522849559783936


In [14]:
(train_df.limit(1000000)
        .select('x1', 'x2', 'x3', 'x4', 'y1', 'y2')
        .write.option("header",True)
        .option("delimiter",",")
        .option("compression","gzip")
        .csv('dataset'))