In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np

## User Defined Function to find median

In [2]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf, collect_list

# pyspark user defined function to calculate median
median_agg = udf(lambda s: float(np.nanmedian(s)), FloatType())

# helper function that look as aggregator 
def median(column):
    return median_agg(collect_list(column))

## Creating pandas DataFrame

In [3]:
titanic_df = pd.read_csv("../Titanic-Disaster/data/train.csv", index_col='PassengerId')
titanic_df = titanic_df[['Survived', 'Age', 'Fare']]
titanic_df.head()

Unnamed: 0_level_0,Survived,Age,Fare
PassengerId,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
1,0,22.0,7.25
2,1,38.0,71.2833
3,1,26.0,7.925
4,1,35.0,53.1
5,0,35.0,8.05


In [4]:
titanic_df.Fare.median()

14.4542

In [5]:
titanic_df.Age.median()

28.0

## Creating pyspark DataFrame

In [6]:
spark = SparkSession.builder.getOrCreate()

In [7]:
titanic_spark_dt = spark.createDataFrame(titanic_df)
titanic_spark_dt.createOrReplaceTempView("titanic")
titanic_spark_dt = spark.table("titanic")
spark.catalog.listTables()

[Table(name='titanic', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [8]:
titanic_spark_dt.groupBy().agg(median('Fare')).show()

+----------------------------------+
|<lambda>(collect_list(Fare, 0, 0))|
+----------------------------------+
|                           14.4542|
+----------------------------------+



In [9]:
titanic_spark_dt.groupBy().agg(median('Age')).show()

+---------------------------------+
|<lambda>(collect_list(Age, 0, 0))|
+---------------------------------+
|                             28.0|
+---------------------------------+

