In [1]:
#Installation of PySpark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [2]:
#Checking the PySpark installation
import pyspark

In [3]:
#Reading a data table with Pandas library
import pandas as pd
pd.read_csv('/content/drive/MyDrive/Pyspark-With-Python-main/test1.csv')

Unnamed: 0,Name,age,Experience,Salary
0,Krish,31,10,30000
1,Sudhanshu,30,8,25000
2,Sunny,29,4,20000
3,Paul,24,3,20000
4,Harsha,21,1,15000
5,Shubham,23,2,18000


In [4]:
#Creating a PySpark session
from pyspark.sql import SparkSession

In [5]:
#Creating a variable
spark = SparkSession.builder.appName('Practise').getOrCreate()

In [6]:
#Starting the PySpark session
spark

In [7]:
#Reading a data table with Spark
df_pyspark = spark.read.csv('/content/drive/MyDrive/Pyspark-With-Python-main/test1.csv')


In [8]:
#What is in df_pyspark?
df_pyspark.show()

+---------+---+----------+------+
|      _c0|_c1|       _c2|   _c3|
+---------+---+----------+------+
|     Name|age|Experience|Salary|
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [9]:
#Another way to see the table content
spark.read.option('header','true').csv('/content/drive/MyDrive/Pyspark-With-Python-main/test1.csv').show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [10]:
df_pyspark2 = spark.read.option('header','true').csv('/content/drive/MyDrive/Pyspark-With-Python-main/test1.csv')

In [11]:
#What is the type of df_pyspark2?
type(df_pyspark2)

pyspark.sql.dataframe.DataFrame

In [12]:
#Reading the three first lines of df_pyspark2
df_pyspark.head(3)

[Row(_c0='Name', _c1='age', _c2='Experience', _c3='Salary'),
 Row(_c0='Krish', _c1='31', _c2='10', _c3='30000'),
 Row(_c0='Sudhanshu', _c1='30', _c2='8', _c3='25000')]

In [13]:
#Getting information of columns table
df_pyspark2.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: string (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Salary: string (nullable = true)



In [14]:
df_pyspark2.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [15]:
#Including the option inferSchema
df_pyspark2 = spark.read.option('header','true').csv('/content/drive/MyDrive/Pyspark-With-Python-main/test1.csv',inferSchema=True)

In [16]:
df_pyspark2.printSchema()

root
 |-- Name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- Experience: integer (nullable = true)
 |-- Salary: integer (nullable = true)



In [17]:
#Including the options about schema and header
df_pyspark2 = spark.read.csv('/content/drive/MyDrive/Pyspark-With-Python-main/test1.csv',inferSchema=True,header=True)

In [18]:
df_pyspark2.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [19]:
#Checking the data type. DataFrame is a kind of data structure.
type(df_pyspark2)

pyspark.sql.dataframe.DataFrame

In [20]:
#Columns names
df_pyspark2.columns

['Name', 'age', 'Experience', 'Salary']

In [21]:
#Selecting one column
df_pyspark2.select('Name').show()


+---------+
|     Name|
+---------+
|    Krish|
|Sudhanshu|
|    Sunny|
|     Paul|
|   Harsha|
|  Shubham|
+---------+



In [22]:
#Selecting multiple columns
df_pyspark2.select(['Name', 'Experience']).show()

+---------+----------+
|     Name|Experience|
+---------+----------+
|    Krish|        10|
|Sudhanshu|         8|
|    Sunny|         4|
|     Paul|         3|
|   Harsha|         1|
|  Shubham|         2|
+---------+----------+



In [23]:
# What is 'Name'?
df_pyspark2['Name']

Column<'Name'>

In [24]:
#What is the data types columns?
df_pyspark2.dtypes

[('Name', 'string'), ('age', 'int'), ('Experience', 'int'), ('Salary', 'int')]

In [25]:
#Describe options
df_pyspark2.describe().show()

+-------+------+------------------+-----------------+------------------+
|summary|  Name|               age|       Experience|            Salary|
+-------+------+------------------+-----------------+------------------+
|  count|     6|                 6|                6|                 6|
|   mean|  null|26.333333333333332|4.666666666666667|21333.333333333332|
| stddev|  null| 4.179314138308661|3.559026084010437| 5354.126134736337|
|    min|Harsha|                21|                1|             15000|
|    max| Sunny|                31|               10|             30000|
+-------+------+------------------+-----------------+------------------+



In [26]:
#Adding column in data frame
df_pyspark2.withColumn('Experience after 2 years', df_pyspark2['Experience']+2).show()

+---------+---+----------+------+------------------------+
|     Name|age|Experience|Salary|Experience after 2 years|
+---------+---+----------+------+------------------------+
|    Krish| 31|        10| 30000|                      12|
|Sudhanshu| 30|         8| 25000|                      10|
|    Sunny| 29|         4| 20000|                       6|
|     Paul| 24|         3| 20000|                       5|
|   Harsha| 21|         1| 15000|                       3|
|  Shubham| 23|         2| 18000|                       4|
+---------+---+----------+------+------------------------+



In [27]:
df_pyspark3 = df_pyspark2.withColumn('Experience after 2 years', df_pyspark2['Experience']+2)

In [28]:
df_pyspark3.show()

+---------+---+----------+------+------------------------+
|     Name|age|Experience|Salary|Experience after 2 years|
+---------+---+----------+------+------------------------+
|    Krish| 31|        10| 30000|                      12|
|Sudhanshu| 30|         8| 25000|                      10|
|    Sunny| 29|         4| 20000|                       6|
|     Paul| 24|         3| 20000|                       5|
|   Harsha| 21|         1| 15000|                       3|
|  Shubham| 23|         2| 18000|                       4|
+---------+---+----------+------+------------------------+



In [29]:
#Drop the columns
df_pyspark3.drop('Experience after 2 years').show()


+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [30]:
df_pyspark3.show(0)

+----+---+----------+------+------------------------+
|Name|age|Experience|Salary|Experience after 2 years|
+----+---+----------+------+------------------------+
+----+---+----------+------+------------------------+
only showing top 0 rows



In [31]:
#Rename the columns
df_pyspark2.withColumnRenamed('Name','New Name').show()


+---------+---+----------+------+
| New Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [32]:
df_pyspark4 = spark.read.csv('/content/drive/MyDrive/Pyspark-With-Python-main/test2.csv', header=True,inferSchema=True)

In [33]:
df_pyspark4.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [34]:
#Dropping lines with null values
df_pyspark4.na.drop().show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [35]:
#Dropping lines with the argument how
#how == all
#In this case the line was not dropped because there is one not null value. The argument how="all" is not satifed.
df_pyspark4.na.drop(how="all").show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



In [36]:
#Dropping lines with the argument how
#how == any
#Now, as how="any", the lines with at least one null value were dropped.
df_pyspark4.na.drop(how="any").show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [37]:
#Dropping lines with the argument threshold
#Here the last line was dropped because the thresold parameter was setted to 2,
#it means at least two non null values should be present.
df_pyspark4.na.drop(how="any", thresh=2).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
+---------+----+----------+------+



In [38]:
df_pyspark4.na.drop(thresh=2).show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
+---------+----+----------+------+



In [39]:
#Dropping lines with the argument subset
#In this case, lines with at least one null value in column "Experience" were deleted.
df_pyspark4.na.drop(how="any", subset=['Experience']).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
|     null| 34|        10| 38000|
+---------+---+----------+------+



In [40]:
#Filling missing values
#Here only null strings values were filled with "Missing values".
df_pyspark4.na.fill('Missing values').show()

+--------------+----+----------+------+
|          Name| age|Experience|Salary|
+--------------+----+----------+------+
|         Krish|  31|        10| 30000|
|     Sudhanshu|  30|         8| 25000|
|         Sunny|  29|         4| 20000|
|          Paul|  24|         3| 20000|
|        Harsha|  21|         1| 15000|
|       Shubham|  23|         2| 18000|
|        Mahesh|null|      null| 40000|
|Missing values|  34|        10| 38000|
|Missing values|  36|      null|  null|
+--------------+----+----------+------+



In [41]:
#Filling missing values
#Here only null integers values were filled with 123456.
df_pyspark4.na.fill(123456).show()

+---------+------+----------+------+
|     Name|   age|Experience|Salary|
+---------+------+----------+------+
|    Krish|    31|        10| 30000|
|Sudhanshu|    30|         8| 25000|
|    Sunny|    29|         4| 20000|
|     Paul|    24|         3| 20000|
|   Harsha|    21|         1| 15000|
|  Shubham|    23|         2| 18000|
|   Mahesh|123456|    123456| 40000|
|     null|    34|        10| 38000|
|     null|    36|    123456|123456|
+---------+------+----------+------+



In [42]:
#Filling missing values
#Only the especified columns were filled with 789456.
df_pyspark4.na.fill(987456, ['Experience','age']).show()

+---------+------+----------+------+
|     Name|   age|Experience|Salary|
+---------+------+----------+------+
|    Krish|    31|        10| 30000|
|Sudhanshu|    30|         8| 25000|
|    Sunny|    29|         4| 20000|
|     Paul|    24|         3| 20000|
|   Harsha|    21|         1| 15000|
|  Shubham|    23|         2| 18000|
|   Mahesh|987456|    987456| 40000|
|     null|    34|        10| 38000|
|     null|    36|    987456|  null|
+---------+------+----------+------+



In [43]:
df_pyspark4.show()

+---------+----+----------+------+
|     Name| age|Experience|Salary|
+---------+----+----------+------+
|    Krish|  31|        10| 30000|
|Sudhanshu|  30|         8| 25000|
|    Sunny|  29|         4| 20000|
|     Paul|  24|         3| 20000|
|   Harsha|  21|         1| 15000|
|  Shubham|  23|         2| 18000|
|   Mahesh|null|      null| 40000|
|     null|  34|        10| 38000|
|     null|  36|      null|  null|
+---------+----+----------+------+



How to filling null values with average behaviour of the column?

In [44]:
#by avearge
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age', 'Experience', 'Salary'], 
    outputCols=["{}_imputed".format(c) for c in ['age', 'Experience', 'Salary']]
    ).setStrategy("mean")

In [45]:
imputer.fit(df_pyspark4).transform(df_pyspark4).show()

+---------+----+----------+------+-----------+------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experience_imputed|Salary_imputed|
+---------+----+----------+------+-----------+------------------+--------------+
|    Krish|  31|        10| 30000|         31|                10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                 8|         25000|
|    Sunny|  29|         4| 20000|         29|                 4|         20000|
|     Paul|  24|         3| 20000|         24|                 3|         20000|
|   Harsha|  21|         1| 15000|         21|                 1|         15000|
|  Shubham|  23|         2| 18000|         23|                 2|         18000|
|   Mahesh|null|      null| 40000|         28|                 5|         40000|
|     null|  34|        10| 38000|         34|                10|         38000|
|     null|  36|      null|  null|         36|                 5|         25750|
+---------+----+----------+-

In [46]:
#by median
from pyspark.ml.feature import Imputer

imputer = Imputer(
    inputCols=['age', 'Experience', 'Salary'],
    outputCols=["{}_imputed" .format(c) for c in ['age', 'Experieence', 'Salary']]
    ).setStrategy("median")

Filter Operaton

In [47]:
df_pyspark2.show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [48]:
# Salray of the people less than ou equal to 20000
df_pyspark2.filter("Salary<=20000").show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [49]:
df_pyspark2.filter("Salary<=20000").select(['Name','age']).show()

+-------+---+
|   Name|age|
+-------+---+
|  Sunny| 29|
|   Paul| 24|
| Harsha| 21|
|Shubham| 23|
+-------+---+



In [50]:
#& End
df_pyspark2.filter((df_pyspark2['Salary']<=20000) &
                   (df_pyspark2['Salary']>=15000)).show()

+-------+---+----------+------+
|   Name|age|Experience|Salary|
+-------+---+----------+------+
|  Sunny| 29|         4| 20000|
|   Paul| 24|         3| 20000|
| Harsha| 21|         1| 15000|
|Shubham| 23|         2| 18000|
+-------+---+----------+------+



In [51]:
#| Or
df_pyspark2.filter((df_pyspark2['Salary']<=20000) |
                   (df_pyspark2['Salary']>=15000)).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
|    Sunny| 29|         4| 20000|
|     Paul| 24|         3| 20000|
|   Harsha| 21|         1| 15000|
|  Shubham| 23|         2| 18000|
+---------+---+----------+------+



In [52]:
#What are the out of conditions options?
df_pyspark2.filter(~(df_pyspark2['Salary']<=20000)).show()

+---------+---+----------+------+
|     Name|age|Experience|Salary|
+---------+---+----------+------+
|    Krish| 31|        10| 30000|
|Sudhanshu| 30|         8| 25000|
+---------+---+----------+------+



In [53]:
df_pyspark5 = spark.read.csv('/content/drive/MyDrive/Pyspark-With-Python-main/test3.csv', header=True, inferSchema=True)
df_pyspark5.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [54]:
#Checking the Schema
df_pyspark5.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- salary: integer (nullable = true)



In [55]:
imputer.fit(df_pyspark4).transform(df_pyspark4).show()

+---------+----+----------+------+-----------+-------------------+--------------+
|     Name| age|Experience|Salary|age_imputed|Experieence_imputed|Salary_imputed|
+---------+----+----------+------+-----------+-------------------+--------------+
|    Krish|  31|        10| 30000|         31|                 10|         30000|
|Sudhanshu|  30|         8| 25000|         30|                  8|         25000|
|    Sunny|  29|         4| 20000|         29|                  4|         20000|
|     Paul|  24|         3| 20000|         24|                  3|         20000|
|   Harsha|  21|         1| 15000|         21|                  1|         15000|
|  Shubham|  23|         2| 18000|         23|                  2|         18000|
|   Mahesh|null|      null| 40000|         29|                  4|         40000|
|     null|  34|        10| 38000|         34|                 10|         38000|
|     null|  36|      null|  null|         36|                  4|         20000|
+---------+----+

In [56]:
#Grouping by name and total salary

In [57]:
df_pyspark5.groupBy('Name').sum('Salary').show()

+---------+-----------+
|     Name|sum(Salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [58]:
# Grouping by departments and maximum salary
df_pyspark5.groupBy('Departments').sum().show()

+------------+-----------+
| Departments|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



In [59]:
#Grouping by name and maximum salary
df_pyspark5.groupBy('Name').max().show()

+---------+-----------+
|     Name|max(salary)|
+---------+-----------+
|Sudhanshu|      20000|
|    Sunny|      10000|
|    Krish|      10000|
|   Mahesh|       4000|
+---------+-----------+



In [60]:
#Average salary per departament
df_pyspark5.groupBy('Departments').mean().show()

+------------+-----------+
| Departments|avg(salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



In [61]:
How many workers per departments?
df_pyspark5.groupBy('Departments').count().show()

Object `departments` not found.
+------------+-----+
| Departments|count|
+------------+-----+
|         IOT|    2|
|    Big Data|    4|
|Data Science|    4|
+------------+-----+



In [62]:
#Salary sum
df_pyspark5.agg({'Salary':'sum'}).show()

+-----------+
|sum(Salary)|
+-----------+
|      73000|
+-----------+

