## Content:

This notebook contains
- how to install PySpark for Google colab
- basic operation using PySpark dataframe
- selecting rows and columns
- missing value handling
- group by, aggregation functions

## Setting up PySpark

In [1]:
# Downloading Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
# Downloading Apache Spark with Hadoop
# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

In [3]:
import findspark
findspark.init()

In [4]:
# creating spark session 
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [5]:
spark

## PySpark Dataframe

In [6]:
df_ps = spark.read.csv("/content/drive/MyDrive/Colab Notebooks/life_exp_data.csv")

In [7]:
df_ps.show()

+-----------+----+----------+----------------+---------------+-------------+-------+--------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+-----------+----------+--------------------+-------------------+--------------------+---------+
|        _c0| _c1|       _c2|             _c3|            _c4|          _c5|    _c6|                 _c7|        _c8|     _c9| _c10|              _c11| _c12|             _c13|       _c14|     _c15|       _c16|      _c17|                _c18|               _c19|                _c20|     _c21|
+-----------+----+----------+----------------+---------------+-------------+-------+--------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+-----------+----------+--------------------+-------------------+--------------------+---------+
|    Country|Year|    Status|Life expectancy |Adult Mortality|infant deaths|Alcohol|percentage expend...|Hepatitis B|Meas

In [8]:
df_ps1 = spark.read.option('header', 'true').csv('/content/drive/MyDrive/Colab Notebooks/life_exp_data.csv')
df_ps1.show()

+-----------+----+----------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+-----------+----------+---------------------+-------------------+-------------------------------+---------+
|    Country|Year|    Status|Life expectancy |Adult Mortality|infant deaths|Alcohol|percentage expenditure|Hepatitis B|Measles | BMI |under-five deaths |Polio|Total expenditure|Diphtheria | HIV/AIDS|        GDP|Population| thinness  1-19 years| thinness 5-9 years|Income composition of resources|Schooling|
+-----------+----+----------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+-----------+----------+---------------------+-------------------+-------------------------------+---------+
|Afghanistan|2015|Developing|              65|            263|           62|   

In [9]:
type(df_ps1)

pyspark.sql.dataframe.DataFrame

In [10]:
df_ps1.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Life expectancy : string (nullable = true)
 |-- Adult Mortality: string (nullable = true)
 |-- infant deaths: string (nullable = true)
 |-- Alcohol: string (nullable = true)
 |-- percentage expenditure: string (nullable = true)
 |-- Hepatitis B: string (nullable = true)
 |-- Measles : string (nullable = true)
 |--  BMI : string (nullable = true)
 |-- under-five deaths : string (nullable = true)
 |-- Polio: string (nullable = true)
 |-- Total expenditure: string (nullable = true)
 |-- Diphtheria : string (nullable = true)
 |--  HIV/AIDS: string (nullable = true)
 |-- GDP: string (nullable = true)
 |-- Population: string (nullable = true)
 |--  thinness  1-19 years: string (nullable = true)
 |--  thinness 5-9 years: string (nullable = true)
 |-- Income composition of resources: string (nullable = true)
 |-- Schooling: string (nullable = true)



- printSchema looks similar to pandas_dataframe.info()
- also the numerical columns are shown as string. will use inferSchema argument for this.

In [11]:
df_ps2 = spark.read.option('header', 'true').csv('/content/drive/MyDrive/Colab Notebooks/life_exp_data.csv', inferSchema=True)
df_ps2.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Status: string (nullable = true)
 |-- Life expectancy : double (nullable = true)
 |-- Adult Mortality: integer (nullable = true)
 |-- infant deaths: integer (nullable = true)
 |-- Alcohol: double (nullable = true)
 |-- percentage expenditure: double (nullable = true)
 |-- Hepatitis B: integer (nullable = true)
 |-- Measles : integer (nullable = true)
 |--  BMI : double (nullable = true)
 |-- under-five deaths : integer (nullable = true)
 |-- Polio: integer (nullable = true)
 |-- Total expenditure: double (nullable = true)
 |-- Diphtheria : integer (nullable = true)
 |--  HIV/AIDS: double (nullable = true)
 |-- GDP: double (nullable = true)
 |-- Population: double (nullable = true)
 |--  thinness  1-19 years: double (nullable = true)
 |--  thinness 5-9 years: double (nullable = true)
 |-- Income composition of resources: double (nullable = true)
 |-- Schooling: double (nullable = true)



So now the columns are having different datatypes other than just string.

In [12]:
# alternate way of reading the csv with desired datatypes of the columns

file_path = '/content/drive/MyDrive/Colab Notebooks/life_exp_data.csv'
df_ps3 = spark.read.csv(file_path, header=True, inferSchema=True)
df_ps3.printSchema()

root
 |-- Country: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Status: string (nullable = true)
 |-- Life expectancy : double (nullable = true)
 |-- Adult Mortality: integer (nullable = true)
 |-- infant deaths: integer (nullable = true)
 |-- Alcohol: double (nullable = true)
 |-- percentage expenditure: double (nullable = true)
 |-- Hepatitis B: integer (nullable = true)
 |-- Measles : integer (nullable = true)
 |--  BMI : double (nullable = true)
 |-- under-five deaths : integer (nullable = true)
 |-- Polio: integer (nullable = true)
 |-- Total expenditure: double (nullable = true)
 |-- Diphtheria : integer (nullable = true)
 |--  HIV/AIDS: double (nullable = true)
 |-- GDP: double (nullable = true)
 |-- Population: double (nullable = true)
 |--  thinness  1-19 years: double (nullable = true)
 |--  thinness 5-9 years: double (nullable = true)
 |-- Income composition of resources: double (nullable = true)
 |-- Schooling: double (nullable = true)



In [13]:
# printing the columns

print(df_ps3.columns)

['Country', 'Year', 'Status', 'Life expectancy ', 'Adult Mortality', 'infant deaths', 'Alcohol', 'percentage expenditure', 'Hepatitis B', 'Measles ', ' BMI ', 'under-five deaths ', 'Polio', 'Total expenditure', 'Diphtheria ', ' HIV/AIDS', 'GDP', 'Population', ' thinness  1-19 years', ' thinness 5-9 years', 'Income composition of resources', 'Schooling']


In [14]:
df_ps3.head(5)

[Row(Country='Afghanistan', Year=2015, Status='Developing', Life expectancy =65.0, Adult Mortality=263, infant deaths=62, Alcohol=0.01, percentage expenditure=71.27962362, Hepatitis B=65, Measles =1154,  BMI =19.1, under-five deaths =83, Polio=6, Total expenditure=8.16, Diphtheria =65,  HIV/AIDS=0.1, GDP=584.25921, Population=33736494.0,  thinness  1-19 years=17.2,  thinness 5-9 years=17.3, Income composition of resources=0.479, Schooling=10.1),
 Row(Country='Afghanistan', Year=2014, Status='Developing', Life expectancy =59.9, Adult Mortality=271, infant deaths=64, Alcohol=0.01, percentage expenditure=73.52358168, Hepatitis B=62, Measles =492,  BMI =18.6, under-five deaths =86, Polio=58, Total expenditure=8.18, Diphtheria =62,  HIV/AIDS=0.1, GDP=612.696514, Population=327582.0,  thinness  1-19 years=17.5,  thinness 5-9 years=17.5, Income composition of resources=0.476, Schooling=10.0),
 Row(Country='Afghanistan', Year=2013, Status='Developing', Life expectancy =59.9, Adult Mortality=26

In [15]:
# printing single column

print(df_ps3.select('Life expectancy '))
print("type : ", type(df_ps3.select('Life expectancy ')))

DataFrame[Life expectancy : double]
type :  <class 'pyspark.sql.dataframe.DataFrame'>


In [16]:
df_ps3.select('Life expectancy ').show()

+----------------+
|Life expectancy |
+----------------+
|            65.0|
|            59.9|
|            59.9|
|            59.5|
|            59.2|
|            58.8|
|            58.6|
|            58.1|
|            57.5|
|            57.3|
|            57.3|
|            57.0|
|            56.7|
|            56.2|
|            55.3|
|            54.8|
|            77.8|
|            77.5|
|            77.2|
|            76.9|
+----------------+
only showing top 20 rows



In [17]:
# printing multiple columns

print(df_ps3.select(['Status','Life expectancy ']))
print("type : ", type(df_ps3.select(['Status','Life expectancy '])))
print("head : ", df_ps3.select(['Status','Life expectancy ']).head(5))
print("---------------------------------------------------------------")
df_ps3.select(['Status','Life expectancy ']).show()

DataFrame[Status: string, Life expectancy : double]
type :  <class 'pyspark.sql.dataframe.DataFrame'>
head :  [Row(Status='Developing', Life expectancy =65.0), Row(Status='Developing', Life expectancy =59.9), Row(Status='Developing', Life expectancy =59.9), Row(Status='Developing', Life expectancy =59.5), Row(Status='Developing', Life expectancy =59.2)]
---------------------------------------------------------------
+----------+----------------+
|    Status|Life expectancy |
+----------+----------------+
|Developing|            65.0|
|Developing|            59.9|
|Developing|            59.9|
|Developing|            59.5|
|Developing|            59.2|
|Developing|            58.8|
|Developing|            58.6|
|Developing|            58.1|
|Developing|            57.5|
|Developing|            57.3|
|Developing|            57.3|
|Developing|            57.0|
|Developing|            56.7|
|Developing|            56.2|
|Developing|            55.3|
|Developing|            54.8|
|Developin

In [18]:
# print(df_ps3['Status'])
# print(type(df_ps3['Status']))
# print(df_ps3['Status'].show()) # does not work

In [19]:
# Checking datatypes of the columns

df_ps3.dtypes

[('Country', 'string'),
 ('Year', 'int'),
 ('Status', 'string'),
 ('Life expectancy ', 'double'),
 ('Adult Mortality', 'int'),
 ('infant deaths', 'int'),
 ('Alcohol', 'double'),
 ('percentage expenditure', 'double'),
 ('Hepatitis B', 'int'),
 ('Measles ', 'int'),
 (' BMI ', 'double'),
 ('under-five deaths ', 'int'),
 ('Polio', 'int'),
 ('Total expenditure', 'double'),
 ('Diphtheria ', 'int'),
 (' HIV/AIDS', 'double'),
 ('GDP', 'double'),
 ('Population', 'double'),
 (' thinness  1-19 years', 'double'),
 (' thinness 5-9 years', 'double'),
 ('Income composition of resources', 'double'),
 ('Schooling', 'double')]

In [20]:
df_ps3.describe().show()

+-------+-----------+------------------+----------+-----------------+------------------+------------------+------------------+----------------------+------------------+------------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+--------------------+---------------------+-------------------+-------------------------------+------------------+
|summary|    Country|              Year|    Status| Life expectancy |   Adult Mortality|     infant deaths|           Alcohol|percentage expenditure|       Hepatitis B|          Measles |              BMI |under-five deaths |             Polio| Total expenditure|      Diphtheria |          HIV/AIDS|               GDP|          Population| thinness  1-19 years| thinness 5-9 years|Income composition of resources|         Schooling|
+-------+-----------+------------------+----------+-----------------+------------------+------------------+------------------+------

In [21]:
## adding columns

df_ps4 = df_ps3.withColumn('new_column_name', df_ps3['Year']+2)
df_ps4.show()

+-----------+----+----------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+-----------+-----------+---------------------+-------------------+-------------------------------+---------+---------------+
|    Country|Year|    Status|Life expectancy |Adult Mortality|infant deaths|Alcohol|percentage expenditure|Hepatitis B|Measles | BMI |under-five deaths |Polio|Total expenditure|Diphtheria | HIV/AIDS|        GDP| Population| thinness  1-19 years| thinness 5-9 years|Income composition of resources|Schooling|new_column_name|
+-----------+----+----------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+-----------+-----------+---------------------+-------------------+-------------------------------+---------+---------------+
|Afghanistan|2015|Developing

In [22]:
df_ps4.columns

['Country',
 'Year',
 'Status',
 'Life expectancy ',
 'Adult Mortality',
 'infant deaths',
 'Alcohol',
 'percentage expenditure',
 'Hepatitis B',
 'Measles ',
 ' BMI ',
 'under-five deaths ',
 'Polio',
 'Total expenditure',
 'Diphtheria ',
 ' HIV/AIDS',
 'GDP',
 'Population',
 ' thinness  1-19 years',
 ' thinness 5-9 years',
 'Income composition of resources',
 'Schooling',
 'new_column_name']

In [23]:
## dropping columns

df_ps5 = df_ps4.drop('new_column_name')
df_ps5.columns

['Country',
 'Year',
 'Status',
 'Life expectancy ',
 'Adult Mortality',
 'infant deaths',
 'Alcohol',
 'percentage expenditure',
 'Hepatitis B',
 'Measles ',
 ' BMI ',
 'under-five deaths ',
 'Polio',
 'Total expenditure',
 'Diphtheria ',
 ' HIV/AIDS',
 'GDP',
 'Population',
 ' thinness  1-19 years',
 ' thinness 5-9 years',
 'Income composition of resources',
 'Schooling']

In [24]:
## renaming columns

df_ps6 = df_ps5.withColumnRenamed('Country', 'Countries')
print("old column names : ", df_ps5.columns)
print("new column names : ", df_ps6.columns)

old column names :  ['Country', 'Year', 'Status', 'Life expectancy ', 'Adult Mortality', 'infant deaths', 'Alcohol', 'percentage expenditure', 'Hepatitis B', 'Measles ', ' BMI ', 'under-five deaths ', 'Polio', 'Total expenditure', 'Diphtheria ', ' HIV/AIDS', 'GDP', 'Population', ' thinness  1-19 years', ' thinness 5-9 years', 'Income composition of resources', 'Schooling']
new column names :  ['Countries', 'Year', 'Status', 'Life expectancy ', 'Adult Mortality', 'infant deaths', 'Alcohol', 'percentage expenditure', 'Hepatitis B', 'Measles ', ' BMI ', 'under-five deaths ', 'Polio', 'Total expenditure', 'Diphtheria ', ' HIV/AIDS', 'GDP', 'Population', ' thinness  1-19 years', ' thinness 5-9 years', 'Income composition of resources', 'Schooling']


In [25]:
## Checking shape of Pyspark dataframe        
## there is function similar to shape, however we can use the following workaround

# workaround 1
import pyspark
def spark_shape(self):
  return (self.count(), len(self.columns))

print("With user-defined function : ", spark_shape(df_ps3))

# workaround 2
print("With Pandas : ", df_ps3.toPandas().shape)

With user-defined function :  (2938, 22)
With Pandas :  (2938, 22)


### Dropping rows containing missing values

In [26]:
print("Size of original dataframe")
print(spark_shape(df_ps6))
print("Size of the dataframe, If rows containing one or more na's are dropped")
print(spark_shape(df_ps6.na.drop()))
print("Size of the dataframe, If rows containing only na's are dropped")
print(spark_shape(df_ps6.na.drop(how='all')))
print("Size of the dataframe, If rows containing less than 20 not-null values are dropped")
print(spark_shape(df_ps6.na.drop(how='any', thresh=20)))

Size of original dataframe
(2938, 22)
Size of the dataframe, If rows containing one or more na's are dropped
(1649, 22)
Size of the dataframe, If rows containing only na's are dropped
(2938, 22)
Size of the dataframe, If rows containing less than 20 not-null values are dropped
(2666, 22)


### Finding number of missing values in each column

In [27]:
# finding count of null, none, NaN of all columns
from pyspark.sql.functions import col, isnan, when, count
df_ps6.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_ps6.columns]).show()

+---------+----+------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+---+----------+---------------------+-------------------+-------------------------------+---------+
|Countries|Year|Status|Life expectancy |Adult Mortality|infant deaths|Alcohol|percentage expenditure|Hepatitis B|Measles | BMI |under-five deaths |Polio|Total expenditure|Diphtheria | HIV/AIDS|GDP|Population| thinness  1-19 years| thinness 5-9 years|Income composition of resources|Schooling|
+---------+----+------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+---+----------+---------------------+-------------------+-------------------------------+---------+
|        0|   0|     0|              10|             10|            0|    194|                     0|        553|       0

In [28]:
# dropping rows where a particular column has na's
spark_shape(df_ps6.na.drop(how='any', subset=['Life expectancy ']))

(2928, 22)

In [29]:
# filling the missing values for specific column

df_ps7 = df_ps6.na.fill(0, 'Life expectancy ')
df_ps7.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_ps7.columns]).show()


+---------+----+------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+---+----------+---------------------+-------------------+-------------------------------+---------+
|Countries|Year|Status|Life expectancy |Adult Mortality|infant deaths|Alcohol|percentage expenditure|Hepatitis B|Measles | BMI |under-five deaths |Polio|Total expenditure|Diphtheria | HIV/AIDS|GDP|Population| thinness  1-19 years| thinness 5-9 years|Income composition of resources|Schooling|
+---------+----+------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+---+----------+---------------------+-------------------+-------------------------------+---------+
|        0|   0|     0|               0|             10|            0|    194|                     0|        553|       0

In [30]:
# filling the missing values for specific columns

df_ps8 = df_ps6.na.fill(0, ['Life expectancy ', 'Adult Mortality'])
df_ps8.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_ps8.columns]).show()


+---------+----+------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+---+----------+---------------------+-------------------+-------------------------------+---------+
|Countries|Year|Status|Life expectancy |Adult Mortality|infant deaths|Alcohol|percentage expenditure|Hepatitis B|Measles | BMI |under-five deaths |Polio|Total expenditure|Diphtheria | HIV/AIDS|GDP|Population| thinness  1-19 years| thinness 5-9 years|Income composition of resources|Schooling|
+---------+----+------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+---+----------+---------------------+-------------------+-------------------------------+---------+
|        0|   0|     0|               0|              0|            0|    194|                     0|        553|       0

In [31]:
# imputing mean value 

from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols = ['Alcohol', 'Hepatitis B'],
    outputCols = ["{}_new".format(c) for c in ['Alcohol', 'Hepatitis B']]).setStrategy('mean')
# we can use median as well

In [32]:
df_ps9 = imputer.fit(df_ps8).transform(df_ps8)
print("Checking missing value count after imputing missing values")
df_ps9.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_ps9.columns]).show()

Checking missing value count after imputing missing values
+---------+----+------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+---+----------+---------------------+-------------------+-------------------------------+---------+-----------+---------------+
|Countries|Year|Status|Life expectancy |Adult Mortality|infant deaths|Alcohol|percentage expenditure|Hepatitis B|Measles | BMI |under-five deaths |Polio|Total expenditure|Diphtheria | HIV/AIDS|GDP|Population| thinness  1-19 years| thinness 5-9 years|Income composition of resources|Schooling|Alcohol_new|Hepatitis B_new|
+---------+----+------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+---+----------+---------------------+-------------------+-------------------------------+---------+-------

## Filtering rows based on conditions

In [36]:
# filtering rows based on condition

df_ps9.filter('Year > 2005').show()

+-----------+----+----------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+-----------+-----------+---------------------+-------------------+-------------------------------+---------+-----------+---------------+
|  Countries|Year|    Status|Life expectancy |Adult Mortality|infant deaths|Alcohol|percentage expenditure|Hepatitis B|Measles | BMI |under-five deaths |Polio|Total expenditure|Diphtheria | HIV/AIDS|        GDP| Population| thinness  1-19 years| thinness 5-9 years|Income composition of resources|Schooling|Alcohol_new|Hepatitis B_new|
+-----------+----+----------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+-----------+-----------+---------------------+-------------------+-------------------------------+---------+-----------+---------

In [37]:
# column name with leading or trailing blank space did not work for this syntax

# df_ps9.filter('Year > 2005').show()

In [38]:
# Showing specific columns

df_ps9.filter('Year > 2005').select(['Countries', 'Year']).show()

+-----------+----+
|  Countries|Year|
+-----------+----+
|Afghanistan|2015|
|Afghanistan|2014|
|Afghanistan|2013|
|Afghanistan|2012|
|Afghanistan|2011|
|Afghanistan|2010|
|Afghanistan|2009|
|Afghanistan|2008|
|Afghanistan|2007|
|Afghanistan|2006|
|    Albania|2015|
|    Albania|2014|
|    Albania|2013|
|    Albania|2012|
|    Albania|2011|
|    Albania|2010|
|    Albania|2009|
|    Albania|2008|
|    Albania|2007|
|    Albania|2006|
+-----------+----+
only showing top 20 rows



In [39]:
# alternative way of filtering

df_ps9.filter(df_ps9['Year'] > 2005).show()

+-----------+----+----------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+-----------+-----------+---------------------+-------------------+-------------------------------+---------+-----------+---------------+
|  Countries|Year|    Status|Life expectancy |Adult Mortality|infant deaths|Alcohol|percentage expenditure|Hepatitis B|Measles | BMI |under-five deaths |Polio|Total expenditure|Diphtheria | HIV/AIDS|        GDP| Population| thinness  1-19 years| thinness 5-9 years|Income composition of resources|Schooling|Alcohol_new|Hepatitis B_new|
+-----------+----+----------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+-----------+-----------+---------------------+-------------------+-------------------------------+---------+-----------+---------

In [40]:
# this works for column names with leading and trailing spaces

df_ps9.filter(df_ps9['Life expectancy '] > 40).show()

+-----------+----+----------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+-----------+-----------+---------------------+-------------------+-------------------------------+---------+-----------+---------------+
|  Countries|Year|    Status|Life expectancy |Adult Mortality|infant deaths|Alcohol|percentage expenditure|Hepatitis B|Measles | BMI |under-five deaths |Polio|Total expenditure|Diphtheria | HIV/AIDS|        GDP| Population| thinness  1-19 years| thinness 5-9 years|Income composition of resources|Schooling|Alcohol_new|Hepatitis B_new|
+-----------+----+----------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+-----------+-----------+---------------------+-------------------+-------------------------------+---------+-----------+---------

In [41]:
# multiple conditions

df_ps9.filter((df_ps9['Year'] > 2005) &
              (df_ps9['Life expectancy '] > 40)).show()

+-----------+----+----------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+-----------+-----------+---------------------+-------------------+-------------------------------+---------+-----------+---------------+
|  Countries|Year|    Status|Life expectancy |Adult Mortality|infant deaths|Alcohol|percentage expenditure|Hepatitis B|Measles | BMI |under-five deaths |Polio|Total expenditure|Diphtheria | HIV/AIDS|        GDP| Population| thinness  1-19 years| thinness 5-9 years|Income composition of resources|Schooling|Alcohol_new|Hepatitis B_new|
+-----------+----+----------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+-----------+-----------+---------------------+-------------------+-------------------------------+---------+-----------+---------

In [42]:

df_ps9.filter((df_ps9['Year'] > 2005) |
              (df_ps9['Life expectancy '] > 40)).show()

+-----------+----+----------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+-----------+-----------+---------------------+-------------------+-------------------------------+---------+-----------+---------------+
|  Countries|Year|    Status|Life expectancy |Adult Mortality|infant deaths|Alcohol|percentage expenditure|Hepatitis B|Measles | BMI |under-five deaths |Polio|Total expenditure|Diphtheria | HIV/AIDS|        GDP| Population| thinness  1-19 years| thinness 5-9 years|Income composition of resources|Schooling|Alcohol_new|Hepatitis B_new|
+-----------+----+----------+----------------+---------------+-------------+-------+----------------------+-----------+--------+-----+------------------+-----+-----------------+-----------+---------+-----------+-----------+---------------------+-------------------+-------------------------------+---------+-----------+---------

In [44]:
# inverse condition

df_ps9.filter(~(df_ps9['Year'] > 2005)).select(['Countries', 'Year']).show()

+-----------+----+
|  Countries|Year|
+-----------+----+
|Afghanistan|2005|
|Afghanistan|2004|
|Afghanistan|2003|
|Afghanistan|2002|
|Afghanistan|2001|
|Afghanistan|2000|
|    Albania|2005|
|    Albania|2004|
|    Albania|2003|
|    Albania|2002|
|    Albania|2001|
|    Albania|2000|
|    Algeria|2005|
|    Algeria|2004|
|    Algeria|2003|
|    Algeria|2002|
|    Algeria|2001|
|    Algeria|2000|
|     Angola|2005|
|     Angola|2004|
+-----------+----+
only showing top 20 rows



## Group by and Aggregation

In [47]:
# Group by and aggregation

df_ps9.groupBy('Countries').mean('Life expectancy ').show()

+--------------------+---------------------+
|           Countries|avg(Life expectancy )|
+--------------------+---------------------+
|       Côte d'Ivoire|              50.3875|
|                Chad|              50.3875|
|Micronesia (Feder...|                 68.2|
|            Paraguay|              73.1125|
|               Yemen|   63.862500000000004|
|             Senegal|             62.56875|
|          Cabo Verde|             72.51875|
|              Sweden|             82.51875|
|            Kiribati|    65.14999999999999|
|   Republic of Korea|              80.4875|
|              Guyana|              65.6375|
|             Eritrea|              60.6875|
|         Philippines|    67.57499999999999|
|            Djibouti|             60.75625|
|               Tonga|    72.53124999999999|
|            Malaysia|    73.75625000000001|
|           Singapore|    81.47500000000001|
|                Fiji|              68.7125|
|              Turkey|              73.9125|
|         

In [48]:
df_ps9.groupBy('Status').mean('Life expectancy ').show()

+----------+---------------------+
|    Status|avg(Life expectancy )|
+----------+---------------------+
| Developed|    79.19785156249996|
|Developing|    66.83483099752688|
+----------+---------------------+



In [49]:
df_ps9.groupBy('Status').count().show()

+----------+-----+
|    Status|count|
+----------+-----+
| Developed|  512|
|Developing| 2426|
+----------+-----+



In [50]:
df_ps9.agg(({"Life expectancy ": "max"})).show()

+---------------------+
|max(Life expectancy )|
+---------------------+
|                 89.0|
+---------------------+



In [52]:
# this is giving every countries maximum Life expectancy

df_ps9.groupBy('Countries').max('Life expectancy ').show()

+--------------------+---------------------+
|           Countries|max(Life expectancy )|
+--------------------+---------------------+
|       Côte d'Ivoire|                 54.0|
|                Chad|                 57.0|
|Micronesia (Feder...|                 69.4|
|            Paraguay|                 79.0|
|               Yemen|                 68.0|
|             Senegal|                 66.7|
|          Cabo Verde|                 77.0|
|              Sweden|                 89.0|
|            Kiribati|                 66.3|
|   Republic of Korea|                 87.0|
|              Guyana|                 66.3|
|             Eritrea|                 67.0|
|         Philippines|                 68.5|
|            Djibouti|                 69.0|
|               Tonga|                 73.5|
|            Malaysia|                 75.0|
|           Singapore|                 87.0|
|                Fiji|                 69.9|
|              Turkey|                 78.0|
|         