# Classification for Stars vs. Non-Stellar Objects 

#### In this notebook, we will explore the RefCat data for analyzing and extracting train data sets for **labels of stars**.

#### Some Issues:
- Imbalanced data due to their HealPix-indexed astropy table. We may need to save them again as a new single PySpark Dataframe. (this job will be done on this notebook)
- Huge Size of Data. Many in-memory executions also can show some troubles. Hence, only operations from disk to disk can work well. 

## Import Basic Packages 

In [1]:
import numpy as np
import pandas as pd
import glob
import sys
import h5py
#from netCDF4 import Dataset
from datetime import datetime
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
from scipy.spatial import cKDTree

import pyarrow as pa
import pyarrow.parquet as pq

from functools import reduce
import operator
import gc

# Increase display width to 200 characters
pd.set_option('display.max_rows', 300)
pd.set_option('display.max_colwidth', 200)

In [2]:
import os

from astropy.table import Table
from matplotlib.ticker import MultipleLocator

from astropy.utils.exceptions import AstropyWarning
import warnings
warnings.simplefilter('ignore', category=AstropyWarning)

In [3]:
# plot settings
#plt.rc('font', family='serif') 
#plt.rc('font', serif='Times New Roman') 
plt.rcParams.update({'font.size': 16})
plt.rcParams['mathtext.fontset'] = 'stix'

## PySpark Session

In [4]:
%%time
# PySpark packages
from pyspark import SparkContext   
from pyspark.sql import SparkSession

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark import Row
from pyspark.sql.window import Window as W


spark = SparkSession.builder \
    .master("yarn") \
    .appName("spark-shell") \
    .config("spark.driver.maxResultSize", "32g") \
    .config("spark.driver.memory", "32g") \
    .config("spark.executor.memory", "7g") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.instances", "200") \
    .config("spark.sql.hive.filesourcePartitionFileCacheSize", "1048576000") \
    .getOrCreate()


sc = spark.sparkContext
sc.setCheckpointDir("hdfs://spark00:54310/tmp/checkpoints")

spark.conf.set("spark.sql.debug.maxToStringFields", 500)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

CPU times: user 11.9 ms, sys: 7.94 ms, total: 19.8 ms
Wall time: 32.5 s


> This takes time to get resources from the Yarn Cluster

## Reading Reference Catalog Files

- Read converted spark dataframe in the **Parquet Format**

In [5]:
hdfsheader = 'hdfs://spark00:54310'
workpath = '/user/shong/work/sedfit/spherex/data/temp/'
datapath = '/user/shong/data/spherex/star-classification/input-ref-cat/'

#### Check the catalog files

In [6]:
outlist = !hadoop fs -ls {hdfsheader+datapath}

In [7]:
len(outlist)

12289

In [8]:
outlist[0]

'Found 12288 items'

In [9]:
outlist[12288]

'drwxr-xr-x   - shong supergroup          0 2024-05-11 14:45 hdfs://spark00:54310/user/shong/data/spherex/star-classification/input-ref-cat/Gaia_DR3.LS.PS1DR1.CatWISE.AllWISE.2MASS_NSIDE32_012287.parquet.snappy'

#### Read *only* one catalog file

> The Full catalog itself is too big to fit within in-memory calculations. This is quite rare but indeed it is. Hence, for checking out the schema and displaying some rows, we need this one-file sample to avoid loading all data in memory. 

In [10]:
onedf = spark.read.option("header","true") \
.parquet(hdfsheader+datapath+'Gaia_DR3.LS.PS1DR1.CatWISE.AllWISE.2MASS_NSIDE32_012287.parquet.snappy')

In [11]:
onedf.printSchema()

root
 |-- SPHERExRefID: long (nullable = true)
 |-- Gaia_DR3_source_id: long (nullable = true)
 |-- LegacySurvey_uid: long (nullable = true)
 |-- PS1_DR1_StackObject_objID: long (nullable = true)
 |-- CatWISE_source_id: string (nullable = true)
 |-- AllWISE_designation: string (nullable = true)
 |-- 2MASS_designation: string (nullable = true)
 |-- ra: double (nullable = true)
 |-- dec: double (nullable = true)
 |-- ra_error: double (nullable = true)
 |-- dec_error: double (nullable = true)
 |-- coord_src: long (nullable = true)
 |-- pmra: double (nullable = true)
 |-- pmra_error: double (nullable = true)
 |-- pmdec: double (nullable = true)
 |-- pmdec_error: double (nullable = true)
 |-- parallax: double (nullable = true)
 |-- parallax_error: double (nullable = true)
 |-- ref_epoch: double (nullable = true)
 |-- astrometric_params_solved: short (nullable = true)
 |-- CatWISE_PMRA: double (nullable = true)
 |-- CatWISE_PMDec: double (nullable = true)
 |-- CatWISE_sigPMRA: double (nullab

In [12]:
%%time
onedf.limit(4).toPandas().T

CPU times: user 669 µs, sys: 16.6 ms, total: 17.2 ms
Wall time: 3.61 s


Unnamed: 0,0,1,2,3
SPHERExRefID,1781226356739145733,1781226357108244482,1781226357225684993,1781226361000558592
Gaia_DR3_source_id,-9999,-9999,6917053110906356480,-9999
LegacySurvey_uid,1381033849129578,1381033849129483,1381033849129619,1381033849129863
PS1_DR1_StackObject_objID,105943148214193291,-9999,105933148223679716,-9999
CatWISE_source_id,,3146m016_b0-026737,3146m016_b0-007758,
AllWISE_designation,,J205916.46-014255.9,J205917.35-014302.2,
2MASS_designation,,,20591737-0143021,
ra,314.821382,314.818459,314.822332,314.829754
dec,-1.714273,-1.715548,-1.717279,-1.719934
ra_error,0.000002,0.000012,0.0,0.000007


In [13]:
len(onedf.columns)

101

> The total number of data columns is **101**. 

#### Now read all reference catalog files to a single dataframe

In [14]:
%%time
rawdf = spark.read.option("header","true").option("recursiveFileLookup","true").parquet(hdfsheader+datapath)

CPU times: user 1.29 ms, sys: 1.49 ms, total: 2.78 ms
Wall time: 8.1 s


> As explained above, we need to handle this full dataframe with care, though our Spark/Hadoop cluster is big and great enough. 

## Gaia SubSets and Other Catalogs' Photometries

#### Choose columns we need

In [15]:
listcol = rawdf.columns
print(len(listcol))

101


In [16]:
colbasic = ['SPHERExRefID', 'Gaia_DR3_source_id', 'ra', 'dec']

In [17]:
colphot = ['Gaia_G','Gaia_BP','Gaia_RP','LS_g','LS_r','LS_z',
           'PS1_g','PS1_r','PS1_i','PS1_z','PS1_y',
           '2MASS_J','2MASS_H','2MASS_Ks','WISE_W1','WISE_W2','WISE_W3','WISE_W4']

In [18]:
colphoterror = ['Gaia_G_error','Gaia_BP_error','Gaia_RP_error','LS_g_error','LS_r_error','LS_z_error',
           'PS1_g_error','PS1_r_error','PS1_i_error','PS1_z_error','PS1_y_error',
           '2MASS_J_error','2MASS_H_error','2MASS_Ks_error',
           'WISE_W1_error','WISE_W2_error','WISE_W3_error','WISE_W4_error']

In [19]:
collabel = ['class']

In [20]:
#%%time
#rawdf.select(colbasic+colphot+collabel).toPandas().set_index('summary').transpose()

#### What is the `class` column?

In [21]:
#%%time
#rawdf.select('class').distinct().collect()

> output: four classes <br> [Row(class=1), Row(class=4), Row(class=2), Row(class=0)]

### Now Gaia-Compatible SubSet from the RefCat

In [22]:
df = rawdf.select(colbasic+colphot+colphoterror+collabel).filter(F.col("Gaia_DR3_source_id") > 0)

In [23]:
df.printSchema()

root
 |-- SPHERExRefID: long (nullable = true)
 |-- Gaia_DR3_source_id: long (nullable = true)
 |-- ra: double (nullable = true)
 |-- dec: double (nullable = true)
 |-- Gaia_G: double (nullable = true)
 |-- Gaia_BP: double (nullable = true)
 |-- Gaia_RP: double (nullable = true)
 |-- LS_g: double (nullable = true)
 |-- LS_r: double (nullable = true)
 |-- LS_z: double (nullable = true)
 |-- PS1_g: double (nullable = true)
 |-- PS1_r: double (nullable = true)
 |-- PS1_i: double (nullable = true)
 |-- PS1_z: double (nullable = true)
 |-- PS1_y: double (nullable = true)
 |-- 2MASS_J: double (nullable = true)
 |-- 2MASS_H: double (nullable = true)
 |-- 2MASS_Ks: double (nullable = true)
 |-- WISE_W1: double (nullable = true)
 |-- WISE_W2: double (nullable = true)
 |-- WISE_W3: double (nullable = true)
 |-- WISE_W4: double (nullable = true)
 |-- Gaia_G_error: double (nullable = true)
 |-- Gaia_BP_error: double (nullable = true)
 |-- Gaia_RP_error: double (nullable = true)
 |-- LS_g_error: do

In [24]:
df = df.repartition(2000)

> our current parquet files were direct convertions from HealPix astro tables. this caused imbalanced funny partitions. we need to repartition and save them. 

In [25]:
df.cache()

DataFrame[SPHERExRefID: bigint, Gaia_DR3_source_id: bigint, ra: double, dec: double, Gaia_G: double, Gaia_BP: double, Gaia_RP: double, LS_g: double, LS_r: double, LS_z: double, PS1_g: double, PS1_r: double, PS1_i: double, PS1_z: double, PS1_y: double, 2MASS_J: double, 2MASS_H: double, 2MASS_Ks: double, WISE_W1: double, WISE_W2: double, WISE_W3: double, WISE_W4: double, Gaia_G_error: double, Gaia_BP_error: double, Gaia_RP_error: double, LS_g_error: double, LS_r_error: double, LS_z_error: double, PS1_g_error: double, PS1_r_error: double, PS1_i_error: double, PS1_z_error: double, PS1_y_error: double, 2MASS_J_error: double, 2MASS_H_error: double, 2MASS_Ks_error: double, WISE_W1_error: double, WISE_W2_error: double, WISE_W3_error: double, WISE_W4_error: double, class: smallint]

In [26]:
#df.show(3,truncate=True)

In [27]:
#%%time
#df.limit(3).toPandas().T

In [28]:
#%%time
#df.select(colbasic+colphot+collabel).toPandas().set_index('summary').transpose()

### Save the trimmed dataframe 

In [29]:
outdatapath = '/user/shong/data/spherex/star-classification/reduced-data/'

In [30]:
hdfsheader+outdatapath

'hdfs://spark00:54310/user/shong/data/spherex/star-classification/reduced-data/'

In [31]:
hdfsheader+outdatapath+'Gaia-Compatible-Subset.parquet.snappy'

'hdfs://spark00:54310/user/shong/data/spherex/star-classification/reduced-data/Gaia-Compatible-Subset.parquet.snappy'

In [33]:
%%time
df.write.option("compression", "snappy").mode("overwrite") \
    .save(hdfsheader+outdatapath+'Gaia-Compatible-Subset.parquet.snappy')    

CPU times: user 208 ms, sys: 135 ms, total: 343 ms
Wall time: 1h 19min 55s
