<a href="https://colab.research.google.com/github/kajalpani/PySpark_work/blob/main/spark_dataframe_titanic.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [8]:
titanic_df = spark.read.csv('titanic_train.csv', header = True, inferSchema=True)
titanic_df

passenger_id,pclass,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked,boat,body,home.dest,survived
1216,3,"Smyth, Miss. Julia",female,,0,0,335432,7.7333,,Q,13,,,1
699,3,"Cacic, Mr. Luka",male,38.0,0,0,315089,8.6625,,S,,,Croatia,0
1267,3,"Van Impe, Mrs. Je...",female,30.0,1,1,345773,24.15,,S,,,,0
449,2,"Hocking, Mrs. Eli...",female,54.0,1,3,29105,23.0,,S,4,,"Cornwall / Akron, OH",1
576,2,"Veal, Mr. James",male,40.0,0,0,28221,13.0,,S,,,"Barre, Co Washing...",0
1083,3,"Olsen, Mr. Henry ...",male,28.0,0,0,C 4001,22.525,,S,,173.0,,0
898,3,"Johnson, Mr. Will...",male,19.0,0,0,LINE,0.0,,S,,,,0
560,2,"Sinkkonen, Miss. ...",female,30.0,0,0,250648,13.0,,S,10,,Finland / Washing...,1
1079,3,"Ohman, Miss. Velin",female,22.0,0,0,347085,7.775,,S,C,,,1
908,3,"Jussila, Miss. Ma...",female,21.0,1,0,4137,9.825,,S,,,,0


In [9]:
titanic_df.printSchema()

root
 |-- passenger_id: integer (nullable = true)
 |-- pclass: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- age: double (nullable = true)
 |-- sibsp: integer (nullable = true)
 |-- parch: integer (nullable = true)
 |-- ticket: string (nullable = true)
 |-- fare: double (nullable = true)
 |-- cabin: string (nullable = true)
 |-- embarked: string (nullable = true)
 |-- boat: string (nullable = true)
 |-- body: double (nullable = true)
 |-- home.dest: string (nullable = true)
 |-- survived: integer (nullable = true)



In [10]:
titanic_df.limit(5)

passenger_id,pclass,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked,boat,body,home.dest,survived
1216,3,"Smyth, Miss. Julia",female,,0,0,335432,7.7333,,Q,13.0,,,1
699,3,"Cacic, Mr. Luka",male,38.0,0,0,315089,8.6625,,S,,,Croatia,0
1267,3,"Van Impe, Mrs. Je...",female,30.0,1,1,345773,24.15,,S,,,,0
449,2,"Hocking, Mrs. Eli...",female,54.0,1,3,29105,23.0,,S,4.0,,"Cornwall / Akron, OH",1
576,2,"Veal, Mr. James",male,40.0,0,0,28221,13.0,,S,,,"Barre, Co Washing...",0


In [12]:
titanic_df.select("passenger_id", "survived").limit(5)

passenger_id,survived
1216,1
699,0
1267,0
449,1
576,0


In [14]:
titanic_df.where((titanic_df.age >= 25) & (titanic_df.survived == 1)).limit(5)

passenger_id,pclass,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked,boat,body,home.dest,survived
449,2,"Hocking, Mrs. Eli...",female,54.0,1,3,29105,23.0,,S,4,,"Cornwall / Akron, OH",1
560,2,"Sinkkonen, Miss. ...",female,30.0,0,0,250648,13.0,,S,10,,Finland / Washing...,1
43,1,"Bucknell, Mrs. Wi...",female,60.0,0,0,11813,76.2917,D15,C,8,,"Philadelphia, PA",1
233,1,"Potter, Mrs. Thom...",female,56.0,0,1,11767,83.1583,C50,C,7,,"Mt Airy, Philadel...",1
1088,3,"Olsson, Mr. Oscar...",male,32.0,0,0,347079,7.775,,S,A,,,1


In [15]:
titanic_df.select("name","sex").where((titanic_df.age >= 25) & (titanic_df.survived == 1)).limit(5)

name,sex
"Hocking, Mrs. Eli...",female
"Sinkkonen, Miss. ...",female
"Bucknell, Mrs. Wi...",female
"Potter, Mrs. Thom...",female
"Olsson, Mr. Oscar...",male


In [16]:
titanic_df.agg({"Fare":"avg"})

avg(Fare)
34.012700942284994


In [21]:
titanic_df.groupBy("pclass").agg({"fare":"avg"}).orderBy("pclass", ascending=False)

pclass,avg(fare)
3,13.77492935010483
2,21.259914457831325
1,91.1504660194174


In [27]:
titanic_df.filter(titanic_df.age > 25).agg({"Fare":"avg"})

avg(Fare)
45.22928710526312


In [29]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def round_float_down(x):
  return int(x)

round_float_dwn_udf = udf(round_float_down, IntegerType())

titanic_df.select('name','fare', round_float_dwn_udf('fare').alias("Fare rounded down")).limit(5)

name,fare,Fare rounded down
"Smyth, Miss. Julia",7.7333,7
"Cacic, Mr. Luka",8.6625,8
"Van Impe, Mrs. Je...",24.15,24
"Hocking, Mrs. Eli...",23.0,23
"Veal, Mr. James",13.0,13


In [30]:
titanic_df.createOrReplaceTempView("Titanic")

In [31]:
spark.sql("select * from Titanic limit 5")

passenger_id,pclass,name,sex,age,sibsp,parch,ticket,fare,cabin,embarked,boat,body,home.dest,survived
1216,3,"Smyth, Miss. Julia",female,,0,0,335432,7.7333,,Q,13.0,,,1
699,3,"Cacic, Mr. Luka",male,38.0,0,0,315089,8.6625,,S,,,Croatia,0
1267,3,"Van Impe, Mrs. Je...",female,30.0,1,1,345773,24.15,,S,,,,0
449,2,"Hocking, Mrs. Eli...",female,54.0,1,3,29105,23.0,,S,4.0,,"Cornwall / Akron, OH",1
576,2,"Veal, Mr. James",male,40.0,0,0,28221,13.0,,S,,,"Barre, Co Washing...",0
