<a href="https://colab.research.google.com/github/tvelichkovt/PySpark-DataFrame/blob/main/PySpark%20DataFrame%20SQL%20Basics.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

In [5]:
# Load the csv into a dataframe
from google.colab import files

# Download the dataset from here (https://raw.githubusercontent.com/datasciencedojo/datasets/master/titanic.csv) and keep it somewhere on your computer. Load the dataset into your Colab directory from your local system:
files.upload()
df = spark.read.csv("titanic.csv", header=True, inferSchema=True)

Saving titanic.csv to titanic.csv


In [6]:
!ls

sample_data		   spark-3.1.1-bin-hadoop3.2.tgz
spark-3.1.1-bin-hadoop3.2  titanic.csv


In [7]:
display(df)

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen ...",male,22.0,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. ...",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. Willia...",male,35.0,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
7,0,1,"McCarthy, Mr. Tim...",male,54.0,0,0,17463,51.8625,E46,S
8,0,3,"Palsson, Master. ...",male,2.0,3,1,349909,21.075,,S
9,1,3,"Johnson, Mrs. Osc...",female,27.0,0,2,347742,11.1333,,S
10,1,2,"Nasser, Mrs. Nich...",female,14.0,1,0,237736,30.0708,,C


In [8]:
df.show(2)

+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0| PC 17599|71.2833|  C85|       C|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
only showing top 2 rows



In [9]:
df.limit(2)

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen ...",male,22.0,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C


In [10]:
type(df)

pyspark.sql.dataframe.DataFrame

In [11]:
df.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [12]:
df.select('PassengerId', 'Survived').limit(5)

PassengerId,Survived
1,0
2,1
3,1
4,1
5,0


In [13]:
# Females with age more than 25yrs who survived and not from 1st class ordered by theor ticket price ascending

df.where((df.Age > 25) & (df.Survived == 1) & (df.Sex != "male") & (df.Pclass != 1)).orderBy(df.Fare).limit(5)

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
316,1,3,"Nilsson, Miss. He...",female,26.0,0,0,347470,7.8542,,S
217,1,3,"Honkanen, Miss. E...",female,27.0,0,0,STON/O2. 3101283,7.925,,S
3,1,3,"Heikkinen, Miss. ...",female,26.0,0,0,STON/O2. 3101282,7.925,,S
798,1,3,"Osman, Mrs. Mara",female,31.0,0,0,349244,8.6833,,S
484,1,3,"Turkula, Mrs. (He...",female,63.0,0,0,4134,9.5875,,S


In [14]:
# Average price of the tickets

df.agg({'Fare':'avg'})

avg(Fare)
32.2042079685746


In [15]:
df.groupBy('Pclass').agg({'Fare':'avg'}).orderBy('Pclass', ascending=False)

Pclass,avg(Fare)
3,13.675550101832997
2,20.66218315217391
1,84.15468749999992


In [17]:
df.filter(df.Age > 25).agg({'Fare':'avg'})

avg(Fare)
37.61960169491524


In [18]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def round_float_down(x):
  return int(x)

round_float_down_udf = udf(round_float_down, IntegerType())

df.select('PassengerId', 'Fare', round_float_down_udf('Fare').alias('Fare Rounded Down')).limit(5)

PassengerId,Fare,Fare Rounded Down
1,7.25,7
2,71.2833,71
3,7.925,7
4,53.1,53
5,8.05,8


In [19]:
df.createOrReplaceTempView("Titanic")

In [20]:
spark.sql('select * from Titanic')

PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen ...",male,22.0,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. Joh...",female,38.0,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. ...",female,26.0,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Ja...",female,35.0,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. Willia...",male,35.0,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
7,0,1,"McCarthy, Mr. Tim...",male,54.0,0,0,17463,51.8625,E46,S
8,0,3,"Palsson, Master. ...",male,2.0,3,1,349909,21.075,,S
9,1,3,"Johnson, Mrs. Osc...",female,27.0,0,2,347742,11.1333,,S
10,1,2,"Nasser, Mrs. Nich...",female,14.0,1,0,237736,30.0708,,C


In [21]:
spark.sql(' select count(*) Survived_Women_3rdClass from Titanic where Sex="female" and Survived=1 and Pclass=3 ') 

Survived_Women_3rdClass
72
