<a href="https://colab.research.google.com/github/nakibworkspace/PySpark-Tutorial/blob/main/PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**PySpark Basic Introduction**

In [None]:
import pyspark

In [None]:
from pyspark.sql import SparkSession

In [None]:
#Creating a spark session
spark= SparkSession.builder \
        .appName('Practice') \
        .getOrCreate()

In [None]:
spark

Here as we are wroking on local machine, there is only one cluster. But in cloud we can make multiple clusters.

In [None]:
# Read a dataset wrt spark
df_pyspark = spark.read.csv("/content/test1 - Sheet1.csv")

In [None]:
df_pyspark

DataFrame[_c0: string, _c1: string]

In [None]:
#see the entire dataset
df_pyspark.show()

+------+---+
|   _c0|_c1|
+------+---+
|  Name|age|
|   raj| 26|
|  ibna| 27|
|raihan| 34|
|  sami| 56|
|ridwan| 22|
+------+---+



In [None]:
spark.read.option('header','true').csv('/content/test1 - Sheet1.csv').show()

+------+---+
|  Name|age|
+------+---+
|   raj| 26|
|  ibna| 27|
|raihan| 34|
|  sami| 56|
|ridwan| 22|
+------+---+



In [None]:
df_pyspark = spark.read.option('header','true').csv('/content/test1 - Sheet1.csv')

In [None]:
type(df_pyspark)

In [None]:
df_pyspark.head(3)

[Row(Name='raj', age='26'),
 Row(Name='ibna', age='27'),
 Row(Name='raihan', age='34')]

In [None]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)



**2nd Session** \
PySpark DataFrame \
Reading Dataset \
Checking the DataTypes of the column (Schema) \
Selecting columns and indexing \
Check describe option similar to Pandas \
Dropping columns \
Rename columns


In the last session code, we saw the age is shown as string where it was clearly an integer value. By default the pyspark thinks of every values as string, to change it .csv needs another attribute within it.

In [None]:
df_pyspark=spark.read.option('header','true').csv('/content/test1 - Sheet1.csv',inferSchema=True)

In [None]:
df_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)



In [None]:
type(df_pyspark)

In [None]:
df_pyspark.columns

['Name', 'age']

In [None]:
df_pyspark.head(3)

[Row(Name='raj', age=26), Row(Name='ibna', age=27), Row(Name='raihan', age=34)]

In [None]:
#To select one column
df_pyspark.select('Name').show()

+------+
|  Name|
+------+
|   raj|
|  ibna|
|raihan|
|  sami|
|ridwan|
+------+



In [None]:
df_pyspark.dtypes

[('Name', 'string'), ('age', 'int')]

In [None]:
df_pyspark.describe()

DataFrame[summary: string, Name: string, age: string]

In [None]:
df_pyspark.describe().show()

+-------+----+------------------+
|summary|Name|               age|
+-------+----+------------------+
|  count|   5|                 5|
|   mean|NULL|              33.0|
| stddev|NULL|13.564659966250536|
|    min|ibna|                22|
|    max|sami|                56|
+-------+----+------------------+



In [None]:
##Adding columns in dataframe
df_pyspark=df_pyspark.withColumn('Age after 2 years',df_pyspark['Age']+2)

In [None]:
df_pyspark.show()

+------+---+-----------------+
|  Name|age|Age after 2 years|
+------+---+-----------------+
|   raj| 26|               28|
|  ibna| 27|               29|
|raihan| 34|               36|
|  sami| 56|               58|
|ridwan| 22|               24|
+------+---+-----------------+



In [None]:
## Dropping columns
df_pyspark=df_pyspark.drop('Age after 2 years')

In [None]:
df_pyspark.show()

+------+---+
|  Name|age|
+------+---+
|   raj| 26|
|  ibna| 27|
|raihan| 34|
|  sami| 56|
|ridwan| 22|
+------+---+



In [None]:
## Rename the column
df_pyspark.withColumnRenamed('Name','New Name')

DataFrame[New Name: string, age: int]

In [None]:
df_pyspark.withColumnRenamed('Name','New Name').show()

+--------+---+
|New Name|age|
+--------+---+
|     raj| 26|
|    ibna| 27|
|  raihan| 34|
|    sami| 56|
|  ridwan| 22|
+--------+---+



**PySpark Handling Missing Values** \
Dropping cols \
Dropping rows \
various params in dropping functionality \
handling missing values by mean, median, mode

In [None]:
df2_pyspark=spark.read.csv("/content/test1 - Sheet1-2.csv",header=True,inferSchema=True)

In [None]:
df2_pyspark.show()

+------+----+-----------+------+
|  Name| age|experience |salary|
+------+----+-----------+------+
|   raj|  26|         10| 30000|
|  ibna|  27|          8| 25000|
|raihan|  34|          4| 20000|
|  sami|  56|          3| 20000|
|ridwan|  22|          1| 15000|
| nafis|  23|          2| 18000|
|  obhi|NULL|       NULL| 40000|
|  NULL|  34|         10| 38000|
|  NULL|  46|       NULL|  NULL|
+------+----+-----------+------+



In [None]:
##dropping the cols
df2_pyspark.drop('Name')

DataFrame[age: int, experience : int, salary: int]

In [None]:
df2_pyspark.show()

+------+----+-----------+------+
|  Name| age|experience |salary|
+------+----+-----------+------+
|   raj|  26|         10| 30000|
|  ibna|  27|          8| 25000|
|raihan|  34|          4| 20000|
|  sami|  56|          3| 20000|
|ridwan|  22|          1| 15000|
| nafis|  23|          2| 18000|
|  obhi|NULL|       NULL| 40000|
|  NULL|  34|         10| 38000|
|  NULL|  46|       NULL|  NULL|
+------+----+-----------+------+



In [None]:
df2_pyspark.na.drop().show()

+------+---+-----------+------+
|  Name|age|experience |salary|
+------+---+-----------+------+
|   raj| 26|         10| 30000|
|  ibna| 27|          8| 25000|
|raihan| 34|          4| 20000|
|  sami| 56|          3| 20000|
|ridwan| 22|          1| 15000|
| nafis| 23|          2| 18000|
+------+---+-----------+------+



in drop() there are some different params

In [None]:
##any==how
df2_pyspark.na.drop(how="any").show() #drops only if all the cols are null
##how=all
df2_pyspark.na.drop(how="all").show() #drops row if one col is null

+------+---+-----------+------+
|  Name|age|experience |salary|
+------+---+-----------+------+
|   raj| 26|         10| 30000|
|  ibna| 27|          8| 25000|
|raihan| 34|          4| 20000|
|  sami| 56|          3| 20000|
|ridwan| 22|          1| 15000|
| nafis| 23|          2| 18000|
+------+---+-----------+------+

+------+----+-----------+------+
|  Name| age|experience |salary|
+------+----+-----------+------+
|   raj|  26|         10| 30000|
|  ibna|  27|          8| 25000|
|raihan|  34|          4| 20000|
|  sami|  56|          3| 20000|
|ridwan|  22|          1| 15000|
| nafis|  23|          2| 18000|
|  obhi|NULL|       NULL| 40000|
|  NULL|  34|         10| 38000|
|  NULL|  46|       NULL|  NULL|
+------+----+-----------+------+



In [None]:
##threshold
df2_pyspark.na.drop(how="any", thresh=2).show() #atleast row has to have two not null value

+------+----+-----------+------+
|  Name| age|experience |salary|
+------+----+-----------+------+
|   raj|  26|         10| 30000|
|  ibna|  27|          8| 25000|
|raihan|  34|          4| 20000|
|  sami|  56|          3| 20000|
|ridwan|  22|          1| 15000|
| nafis|  23|          2| 18000|
|  obhi|NULL|       NULL| 40000|
|  NULL|  34|         10| 38000|
+------+----+-----------+------+



In [None]:
df2_pyspark.na.drop(how="any", subset=['experience ']).show() #drops only if experience is null

+------+---+-----------+------+
|  Name|age|experience |salary|
+------+---+-----------+------+
|   raj| 26|         10| 30000|
|  ibna| 27|          8| 25000|
|raihan| 34|          4| 20000|
|  sami| 56|          3| 20000|
|ridwan| 22|          1| 15000|
| nafis| 23|          2| 18000|
|  NULL| 34|         10| 38000|
+------+---+-----------+------+



In [None]:
df2_pyspark.na.drop(how="any", subset=['age']).show()

+------+---+-----------+------+
|  Name|age|experience |salary|
+------+---+-----------+------+
|   raj| 26|         10| 30000|
|  ibna| 27|          8| 25000|
|raihan| 34|          4| 20000|
|  sami| 56|          3| 20000|
|ridwan| 22|          1| 15000|
| nafis| 23|          2| 18000|
|  NULL| 34|         10| 38000|
|  NULL| 46|       NULL|  NULL|
+------+---+-----------+------+



In [None]:
##Filling the missing value
df2_pyspark.na.fill('Missing Values').show()

+--------------+----+-----------+------+
|          Name| age|experience |salary|
+--------------+----+-----------+------+
|           raj|  26|         10| 30000|
|          ibna|  27|          8| 25000|
|        raihan|  34|          4| 20000|
|          sami|  56|          3| 20000|
|        ridwan|  22|          1| 15000|
|         nafis|  23|          2| 18000|
|          obhi|NULL|       NULL| 40000|
|Missing Values|  34|         10| 38000|
|Missing Values|  46|       NULL|  NULL|
+--------------+----+-----------+------+



Here the Null only replaced by missing values cause the missing values changes can be done only in strings.


In [None]:
df2_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience : integer (nullable = true)
 |-- salary: integer (nullable = true)



In [None]:
#Gotta cast the int cols to string first
from pyspark.sql.functions import col
df2_pyspark= df2_pyspark.withColumn("age",col("age").cast("string")) \
                        .withColumn("experience ",col("experience ").cast("string")) \
                        .withColumn("Salary",col("Salary").cast("string"))

In [None]:
df2_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- experience : string (nullable = true)
 |-- Salary: string (nullable = true)



In [None]:
df2_pyspark.na.fill('Missing Values').show()

+--------------+--------------+--------------+--------------+
|          Name|           age|   experience |        Salary|
+--------------+--------------+--------------+--------------+
|           raj|            26|            10|         30000|
|          ibna|            27|             8|         25000|
|        raihan|            34|             4|         20000|
|          sami|            56|             3|         20000|
|        ridwan|            22|             1|         15000|
|         nafis|            23|             2|         18000|
|          obhi|Missing Values|Missing Values|         40000|
|Missing Values|            34|            10|         38000|
|Missing Values|            46|Missing Values|Missing Values|
+--------------+--------------+--------------+--------------+



In [None]:
df3_pyspark=spark.read.csv("/content/test1 - Sheet1-2.csv",header=True,inferSchema=True)

In [None]:
df3_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- experience : integer (nullable = true)
 |-- salary: integer (nullable = true)



To fill the null values with mean, median and mode we are going to use Imputer function.

In [None]:
from pyspark.ml.feature import Imputer

imputer= Imputer(
    inputCols=['age','experience ', 'salary'],
    outputCols=["{}_imputed".format(c) for c in ['age','experience ','salary']]
).setStrategy("mean")

In [None]:
#add imputation cols to df
imputer.fit(df3_pyspark).transform(df3_pyspark).show()

+------+----+-----------+------+-----------+-------------------+--------------+
|  Name| age|experience |salary|age_imputed|experience _imputed|salary_imputed|
+------+----+-----------+------+-----------+-------------------+--------------+
|   raj|  26|         10| 30000|         26|                 10|         30000|
|  ibna|  27|          8| 25000|         27|                  8|         25000|
|raihan|  34|          4| 20000|         34|                  4|         20000|
|  sami|  56|          3| 20000|         56|                  3|         20000|
|ridwan|  22|          1| 15000|         22|                  1|         15000|
| nafis|  23|          2| 18000|         23|                  2|         18000|
|  obhi|NULL|       NULL| 40000|         33|                  5|         40000|
|  NULL|  34|         10| 38000|         34|                 10|         38000|
|  NULL|  46|       NULL|  NULL|         46|                  5|         25750|
+------+----+-----------+------+--------

In [None]:
from pyspark.ml.feature import Imputer

imputer= Imputer(
    inputCols=['age','experience ','salary'],
    outputCols=["{}_imputet".format(c) for c in ['age','experience ','salary']]
).setStrategy("median")

In [None]:
imputer.fit(df3_pyspark).transform(df3_pyspark).show()

+------+----+-----------+------+-----------+-------------------+--------------+
|  Name| age|experience |salary|age_imputet|experience _imputet|salary_imputet|
+------+----+-----------+------+-----------+-------------------+--------------+
|   raj|  26|         10| 30000|         26|                 10|         30000|
|  ibna|  27|          8| 25000|         27|                  8|         25000|
|raihan|  34|          4| 20000|         34|                  4|         20000|
|  sami|  56|          3| 20000|         56|                  3|         20000|
|ridwan|  22|          1| 15000|         22|                  1|         15000|
| nafis|  23|          2| 18000|         23|                  2|         18000|
|  obhi|NULL|       NULL| 40000|         27|                  4|         40000|
|  NULL|  34|         10| 38000|         34|                 10|         38000|
|  NULL|  46|       NULL|  NULL|         46|                  4|         20000|
+------+----+-----------+------+--------

In [None]:
from pyspark.ml.feature import Imputer

imputer= Imputer(
    inputCols=['age','experience ','salary'],
    outputCols=["{}_imputet".format(c) for c in ['age','experience ','salary']]
).setStrategy("mode")

In [None]:
imputer.fit(df3_pyspark).transform(df3_pyspark).show()

+------+----+-----------+------+-----------+-------------------+--------------+
|  Name| age|experience |salary|age_imputet|experience _imputet|salary_imputet|
+------+----+-----------+------+-----------+-------------------+--------------+
|   raj|  26|         10| 30000|         26|                 10|         30000|
|  ibna|  27|          8| 25000|         27|                  8|         25000|
|raihan|  34|          4| 20000|         34|                  4|         20000|
|  sami|  56|          3| 20000|         56|                  3|         20000|
|ridwan|  22|          1| 15000|         22|                  1|         15000|
| nafis|  23|          2| 18000|         23|                  2|         18000|
|  obhi|NULL|       NULL| 40000|         34|                 10|         40000|
|  NULL|  34|         10| 38000|         34|                 10|         38000|
|  NULL|  46|       NULL|  NULL|         46|                 10|         20000|
+------+----+-----------+------+--------

**PySpark Dataframes** \
filter operation \
&,|,== \
~

**Filter Operations**

In [None]:
### salary of people less than or equal to 20000
df2_pyspark.filter("salary<=20000").show()

+------+---+-----------+------+
|  Name|age|experience |Salary|
+------+---+-----------+------+
|raihan| 34|          4| 20000|
|  sami| 56|          3| 20000|
|ridwan| 22|          1| 15000|
| nafis| 23|          2| 18000|
+------+---+-----------+------+



In [None]:
df2_pyspark.filter("salary<=20000").select(['name','age']).show()

+------+---+
|  name|age|
+------+---+
|raihan| 34|
|  sami| 56|
|ridwan| 22|
| nafis| 23|
+------+---+



In [None]:
#two conditions
df2_pyspark.filter((df2_pyspark['salary'] <= 20000) & (df2_pyspark['salary'] >= 15000)).show()

+------+---+-----------+------+
|  Name|age|experience |Salary|
+------+---+-----------+------+
|raihan| 34|          4| 20000|
|  sami| 56|          3| 20000|
|ridwan| 22|          1| 15000|
| nafis| 23|          2| 18000|
+------+---+-----------+------+



In [None]:
df2_pyspark.filter((df2_pyspark['salary'] <= 20000) | (df2_pyspark['salary'] >= 15000)).show()

+------+----+-----------+------+
|  Name| age|experience |Salary|
+------+----+-----------+------+
|   raj|  26|         10| 30000|
|  ibna|  27|          8| 25000|
|raihan|  34|          4| 20000|
|  sami|  56|          3| 20000|
|ridwan|  22|          1| 15000|
| nafis|  23|          2| 18000|
|  obhi|NULL|       NULL| 40000|
|  NULL|  34|         10| 38000|
+------+----+-----------+------+



In [None]:
df2_pyspark.filter(df2_pyspark['salary']<=20000).show()

+------+---+-----------+------+
|  Name|age|experience |Salary|
+------+---+-----------+------+
|raihan| 34|          4| 20000|
|  sami| 56|          3| 20000|
|ridwan| 22|          1| 15000|
| nafis| 23|          2| 18000|
+------+---+-----------+------+



**PySpark GroupBy And Aggregate Function**

In [None]:
df4_pyspark=spark.read.csv('/content/Sheet2 - Sheet1-2.csv', header=True, inferSchema=True)

In [None]:
df4_pyspark.show()

+------+-------------+------+
|  Name|   Department|Salary|
+------+-------------+------+
| Nakib|Data Science | 10000|
| Nakib|          IOT|  5000|
|  Ibna|     Big Data|  4000|
| Nakib|     Big Data|  4000|
|  Ibna|Data Science |  3000|
|Raihan|Data Science | 20000|
|Raihan|          IOT| 10000|
|Raihan|     Big Data|  5000|
|  Alif|Data Science | 10000|
|  Alif|     Big Data| 25000|
+------+-------------+------+



In [None]:
df4_pyspark.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Department: string (nullable = true)
 |-- Salary: integer (nullable = true)



In [None]:
#groupby
#maximum salary of all people
df4_pyspark.groupBy('Name').sum().show()

+------+-----------+
|  Name|sum(Salary)|
+------+-----------+
|Raihan|      35000|
|  Alif|      35000|
|  Ibna|       7000|
| Nakib|      19000|
+------+-----------+



In [None]:
#Max salary each person getting
df4_pyspark.groupBy('Name').max().show()

+------+-----------+
|  Name|max(Salary)|
+------+-----------+
|Raihan|      20000|
|  Alif|      25000|
|  Ibna|       4000|
| Nakib|      10000|
+------+-----------+



In [None]:
#Min salary
df4_pyspark.groupBy('Name').min().show()

+------+-----------+
|  Name|min(Salary)|
+------+-----------+
|Raihan|       5000|
|  Alif|      10000|
|  Ibna|       3000|
| Nakib|       4000|
+------+-----------+



In [None]:
df4_pyspark.show()

+------+-------------+------+
|  Name|   Department|Salary|
+------+-------------+------+
| Nakib|Data Science | 10000|
| Nakib|          IOT|  5000|
|  Ibna|     Big Data|  4000|
| Nakib|     Big Data|  4000|
|  Ibna|Data Science |  3000|
|Raihan|Data Science | 20000|
|Raihan|          IOT| 10000|
|Raihan|     Big Data|  5000|
|  Alif|Data Science | 10000|
|  Alif|     Big Data| 25000|
+------+-------------+------+



In [None]:
### groupby department which gives max salary
df4_pyspark.groupBy('Department').sum().show()

+-------------+-----------+
|   Department|sum(Salary)|
+-------------+-----------+
|          IOT|      15000|
|     Big Data|      38000|
|Data Science |      43000|
+-------------+-----------+



In [None]:
df4_pyspark.groupBy('Department').mean().show()

+-------------+-----------+
|   Department|avg(Salary)|
+-------------+-----------+
|          IOT|     7500.0|
|     Big Data|     9500.0|
|Data Science |    10750.0|
+-------------+-----------+



In [None]:
df4_pyspark.groupBy('Department').count().show()

+-------------+-----+
|   Department|count|
+-------------+-----+
|          IOT|    2|
|     Big Data|    4|
|Data Science |    4|
+-------------+-----+



In [None]:
df4_pyspark.agg({'Salary': 'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|      96000|
+-----------+

