In [1]:
import numpy as np
import pandas as pd
import glob
import sys
import h5py
#from netCDF4 import Dataset
from datetime import datetime
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from scipy.spatial import cKDTree

import pyarrow as pa
import pyarrow.parquet as pqw

from functools import reduce
import operator
import gc

In [2]:
# plot settings
plt.rc('font', family='serif') 
plt.rc('font', serif='Times New Roman') 
plt.rcParams.update({'font.size': 16})
plt.rcParams['mathtext.fontset'] = 'stix'

In [48]:
# PySpark packages
from pyspark import SparkContext   
from pyspark.sql import SparkSession

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark import Row
from pyspark.sql.window import Window as W

#spark = SparkSession.builder \
#    .master("yarn") \
#    .appName("spark-shell") \
#    .config("spark.driver.maxResultSize", "32g") \
#    .config("spark.driver.memory", "32g") \
#    .config("spark.executor.memory", "6g") \
#    .config("spark.executor.cores", "1") \
#    .config("spark.executor.instances", "30") \
#    .getOrCreate()

#    .config("spark.executor.memory", "14g") \
#    .config("spark.executor.cores", "2") \
#    .config("spark.executor.instances", "60") \
#    .config("spark.executor.memory", "6g") \
#    .config("spark.jars.packages", "graphframes:graphframes:0.7.0-spark2.4-s_2.11") \
#    .getOrCreate()

spark = SparkSession.builder \
    .appName("MyApp") \
    .master("spark://sohnic:7077") \
    .config("spark.driver.memory", "100g") \
    .getOrCreate()

sc = spark.sparkContext
sc.setCheckpointDir("hdfs://sohnic:54310/tmp/checkpoints")

spark.conf.set("spark.sql.debug.maxToStringFields", 500)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [49]:
sc.getConf().getAll()[:10]

[('spark.app.submitTime', '1724489754988'),
 ('spark.rdd.compress', 'True'),
 ('spark.master', 'spark://sohnic:7077'),
 ('spark.driver.port', '38217'),
 ('spark.app.startTime', '1724489755183'),
 ('spark.sql.warehouse.dir', 'file:/home/lshin/spark-warehouse'),
 ('spark.app.name', 'MyApp'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.pyFiles', ''),
 ('spark.driver.host', 'sohnic')]

In [7]:
#h5dir = '/mnt/raid5/lshin/'
#flist = !ls /mnt/raid5/lshin

h5dir = '/home/lshin/TNG300/snap99/'
flist = !ls /home/lshin/TNG300/snap99

h5list = flist[1:]#[2:]
print(h5list[:5])

['snap099_sorted_x0_y0_z0.csv', 'snap099_sorted_x0_y0_z1.csv', 'snap099_sorted_x0_y0_z2.csv', 'snap099_sorted_x0_y0_z3.csv', 'snap099_sorted_x0_y0_z4.csv']


In [10]:
numh5list = len(h5list)
print(numh5list)

600


In [11]:
h5dir+h5list[0]

'/home/lshin/TNG300/snap99/snap099_sorted_x0_y0_z0.csv'

In [13]:
%%time
df = pd.read_csv(h5dir+h5list[0])
df.head()
#h5f.keys()

CPU times: user 584 ms, sys: 50.4 ms, total: 635 ms
Wall time: 633 ms


Unnamed: 0,px,py,pz,vx,vy,vz,mass
0,15389.08931,954.616907,4401.295627,-119.992165,-28.731781,-266.06906,0.000588
1,15389.030575,954.66996,4401.003768,-101.488754,12.874359,-215.97354,0.000428
2,15389.011885,955.174487,4400.624592,-84.58567,-22.560968,-274.01807,0.000623
3,15389.240237,955.166115,4401.306553,-48.979668,-38.06464,-236.63635,0.000476
4,15389.082537,955.356952,4401.132427,-76.940155,-1.374161,-279.7991,0.000767


# Save selected features as a parquet

In [34]:
schema = T.StructType([\
                       T.StructField('px',T.FloatType(), True),\
                       T.StructField('py',T.FloatType(), True),\
                       T.StructField('pz',T.FloatType(), True),\
                       T.StructField('vx',T.FloatType(), True),\
                       T.StructField('vy',T.FloatType(), True),\
                       T.StructField('vz',T.FloatType(), True),\
                       T.StructField('mass',T.FloatType(), True),\
                      ])

In [17]:
px = df['px'].tolist()
py = df['py'].tolist()
pz = df['pz'].tolist()
vx = df['vx'].tolist()
vy = df['vy'].tolist()
vz = df['vz'].tolist()
mass = df['mass'].tolist()

In [18]:
%%time
sparkdf = spark.createDataFrame(zip(px, py, pz, vx, vy, vz, mass),schema)

CPU times: user 3.09 s, sys: 52.8 ms, total: 3.14 s
Wall time: 4.56 s


In [19]:
%%time
sparkdf.show(3,truncate=True)

[Stage 0:>                                                          (0 + 1) / 1]

+---------+--------+---------+-----------+----------+----------+------------+
|       px|      py|       pz|         vx|        vy|        vz|        mass|
+---------+--------+---------+-----------+----------+----------+------------+
|15389.089|954.6169|4401.2954|-119.992165|-28.731781|-266.06906| 5.879042E-4|
| 15389.03|  954.67| 4401.004|-101.488754| 12.874359|-215.97354|4.2798722E-4|
|15389.012|955.1745|4400.6245|  -84.58567|-22.560968|-274.01807| 6.225057E-4|
+---------+--------+---------+-----------+----------+----------+------------+
only showing top 3 rows

CPU times: user 2.99 ms, sys: 3.45 ms, total: 6.44 ms
Wall time: 1.55 s


                                                                                

In [20]:
sparkdf.count()

                                                                                

829890

In [21]:
sparkdf.printSchema()

root
 |-- px: float (nullable = true)
 |-- py: float (nullable = true)
 |-- pz: float (nullable = true)
 |-- vx: float (nullable = true)
 |-- vy: float (nullable = true)
 |-- vz: float (nullable = true)
 |-- mass: float (nullable = true)



In [22]:
#outdir = 'hdfs://sohnic:54310/user/lshin/'
outdir = 'hdfs://sohnic:54310/data/TNG300/snap99/'

In [27]:
outname = outdir+h5list[0].replace("csv","parquet.snappy")
print(outname)

hdfs://sohnic:54310/data/TNG300/snap99/snap099_sorted_x0_y0_z0.parquet.snappy


In [28]:
%%time
sparkdf.write.option("compression", "snappy") \
    .mode("overwrite") \
    .save(outname)

                                                                                

CPU times: user 13.7 ms, sys: 6.92 ms, total: 20.6 ms
Wall time: 2.76 s


# Check up the parquet

In [45]:
%%time
# Read all parquets in the directory
newsparkdf = spark.read.option("header","true").option("recursiveFileLookup","true").parquet(outname)

CPU times: user 4.33 ms, sys: 8 µs, total: 4.34 ms
Wall time: 124 ms


In [46]:
newsparkdf.printSchema()

root
 |-- px: float (nullable = true)
 |-- py: float (nullable = true)
 |-- pz: float (nullable = true)
 |-- vx: float (nullable = true)
 |-- vy: float (nullable = true)
 |-- vz: float (nullable = true)
 |-- mass: float (nullable = true)



In [47]:
%%time
print(newsparkdf.count())

1206812
CPU times: user 1.99 ms, sys: 0 ns, total: 1.99 ms
Wall time: 254 ms


In [48]:
%%time
newsparkdf.limit(2).toPandas().transpose()

CPU times: user 8.11 ms, sys: 3.91 ms, total: 12 ms
Wall time: 353 ms


Unnamed: 0,0,1
px,204866.796875,204869.765625
py,204772.015625,204746.625
pz,197395.640625,197410.6875
vx,9.04139,254.601059
vy,-134.318375,-92.379158
vz,-66.458954,-7.044174
mass,0.000831,0.000476


In [49]:
newsparkdf.select(['px','py','pz']).show(3)

+---------+---------+---------+
|       px|       py|       pz|
+---------+---------+---------+
| 204866.8|204772.02|197395.64|
|204869.77|204746.62|197410.69|
|204835.03|204758.92|197448.14|
+---------+---------+---------+
only showing top 3 rows



In [38]:
%%time
newsparkdf.select(['px','py','pz']).describe().toPandas().set_index('summary').transpose()

CPU times: user 14 ms, sys: 0 ns, total: 14 ms
Wall time: 700 ms


summary,count,mean,stddev,min,max
px,829890,7333.757431926775,6489.1686268262965,0.35823357,20499.875
py,829890,7210.045494197404,4223.622205323317,0.09408737,20467.947
pz,829890,24904.02383216116,9929.99639630118,5.216663,34166.35


# repartition

In [39]:
%%time
newsparkdf.cache()
newsparkdf.repartition(10,"px").count()

CPU times: user 4.39 ms, sys: 0 ns, total: 4.39 ms
Wall time: 756 ms


829890

# Converting all files

In [27]:
outdir+h5list[0].split('_x0')[0]+ ".parquet.snappy"

'hdfs://sohnic:54310/data/TNG300/snap99/snap099_sorted.parquet.snappy'

In [45]:
%%time
sparkdf = spark.createDataFrame([], schema)

for i in tqdm(range(len(h5list))):
    df = pd.read_csv(h5dir + h5list[i])
    tempdf = spark.createDataFrame(df[['px', 'py', 'pz', 'vx', 'vy', 'vz', 'mass']])
    sparkdf = sparkdf.union(tempdf)  # Append to the existing DataFrame
    #print(i)

# Save to Parquet
outname = outdir+'parquet/'+h5list[0].split('_x0')[0]+ ".parquet.snappy"
print(outname)
sparkdf.write.option("compression", "snappy").mode("overwrite").save(outname)

  0%|          | 0/600 [00:00<?, ?it/s]

  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_categorical_dtype(s.dtype):
  elif is_ca

hdfs://sohnic:54310/data/TNG300/snap99/parquet/snap099_sorted.parquet.snappy


24/08/24 20:18:13 WARN DAGScheduler: Broadcasting large task binary with size 1552.0 KiB
                                                                                

CPU times: user 10min 10s, sys: 53.2 s, total: 11min 3s
Wall time: 2h 9min 5s


In [51]:
%%time
#check
newsparkdf = spark.read.option("header","true").option("recursiveFileLookup","true").parquet(outname)

24/08/24 22:35:17 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.


CPU times: user 6.95 ms, sys: 267 µs, total: 7.21 ms
Wall time: 6.3 s


In [53]:
%%time
print(newsparkdf.count())



711967480
CPU times: user 28 ms, sys: 4 ms, total: 32 ms
Wall time: 10.1 s


                                                                                