# Apache Spark for astronomy: hands-on session 2

### Context

Welcome to the series of notebooks on Apache Spark! The main goal of this series is to get familiar with Apache Spark, and in particular its Python API called PySpark in a context of the astronomy. In this second notebook, we will learn on concrete examples how to interface and play with popular scientific libraries (Numpy, Pandas, ...).

### Learning objectives

- Interfacing popular Python scientific libraries with Apache Spark
- Developing your own modules for Spark

Through this series of exercises, we will use the same dataset as in the first session:

In [4]:
# Load data into a Spark DataFrame
df = spark.read.format("parquet").load("../../data/clusters.parquet")

## User defined functions and column creation

Similarly to `map` and `mapPartitions`, you would like to define your own functions but this time to create new DataFrame columns. In python, the efficient way of doing this is via "Pandas User Defined Functions" (vectorized functions). 

**Exercise (£):** Use pandas UDF to compute the distance of each row to the center, and store the result in a new Dataframe column:

In [5]:
from pyspark.sql.functions import pandas_udf, PandasUDFType, col
from pyspark.sql.types import DoubleType
import numpy as np
import pandas as pd

@pandas_udf(DoubleType(), PandasUDFType.SCALAR)
def compute_distance_to_center(x, y, z):
    """ Compute the distance to the center (0, 0, 0)
    
    Parameters
    ----------
    x, y, z: double
        row coordinates
        
    Returns
    ----------
    series: pandas Series
        Series containing distance to the center for each row
    """
    r_square = x*x + y*y + z*z
    return pd.Series(np.sqrt(r_square))

df.withColumn(
    "distance", 
    compute_distance_to_center(
        col("x"),
        col("y"),
        col("z")
    )
).show(5)

+-------------------+-------------------+------------------+---+------------------+
|                  x|                  y|                 z| id|          distance|
+-------------------+-------------------+------------------+---+------------------+
|-1.4076402686194887|  6.673344773733206| 8.208460943517498|  2|10.672104415540714|
| 0.6498424376672443|  3.744291410605022|1.0917784706793445|  0|3.9539845207540685|
| 1.3036201950328201|-2.0809475280266656| 4.704460741202294|  1|5.3067616389669645|
|-1.3741641126376476|  4.791424573067701| 2.562770404033503|  0| 5.604807631993114|
| 0.3454761504864363| -2.481008091382492|2.3088066072973583|  1|3.4066615432062313|
+-------------------+-------------------+------------------+---+------------------+
only showing top 5 rows



**Exercise (£££):** As in session 1, find the barycentre of each clusters in the dataset but this time using aggregation and user defined function (hint: look for `PandasUDFType.GROUPED_MAP`).

In [15]:
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def compute_barycentre(pdf: pd.DataFrame):
    """ Compute the barycentre of a partition
    
    Parameters
    ----------
    pdf : pandas DataFrame
        pandas DataFrame containing partition data
        
    Returns
    ----------
    Pandas DataFrame with barycentre coordinates.
    """
    # We can use Pandas method directly
    mean = pdf.mean()
    # This is just to reconstruct a Pandas DataFrame with the result
    # i: colname, j: mean value
    out = {i:[j] for i, j in zip(mean.keys(), mean.values)}
    return pd.DataFrame(out)

df.groupBy("id").apply(compute_barycentre).show()

+-------------------+-------------------+------------------+---+
|                  x|                  y|                 z| id|
+-------------------+-------------------+------------------+---+
| 0.9084311322436593|-1.5335608883132903| 2.926201255363395|  1|
|-1.2364938227997018| 7.7837163227456205| 9.292937669035544|  2|
|  1.001314312562809|  4.250879907797302|2.0216900721305446|  0|
+-------------------+-------------------+------------------+---+



In [7]:
import numpy as np

def yield_barycentre(part):
    """ Yield the number of rows in the partition
    
    Parameters
    ----------
    part : Iterator
        Iterator containing partition data
        
    Yield
    ----------
    length: integer
        number of rows inside the partition
    """
    try:
        partition_data = [*part]
        x, y, z, _ = np.transpose(partition_data)
        yield np.mean([x, y, z], axis=1)
    except ValueError as e:
        # Empty partition
        yield [None, None, None]

# Let's repartition our DataFrame according to "id"
df_repart = df.orderBy("id")

# mapPartitions is a RDD method(not available for DataFrame in pyspark)
print("Cluster coordinates:")
df_repart.rdd.mapPartitions(yield_barycentre).collect()

Cluster coordinates:


[array([1.00131431, 4.25087991, 2.02169007]),
 array([ 0.90843113, -1.53356089,  2.92620126]),
 array([-1.23649382,  7.78371632,  9.29293767]),
 [None, None, None]]