## 2-Python Spark SQL and DataFrame  (Advanced Example)
### This notebook is desinged to work on your local computer. 
### Note: Install pyspark package via Anaconda before running this code.

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ops').getOrCreate()

In [114]:
# Python version 
import sys
print(sys.version)

3.7.3 (default, Apr 24 2019, 15:29:51) [MSC v.1915 64 bit (AMD64)]


In [115]:
# Spark version
print(spark.version)

2.4.0


## Example 1 of 3 (appl_stock.csv)

### 1-Read Data

In [116]:
df = spark.read.csv('appl_stock.csv', inferSchema=True, header=True)

### 2-Inspect Data

In [117]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [118]:
# Return df column names and data types
df.dtypes

[('Date', 'timestamp'),
 ('Open', 'double'),
 ('High', 'double'),
 ('Low', 'double'),
 ('Close', 'double'),
 ('Volume', 'int'),
 ('Adj Close', 'double')]

In [119]:
# get the column names
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [120]:
df.show(3)

+-------------------+----------+----------+------------------+----------+---------+------------------+
|               Date|      Open|      High|               Low|     Close|   Volume|         Adj Close|
+-------------------+----------+----------+------------------+----------+---------+------------------+
|2010-01-04 00:00:00|213.429998|214.499996|212.38000099999996|214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|214.599998|215.589994|        213.249994|214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|214.379993|    215.23|        210.750004|210.969995|138040000|27.333178000000004|
+-------------------+----------+----------+------------------+----------+---------+------------------+
only showing top 3 rows



In [121]:
df.take(3)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039),
 Row(Date=datetime.datetime(2010, 1, 5, 0, 0), Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002),
 Row(Date=datetime.datetime(2010, 1, 6, 0, 0), Open=214.379993, High=215.23, Low=210.750004, Close=210.969995, Volume=138040000, Adj Close=27.333178000000004)]

In [122]:
# returns top n rows  
df.head(3)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039),
 Row(Date=datetime.datetime(2010, 1, 5, 0, 0), Open=214.599998, High=215.589994, Low=213.249994, Close=214.379993, Volume=150476200, Adj Close=27.774976000000002),
 Row(Date=datetime.datetime(2010, 1, 6, 0, 0), Open=214.379993, High=215.23, Low=210.750004, Close=210.969995, Volume=138040000, Adj Close=27.333178000000004)]

In [123]:
df.toPandas().head(3)

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close
0,2010-01-04,213.429998,214.499996,212.380001,214.009998,123432400,27.727039
1,2010-01-05,214.599998,215.589994,213.249994,214.379993,150476200,27.774976
2,2010-01-06,214.379993,215.23,210.750004,210.969995,138040000,27.333178


In [124]:
df.toPandas().tail(3)

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close
1759,2016-12-28,117.519997,118.019997,116.199997,116.760002,20905900,116.255965
1760,2016-12-29,116.449997,117.110001,116.400002,116.730003,15039500,116.226096
1761,2016-12-30,116.650002,117.199997,115.43,115.82,30586300,115.32002


In [125]:
# returns the first row of the table
df.first()

Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)

In [126]:
# Descriptive Statistics of the data 
df.describe().show()

+-------+------------------+------------------+------------------+-----------------+-------------------+------------------+
|summary|              Open|              High|               Low|            Close|             Volume|         Adj Close|
+-------+------------------+------------------+------------------+-----------------+-------------------+------------------+
|  count|              1762|              1762|              1762|             1762|               1762|              1762|
|   mean| 313.0763111589103| 315.9112880164581| 309.8282405079457|312.9270656379113|9.422577587968218E7| 75.00174115607275|
| stddev|185.29946803981522|186.89817686485767|183.38391664371008|185.1471036170943|6.020518776592709E7| 28.57492972179906|
|    min|              90.0|         90.699997|         89.470001|        90.279999|           11475900|         24.881912|
|    max|        702.409988|        705.070023|        699.569977|       702.100021|          470249500|127.96609099999999|
+-------

In [127]:
df.toPandas().describe() # Pandas way 

Unnamed: 0,Open,High,Low,Close,Volume,Adj Close
count,1762.0,1762.0,1762.0,1762.0,1762.0,1762.0
mean,313.076311,315.911288,309.828241,312.927066,94225780.0,75.001741
std,185.299468,186.898177,183.383917,185.147104,60205190.0,28.57493
min,90.0,90.699997,89.470001,90.279999,11475900.0,24.881912
25%,115.222498,116.362499,114.0025,115.190002,49174780.0,50.28854
50%,318.230007,320.600008,316.545002,318.240007,80503850.0,72.983145
75%,470.880016,478.110008,467.972513,472.592512,121081600.0,100.207243
max,702.409988,705.070023,699.569977,702.100021,470249500.0,127.966091


### 3-Filter Data

In [128]:
# List all the records with closing less than 500. Show only High and Low columns
# Option-1
df.filter('close < 500').select(['High', 'Low']).show()

+------------------+------------------+
|              High|               Low|
+------------------+------------------+
|        214.499996|212.38000099999996|
|        215.589994|        213.249994|
|            215.23|        210.750004|
|        212.000006|        209.050005|
|        212.000006|209.06000500000002|
|        213.000002|        208.450005|
|209.76999500000002|        206.419998|
|210.92999500000002|        204.099998|
|210.45999700000002|        209.020004|
|211.59999700000003|        205.869999|
|215.18999900000003|        207.240004|
|        215.549994|        209.500002|
|213.30999599999998|        207.210003|
|        207.499996|            197.16|
|        204.699999|        200.190002|
|        213.710005|        202.580004|
|            210.58|        199.530001|
|        205.500004|        198.699995|
|        202.199995|        190.250002|
|             196.0|191.29999899999999|
+------------------+------------------+
only showing top 20 rows



In [129]:
# Option-2
df.filter(df['close'] < 500).select(['High', 'Low']).show()

+------------------+------------------+
|              High|               Low|
+------------------+------------------+
|        214.499996|212.38000099999996|
|        215.589994|        213.249994|
|            215.23|        210.750004|
|        212.000006|        209.050005|
|        212.000006|209.06000500000002|
|        213.000002|        208.450005|
|209.76999500000002|        206.419998|
|210.92999500000002|        204.099998|
|210.45999700000002|        209.020004|
|211.59999700000003|        205.869999|
|215.18999900000003|        207.240004|
|        215.549994|        209.500002|
|213.30999599999998|        207.210003|
|        207.499996|            197.16|
|        204.699999|        200.190002|
|        213.710005|        202.580004|
|            210.58|        199.530001|
|        205.500004|        198.699995|
|        202.199995|        190.250002|
|             196.0|191.29999899999999|
+------------------+------------------+
only showing top 20 rows



In [130]:
# show all rows
# df.filter(df['close'] < 500).select(['High', 'Low']).show(df.count())

In [131]:
# list all records with Open greater than 200 and Close greater than 200
df.filter((df['Open'] > 200) & ~(df['Close'] < 200)).show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [132]:
# List all records with Low value is 197.16
df.filter(df['Low'] == 197.16).show()

+-------------------+------------------+----------+------+------+---------+---------+
|               Date|              Open|      High|   Low| Close|   Volume|Adj Close|
+-------------------+------------------+----------+------+------+---------+---------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|
+-------------------+------------------+----------+------+------+---------+---------+



In [133]:
# Collect (Action) - Return all the elements of the dataset as an array at the driver program. 
# This is usually useful after a filter or other operation that returns a sufficiently small subset of the data

df.filter(df['Low'] == 197.16).collect()

[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [134]:
result = df.filter(df['Low'] == 197.16).collect()

In [135]:
result

[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [136]:
#Get a specific row and specific column --> A cell value 
row = result[0]
row['Volume'] #row.asDict()['Volume']

220441900

### Dates and Timestamps

In [137]:
df.select('Date', 'Open').show()

+-------------------+------------------+
|               Date|              Open|
+-------------------+------------------+
|2010-01-04 00:00:00|        213.429998|
|2010-01-05 00:00:00|        214.599998|
|2010-01-06 00:00:00|        214.379993|
|2010-01-07 00:00:00|            211.75|
|2010-01-08 00:00:00|        210.299994|
|2010-01-11 00:00:00|212.79999700000002|
|2010-01-12 00:00:00|209.18999499999998|
|2010-01-13 00:00:00|        207.870005|
|2010-01-14 00:00:00|210.11000299999998|
|2010-01-15 00:00:00|210.92999500000002|
|2010-01-19 00:00:00|        208.330002|
|2010-01-20 00:00:00|        214.910006|
|2010-01-21 00:00:00|        212.079994|
|2010-01-22 00:00:00|206.78000600000001|
|2010-01-25 00:00:00|202.51000200000001|
|2010-01-26 00:00:00|205.95000100000001|
|2010-01-27 00:00:00|        206.849995|
|2010-01-28 00:00:00|        204.930004|
|2010-01-29 00:00:00|        201.079996|
|2010-02-01 00:00:00|192.36999699999998|
+-------------------+------------------+
only showing top

In [139]:
from pyspark.sql.functions import (dayofmonth, hour, dayofyear, month,
                                  year, weekofyear, format_number, date_format)

In [140]:
df.select(dayofmonth(df['Date'])).show(5)

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
+----------------+
only showing top 5 rows



In [141]:
df.select(hour(df['Date'])).show(5)

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 5 rows



In [142]:
df.select(month(df['Date'])).show(5)

+-----------+
|month(Date)|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
+-----------+
only showing top 5 rows



In [143]:
df.select(year(df['Date'])).show(5)

+----------+
|year(Date)|
+----------+
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
+----------+
only showing top 5 rows



#### Average Closing price feature Group by year 

In [157]:
new_df = df.withColumn('Year', year(df['Date']))

In [158]:
new_df.show(5)

+-------------------+----------+----------+------------------+------------------+---------+------------------+----+
|               Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|Year|
+-------------------+----------+----------+------------------+------------------+---------+------------------+----+
|2010-01-04 00:00:00|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05 00:00:00|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06 00:00:00|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07 00:00:00|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08 00:00:00|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|2010|
+-------------------+----------+----------+------------------+----------

In [159]:
result = new_df.groupBy('Year').mean().select(['Year', 'avg(Close)'])
result.show()

+----+------------------+
|Year|        avg(Close)|
+----+------------------+
|2015|120.03999980555547|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2012| 576.0497195640002|
|2016|104.60400786904763|
|2010| 259.8424600000002|
|2011|364.00432532142867|
+----+------------------+



In [160]:
new_result = result.withColumnRenamed('avg(Close)', 'Average Closing Price')
new_result.show()

+----+---------------------+
|Year|Average Closing Price|
+----+---------------------+
|2015|   120.03999980555547|
|2013|    472.6348802857143|
|2014|    295.4023416507935|
|2012|    576.0497195640002|
|2016|   104.60400786904763|
|2010|    259.8424600000002|
|2011|   364.00432532142867|
+----+---------------------+



In [161]:
new_result.select(['Year', format_number('Average Closing Price', 2).alias('Average Closing Price')]).show()

+----+---------------------+
|Year|Average Closing Price|
+----+---------------------+
|2015|               120.04|
|2013|               472.63|
|2014|               295.40|
|2012|               576.05|
|2016|               104.60|
|2010|               259.84|
|2011|               364.00|
+----+---------------------+



## Example 2 of 3 (Sales_info.csv)

In [52]:
df = spark.read.csv('sales_info.csv', inferSchema=True, header=True)
df.show(3)

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
+-------+-------+-----+
only showing top 3 rows



In [53]:
df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



### Groupby and Aggregate

#### Option-1

In [54]:
df.groupBy('Company')

<pyspark.sql.group.GroupedData at 0x1da24022400>

In [55]:
df.groupBy('Company').mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [56]:
df.groupBy('Company').sum().show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [58]:
df.groupBy('Company').min().show()

+-------+----------+
|Company|min(Sales)|
+-------+----------+
|   APPL|     130.0|
|   GOOG|     120.0|
|     FB|     350.0|
|   MSFT|     124.0|
+-------+----------+



In [59]:
df.groupBy('Company').count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



In [60]:
df.agg({'Sales': 'min'}).show()

+----------+
|min(Sales)|
+----------+
|     120.0|
+----------+



In [62]:
df.agg({'Sales': 'sum'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



#### Option-2

In [63]:
# Create a groupBy object
group_data = df.groupBy('Company')

In [64]:
group_data.agg({'Sales': 'sum'}).show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [67]:
from pyspark.sql import functions as F
df.select(F.countDistinct('Sales')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+



In [69]:
df.select(F.avg('Sales').alias('Average Sales')).show()

+-----------------+
|    Average Sales|
+-----------------+
|360.5833333333333|
+-----------------+



In [70]:
df.select(F.countDistinct('Company')).show()

+-----------------------+
|count(DISTINCT Company)|
+-----------------------+
|                      4|
+-----------------------+



In [71]:
df.select(F.stddev('Sales')).show()

+------------------+
|stddev_samp(Sales)|
+------------------+
|250.08742410799007|
+------------------+



In [72]:
df.select(F.stddev('Sales').alias('std')).show()

+------------------+
|               std|
+------------------+
|250.08742410799007|
+------------------+



In [73]:
from pyspark.sql.functions import format_number
sales_std = df.select(F.stddev('Sales').alias('std'))

In [75]:
sales_std.show()

+------------------+
|               std|
+------------------+
|250.08742410799007|
+------------------+



In [76]:
sales_std.select(format_number('std', 2).alias('std')).show()

+------+
|   std|
+------+
|250.09|
+------+



### Sort and orderBy

In [77]:
df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [80]:
df.orderBy('Sales', ascending=False).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



#### SORT sorts data inside partition, while ORDER BY is global sort

In [82]:
df.sort('Sales', ascending=False).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



In [36]:
df.orderBy(df['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



## Example 3 of 3 (ContainsNull.csv)

### Missing Value

In [83]:
df = spark.read.csv('ContainsNull.csv', inferSchema = True, header=True)
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [88]:
# Drop records with one or more missing values 
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [85]:
# Drop records with two or more missing values 
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [89]:
# Drop records with all missing values 
df.na.drop(how = 'all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [90]:
# Drop records with Sales having missing values 
df.na.drop(subset="Sales").show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [91]:
# Drop records with Names having missing values 
df.na.drop(subset='Name').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp4|Cindy|456.0|
+----+-----+-----+



In [92]:
df.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



In [94]:
# Fill numberical features with 0
df.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [96]:
# Fill categorical features with "unknown"
df.na.fill('unknown').show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|unknown| null|
|emp3|unknown|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [97]:
# Fill a specific column (Name) with a specific value (no name)
df.na.fill('no name', subset = 'Name').show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|no name| null|
|emp3|no name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [98]:
df.select(F.mean('Sales')).collect()

[Row(avg(Sales)=400.5)]

In [99]:
df.select(F.mean(df['Sales'])).collect()

[Row(avg(Sales)=400.5)]

#### Replace missing Sales values with the mean(Sales)

In [109]:
# Option-1
mean_val = df.select(F.mean(df['Sales'])).collect()

In [110]:
mean_val[0][0]

400.5

In [111]:
mean_sales = mean_val[0][0]

In [112]:
df.na.fill(mean_sales, subset='Sales').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [53]:
# Option-2: Combine all steps
df.na.fill(df.select(mean(df['Sales'])).collect()[0][0]).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [162]:
# Stop SparkSession
spark.stop()