In [51]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


# **Create the Pyspark Session**

In [52]:
from pyspark.sql import SparkSession

**Let's create a Session App Name**

In [53]:
spark = SparkSession.builder.appName('Spark_Beginners').getOrCreate()

**Let's Check the session name**

In [54]:
spark

**Read the dataframe using spark**

In [55]:
df = spark.read.csv('PK COVID-19.csv')

In [56]:
df.show()

+---------+-----+------+---------+--------------------+--------------------+---------+
|      _c0|  _c1|   _c2|      _c3|                 _c4|                 _c5|      _c6|
+---------+-----+------+---------+--------------------+--------------------+---------+
|     Date|Cases|Deaths|Recovered|      Travel_history|            Province|     City|
|2/26/2020|    1|     0|        0|               China|Islamabad Capital...|Islamabad|
|2/26/2020|    2|     0|        0|         Iran/Taftan|               Sindh|  Karachi|
|2/29/2020|    1|     0|        0|               China|Islamabad Capital...|Islamabad|
|2/29/2020|    1|     0|        0|         Iran/Taftan|               Sindh|  Karachi|
| 3/2/2020|    1|     0|        0|         Iran/Taftan|    Gilgit-Baltistan|   Gilgit|
| 3/6/2020|    0|     0|        1|             Unknown|               Sindh|  Karachi|
| 3/7/2020|    1|     0|        0|         Iran/Taftan|               Sindh|  Karachi|
| 3/9/2020|    6|     0|        0|         

**Let's set column names in spark**

In [57]:
df = spark.read.option('header', 'true').csv('PK COVID-19.csv')

In [58]:
df.show()

+---------+-----+------+---------+--------------------+--------------------+---------+
|     Date|Cases|Deaths|Recovered|      Travel_history|            Province|     City|
+---------+-----+------+---------+--------------------+--------------------+---------+
|2/26/2020|    1|     0|        0|               China|Islamabad Capital...|Islamabad|
|2/26/2020|    2|     0|        0|         Iran/Taftan|               Sindh|  Karachi|
|2/29/2020|    1|     0|        0|               China|Islamabad Capital...|Islamabad|
|2/29/2020|    1|     0|        0|         Iran/Taftan|               Sindh|  Karachi|
| 3/2/2020|    1|     0|        0|         Iran/Taftan|    Gilgit-Baltistan|   Gilgit|
| 3/6/2020|    0|     0|        1|             Unknown|               Sindh|  Karachi|
| 3/7/2020|    1|     0|        0|         Iran/Taftan|               Sindh|  Karachi|
| 3/9/2020|    6|     0|        0|               Syria|               Sindh|  Karachi|
| 3/9/2020|    3|     0|        0|         

**Print the Schema of df**

In [59]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Cases: string (nullable = true)
 |-- Deaths: string (nullable = true)
 |-- Recovered: string (nullable = true)
 |-- Travel_history: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- City: string (nullable = true)



**Let's print the infer schema to infer data types**

In [60]:
df = spark.read.option('header', 'true').csv('PK COVID-19.csv', inferSchema=True)

In [61]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Cases: integer (nullable = true)
 |-- Deaths: integer (nullable = true)
 |-- Recovered: integer (nullable = true)
 |-- Travel_history: string (nullable = true)
 |-- Province: string (nullable = true)
 |-- City: string (nullable = true)



**Let's see the columns name**

In [62]:
df.columns

['Date', 'Cases', 'Deaths', 'Recovered', 'Travel_history', 'Province', 'City']

**See the data types of dataframe**

In [63]:
type(df)

pyspark.sql.dataframe.DataFrame

**Let's Select only one column 'City' from spark df**

In [64]:
df.select('City').show()

+---------+
|     City|
+---------+
|Islamabad|
|  Karachi|
|Islamabad|
|  Karachi|
|   Gilgit|
|  Karachi|
|  Karachi|
|  Karachi|
|  Karachi|
|  Karachi|
|Hyderabad|
|   Quetta|
|   Skardu|
|  Karachi|
|   Gilgit|
|Islamabad|
|  Karachi|
|   Taftan|
|   Sukkur|
|  Karachi|
+---------+
only showing top 20 rows



**Let's Select Multiple columns "Cases' & 'City' from spark df**

In [65]:
df.select(['Cases','City']).show()

+-----+---------+
|Cases|     City|
+-----+---------+
|    1|Islamabad|
|    2|  Karachi|
|    1|Islamabad|
|    1|  Karachi|
|    1|   Gilgit|
|    0|  Karachi|
|    1|  Karachi|
|    6|  Karachi|
|    3|  Karachi|
|    1|  Karachi|
|    1|Hyderabad|
|    1|   Quetta|
|    1|   Skardu|
|    0|  Karachi|
|    1|   Gilgit|
|    1|Islamabad|
|    1|  Karachi|
|    7|   Taftan|
|   13|   Sukkur|
|    4|  Karachi|
+-----+---------+
only showing top 20 rows



**Let's Add the column in spark dataframe**

In [66]:
df = df.withColumn('Cases After Multiply by 2', df['Cases'] * 2 )

In [67]:
df.show()

+---------+-----+------+---------+--------------------+--------------------+---------+-------------------------+
|     Date|Cases|Deaths|Recovered|      Travel_history|            Province|     City|Cases After Multiply by 2|
+---------+-----+------+---------+--------------------+--------------------+---------+-------------------------+
|2/26/2020|    1|     0|        0|               China|Islamabad Capital...|Islamabad|                        2|
|2/26/2020|    2|     0|        0|         Iran/Taftan|               Sindh|  Karachi|                        4|
|2/29/2020|    1|     0|        0|               China|Islamabad Capital...|Islamabad|                        2|
|2/29/2020|    1|     0|        0|         Iran/Taftan|               Sindh|  Karachi|                        2|
| 3/2/2020|    1|     0|        0|         Iran/Taftan|    Gilgit-Baltistan|   Gilgit|                        2|
| 3/6/2020|    0|     0|        1|             Unknown|               Sindh|  Karachi|          

**Let's Drop the Column from spark dataframe**

In [68]:
df = df.drop('Cases After Multiply by 2')

In [69]:
df.show()

+---------+-----+------+---------+--------------------+--------------------+---------+
|     Date|Cases|Deaths|Recovered|      Travel_history|            Province|     City|
+---------+-----+------+---------+--------------------+--------------------+---------+
|2/26/2020|    1|     0|        0|               China|Islamabad Capital...|Islamabad|
|2/26/2020|    2|     0|        0|         Iran/Taftan|               Sindh|  Karachi|
|2/29/2020|    1|     0|        0|               China|Islamabad Capital...|Islamabad|
|2/29/2020|    1|     0|        0|         Iran/Taftan|               Sindh|  Karachi|
| 3/2/2020|    1|     0|        0|         Iran/Taftan|    Gilgit-Baltistan|   Gilgit|
| 3/6/2020|    0|     0|        1|             Unknown|               Sindh|  Karachi|
| 3/7/2020|    1|     0|        0|         Iran/Taftan|               Sindh|  Karachi|
| 3/9/2020|    6|     0|        0|               Syria|               Sindh|  Karachi|
| 3/9/2020|    3|     0|        0|         

**Let's Rename the Column Names**

In [72]:
df.withColumnRenamed('City', 'city').show()

+---------+-----+------+---------+--------------------+--------------------+---------+
|     Date|Cases|Deaths|Recovered|      Travel_history|            Province|     city|
+---------+-----+------+---------+--------------------+--------------------+---------+
|2/26/2020|    1|     0|        0|               China|Islamabad Capital...|Islamabad|
|2/26/2020|    2|     0|        0|         Iran/Taftan|               Sindh|  Karachi|
|2/29/2020|    1|     0|        0|               China|Islamabad Capital...|Islamabad|
|2/29/2020|    1|     0|        0|         Iran/Taftan|               Sindh|  Karachi|
| 3/2/2020|    1|     0|        0|         Iran/Taftan|    Gilgit-Baltistan|   Gilgit|
| 3/6/2020|    0|     0|        1|             Unknown|               Sindh|  Karachi|
| 3/7/2020|    1|     0|        0|         Iran/Taftan|               Sindh|  Karachi|
| 3/9/2020|    6|     0|        0|               Syria|               Sindh|  Karachi|
| 3/9/2020|    3|     0|        0|         