In [3]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [5]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [36]:
# Loading the Data using Spark #inferSchema: it loads features such as PassengerId as integer as they appear as an int instead of loading them as strings
data=spark.read.csv("drive/MyDrive/train.csv",header=True,inferSchema=True)

In [7]:
data

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen ...",male,22.0,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. ...",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. Willia...",male,35.0,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
7,0,1,"McCarthy, Mr. Tim...",male,54.0,0,0,17463,51.8625,E46,S
8,0,3,"Palsson, Master. ...",male,2.0,3,1,349909,21.075,,S
9,1,3,"Johnson, Mrs. Osc...",female,27.0,0,2,347742,11.1333,,S
10,1,2,"Nasser, Mrs. Nich...",female,14.0,1,0,237736,30.0708,,C


In [10]:
data.printSchema()#shows the columns, and their types

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [11]:
data.limit(10)#to show just 10 rows of data

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen ...",male,22.0,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. ...",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. Willia...",male,35.0,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
7,0,1,"McCarthy, Mr. Tim...",male,54.0,0,0,17463,51.8625,E46,S
8,0,3,"Palsson, Master. ...",male,2.0,3,1,349909,21.075,,S
9,1,3,"Johnson, Mrs. Osc...",female,27.0,0,2,347742,11.1333,,S
10,1,2,"Nasser, Mrs. Nich...",female,14.0,1,0,237736,30.0708,,C


In [12]:
data.select('*')#select statement from SQL

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen ...",male,22.0,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. ...",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. Willia...",male,35.0,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
7,0,1,"McCarthy, Mr. Tim...",male,54.0,0,0,17463,51.8625,E46,S
8,0,3,"Palsson, Master. ...",male,2.0,3,1,349909,21.075,,S
9,1,3,"Johnson, Mrs. Osc...",female,27.0,0,2,347742,11.1333,,S
10,1,2,"Nasser, Mrs. Nich...",female,14.0,1,0,237736,30.0708,,C


In [14]:
data.select('PassengerId').limit(5)#select statement from SQL

PassengerId
1
2
3
4
5


In [16]:
data.where(data.Age>25) #same as where in SQL

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. ...",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. Willia...",male,35.0,0,0,373450,8.05,,S
7,0,1,"McCarthy, Mr. Tim...",male,54.0,0,0,17463,51.8625,E46,S
9,1,3,"Johnson, Mrs. Osc...",female,27.0,0,2,347742,11.1333,,S
12,1,1,"Bonnell, Miss. El...",female,58.0,0,0,113783,26.55,C103,S
14,0,3,"Andersson, Mr. An...",male,39.0,1,5,347082,31.275,,S
16,1,2,"Hewlett, Mrs. (Ma...",female,55.0,0,0,248706,16.0,,S
19,0,3,"Vander Planke, Mr...",female,31.0,1,0,345763,18.0,,S


In [37]:
data.where((data.Age>25) & (data.Survived==1) )

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. ...",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
9,1,3,"Johnson, Mrs. Osc...",female,27.0,0,2,347742,11.1333,,S
12,1,1,"Bonnell, Miss. El...",female,58.0,0,0,113783,26.55,C103,S
16,1,2,"Hewlett, Mrs. (Ma...",female,55.0,0,0,248706,16.0,,S
22,1,2,"Beesley, Mr. Lawr...",male,34.0,0,0,248698,13.0,D56,S
24,1,1,"Sloper, Mr. Willi...",male,28.0,0,0,113788,35.5,A6,S
26,1,3,"Asplund, Mrs. Car...",female,38.0,1,5,347077,31.3875,,S
53,1,1,"Harper, Mrs. Henr...",female,49.0,1,0,PC 17572,76.7292,D33,C


In [21]:
#To get the average of Fare
data.agg({'Fare':'avg'})

avg(Fare)
32.2042079685746


In [24]:
data.groupBy('Pclass').agg({"Fare":"avg"})# To get the average for Each Class

Pclass,avg(Fare)
1,84.15468749999992
3,13.675550101832997
2,20.66218315217391


In [26]:
data.groupBy('Pclass').agg({"Fare":"avg"}).orderBy('Pclass')#order the average

Pclass,avg(Fare)
1,84.15468749999992
2,20.66218315217391
3,13.675550101832997


In [31]:
data.groupBy(data.Pclass).agg({'Fare':'max'})

Pclass,max(Fare)
1,512.3292
3,69.55
2,73.5


In [8]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf #user defined function

In [9]:
def round_value(x):
  return int(x)

In [10]:
round_value_udf=udf(round_value,IntegerType())#converts it into spark function udf

In [11]:
data.select(round_value_udf('Fare')).alias('Fare Rounded Down')

round_value(Fare)
7
71
7
53
8
8
51
21
11
30


In [12]:
data.createOrReplaceTempView("Titanic") #this is in order to run sql querries such as below

In [13]:
spark.sql('select * from Titanic')#now you can run normal SQL querries

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen ...",male,22.0,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. ...",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. Willia...",male,35.0,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
7,0,1,"McCarthy, Mr. Tim...",male,54.0,0,0,17463,51.8625,E46,S
8,0,3,"Palsson, Master. ...",male,2.0,3,1,349909,21.075,,S
9,1,3,"Johnson, Mrs. Osc...",female,27.0,0,2,347742,11.1333,,S
10,1,2,"Nasser, Mrs. Nich...",female,14.0,1,0,237736,30.0708,,C


In [24]:
spark.sql('select * from Titanic where Pclass == 1')

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
7,0,1,"McCarthy, Mr. Tim...",male,54.0,0,0,17463,51.8625,E46,S
12,1,1,"Bonnell, Miss. El...",female,58.0,0,0,113783,26.55,C103,S
24,1,1,"Sloper, Mr. Willi...",male,28.0,0,0,113788,35.5,A6,S
28,0,1,"Fortune, Mr. Char...",male,19.0,3,2,19950,263.0,C23 C25 C27,S
31,0,1,"Uruchurtu, Don. M...",male,40.0,0,0,PC 17601,27.7208,,C
32,1,1,"Spencer, Mrs. Wil...",female,,1,0,PC 17569,146.5208,B78,C
35,0,1,"Meyer, Mr. Edgar ...",male,28.0,1,0,PC 17604,82.1708,,C
36,0,1,"Holverson, Mr. Al...",male,42.0,1,0,113789,52.0,,S


In [34]:
spark.sql('select * from Titanic where Age > (select avg(age) from Titanic)')

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. Willia...",male,35.0,0,0,373450,8.05,,S
7,0,1,"McCarthy, Mr. Tim...",male,54.0,0,0,17463,51.8625,E46,S
12,1,1,"Bonnell, Miss. El...",female,58.0,0,0,113783,26.55,C103,S
14,0,3,"Andersson, Mr. An...",male,39.0,1,5,347082,31.275,,S
16,1,2,"Hewlett, Mrs. (Ma...",female,55.0,0,0,248706,16.0,,S
19,0,3,"Vander Planke, Mr...",female,31.0,1,0,345763,18.0,,S
21,0,2,"Fynney, Mr. Joseph J",male,35.0,0,0,239865,26.0,,S
22,1,2,"Beesley, Mr. Lawr...",male,34.0,0,0,248698,13.0,D56,S


In [35]:
spark.sql('select avg(age) from Titanic ')

avg(age)
29.69911764705882
