# Compute Cl on galaxy overdensities on DC2-Data

- author : Sylvie Dagoret-Campagne
- affliliation : IJCLab/in2p3/CNRS
- creation date : July 28th 2020


- Run on Spark cluster


In [1]:
%matplotlib inline

import os

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

In [2]:
import pyarrow

In [3]:
pyarrow.__version__

'0.14.1'

In [4]:
from pyspark.sql.functions import col, pandas_udf 
from pyspark.sql.types import LongType

from pyspark.sql.types import IntegerType,FloatType
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [5]:
# here is how we create a function ('Ang2Pix') that can be called by dataframes
# it takes as input the "ra" and "dec" values (which are not very different from theta/phi)
# and returns the pixel number (but as pandas series for efficiency)
import numpy as np
import pandas as pd
import healpy as hp

nside=512

In [6]:
npix = hp.nside2npix(nside)
lmax = 3 * nside

In [7]:
def Ang2Pix_func(ra: pd.Series, dec: pd.Series) -> pd.Series:
    return pd.Series(hp.ang2pix(nside,np.radians(90-dec),np.radians(ra)))

In [8]:
pd_ang2pix = pandas_udf(Ang2Pix_func, returnType=IntegerType())
#gal = gal.withColumn("ihealpix",pd_ang2pix(gal["RA"],gal["DEC"]))

## Get parquet files


In [9]:
! hadoop dfs -ls /lsst/DC2

DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

Found 18 items
drwxr-xr-x   - stephane.plaszczynski lsst           0 2020-06-12 10:23 /lsst/DC2/DR6axCdc2.parquet
drwxr-xr-x   - stephane.plaszczynski lsst           0 2020-04-23 17:15 /lsst/DC2/cosmoDC2
-rw-r--r--   3 stephane.plaszczynski lsst 83486737444 2020-05-28 19:33 /lsst/DC2/dc2_object_run2.2i_dr3.parquet
-rw-r--r--   3 stephane.plaszczynski lsst 44384598496 2020-06-05 13:15 /lsst/DC2/dc2_object_run2.2i_dr6.parquet
drwxr-xr-x   - stephane.plaszczynski lsst           0 2020-06-26 10:36 /lsst/DC2/dc2_object_run2.2i_dr6b
drwxr-xr-x   - stephane.plaszczynski lsst           0 2020-07-28 18:46 /lsst/DC2/dc2_object_run2.2i_dr6c
drwxr-xr-x   - stephane.plaszczynski lsst           0 2019-10-21 15:54 /lsst/DC2/df1.parquet
drwxr-xr-x   - stephane.plaszczynski lsst           0 2019-11-08 16:58 /lsst/DC2/df2.parquet
drwxr-xr-x   - stephane.plaszczynski lsst           0 2019-12-06 18:

### Read files with spark

In [10]:
datafile="/lsst/DC2/dc2_object_run2.2i_dr3.parquet"

In [None]:
from pyspark.sql import SparkSession

# Initialise our Spark session
spark = SparkSession.builder.getOrCreate()

# Read the data as DataFrame
df = spark.read.format("parquet").load(datafile)

In [None]:
df = df.repartition(df.rdd.getNumPartitions())

### DC2 Object catalog Schema

In [None]:
# Check what we have in the file
df.printSchema()

In [None]:
df_gal=df.filter('extendedness == true')

In [None]:
# Show all available tracts
df_gal.select('tract').distinct().show()

In [None]:
df_stat=df_gal.groupBy("tract").count()

In [None]:
df_stat.show()

In [None]:
dp_stat=df_stat.toPandas()
dp_stat.info()

In [None]:
indexes=np.arange(len(dp_stat))

In [None]:
width=0.8
fig = plt.figure(figsize=(20,6))
ax = fig.add_subplot(111)
ax.bar(indexes,dp_stat["count"].values,width,color='red')
ax.set_xticks(indexes)
ax.set_title("Number of count of extended obj per tract")
xtickNames = ax.set_xticklabels(dp_stat["tract"].values)
plt.setp(xtickNames, rotation=90, fontsize=10)
plt.yscale('log')

The DM stack includes functionality to get the tract and patch number corresponding to a certain position `(RA,DEC)`. However, it is out of the scope of this tutorial.

Apache Spark provides `filter` mechanisms, which you can use to speed up data access if you only need a certain chunks of the dataset.
For the object catalog, the chunks are broken into `tract` and `patch`, and hence those are the `filters` you can use:

In [None]:
idx=0
for the_tract in dp_stat["tract"].values:
    selection_name = "tract == {}".format(the_tract)

    # Retrieve the ra,dec coordinates of all sources within tract number 4430
    data = df_gal.select('ra', 'dec').where(selection_name).collect()
    #data = df.select('ra', 'dec').collect()
    
    print("======= tract {} ================".format(the_tract))
    #data.describe().show()

    if dp_stat["count"].values[idx]>1e4:
    
        # `collect` returns list of list[ra, dec], so for 
        # plotting purpose we tranpose the output:
        ra, dec = np.transpose(data)

        # Plot a 2d histogram of sources
        plt.figure(figsize=(10,7))
        plt.hist2d(ra, dec, 100)
        plt.gca().set_aspect('equal')
        plt.colorbar(label='Number of objects')
        plt.xlabel('RA [deg]')
        plt.ylabel('dec [deg]');
        plt.title("tract = {}".format(the_tract))
        plt.show()
        
    idx+=1
    if idx>2:
        break

In [None]:
pd_ang2pix = pandas_udf(Ang2Pix_func, returnType=IntegerType())
df_gal_healpix = df_gal.withColumn("ihealpix",pd_ang2pix(df_gal["ra"],df_gal["dec"]))

In [None]:
df_gal_healpix.printSchema()

In [None]:
df_gal_healpix_small=df_gal_healpix.select('ra','dec','ihealpix')

In [None]:
df_gal_healpix_small.describe().show()

In [None]:
m=df_gal_healpix_small.groupBy('ihealpix').count()
m.show(5)

In [None]:
#m.orderBy('ihealpix', ascending=True).show(10)

In [None]:
# get to python world (to Pandas) : 
# note that here is the action (lazy evaluation so far)
p=m.toPandas()
p.info()

In [None]:
p

In [None]:
# the following doesn't has anything to do with spark (only Healpix)
hpMap = np.zeros(hp.nside2npix(nside))
#fill the map from the pandas object
hpMap[p['ihealpix'].values]=p['count'].values
#plot using standard healpy function
hp.mollview(hpMap,cmap="jet")
hp.graticule(color='white')

In [None]:

plt.hist(hpMap,bins=int(100))
plt.yscale('log')
plt.show()


In [None]:
cl = hp.anafast(hpMap)
ell = np.arange(len(cl))

In [None]:
plt.plot(ell,cl,"ro")
plt.yscale('log')
plt.xlabel("$\ell$")
plt.ylabel("$C_\ell$")
plt.title("{} : $C_\ell$ by healpix/anafast".format("DC2Run2.2i"))

In [None]:
lmax 

In [None]:
hpMap.shape