<a href="https://colab.research.google.com/github/mitchtheuvenet/pdp-assignment-3/blob/master/assignment3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Assignment 3

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-3.0.0-preview2/spark-3.0.0-preview2-bin-hadoop3.2.tgz
!tar -xvf spark-3.0.0-preview2-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark

In [59]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-preview2-bin-hadoop3.2"

In [60]:
import findspark

findspark.init()

In [61]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
  .master('local') \
  .appName('Assignment 3 Titanic') \
  .config('spark.executor.memory', '1gb') \
  .getOrCreate()

sc = spark.sparkContext

In [62]:
rdd = sc.textFile('titanic.csv')
rdd = rdd.map(lambda line: line.split(','))

In [None]:
from pyspark.sql import Row
from pyspark.sql.types import *

df = rdd.map(lambda line: Row(survived=line[0],
                              p_class=line[1],
                              name=line[2],
                              sex=line[3],
                              age=line[4],
                              siblings_spouses_aboard=line[5],
                              parents_children_aboard=line[6],
                              fare=line[7])).toDF()

df = df.withColumn('age', df['age'].cast(IntegerType()))
df = df.withColumn('fare', df['fare'].cast(FloatType()))
df = df.withColumn('survived', df['survived'].cast(BooleanType()))

df.show()

## 3a. Calculate the conditional probability that a person survives, given their sex and passenger class:

*P(S = true | G = female, C = 1)*

*P(S = true | G = female, C = 2)*

*P(S = true | G = female, C = 3)*

*P(S = true | G = male, C = 1)*

*P(S = true | G = male, C = 2)*

*P(S = true | G = male, C = 3)*

In [64]:
def calcSurvivalP(df, sex, p_class):
  passengers = df.rdd.filter(lambda line: line['sex'] == sex and line['p_class'] == p_class)
  passenger_count = (spark.createDataFrame(passengers)).count()

  survivors = passengers.filter(lambda line: line['survived'] == True)
  survivor_count = (spark.createDataFrame(survivors)).count()

  return str(survivor_count) + '/' + str(passenger_count)

In [None]:
print('P(S = true | G = female, C = 1) = ' + calcSurvivalP(df, 'female', '1'))
print('P(S = true | G = female, C = 2) = ' + calcSurvivalP(df, 'female', '2'))
print('P(S = true | G = female, C = 3) = ' + calcSurvivalP(df, 'female', '3'))
print('P(S = true | G = male, C = 1) = ' + calcSurvivalP(df, 'male', '1'))
print('P(S = true | G = male, C = 2) = ' + calcSurvivalP(df, 'male', '2'))
print('P(S = true | G = male, C = 3) = ' + calcSurvivalP(df, 'male', '3'))

## 3b. What is the probability that a child who is in third class and is 10 years old or younger survives? Since the number of data points that satisfy the condition is small, use the "bayesian" approach and represent your probability as a beta distribution. Calculate a belief distribution for:

*S = true | A <= 10, C = 3*

## You can express your answer as a parameterized distribution

In [68]:
def calcSurvivalPChild(df):
  passengers = df.rdd.filter(lambda line: line['age'] <= 10 and line['p_class'] == '3')
  passenger_count = (spark.createDataFrame(passengers)).count()

  survivors = passengers.filter(lambda line: line['survived'] == True)
  survivor_count = (spark.createDataFrame(survivors)).count()

  deceased_count = passenger_count - survivor_count

  return 'Beta(⍺ = ' + str(survivor_count) + ', β = ' + str(deceased_count) + ')'

In [None]:
print(calcSurvivalPChild(df))

## 3c. How much did people pay to be on the ship? Calculate the expectation of fare conditioned on class:

*E[X | C = 1]*

*E[X | C = 2]*

*E[X | C = 3]*

In [53]:
def calcFarePerClassE(df, p_class):
  df_filter = df.rdd.filter(lambda line: line['p_class'] == p_class)

  avg_fare = (spark.createDataFrame(df_filter)).groupBy('p_class').avg().collect()

  return str(avg_fare[0][2])

In [None]:
print('E[X | C = 1] = ' + calcFarePerClassE(df, '1'))
print('E[X | C = 2] = ' + calcFarePerClassE(df, '2'))
print('E[X | C = 3] = ' + calcFarePerClassE(df, '3'))

In [56]:
spark.stop()