# pyspark tutorial




In [None]:
!pip install pyspark



In [None]:
import pyspark

In [None]:
import pandas as pd
df = pd.read_csv('/content/drive/MyDrive/Data/test.csv')

In [None]:
df

Unnamed: 0,Name,Age
0,Shubham,24
1,Deep,22
2,Shinde,80
3,Patil,78


In [None]:
df['Name ']

Unnamed: 0,Name
0,Shubham
1,Deep
2,Shinde
3,Patil


In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('Practise').getOrCreate()


In [None]:
spark

In [None]:
df_pyspark = spark.read.csv('/content/drive/MyDrive/Data/test.csv')

In [None]:
df_pyspark.show()

+-------+---+
|    _c0|_c1|
+-------+---+
|  Name |Age|
|Shubham| 24|
|   Deep| 22|
| Shinde| 80|
|  Patil| 78|
+-------+---+



In [None]:
#to get rid of c0 and c1 from above table
df_pyspark = spark.read.csv('/content/drive/MyDrive/Data/test.csv', header =True , inferSchema=True)
df_pyspark.show()

+-------+---+
|  Name |Age|
+-------+---+
|Shubham| 24|
|   Deep| 22|
| Shinde| 80|
|  Patil| 78|
+-------+---+



In [None]:
df_pyspark.printSchema()

root
 |-- Name : string (nullable = true)
 |-- Age: integer (nullable = true)



In [None]:
type(df_pyspark)

In [None]:

df_pyspark.columns

['Name ', 'Age']

In [None]:
df_pyspark.head(3) # getting data in list format

[Row(Name ='Shubham', Age=24),
 Row(Name ='Deep', Age=22),
 Row(Name ='Shinde', Age=80)]

In [None]:
# to see entire particular column in df format
df_pyspark.select('Name ').show()

+-------+
|  Name |
+-------+
|Shubham|
|   Deep|
| Shinde|
|  Patil|
+-------+



In [None]:
#to get multiple columns and see the rows
df_pyspark.select(['Name ','age']).show()

+-------+---+
|  Name |age|
+-------+---+
|Shubham| 24|
|   Deep| 22|
| Shinde| 80|
|  Patil| 78|
+-------+---+



In [None]:
df_pyspark['Name '] # dtype : column

Column<'Name '>

#checking data types of each column

In [None]:
df_pyspark.dtypes

[('Name ', 'string'), ('Age', 'int')]

# check describe functionality

In [None]:
df_pyspark.describe().show()

+-------+-------+-----------------+
|summary|  Name |              Age|
+-------+-------+-----------------+
|  count|      4|                4|
|   mean|   NULL|             51.0|
| stddev|   NULL|32.35222815613581|
|    min|   Deep|               22|
|    max|Shubham|               80|
+-------+-------+-----------------+



# adding columns in dataframe

In [None]:
df_pyspark = df_pyspark.withColumn('Experience',df_pyspark['Age']+3)

# Removing columns in dataframe

In [None]:
df_pyspark.show()

+-------+---+----------+
|  Name |Age|Experience|
+-------+---+----------+
|Shubham| 24|        27|
|   Deep| 22|        25|
| Shinde| 80|        83|
|  Patil| 78|        81|
+-------+---+----------+



In [None]:
df_pyspark = df_pyspark.drop('Experience')

In [None]:
df_pyspark.show()

+-------+---+
|  Name |Age|
+-------+---+
|Shubham| 24|
|   Deep| 22|
| Shinde| 80|
|  Patil| 78|
+-------+---+



# Renaming the columns


In [None]:
df_pyspark = df_pyspark.withColumnRenamed('Age','AGE')

In [None]:
df_pyspark.show()

+-------+---+
|  Name |AGE|
+-------+---+
|Shubham| 24|
|   Deep| 22|
| Shinde| 80|
|  Patil| 78|
+-------+---+



# DAY-3 Handling missing data

## Pyspark Handling Missing Values
## Dropping Columns
## Dropping Rows
## Various Parameter In Dropping functionalities
## Handling Missing values by Mean, MEdian And Mode


In [None]:

df2_pyspark = spark.read.csv('/content/drive/MyDrive/Data/emp_sal_data.csv', header =True , inferSchema=True)

In [None]:
df2_pyspark.show()

+------+----+----+------+
|  Name| Age| Exp|Salary|
+------+----+----+------+
|  Tony|  53|  10| 80000|
| Samay|  26|   2| 30000|
| Virat|  33|   7| 72000|
|  Yuzi|  28|   3| 40000|
|Jorden|  31|   5| 55000|
|   Ved|  35|   6| 63000|
| Jacob|NULL|NULL| 75000|
|  NULL|  34|   5|  NULL|
|  NULL|  32|NULL|  NULL|
+------+----+----+------+



In [None]:
# to drop the column
df2_pyspark.drop('Name').show() # not using right now

+----+----+------+
| Age| Exp|Salary|
+----+----+------+
|  53|  10| 80000|
|  26|   2| 30000|
|  33|   7| 72000|
|  28|   3| 40000|
|  31|   5| 55000|
|  35|   6| 63000|
|NULL|NULL| 75000|
|  34|   5|  NULL|
|  32|NULL|  NULL|
+----+----+------+



In [None]:
df2_pyspark.na.drop().show()

+------+---+---+------+
|  Name|Age|Exp|Salary|
+------+---+---+------+
|  Tony| 53| 10| 80000|
| Samay| 26|  2| 30000|
| Virat| 33|  7| 72000|
|  Yuzi| 28|  3| 40000|
|Jorden| 31|  5| 55000|
|   Ved| 35|  6| 63000|
+------+---+---+------+



In [None]:
df2_pyspark.na.drop(how = "all").show() # all - all col values should be null # any - any value is null in any col it will delete it(bydefault)

+------+----+----+------+
|  Name| Age| Exp|Salary|
+------+----+----+------+
|  Tony|  53|  10| 80000|
| Samay|  26|   2| 30000|
| Virat|  33|   7| 72000|
|  Yuzi|  28|   3| 40000|
|Jorden|  31|   5| 55000|
|   Ved|  35|   6| 63000|
| Jacob|NULL|NULL| 75000|
|  NULL|  34|   5|  NULL|
|  NULL|  32|NULL|  NULL|
+------+----+----+------+



In [None]:
# thresh = takes the minimun number of non null values and if it present in the df then it will not remove the row
df2_pyspark.na.drop(how = "any", thresh = 2).show()

+------+----+----+------+
|  Name| Age| Exp|Salary|
+------+----+----+------+
|  Tony|  53|  10| 80000|
| Samay|  26|   2| 30000|
| Virat|  33|   7| 72000|
|  Yuzi|  28|   3| 40000|
|Jorden|  31|   5| 55000|
|   Ved|  35|   6| 63000|
| Jacob|NULL|NULL| 75000|
|  NULL|  34|   5|  NULL|
+------+----+----+------+



In [None]:
# subset # removes the null values vale rows from particular column
df2_pyspark.na.drop(how="any", subset=['Exp']).show()

+------+---+---+------+
|  Name|Age|Exp|Salary|
+------+---+---+------+
|  Tony| 53| 10| 80000|
| Samay| 26|  2| 30000|
| Virat| 33|  7| 72000|
|  Yuzi| 28|  3| 40000|
|Jorden| 31|  5| 55000|
|   Ved| 35|  6| 63000|
|  NULL| 34|  5|  NULL|
+------+---+---+------+



# filling the missing values


In [None]:
#filling the missing values using na.fill in all the df
df2_pyspark.na.fill('Missing Values').show()

+--------------+----+----+------+
|          Name| Age| Exp|Salary|
+--------------+----+----+------+
|          Tony|  53|  10| 80000|
|         Samay|  26|   2| 30000|
|         Virat|  33|   7| 72000|
|          Yuzi|  28|   3| 40000|
|        Jorden|  31|   5| 55000|
|           Ved|  35|   6| 63000|
|         Jacob|NULL|NULL| 75000|
|Missing Values|  34|   5|  NULL|
|Missing Values|  32|NULL|  NULL|
+--------------+----+----+------+



In [None]:
#filling the missing values using na.fill in specific column
df2_pyspark.na.fill('Missing Values',['Exp','Age']).show()

+------+----+----+------+
|  Name| Age| Exp|Salary|
+------+----+----+------+
|  Tony|  53|  10| 80000|
| Samay|  26|   2| 30000|
| Virat|  33|   7| 72000|
|  Yuzi|  28|   3| 40000|
|Jorden|  31|   5| 55000|
|   Ved|  35|   6| 63000|
| Jacob|NULL|NULL| 75000|
|  NULL|  34|   5|  NULL|
|  NULL|  32|NULL|  NULL|
+------+----+----+------+



In [None]:
# filling the null values with mean/ median / mode for specifci column
from pyspark.ml.feature import Imputer
imputer = Imputer(
inputCols=['Age', 'Exp', 'Salary'],
outputCols=["{}_imputed".format(c) for c in ['Age', 'Exp', 'Salary']]
).setStrategy("mean") # can use mean or mode or median

In [None]:
imputer.fit(df2_pyspark).transform(df2_pyspark).show()

+------+----+----+------+-----------+-----------+--------------+
|  Name| Age| Exp|Salary|Age_imputed|Exp_imputed|Salary_imputed|
+------+----+----+------+-----------+-----------+--------------+
|  Tony|  53|  10| 80000|         53|         10|         80000|
| Samay|  26|   2| 30000|         26|          2|         30000|
| Virat|  33|   7| 72000|         33|          7|         72000|
|  Yuzi|  28|   3| 40000|         28|          3|         40000|
|Jorden|  31|   5| 55000|         31|          5|         55000|
|   Ved|  35|   6| 63000|         35|          6|         63000|
| Jacob|NULL|NULL| 75000|         34|          5|         75000|
|  NULL|  34|   5|  NULL|         34|          5|         59285|
|  NULL|  32|NULL|  NULL|         32|          5|         59285|
+------+----+----+------+-----------+-----------+--------------+



# Pyspark Data frames - filter operations


In [None]:
# get the people who have salary less than 60000
df2_pyspark.filter("Salary<=60000").show()


+------+---+---+------+
|  Name|Age|Exp|Salary|
+------+---+---+------+
| Samay| 26|  2| 30000|
|  Yuzi| 28|  3| 40000|
|Jorden| 31|  5| 55000|
+------+---+---+------+



In [None]:
# SHow me only name and age columns who has salary less than 60000
df2_pyspark.filter("Salary<=60000").select(["Name","Age"]).show()

+------+---+
|  Name|Age|
+------+---+
| Samay| 26|
|  Yuzi| 28|
|Jorden| 31|
+------+---+



In [None]:
# with multiple condition
df2_pyspark.filter((df2_pyspark['Salary']>=65000) | (df2_pyspark['Age']>=35)).show()

+-----+----+----+------+
| Name| Age| Exp|Salary|
+-----+----+----+------+
| Tony|  53|  10| 80000|
|Virat|  33|   7| 72000|
|  Ved|  35|   6| 63000|
|Jacob|NULL|NULL| 75000|
+-----+----+----+------+



# Group by And aggregate functions

In [None]:
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.appName('Practice').getOrCreate

In [None]:
pyspark_3= spark.read.csv("/content/drive/MyDrive/Data/Book1.csv", header=True, inferSchema=True)

In [None]:
pyspark_3.show()

+---------+------------+------+
|     Name| Departments|salary|
+---------+------------+------+
|    Krish|Data Science| 10000|
|    Krish|         IOT|  5000|
|   Mahesh|    Big Data|  4000|
|    Krish|    Big Data|  4000|
|   Mahesh|Data Science|  3000|
|Sudhanshu|Data Science| 20000|
|Sudhanshu|         IOT| 10000|
|Sudhanshu|    Big Data|  5000|
|    Sunny|Data Science| 10000|
|    Sunny|    Big Data|  2000|
+---------+------------+------+



In [None]:
pyspark_3.printSchema()

root
 |-- Name: string (nullable = true)
 |-- Departments: string (nullable = true)
 |-- salary: integer (nullable = true)



In [None]:
#group by -- group to find the sum of salary
sum  = pyspark_3.groupBy('Name').sum().show()

+---------+-----------+
|     Name|sum(salary)|
+---------+-----------+
|Sudhanshu|      35000|
|    Sunny|      12000|
|    Krish|      19000|
|   Mahesh|       7000|
+---------+-----------+



In [None]:
# group to find the maximum salary of department
pyspark_3.groupBy('Name').max().show()

+---------+-----------+
|     Name|max(salary)|
+---------+-----------+
|Sudhanshu|      20000|
|    Sunny|      10000|
|    Krish|      10000|
|   Mahesh|       4000|
+---------+-----------+



In [None]:
# group to find the maximum salary of department
pyspark_3.groupBy('Departments').sum().show()

+------------+-----------+
| Departments|sum(salary)|
+------------+-----------+
|         IOT|      15000|
|    Big Data|      15000|
|Data Science|      43000|
+------------+-----------+



In [None]:

# find the mean
pyspark_3.groupBy('Departments').mean().show()

+------------+-----------+
| Departments|avg(salary)|
+------------+-----------+
|         IOT|     7500.0|
|    Big Data|     3750.0|
|Data Science|    10750.0|
+------------+-----------+



In [None]:
#count the employees per department
pyspark_3.groupBy('Departments').count().show()

+------------+-----+
| Departments|count|
+------------+-----+
|         IOT|    2|
|    Big Data|    4|
|Data Science|    4|
+------------+-----+



#using aggregate function


In [None]:
pyspark_3.agg({'salary':'sum'}).show()

+-----------+
|sum(salary)|
+-----------+
|      73000|
+-----------+

