# Introduction

## Requirements

* pyspark 2.3
* pandas
* numpy
* jupyter

## Hadoop

<img src="images/hadoop.jpg">

* Open source distributed processing and storage platform
* Layered architecture
* Fault tolerant

## HDFS

<img src="images/hdfs.jpg">

* Replication
* Tiered storage
* Rack awareness

## YARN

<img src="images/yarn.png">

* Queues, priority, fair sharing
* Accounting

## Spark

<img src="images/spark.png">

* RDD
* DataFrame

# Let's create a mock!

In [205]:
%matplotlib notebook

import numpy as np
import os
import pandas as pd
import sys

In [206]:
from pyspark.sql import SparkSession, Row

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [207]:
sc

## Halo catalog

### From CosmoHub

In [208]:
# Original query
halo_sql = """
    SELECT id AS halo_id, num_p AS halo_num_p,
        x AS halo_x, y AS halo_y, z AS halo_z,
        vx AS halo_vx, vy AS halo_vy, vz AS halo_vz
    FROM cosmohub.my_halos
    WHERE pid=-1 AND octant=1
        AND PMOD((180.0/PI() * ATAN2(y,x)) + 360., 360.) BETWEEN 0 AND 3
        AND 90 - (180.0/PI() * ATAN2(SQRT(x*x+y*y),z)) BETWEEN 0 AND 3
"""

In [4]:
# 3x3 patch
halo_sql = """
    SELECT id AS halo_id, num_p AS halo_num_p,
         x AS halo_x,   y AS halo_y,   z AS halo_z,
        vx AS halo_vx, vy AS halo_vy, vz AS halo_vz,
        SQRT(x*x + y*y + z*z) AS halo_r
    FROM handson.halos_3x3
"""

In [None]:
df = spark.sql(halo_sql).limit(5).cache()

### From local file

In [209]:
df = spark.read.csv(
    'file:////home/tallada/Projectes/git/lsst_hadoop_handson/halo_sample.csv.bz2',
    comment='#',
    header=True,
    inferSchema=True,
).limit(5).cache()

### Show sample

In [210]:
df

DataFrame[halo_id: bigint, halo_num_p: int, halo_x: double, halo_y: double, halo_z: double, halo_vx: double, halo_vy: double, halo_vz: double]

In [211]:
df.show()

+-------+----------+-------+-------+-------+--------+--------+--------+
|halo_id|halo_num_p| halo_x| halo_y| halo_z| halo_vx| halo_vy| halo_vz|
+-------+----------+-------+-------+-------+--------+--------+--------+
|3839342|        44|447.352|2.12842|0.88135| 1324.26|-2761.71|-2170.23|
|3844782|       433|571.893|9.55396|9.76489| -667.56|  756.82| -605.93|
|3848430|        23|317.961|3.21973|5.12695|-1024.17|   248.3| -160.01|
|3856622|        56|444.658|2.62061|0.99072| 2246.39|-1378.16|-1777.96|
|3862350|      1554|509.772| 2.0625|6.91553| -744.19|  -458.2|   38.89|
+-------+----------+-------+-------+-------+--------+--------+--------+



## Halo mass

In [212]:
from pyspark.sql import functions as f

In [213]:
particle_mass = 2.398e9

def halo_lm(num_p):
    return f.log10(particle_mass * num_p)

In [214]:
df = df.withColumn('halo_lm', halo_lm(df['halo_num_p']))

In [215]:
df

DataFrame[halo_id: bigint, halo_num_p: int, halo_x: double, halo_y: double, halo_z: double, halo_vx: double, halo_vy: double, halo_vz: double, halo_lm: double]

In [216]:
df.show()

+-------+----------+-------+-------+-------+--------+--------+--------+------------------+
|halo_id|halo_num_p| halo_x| halo_y| halo_z| halo_vx| halo_vy| halo_vz|           halo_lm|
+-------+----------+-------+-------+-------+--------+--------+--------+------------------+
|3839342|        44|447.352|2.12842|0.88135| 1324.26|-2761.71|-2170.23|11.023301855249017|
|3844782|       433|571.893|9.55396|9.76489| -667.56|  756.82| -605.93|12.016337075116196|
|3848430|        23|317.961|3.21973|5.12695|-1024.17|   248.3| -160.01|10.741577014780423|
|3856622|        56|444.658|2.62061|0.99072| 2246.39|-1378.16|-1777.96|11.128037205769031|
|3862350|      1554|509.772| 2.0625|6.91553| -744.19|  -458.2|   38.89|12.571300193227726|
+-------+----------+-------+-------+-------+--------+--------+--------+------------------+



## Halo R

In [217]:
def halo_r(x, y, z):
    return f.sqrt(x*x + y*y + z*z)

In [218]:
df = df.withColumn('halo_r', halo_r(df['halo_x'], df['halo_y'], df['halo_z']))

In [219]:
df.show()

+-------+----------+-------+-------+-------+--------+--------+--------+------------------+------------------+
|halo_id|halo_num_p| halo_x| halo_y| halo_z| halo_vx| halo_vy| halo_vz|           halo_lm|            halo_r|
+-------+----------+-------+-------+-------+--------+--------+--------+------------------+------------------+
|3839342|        44|447.352|2.12842|0.88135| 1324.26|-2761.71|-2170.23|11.023301855249017| 447.3579314749196|
|3844782|       433|571.893|9.55396|9.76489| -667.56|  756.82| -605.93|12.016337075116196| 572.0561464379119|
|3848430|        23|317.961|3.21973|5.12695|-1024.17|   248.3| -160.01|10.741577014780423| 318.0186312129769|
|3856622|        56|444.658|2.62061|0.99072| 2246.39|-1378.16|-1777.96|11.128037205769031|444.66682593475593|
|3862350|      1554|509.772| 2.0625|6.91553| -744.19|  -458.2|   38.89|12.571300193227726|509.82307759205145|
+-------+----------+-------+-------+-------+--------+--------+--------+------------------+------------------+



## Halo True Redshift

In [220]:
import astropy.units as u

from astropy.cosmology import LambdaCDM, z_at_value

from pyspark.sql import types as t
from pyspark.sql.functions import pandas_udf, PandasUDFType

In [221]:
cosmology = LambdaCDM(H0=100, Om0=0.319, Ode0=1-0.319, Ob0=0.049)

In [222]:
def halo_true_redshift(halo_r):
    return z_at_value(
        cosmology.comoving_distance,
        halo_r * u.Mpc
    )

In [223]:
halo_true_redshift(400)

0.13808325920114975

In [224]:
halo_true_redshift(
    np.array([400, 500, 600])
)

ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()

In [225]:
halo_z_min = 0
halo_z_max = 2.5
halo_z_n_bins = 1000

halo_z_bins = np.linspace(halo_z_min, halo_z_max, halo_z_n_bins)
halo_r_bins = cosmology.comoving_distance(halo_z_bins).value

def r_to_z(halo_r):
    return np.interp(halo_r, halo_r_bins, halo_z_bins, halo_z_min, halo_z_max)

@pandas_udf('double', PandasUDFType.SCALAR)
def halo_true_redshift(halo_r):
    idx = halo_r.index
    values = r_to_z(halo_r)
    return pd.Series(values, index=idx)

In [226]:
r_to_z(400)

0.13808350695226523

In [227]:
r_to_z(np.array([400, 500, 600]))

array([0.13808351, 0.17422571, 0.21110203])

In [228]:
df = df.withColumn('halo_true_redshift', halo_true_redshift(df['halo_r']))

In [229]:
df

DataFrame[halo_id: bigint, halo_num_p: int, halo_x: double, halo_y: double, halo_z: double, halo_vx: double, halo_vy: double, halo_vz: double, halo_lm: double, halo_r: double, halo_true_redshift: double]

In [230]:
df.show()

+-------+----------+-------+-------+-------+--------+--------+--------+------------------+------------------+-------------------+
|halo_id|halo_num_p| halo_x| halo_y| halo_z| halo_vx| halo_vy| halo_vz|           halo_lm|            halo_r| halo_true_redshift|
+-------+----------+-------+-------+-------+--------+--------+--------+------------------+------------------+-------------------+
|3839342|        44|447.352|2.12842|0.88135| 1324.26|-2761.71|-2170.23|11.023301855249017| 447.3579314749196|0.15511076199346596|
|3844782|       433|571.893|9.55396|9.76489| -667.56|  756.82| -605.93|12.016337075116196| 572.0561464379119|0.20072066339424757|
|3848430|        23|317.961|3.21973|5.12695|-1024.17|   248.3| -160.01|10.741577014780423| 318.0186312129769| 0.1089703868010368|
|3856622|        56|444.658|2.62061|0.99072| 2246.39|-1378.16|-1777.96|11.128037205769031|444.66682593475593| 0.1541393350525811|
|3862350|      1554|509.772| 2.0625|6.91553| -744.19|  -458.2|   38.89|12.571300193227726|