In [49]:
import pyspark


In [50]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CRUD_operations").getOrCreate()
spark

Reading the csv file 

In [51]:

df_pyspark = spark.read.option('header','true').csv(r"C:\Users\Leon\Downloads\covid\data\country_wise_latest.csv")
df_pyspark.show()

+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|     Country/Region|Confirmed|Deaths|Recovered|Active|New cases|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|
+-------------------+---------+------+---------+------+---------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+
|        Afghanistan|    36263|  1269|    25198|  9796|      106|        10|           18|               3.5|                69.49|                  5.04|              35526|          737|             2.07|Eastern Mediterra...|
|            Albania|     4880|   144|     2745|  1991|      117|         6|           6

To print the schema

In [52]:
df_pyspark.printSchema()

root
 |-- Country/Region: string (nullable = true)
 |-- Confirmed: string (nullable = true)
 |-- Deaths: string (nullable = true)
 |-- Recovered: string (nullable = true)
 |-- Active: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)
 |-- Deaths / 100 Cases: string (nullable = true)
 |-- Recovered / 100 Cases: string (nullable = true)
 |-- Deaths / 100 Recovered: string (nullable = true)
 |-- Confirmed last week: string (nullable = true)
 |-- 1 week change: string (nullable = true)
 |-- 1 week % increase: string (nullable = true)
 |-- WHO Region: string (nullable = true)



print the columns

In [53]:
df_pyspark.columns

['Country/Region',
 'Confirmed',
 'Deaths',
 'Recovered',
 'Active',
 'New cases',
 'New deaths',
 'New recovered',
 'Deaths / 100 Cases',
 'Recovered / 100 Cases',
 'Deaths / 100 Recovered',
 'Confirmed last week',
 '1 week change',
 '1 week % increase',
 'WHO Region']

print first 3 rows

In [54]:
df_pyspark.head(3)

[Row(Country/Region='Afghanistan', Confirmed='36263', Deaths='1269', Recovered='25198', Active='9796', New cases='106', New deaths='10', New recovered='18', Deaths / 100 Cases='3.5', Recovered / 100 Cases='69.49', Deaths / 100 Recovered='5.04', Confirmed last week='35526', 1 week change='737', 1 week % increase='2.07', WHO Region='Eastern Mediterranean'),
 Row(Country/Region='Albania', Confirmed='4880', Deaths='144', Recovered='2745', Active='1991', New cases='117', New deaths='6', New recovered='63', Deaths / 100 Cases='2.95', Recovered / 100 Cases='56.25', Deaths / 100 Recovered='5.25', Confirmed last week='4171', 1 week change='709', 1 week % increase='17', WHO Region='Europe'),
 Row(Country/Region='Algeria', Confirmed='27973', Deaths='1163', Recovered='18837', Active='7973', New cases='616', New deaths='8', New recovered='749', Deaths / 100 Cases='4.16', Recovered / 100 Cases='67.34', Deaths / 100 Recovered='6.17', Confirmed last week='23691', 1 week change='4282', 1 week % increas

select column

In [55]:
df_pyspark.select(["Country/Region","Confirmed"]).show()

+-------------------+---------+
|     Country/Region|Confirmed|
+-------------------+---------+
|        Afghanistan|    36263|
|            Albania|     4880|
|            Algeria|    27973|
|            Andorra|      907|
|             Angola|      950|
|Antigua and Barbuda|       86|
|          Argentina|   167416|
|            Armenia|    37390|
|          Australia|    15303|
|            Austria|    20558|
|         Azerbaijan|    30446|
|            Bahamas|      382|
|            Bahrain|    39482|
|         Bangladesh|   226225|
|           Barbados|      110|
|            Belarus|    67251|
|            Belgium|    66428|
|             Belize|       48|
|              Benin|     1770|
|             Bhutan|       99|
+-------------------+---------+
only showing top 20 rows



Adding a column

In [56]:
from pyspark.sql.functions import col
df_pyspark = df_pyspark.withColumn('Total confirmed cases',col("Confirmed")+col("New cases"))
df_pyspark.printSchema()

root
 |-- Country/Region: string (nullable = true)
 |-- Confirmed: string (nullable = true)
 |-- Deaths: string (nullable = true)
 |-- Recovered: string (nullable = true)
 |-- Active: string (nullable = true)
 |-- New cases: string (nullable = true)
 |-- New deaths: string (nullable = true)
 |-- New recovered: string (nullable = true)
 |-- Deaths / 100 Cases: string (nullable = true)
 |-- Recovered / 100 Cases: string (nullable = true)
 |-- Deaths / 100 Recovered: string (nullable = true)
 |-- Confirmed last week: string (nullable = true)
 |-- 1 week change: string (nullable = true)
 |-- 1 week % increase: string (nullable = true)
 |-- WHO Region: string (nullable = true)
 |-- Total confirmed cases: double (nullable = true)



Drop a column

In [57]:
df_pyspark = df_pyspark.drop('New cases')
df_pyspark.show()

+-------------------+---------+------+---------+------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+---------------------+
|     Country/Region|Confirmed|Deaths|Recovered|Active|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|Total confirmed cases|
+-------------------+---------+------+---------+------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+---------------------+
|        Afghanistan|    36263|  1269|    25198|  9796|        10|           18|               3.5|                69.49|                  5.04|              35526|          737|             2.07|Eastern Mediterra...|              36369.0|
|            Albania|     4880|   144|  

Rename a column

In [58]:
df_pyspark = df_pyspark.withColumnRenamed('Active','Active now')
df_pyspark.show()

+-------------------+---------+------+---------+----------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+---------------------+
|     Country/Region|Confirmed|Deaths|Recovered|Active now|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|          WHO Region|Total confirmed cases|
+-------------------+---------+------+---------+----------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+--------------------+---------------------+
|        Afghanistan|    36263|  1269|    25198|      9796|        10|           18|               3.5|                69.49|                  5.04|              35526|          737|             2.07|Eastern Mediterra...|              36369.0|
|            Albania|   

Select a particular row 

In [59]:
df_pyspark.where(df_pyspark["Country/Region"]=="India").show()

+--------------+---------+------+---------+----------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------+---------------------+
|Country/Region|Confirmed|Deaths|Recovered|Active now|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|     WHO Region|Total confirmed cases|
+--------------+---------+------+---------+----------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------+---------------------+
|         India|  1480073| 33408|   951166|    495499|       637|        33598|              2.26|                64.26|                  3.51|            1155338|       324735|            28.11|South-East Asia|            1524530.0|
+--------------+---------+------+---------+----------+----------

Delete a particular row

In [60]:
df_pyspark = df_pyspark.filter(df_pyspark["Country/Region"] !="India")
df_pyspark.where(df_pyspark["Country/Region"]=="India").show()

+--------------+---------+------+---------+----------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+----------+---------------------+
|Country/Region|Confirmed|Deaths|Recovered|Active now|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|WHO Region|Total confirmed cases|
+--------------+---------+------+---------+----------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+----------+---------------------+
+--------------+---------+------+---------+----------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+----------+---------------------+



Update a particular row

In [61]:
from pyspark.sql.functions import when,col

df_pyspark = df_pyspark.withColumn("Confirmed",when(col("Country/Region")=="Australia",989999))
df_pyspark.where(df_pyspark["Country/Region"]=="Australia").show()

+--------------+---------+------+---------+----------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------+---------------------+
|Country/Region|Confirmed|Deaths|Recovered|Active now|New deaths|New recovered|Deaths / 100 Cases|Recovered / 100 Cases|Deaths / 100 Recovered|Confirmed last week|1 week change|1 week % increase|     WHO Region|Total confirmed cases|
+--------------+---------+------+---------+----------+----------+-------------+------------------+---------------------+----------------------+-------------------+-------------+-----------------+---------------+---------------------+
|     Australia|   989999|   167|     9311|      5825|         6|          137|              1.09|                60.84|                  1.79|              12428|         2875|            23.13|Western Pacific|              15671.0|
+--------------+---------+------+---------+----------+----------

Drop rows with any null

In [63]:
df_pyspark = df_pyspark.dropna()


Drop if all columns are null:

In [64]:
df_pyspark = df_pyspark.dropna(how="all")

Fill null with a specific value:

In [65]:
df_pyspark= df_pyspark.fillna(0)

Drop Duplicates

In [66]:
df_pyspark=df_pyspark.dropDuplicates()