# Testing Converted HDFS parquet

- DM particles?!? for sanity check?!?

In [1]:
import numpy as np
import pandas as pd
import glob
import sys
import h5py
#from netCDF4 import Dataset
from datetime import datetime
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from scipy.spatial import cKDTree

import pyarrow as pa
import pyarrow.parquet as pq

from functools import reduce
import operator
import gc

In [2]:
# plot settings
plt.rc('font', family='serif') 
plt.rcParams.update({'font.size': 16})
plt.rcParams['mathtext.fontset'] = 'stix'

### Define SparkSession
> 200 vCPU + 1.4 TB Memory

In [3]:
# PySpark packages
from pyspark import SparkContext   
from pyspark.sql import SparkSession

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark import Row
from pyspark.sql.window import Window as W


spark = SparkSession.builder \
    .master("yarn") \
    .appName("spark-shell") \
    .config("spark.driver.maxResultSize", "32g") \
    .config("spark.driver.memory", "32g") \
    .config("spark.executor.memory", "7g") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.instances", "200") \
    .getOrCreate()


sc = spark.sparkContext
sc.setCheckpointDir("hdfs://spark00:54310/tmp/checkpoints")

spark.conf.set("spark.sql.debug.maxToStringFields", 500)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [4]:
sc.getConf().getAll()[:10]

[('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES',
  'http://spark14:8088/proxy/application_1750653327224_0691'),
 ('spark.driver.memory', '32g'),
 ('spark.driver.appUIAddress', 'http://spark00:4040'),
 ('spark.driver.maxResultSize', '32g'),
 ('spark.app.id', 'application_1750653327224_0691'),
 ('spark.ui.proxyBase', '/proxy/application_1750653327224_0691'),
 ('spark.master', 'yarn'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.serializer.objectStreamReset', '100')]

### Sanity Check for converted snapshot files 

In [5]:
alldmfiles = !hdfs dfs -du -s -h /common/data/illustris/tng50/snapshot84/dm/*

In [6]:
alldmfiles[:5]

['857.4 M  2.5 G  /common/data/illustris/tng50/snapshot84/dm/snap_084.0_dm.parquet.snappy',
 '837.9 M  2.5 G  /common/data/illustris/tng50/snapshot84/dm/snap_084.100_dm.parquet.snappy',
 '820.4 M  2.4 G  /common/data/illustris/tng50/snapshot84/dm/snap_084.101_dm.parquet.snappy',
 '862.6 M  2.5 G  /common/data/illustris/tng50/snapshot84/dm/snap_084.102_dm.parquet.snappy',
 '852.7 M  2.5 G  /common/data/illustris/tng50/snapshot84/dm/snap_084.103_dm.parquet.snappy']

In [7]:
len(alldmfiles)

680

> Now, it seems ok.

### Read Snapshot Parquets 

In [8]:
dmpath = 'hdfs://spark00:54310/common/data/illustris/tng50/snapshot84/dm/'

In [9]:
%%time
# Read all parquets in the directory
dmdf = \
    spark.read.option("header","true") \
    .option("recursiveFileLookup","true").parquet(dmpath)

CPU times: user 2.64 ms, sys: 0 ns, total: 2.64 ms
Wall time: 5.5 s


In [10]:
dmdf.printSchema()

root
 |-- Coordinates: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- ParticleIDs: long (nullable = true)
 |-- Potential: float (nullable = true)
 |-- SubfindDMDensity: float (nullable = true)
 |-- SubfindDensity: float (nullable = true)
 |-- SubfindHsml: float (nullable = true)
 |-- SubfindVelDisp: float (nullable = true)
 |-- Velocities: array (nullable = true)
 |    |-- element: float (containsNull = true)



In [11]:
%%time
rows = dmdf.take(5)  # Returns list of Row objects
rows_dict = [row.asDict() for row in rows]
df = pd.DataFrame(rows_dict)

# Transpose and label
df_transposed = df.transpose()
df_transposed.columns = [f"Row_{i}" for i in range(len(rows))]
print(df_transposed)

                                                              Row_0  \
Coordinates       [23536.061986956738, 13258.062109017172, 14277...   
ParticleIDs                                              6587293544   
Potential                                            -116118.578125   
SubfindDMDensity                                           0.000038   
SubfindDensity                                             0.000038   
SubfindHsml                                                2.408194   
SubfindVelDisp                                           272.708282   
Velocities        [366.7105407714844, -17.06654930114746, 377.21...   

                                                              Row_1  \
Coordinates       [23527.317624073607, 13258.758814691157, 14276...   
ParticleIDs                                              7027990360   
Potential                                              -125185.4375   
SubfindDMDensity                                           0.000087   
Subfi

In [12]:
%%time
dmdf.count()

CPU times: user 2.45 ms, sys: 3.19 ms, total: 5.64 ms
Wall time: 40.2 s


10077696000

- 2160**3 = 10077696000 
- Okey.. it looks correct

#### Optionally, the number of gas particles is .. 

In [13]:
gaspath = 'hdfs://spark00:54310/common/data/illustris/tng50/snapshot84/gas/'

In [14]:
%%time
# Read all parquets in the directory
gasdf = \
    spark.read.option("header","true") \
    .option("recursiveFileLookup","true").parquet(gaspath)

CPU times: user 2.23 ms, sys: 0 ns, total: 2.23 ms
Wall time: 3.47 s


In [15]:
gasdf.printSchema()

root
 |-- CenterOfMass: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- Coordinates: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- Density: float (nullable = true)
 |-- ElectronAbundance: float (nullable = true)
 |-- EnergyDissipation: float (nullable = true)
 |-- GFM_AGNRadiation: float (nullable = true)
 |-- GFM_CoolingRate: float (nullable = true)
 |-- GFM_Metallicity: float (nullable = true)
 |-- GFM_Metals: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- GFM_MetalsTagged: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- GFM_WindDMVelDisp: float (nullable = true)
 |-- GFM_WindHostHaloMass: float (nullable = true)
 |-- InternalEnergy: float (nullable = true)
 |-- InternalEnergyOld: float (nullable = true)
 |-- Machnumber: float (nullable = true)
 |-- MagneticField: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- MagneticFieldDivergence: float 

In [16]:
%%time
rows = gasdf.take(5)  # Returns list of Row objects
rows_dict = [row.asDict() for row in rows]
df = pd.DataFrame(rows_dict)

# Transpose and label
df_transposed = df.transpose()
df_transposed.columns = [f"Row_{i}" for i in range(len(rows))]
print(df_transposed)

                                                                      Row_0  \
CenterOfMass              [7509.58251953125, 9916.2880859375, 4669.45507...   
Coordinates               [7509.683889474307, 9916.351981456668, 4669.41...   
Density                                                            0.000002   
ElectronAbundance                                                  1.175983   
EnergyDissipation                                                       0.0   
GFM_AGNRadiation                                                        0.0   
GFM_CoolingRate                                                        -0.0   
GFM_Metallicity                                                    0.015087   
GFM_Metals                [0.7285040616989136, 0.2564089000225067, 0.001...   
GFM_MetalsTagged          [0.0013146514538675547, 0.00808743666857481, 0...   
GFM_WindDMVelDisp                                                256.340454   
GFM_WindHostHaloMass                                

In [17]:
%%time
gasdf.count()

CPU times: user 8.95 ms, sys: 6.3 ms, total: 15.2 ms
Wall time: 2min 7s


8807829610