## PYSPARK

In [1]:
#To create a session:
from pyspark.sql import SparkSession


from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Permission').getOrCreate()

In [2]:
spark

In [3]:
#Import required libraries:
import pyspark
import pandas as pd

In [4]:
#Read dataframe in pandas:
df = pd.read_csv("Alcohol.csv")


#To check the datatypes and non null count:
df.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 193 entries, 0 to 192
Data columns (total 6 columns):
 #   Column                        Non-Null Count  Dtype  
---  ------                        --------------  -----  
 0   country                       193 non-null    object 
 1   beer_servings                 193 non-null    int64  
 2   spirit_servings               193 non-null    int64  
 3   wine_servings                 193 non-null    int64  
 4   total_litres_of_pure_alcohol  193 non-null    float64
 5   continent                     170 non-null    object 
dtypes: float64(1), int64(3), object(2)
memory usage: 9.2+ KB


In [5]:
#To read the csv file using spark:
dfspark =spark.read.csv("Alcohol.csv")
dfspark.head()

Row(_c0='country', _c1='beer_servings', _c2='spirit_servings', _c3='wine_servings', _c4='total_litres_of_pure_alcohol', _c5='continent')

In [6]:
#To read the csv using spark and assigning header names also:
dfspark = spark.read.option('header','true').csv("Alcohol.csv", inferSchema=True)

In [22]:
#To read the csv
dfspark =spark.read.csv("Alcohol.csv", header=True, inferSchema=True)
dfspark.head(5)

[Row(country='Afghanistan', beer_servings=0, spirit_servings=0, wine_servings=0, total_litres_of_pure_alcohol=0.0, continent='AS'),
 Row(country='Albania', beer_servings=89, spirit_servings=132, wine_servings=54, total_litres_of_pure_alcohol=4.9, continent='EU'),
 Row(country='Algeria', beer_servings=25, spirit_servings=0, wine_servings=14, total_litres_of_pure_alcohol=0.7, continent='AF'),
 Row(country='Andorra', beer_servings=245, spirit_servings=138, wine_servings=312, total_litres_of_pure_alcohol=12.4, continent='EU'),
 Row(country='Angola', beer_servings=217, spirit_servings=57, wine_servings=45, total_litres_of_pure_alcohol=5.9, continent='AF')]

In [8]:
#To check the datatype of dataframe
type(dfspark)

pyspark.sql.dataframe.DataFrame

In [9]:
#Check the schema
dfspark.printSchema()

root
 |-- country: string (nullable = true)
 |-- beer_servings: integer (nullable = true)
 |-- spirit_servings: integer (nullable = true)
 |-- wine_servings: integer (nullable = true)
 |-- total_litres_of_pure_alcohol: double (nullable = true)
 |-- continent: string (nullable = true)



# PySpark basic operations:
    1. To select all the columns 
    2. To select multiple columns
    3. To check the data types of columns
    4. To add the column and drop the column
    5. Describe functionality
    6. Column rename

In [10]:
dfspark.columns

['country',
 'beer_servings',
 'spirit_servings',
 'wine_servings',
 'total_litres_of_pure_alcohol',
 'continent']

In [11]:
#To check the dataframe
dfspark.show()

+-----------------+-------------+---------------+-------------+----------------------------+---------+
|          country|beer_servings|spirit_servings|wine_servings|total_litres_of_pure_alcohol|continent|
+-----------------+-------------+---------------+-------------+----------------------------+---------+
|      Afghanistan|            0|              0|            0|                         0.0|       AS|
|          Albania|           89|            132|           54|                         4.9|       EU|
|          Algeria|           25|              0|           14|                         0.7|       AF|
|          Andorra|          245|            138|          312|                        12.4|       EU|
|           Angola|          217|             57|           45|                         5.9|       AF|
|Antigua & Barbuda|          102|            128|           45|                         4.9|       NA|
|        Argentina|          193|             25|          221|          

In [12]:
#To select the particular column 
dfspark.select('country').show() #By default it shows first 20 rows

+-----------------+
|          country|
+-----------------+
|      Afghanistan|
|          Albania|
|          Algeria|
|          Andorra|
|           Angola|
|Antigua & Barbuda|
|        Argentina|
|          Armenia|
|        Australia|
|          Austria|
|       Azerbaijan|
|          Bahamas|
|          Bahrain|
|       Bangladesh|
|         Barbados|
|          Belarus|
|          Belgium|
|           Belize|
|            Benin|
|           Bhutan|
+-----------------+
only showing top 20 rows



In [13]:
#To select the multiple columns with first 10 rows
dfspark.select(['country','beer_servings']).show(10)

+-----------------+-------------+
|          country|beer_servings|
+-----------------+-------------+
|      Afghanistan|            0|
|          Albania|           89|
|          Algeria|           25|
|          Andorra|          245|
|           Angola|          217|
|Antigua & Barbuda|          102|
|        Argentina|          193|
|          Armenia|           21|
|        Australia|          261|
|          Austria|          279|
+-----------------+-------------+
only showing top 10 rows



In [14]:
#To select the multiple columns with first 10 rows
dfspark.select(['country','beer_servings','spirit_servings']).show(10)

+-----------------+-------------+---------------+
|          country|beer_servings|spirit_servings|
+-----------------+-------------+---------------+
|      Afghanistan|            0|              0|
|          Albania|           89|            132|
|          Algeria|           25|              0|
|          Andorra|          245|            138|
|           Angola|          217|             57|
|Antigua & Barbuda|          102|            128|
|        Argentina|          193|             25|
|          Armenia|           21|            179|
|        Australia|          261|             72|
|          Austria|          279|             75|
+-----------------+-------------+---------------+
only showing top 10 rows



In [15]:
#To check dtypes of all the columns:
dfspark.dtypes

[('country', 'string'),
 ('beer_servings', 'int'),
 ('spirit_servings', 'int'),
 ('wine_servings', 'int'),
 ('total_litres_of_pure_alcohol', 'double'),
 ('continent', 'string')]

In [16]:
#Describe function
dfspark.describe().show()

+-------+-----------+------------------+-----------------+-----------------+----------------------------+---------+
|summary|    country|     beer_servings|  spirit_servings|    wine_servings|total_litres_of_pure_alcohol|continent|
+-------+-----------+------------------+-----------------+-----------------+----------------------------+---------+
|  count|        193|               193|              193|              193|                         193|      193|
|   mean|       null|106.16062176165804|80.99481865284974|49.45077720207254|           4.717098445595855|     null|
| stddev|       null| 101.1431025393134|88.28431210968618|79.69759845763012|           3.773298164356082|     null|
|    min|Afghanistan|                 0|                0|                0|                         0.0|       AF|
|    max|   Zimbabwe|               376|              438|              370|                        14.4|       SA|
+-------+-----------+------------------+-----------------+--------------

In [17]:
dfspark.show()

+-----------------+-------------+---------------+-------------+----------------------------+---------+
|          country|beer_servings|spirit_servings|wine_servings|total_litres_of_pure_alcohol|continent|
+-----------------+-------------+---------------+-------------+----------------------------+---------+
|      Afghanistan|            0|              0|            0|                         0.0|       AS|
|          Albania|           89|            132|           54|                         4.9|       EU|
|          Algeria|           25|              0|           14|                         0.7|       AF|
|          Andorra|          245|            138|          312|                        12.4|       EU|
|           Angola|          217|             57|           45|                         5.9|       AF|
|Antigua & Barbuda|          102|            128|           45|                         4.9|       NA|
|        Argentina|          193|             25|          221|          

In [18]:
#Adding column in dataframe:
dfspark.withColumn('wine_servings_after',dfspark['wine_servings']+10).show()



+-----------------+-------------+---------------+-------------+----------------------------+---------+-------------------+
|          country|beer_servings|spirit_servings|wine_servings|total_litres_of_pure_alcohol|continent|wine_servings_after|
+-----------------+-------------+---------------+-------------+----------------------------+---------+-------------------+
|      Afghanistan|            0|              0|            0|                         0.0|       AS|                 10|
|          Albania|           89|            132|           54|                         4.9|       EU|                 64|
|          Algeria|           25|              0|           14|                         0.7|       AF|                 24|
|          Andorra|          245|            138|          312|                        12.4|       EU|                322|
|           Angola|          217|             57|           45|                         5.9|       AF|                 55|
|Antigua & Barbu

In [19]:
dfspark.show()
#As we can see that new column is not added to the original dataframe 
#Becuase there is no inplace function in spark

+-----------------+-------------+---------------+-------------+----------------------------+---------+
|          country|beer_servings|spirit_servings|wine_servings|total_litres_of_pure_alcohol|continent|
+-----------------+-------------+---------------+-------------+----------------------------+---------+
|      Afghanistan|            0|              0|            0|                         0.0|       AS|
|          Albania|           89|            132|           54|                         4.9|       EU|
|          Algeria|           25|              0|           14|                         0.7|       AF|
|          Andorra|          245|            138|          312|                        12.4|       EU|
|           Angola|          217|             57|           45|                         5.9|       AF|
|Antigua & Barbuda|          102|            128|           45|                         4.9|       NA|
|        Argentina|          193|             25|          221|          

In [20]:
#There is no inplace function in Pyspark. 
#In order to reflect the output in original dataframe we have to assign it to a new variable.
dfspark = dfspark.withColumn('wine_servings_after',dfspark['wine_servings']+10).show()

+-----------------+-------------+---------------+-------------+----------------------------+---------+-------------------+
|          country|beer_servings|spirit_servings|wine_servings|total_litres_of_pure_alcohol|continent|wine_servings_after|
+-----------------+-------------+---------------+-------------+----------------------------+---------+-------------------+
|      Afghanistan|            0|              0|            0|                         0.0|       AS|                 10|
|          Albania|           89|            132|           54|                         4.9|       EU|                 64|
|          Algeria|           25|              0|           14|                         0.7|       AF|                 24|
|          Andorra|          245|            138|          312|                        12.4|       EU|                322|
|           Angola|          217|             57|           45|                         5.9|       AF|                 55|
|Antigua & Barbu

In [23]:
#Drop the column
dfspark = dfspark.drop("wine_servings_after")

In [24]:
dfspark.show()

+-----------------+-------------+---------------+-------------+----------------------------+---------+
|          country|beer_servings|spirit_servings|wine_servings|total_litres_of_pure_alcohol|continent|
+-----------------+-------------+---------------+-------------+----------------------------+---------+
|      Afghanistan|            0|              0|            0|                         0.0|       AS|
|          Albania|           89|            132|           54|                         4.9|       EU|
|          Algeria|           25|              0|           14|                         0.7|       AF|
|          Andorra|          245|            138|          312|                        12.4|       EU|
|           Angola|          217|             57|           45|                         5.9|       AF|
|Antigua & Barbuda|          102|            128|           45|                         4.9|       NA|
|        Argentina|          193|             25|          221|          

In [25]:
#Rename the column
dfspark = dfspark.withColumnRenamed('country','Country')
dfspark.show()

+-----------------+-------------+---------------+-------------+----------------------------+---------+
|          Country|beer_servings|spirit_servings|wine_servings|total_litres_of_pure_alcohol|continent|
+-----------------+-------------+---------------+-------------+----------------------------+---------+
|      Afghanistan|            0|              0|            0|                         0.0|       AS|
|          Albania|           89|            132|           54|                         4.9|       EU|
|          Algeria|           25|              0|           14|                         0.7|       AF|
|          Andorra|          245|            138|          312|                        12.4|       EU|
|           Angola|          217|             57|           45|                         5.9|       AF|
|Antigua & Barbuda|          102|            128|           45|                         4.9|       NA|
|        Argentina|          193|             25|          221|          

# PySpark handling missing values:
1. Dropping columns
2. Dropping rows
3. Various parameters involved in dropping
4. Missing values imputations with Mean, Median and Mode

In [26]:
spark  = SparkSession.builder.appName('New').getOrCreate()
spark

In [27]:
df_pyspark=spark.read.csv("Book1.csv",header=True, inferSchema=True)
df_pyspark.show()

+-------+----+------+---------+
|   Name| Age|Salary|     City|
+-------+----+------+---------+
|  Rahul|  26| 30000|Bangalore|
|Abhinav|  26| 34000|     Pune|
|   Sonu|  23| 34000|   Mumbai|
|  Akash|  24| 45000|     null|
|  Rohit|null| 35000|   Shimla|
|   null|  23|  null|   Manali|
| Sanjay|  30| 56000|   Mysore|
|   null|  21| 53000|    Hubli|
|  Suman|  28| 45000|   Mumbai|
|  Akash|null| 23000|Hyderabad|
|   null|null|  null|     null|
| Aayush|  30| 38000|    Solan|
+-------+----+------+---------+



In [28]:
#To drop a column on temporary basis:
df_pyspark.drop('City').show()

+-------+----+------+
|   Name| Age|Salary|
+-------+----+------+
|  Rahul|  26| 30000|
|Abhinav|  26| 34000|
|   Sonu|  23| 34000|
|  Akash|  24| 45000|
|  Rohit|null| 35000|
|   null|  23|  null|
| Sanjay|  30| 56000|
|   null|  21| 53000|
|  Suman|  28| 45000|
|  Akash|null| 23000|
|   null|null|  null|
| Aayush|  30| 38000|
+-------+----+------+



In [29]:
#To drop null values
df_pyspark.na.drop().show()

#If we will not assign anything in drop() then 
#It will drop all the null values whereever it is (null) present at any row or index position

+-------+---+------+---------+
|   Name|Age|Salary|     City|
+-------+---+------+---------+
|  Rahul| 26| 30000|Bangalore|
|Abhinav| 26| 34000|     Pune|
|   Sonu| 23| 34000|   Mumbai|
| Sanjay| 30| 56000|   Mysore|
|  Suman| 28| 45000|   Mumbai|
| Aayush| 30| 38000|    Solan|
+-------+---+------+---------+



In [30]:
#Drop the null values using how='any' inside drop function
df_pyspark.na.drop(how='any').show() 

#It will drop all the rows wherever there is a null value in a row.
#And by default how='any' in drop function

+-------+---+------+---------+
|   Name|Age|Salary|     City|
+-------+---+------+---------+
|  Rahul| 26| 30000|Bangalore|
|Abhinav| 26| 34000|     Pune|
|   Sonu| 23| 34000|   Mumbai|
| Sanjay| 30| 56000|   Mysore|
|  Suman| 28| 45000|   Mumbai|
| Aayush| 30| 38000|    Solan|
+-------+---+------+---------+



In [31]:
#Drop the null values using how='all' inside drop function
df_pyspark.na.drop(how='all').show() 

#It will delete the rows whenever all the values under the columns in a row are null.

+-------+----+------+---------+
|   Name| Age|Salary|     City|
+-------+----+------+---------+
|  Rahul|  26| 30000|Bangalore|
|Abhinav|  26| 34000|     Pune|
|   Sonu|  23| 34000|   Mumbai|
|  Akash|  24| 45000|     null|
|  Rohit|null| 35000|   Shimla|
|   null|  23|  null|   Manali|
| Sanjay|  30| 56000|   Mysore|
|   null|  21| 53000|    Hubli|
|  Suman|  28| 45000|   Mumbai|
|  Akash|null| 23000|Hyderabad|
| Aayush|  30| 38000|    Solan|
+-------+----+------+---------+



In [32]:
#drop(Thresh=3)
df_pyspark.na.drop(how='all', thresh=3).show()

#Thresh=3 means atleast 3 non null values will be there. 
#If condition not satisfies then drop() will drop that particular row.

+-------+----+------+---------+
|   Name| Age|Salary|     City|
+-------+----+------+---------+
|  Rahul|  26| 30000|Bangalore|
|Abhinav|  26| 34000|     Pune|
|   Sonu|  23| 34000|   Mumbai|
|  Akash|  24| 45000|     null|
|  Rohit|null| 35000|   Shimla|
| Sanjay|  30| 56000|   Mysore|
|   null|  21| 53000|    Hubli|
|  Suman|  28| 45000|   Mumbai|
|  Akash|null| 23000|Hyderabad|
| Aayush|  30| 38000|    Solan|
+-------+----+------+---------+



In [33]:
#drop(subset='Age')
df_pyspark.na.drop(subset='Age').show()

#It will drop all the rows wherever the age values are null.

+-------+---+------+---------+
|   Name|Age|Salary|     City|
+-------+---+------+---------+
|  Rahul| 26| 30000|Bangalore|
|Abhinav| 26| 34000|     Pune|
|   Sonu| 23| 34000|   Mumbai|
|  Akash| 24| 45000|     null|
|   null| 23|  null|   Manali|
| Sanjay| 30| 56000|   Mysore|
|   null| 21| 53000|    Hubli|
|  Suman| 28| 45000|   Mumbai|
| Aayush| 30| 38000|    Solan|
+-------+---+------+---------+



In [34]:
#Filling the missing values with 0 in Age and Salary column:
df_pyspark.na.fill(0,['Salary','Age']).show()

+-------+---+------+---------+
|   Name|Age|Salary|     City|
+-------+---+------+---------+
|  Rahul| 26| 30000|Bangalore|
|Abhinav| 26| 34000|     Pune|
|   Sonu| 23| 34000|   Mumbai|
|  Akash| 24| 45000|     null|
|  Rohit|  0| 35000|   Shimla|
|   null| 23|     0|   Manali|
| Sanjay| 30| 56000|   Mysore|
|   null| 21| 53000|    Hubli|
|  Suman| 28| 45000|   Mumbai|
|  Akash|  0| 23000|Hyderabad|
|   null|  0|     0|     null|
| Aayush| 30| 38000|    Solan|
+-------+---+------+---------+



In [35]:
#Filling the missing values with 'Missing' in Name column:
df_pyspark.na.fill('Missing',['Name','City']).show()

+-------+----+------+---------+
|   Name| Age|Salary|     City|
+-------+----+------+---------+
|  Rahul|  26| 30000|Bangalore|
|Abhinav|  26| 34000|     Pune|
|   Sonu|  23| 34000|   Mumbai|
|  Akash|  24| 45000|  Missing|
|  Rohit|null| 35000|   Shimla|
|Missing|  23|  null|   Manali|
| Sanjay|  30| 56000|   Mysore|
|Missing|  21| 53000|    Hubli|
|  Suman|  28| 45000|   Mumbai|
|  Akash|null| 23000|Hyderabad|
|Missing|null|  null|  Missing|
| Aayush|  30| 38000|    Solan|
+-------+----+------+---------+



In [36]:
#Filling the missing values with mean using imputer:
from pyspark.ml.feature import Imputer

imputer = Imputer(inputCols=['Age','Salary'],
                 outputCols=['Age','Salary']).setStrategy("mean")


imputer.fit(df_pyspark).transform(df_pyspark).show()

+-------+---+------+---------+
|   Name|Age|Salary|     City|
+-------+---+------+---------+
|  Rahul| 26| 30000|Bangalore|
|Abhinav| 26| 34000|     Pune|
|   Sonu| 23| 34000|   Mumbai|
|  Akash| 24| 45000|     null|
|  Rohit| 25| 35000|   Shimla|
|   null| 23| 39300|   Manali|
| Sanjay| 30| 56000|   Mysore|
|   null| 21| 53000|    Hubli|
|  Suman| 28| 45000|   Mumbai|
|  Akash| 25| 23000|Hyderabad|
|   null| 25| 39300|     null|
| Aayush| 30| 38000|    Solan|
+-------+---+------+---------+



In [37]:
#Filling the missing values with median using imputer:


imputer = Imputer(inputCols=['Age','Salary'],
                 outputCols=['Age','Salary']).setStrategy("median")


imputer.fit(df_pyspark).transform(df_pyspark).show()

+-------+---+------+---------+
|   Name|Age|Salary|     City|
+-------+---+------+---------+
|  Rahul| 26| 30000|Bangalore|
|Abhinav| 26| 34000|     Pune|
|   Sonu| 23| 34000|   Mumbai|
|  Akash| 24| 45000|     null|
|  Rohit| 26| 35000|   Shimla|
|   null| 23| 35000|   Manali|
| Sanjay| 30| 56000|   Mysore|
|   null| 21| 53000|    Hubli|
|  Suman| 28| 45000|   Mumbai|
|  Akash| 26| 23000|Hyderabad|
|   null| 26| 35000|     null|
| Aayush| 30| 38000|    Solan|
+-------+---+------+---------+



# PySpark dataframe
1. Filter options
2. &,|,==
3. ~

In [39]:
df_pyspark.show()

+-------+----+------+---------+
|   Name| Age|Salary|     City|
+-------+----+------+---------+
|  Rahul|  26| 30000|Bangalore|
|Abhinav|  26| 34000|     Pune|
|   Sonu|  23| 34000|   Mumbai|
|  Akash|  24| 45000|     null|
|  Rohit|null| 35000|   Shimla|
|   null|  23|  null|   Manali|
| Sanjay|  30| 56000|   Mysore|
|   null|  21| 53000|    Hubli|
|  Suman|  28| 45000|   Mumbai|
|  Akash|null| 23000|Hyderabad|
|   null|null|  null|     null|
| Aayush|  30| 38000|    Solan|
+-------+----+------+---------+



In [44]:
#Select records where name is Rahul
df_pyspark.where(df_pyspark['Name']=='Rahul').show()

+-----+---+------+---------+
| Name|Age|Salary|     City|
+-----+---+------+---------+
|Rahul| 26| 30000|Bangalore|
+-----+---+------+---------+



In [47]:
#Select the records where salary greater than 35000
df_pyspark.where(df_pyspark['Salary']>=35000).show()

+------+----+------+------+
|  Name| Age|Salary|  City|
+------+----+------+------+
| Akash|  24| 45000|  null|
| Rohit|null| 35000|Shimla|
|Sanjay|  30| 56000|Mysore|
|  null|  21| 53000| Hubli|
| Suman|  28| 45000|Mumbai|
|Aayush|  30| 38000| Solan|
+------+----+------+------+



In [49]:
#Select the records where salary greater than 35000
df_pyspark.filter(df_pyspark['Salary']>=35000).show()


#We can use where and filter to filter out the records from the columns based on the conditions

+------+----+------+------+
|  Name| Age|Salary|  City|
+------+----+------+------+
| Akash|  24| 45000|  null|
| Rohit|null| 35000|Shimla|
|Sanjay|  30| 56000|Mysore|
|  null|  21| 53000| Hubli|
| Suman|  28| 45000|Mumbai|
|Aayush|  30| 38000| Solan|
+------+----+------+------+



In [55]:
#Select the name and age of the employees where salary:

df_pyspark.filter(df_pyspark['Salary']>=35000).select(['Name','Age']).show()

+------+----+
|  Name| Age|
+------+----+
| Akash|  24|
| Rohit|null|
|Sanjay|  30|
|  null|  21|
| Suman|  28|
|Aayush|  30|
+------+----+



In [54]:
#Select the records where Salary more than 35000 and Age more than 25: 

df_pyspark.filter((df_pyspark['Salary']>=35000) & (df_pyspark['Age']>25)).show()

+------+---+------+------+
|  Name|Age|Salary|  City|
+------+---+------+------+
|Sanjay| 30| 56000|Mysore|
| Suman| 28| 45000|Mumbai|
|Aayush| 30| 38000| Solan|
+------+---+------+------+



In [58]:
# ~ filter operation:
df_pyspark.filter(~(df_pyspark['Salary']>=35000)).show()

+-------+----+------+---------+
|   Name| Age|Salary|     City|
+-------+----+------+---------+
|  Rahul|  26| 30000|Bangalore|
|Abhinav|  26| 34000|     Pune|
|   Sonu|  23| 34000|   Mumbai|
|  Akash|null| 23000|Hyderabad|
+-------+----+------+---------+



# PySpark
1. GroupBy
2. Aggregate functions

In [64]:
#To create a spark session:
spark = SparkSession.builder.appName('Data').getOrCreate()
spark

In [63]:
#To read a csv file:
sale_pyspark = spark.read.csv('Sale.csv', header=True, inferSchema=True)
sale_pyspark.show()

+---------+-----+-----+
|     City|State| Sale|
+---------+-----+-----+
|   Shimla|   HP|82690|
|   Mumbai|   MH|64217|
|Bangalore|   KA|10671|
|   Mumbai|   MH|89811|
|   Mumbai|   MH| 8583|
|   Shimla|   HP|88233|
|    Solan|   HP| 5546|
|   Shimla|   HP|33440|
|    Solan|   HP|51400|
|Bangalore|   KA|60855|
|Bangalore|   KA|94763|
|Bangalore|   KA|31957|
|   Shimla|   HP|39926|
|Hyderabad|   TS|77598|
|  Chennai|   TN|84379|
|  Chennai|   TN|93683|
|    Solan|   HP|18894|
|    Solan|   HP|78674|
|   Mumbai|   MH|14280|
|   Shimla|   HP|43004|
+---------+-----+-----+
only showing top 20 rows



In [66]:
#To check schema
sale_pyspark.printSchema()

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Sale: integer (nullable = true)



In [71]:
#Group by city to check maximum sale city wise:
sale_pyspark.groupBy(['City']).max().show()

+---------+---------+
|     City|max(Sale)|
+---------+---------+
|Bangalore|    97480|
|  Chennai|    93683|
|   Shimla|    88233|
|   Mumbai|    89811|
|     Pune|    63951|
|    Delhi|    76151|
|    Solan|    78674|
|Hyderabad|    77598|
+---------+---------+



In [72]:
#Group by city to check maximum sale state wise:
sale_pyspark.groupBy(['State']).max().show()

+-----+---------+
|State|max(Sale)|
+-----+---------+
|   HP|    88233|
|   DL|    76151|
|   TS|    77598|
|   TN|    93683|
|   MH|    89811|
|   KA|    97480|
+-----+---------+



In [73]:
#Group by city to check summation of sale city wise:
sale_pyspark.groupBy(['City']).sum().show()

+---------+---------+
|     City|sum(Sale)|
+---------+---------+
|Bangalore|   380215|
|  Chennai|   326005|
|   Shimla|   287293|
|   Mumbai|   176891|
|     Pune|   139120|
|    Delhi|   139530|
|    Solan|   154514|
|Hyderabad|   156403|
+---------+---------+



In [78]:
#Group by state to check count of city in each state:
sale_pyspark.groupBy(['State']).count().show()

+-----+-----+
|State|count|
+-----+-----+
|   HP|    9|
|   DL|    3|
|   TS|    3|
|   TN|    4|
|   MH|    8|
|   KA|    6|
+-----+-----+



In [86]:
#Aggregate function to check total sale
sale_pyspark.agg({'Sale':'sum'}).show()

+---------+
|sum(Sale)|
+---------+
|  1759971|
+---------+

