# Learn PySpark
https://youtu.be/_C8kWso4ne4?si=-FEu8RoK0MwmF8nT

PySpark Documentation:
https://spark.apache.org/docs/latest/api/python/index.html

In [1]:
# Install PySpark, pandas & numpy
# !pip3 install pyspark
# !pip3 install pandas
# !pip3 install numpy

In [2]:
import pyspark
import pandas as pd
import numpy as np

**Precondition:** We have create a sample csv file with name & age columns with name "test1.csv" in the current folder

In [3]:
# Read csv file using pandas
df_pandas = pd.read_csv("test_data/test1.csv")
df_pandas

Unnamed: 0,Name,age,Experience,Salary
0,Krish,31,10,30000
1,Sudhanshu,30,8,25000
2,Sunny,29,4,20000
3,Paul,24,3,20000
4,Harsha,21,1,15000
5,Shubham,23,2,18000


In [4]:
type(df_pandas)

pandas.core.frame.DataFrame

## start a spark session

In [5]:
from pyspark.sql import SparkSession

In [6]:
spark = SparkSession.builder.appName("learn-pyspark").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/07 13:27:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
spark

### Read CSV without any options

**Note:**
Not mentioned InferSchema -- all column types as String
Not mentioned header row -- default column names as _c0, _c1 etc...


In [8]:
df_pyspark = spark.read.csv("test_data/test1.csv")
df_pyspark

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string]

In [9]:
df_pyspark.show()

+---------+---+----------+------+
|      _c0|_c1|       _c2|   _c3|
+---------+---+----------+------+
|     Name|age|Experience|Salary|
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [10]:
type(df_pyspark)

pyspark.sql.dataframe.DataFrame

### Read csv with first row as header

In [11]:
df_pyspark = spark.read.option("header", "true").csv("test_data/test1.csv")

In [12]:
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [13]:
df_pyspark.head(3)

[Row(Name='Krish', age='31', Experience='10', Salary='30000'),
 Row(Name='Sudhanshu', age='30', Experience='8', Salary='25000'),
 Row(Name='Sunny', age='29', Experience='4', Salary='20000')]

In [14]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



In [15]:
df_pyspark.dtypes

[('Name', 'string'),
 ('age', 'string'),
 ('Experience', 'string'),
 ('Salary', 'string')]

### Read csv with first row header & with default schema

In [16]:
# inferSchema & header options can be part of option function or can be part of csv function parameters. Both are correct
df_pyspark = spark.read.option("header", "true").csv("test_data/test1.csv", inferSchema=True)

In [17]:
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [18]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



### Selecting columns

In [19]:
df_pyspark_name_column = df_pyspark.select("Name")

In [20]:
df_pyspark_name_column.show()

+---------+
|     Name|
+---------+
|    Krish|
|Sudhanshu|
|    Sunny|
|     Paul|
|   Harsha|
|  Shubham|
+---------+



In [21]:
type(df_pyspark_name_column)

pyspark.sql.dataframe.DataFrame

#### Selecting multiple columns

In [22]:
df_pyspark.select(["Name", "age"]).show()

+---------+---+
|     Name|age|
+---------+---+
|    Krish| 31|
|Sudhanshu| 30|
|    Sunny| 29|
|     Paul| 24|
|   Harsha| 21|
|  Shubham| 23|
+---------+---+



#### Selecting all columns

In [23]:
df_pyspark.select("*").show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



### Describe similar to Pandas

In [24]:
df_pyspark_describe = df_pyspark.describe()
df_pyspark_describe.show()

23/11/07 13:27:53 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------+------------------+-----------------+------------------+
|summary|  Name|               age|       Experience|            Salary|
+-------+------+------------------+-----------------+------------------+
|  count|     6|                 6|                6|                 6|
|   mean|  NULL|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev|  NULL| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|Harsha|                21|                1|             15000|
|    max| Sunny|                31|               10|             30000|
+-------+------+------------------+-----------------+------------------+



### Add / Drop columns

#### Add column

In [25]:
# Not an inPlace operation
df_pyspark = df_pyspark.withColumn("Exp after 2 yrs", df_pyspark["Experience"]+2)

In [26]:
df_pyspark.show()

+---------+---+----------+------+---------------+
|     Name|age|Experience|Salary|Exp after 2 yrs|
+---------+---+----------+------+---------------+
|    Krish| 31|        10| 30000|             12|
|Sudhanshu| 30|         8| 25000|             10|
|    Sunny| 29|         4| 20000|              6|
|     Paul| 24|         3| 20000|              5|
|   Harsha| 21|         1| 15000|              3|
|  Shubham| 23|         2| 18000|              4|
+---------+---+----------+------+---------------+



#### Drop column

In [27]:
# Not an inPlace operation
df_pyspark = df_pyspark.drop("Exp after 2 yrs")

In [28]:
df_pyspark.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



### Rename columns

In [29]:
# Not an inPlace operation
df_pyspark.withColumnRenamed("Name", "Updated Name").show()

+------------+---+----------+------+
|Updated Name|age|Experience|Salary|
+------------+---+----------+------+
|       Krish| 31|        10| 30000|
|   Sudhanshu| 30|         8| 25000|
|       Sunny| 29|         4| 20000|
|        Paul| 24|         3| 20000|
|      Harsha| 21|         1| 15000|
|     Shubham| 23|         2| 18000|
+------------+---+----------+------+



- Dropping columns
- Dropping Rows
- Various parameters in dropping functionalities
- Handling missing values by mean