## Spark DataFrame Basic Operations

### Basic operations reading, loading and viewing the data

* **pyspark.sql.SparkSession** => Main entry point for DataFrame and SQL functionality.
* **class builder** => Builder for SparkSession.
* **appName** => Sets a name for the application, which will be shown in the Spark web UI.
* **getOrCreate** => Gets an existing SparkSession or, if there is no existing one, creates a new one based on the options set in this builder.

In [1]:
import findspark
findspark.init('C:\\Users\\pradn\\Desktop\\spark\\spark-2.4.3-bin-hadoop2.7')

import pyspark 

#starting the spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Basics').getOrCreate()

#reading the json file
df = spark.read.json('people.json')

#printing the dataframe
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



**printSchema()** => Prints out the schema in the tree format.

In [2]:
#finding the schema of the dataframe
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



**df.columns** => Returns all column names as a list.

In [3]:
#dataframe columns
df.columns

['age', 'name']

**describe** => Computes statistics for numeric and string columns.This include count, mean, stddev, min, and max. If no columns are given, this function computes statistics for all numerical or string columns.

In [4]:
#displaying the statistical summary of the numeric columns
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



**class pyspark.sql.types.StructField(name, dataType, nullable=True, metadata=None)
A field in StructType.**

**Parameters:**	
* name – string, name of the field.
* dataType – DataType of the field.
* nullable – boolean, whether the field can be null (None) or not.
* metadata – a dict from string to simple type that can be toInternald to JSON automatically


In [5]:
#exploring schema
from pyspark.sql.types import (StructField, StringType, IntegerType, StructType)

In [6]:
data_schema = [StructField('age', IntegerType(), True),
              StructField('name', StringType(), True)]

In [7]:
final_structure = StructType(fields = data_schema)

In [8]:
df = spark.read.json('people.json', schema = final_structure)
df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



In [9]:
#selecting columns from the dataframe
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



In [10]:
type(df['age'])

pyspark.sql.column.Column

In [11]:
type(df.select('age'))

pyspark.sql.dataframe.DataFrame

In [12]:
#display first 2 rows in a datframe
df.head(2)

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

In [13]:
df.head(2)[0]

Row(age=None, name='Michael')

In [14]:
type(df.head(2)[0])

pyspark.sql.types.Row

In [15]:
df.select('age', 'name').show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



**withColumn(colName, col)** => Returns a new DataFrame by adding a column or replacing the existing column that has the same name.

In [16]:
#creating a copy of a column
df.withColumn('newage', df['age']).show()

+----+-------+------+
| age|   name|newage|
+----+-------+------+
|null|Michael|  null|
|  30|   Andy|    30|
|  19| Justin|    19|
+----+-------+------+



In [17]:
#creating a new column
df.withColumn('double_age', df['age'] * 2).show()

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



**withColumnRenamed(existing, new)** => Returns a new DataFrame by renaming an existing column. This is a no-op if schema doesn’t contain the given column name.

**Parameters:**	
* existing – string, name of the existing column to rename.
* col – string, new name of the column.

In [18]:
#renaming a column
df.withColumnRenamed('age', 'new_age').show()

+-------+-------+
|new_age|   name|
+-------+-------+
|   null|Michael|
|     30|   Andy|
|     19| Justin|
+-------+-------+



**createOrReplaceTempView(name)** => Creates or replaces a local temporary view with this DataFrame.
The lifetime of this temporary table is tied to the SparkSession that was used to create this DataFrame.

In [19]:
#register as sql temporary view
df.createOrReplaceTempView('people')

In [20]:
results = spark.sql("SELECT * from people")

In [21]:
results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



**where(condition)** => where() is an alias for filter().

In [22]:
new_results = spark.sql("SELECT * FROM people WHERE age = 30")
new_results.show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



### How to filter data once you grab it

In [23]:
#creating another spark session
spark1 = SparkSession.builder.appName('ops').getOrCreate()

In [24]:
df1 = spark.read.csv('appl_stock.csv', inferSchema=True, header=True)
df1.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [25]:
df1.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



**filter(condition)** => Filters rows using the given condition.

In [26]:
#grab all the data that has closing price less than 500$
df1.filter('Close < 500').show(5)

+-------------------+----------+----------+------------------+------------------+---------+------------------+
|               Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
o

In [27]:
#show selected columns only
df1.filter('Close < 500').select(['Open', 'Close']).show(5)

+----------+------------------+
|      Open|             Close|
+----------+------------------+
|213.429998|        214.009998|
|214.599998|        214.379993|
|214.379993|        210.969995|
|    211.75|            210.58|
|210.299994|211.98000499999998|
+----------+------------------+
only showing top 5 rows



In [28]:
#access the columns using the dataframe syntax
df1.filter(df1['Close'] < 500).show(5)

+-------------------+----------+----------+------------------+------------------+---------+------------------+
|               Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+-------------------+----------+----------+------------------+------------------+---------+------------------+
o

In [29]:
#accessing by dataframe
df1.filter(df1['Close'] < 500).select(df1['Volume']).show(5)

+---------+
|   Volume|
+---------+
|123432400|
|150476200|
|138040000|
|119282800|
|111902700|
+---------+
only showing top 5 rows



In [30]:
#filtering stocks whose closing price is less than 200 and opening price is more than 200
#displaying only the top 5 stocks with their open and close prices
df1.filter((df1['Close'] < 200) & (df1['Open'] >200)).select(['Close', 'Open']).show(5)

+----------+------------------+
|     Close|              Open|
+----------+------------------+
|    197.75|206.78000600000001|
|199.289995|        204.930004|
|192.060003|        201.079996|
+----------+------------------+



In [31]:
#filtering stocks whose closing price is less than 200 and opening price is NOT more than 200
#displaying only the top 5 stocks with their open and close prices
df1.filter((df1['Close'] < 200) & ~(df1['Open'] > 200)).select(['Close', 'Open']).show(5)

+----------+------------------+
|     Close|              Open|
+----------+------------------+
|194.729998|192.36999699999998|
|195.859997|        195.909998|
|199.229994|        195.169994|
|192.050003|        196.730003|
|195.460001|192.63000300000002|
+----------+------------------+
only showing top 5 rows



In [32]:
#date when the stock low price was 197.16
df1.filter(df1['Low'] == 197.16).show()

+-------------------+------------------+----------+------+------+---------+---------+
|               Date|              Open|      High|   Low| Close|   Volume|Adj Close|
+-------------------+------------------+----------+------+------+---------+---------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|
+-------------------+------------------+----------+------+------+---------+---------+



**collect()** => Returns all the records as a list of Row.

In [33]:
#using collect() instead of show()
df1.filter(df1['Low'] == 197.16).collect()

[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [34]:
#collecting the data and storing it in result
result = df1.filter(df1['Low'] == 197.16).collect()

In [35]:
#displaying the result
result

[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [36]:
#displaying the first row of the result. (in this case we just have one result)
result[0]

Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)

In [37]:
#storing the first row of result in a variable called row.
row = result[0]

**asDict(recursive=False)** => Return as an dict

**Parameters:**
recursive – turns the nested Row as dict (default: False).

In [38]:
#storing the column names and values in the form of key-value pairs in the dictionary
row.asDict()

{'Date': datetime.datetime(2010, 1, 22, 0, 0),
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

In [39]:
#accessing the values of the dictionary using its keys
row.asDict()['Volume']

220441900

### Group by and aggregate functions

In [40]:
#reading the sales info csv file
df2 = spark.read.csv('sales_info.csv', inferSchema=True, header=True)

In [41]:
df2.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [42]:
df2.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



**groupBy(*cols)** => Groups the DataFrame using the specified columns, so we can run aggregation on them. See GroupedData for all the available aggregate functions.

groupby() is an alias for groupBy().

**Parameters:**	
* cols – list of columns to group by. Each element should be a column name (string) or an expression (Column).

In [43]:
#grouping the same companies
df2.groupBy('Company')

<pyspark.sql.group.GroupedData at 0x27227d0ea58>

In [44]:
#grouping the same companies and finding their mean
df2.groupBy('Company').mean()

DataFrame[Company: string, avg(Sales): double]

In [45]:
#displaying average sales of a company
df2.groupBy('Company').mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [46]:
#grouping the same companies by suming their sales
df2.groupBy('Company').sum().show()

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



In [47]:
#finding the minimum sales of the company
df2.groupBy('Company').min().show()

+-------+----------+
|Company|min(Sales)|
+-------+----------+
|   APPL|     130.0|
|   GOOG|     120.0|
|     FB|     350.0|
|   MSFT|     124.0|
+-------+----------+



In [48]:
#finding the maximum sales of the company using groupby
df2.groupBy('Company').max().show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



**agg(*exprs)** => Aggregate on the entire DataFrame without groups (shorthand for df.groupBy.agg()).

In [49]:
#finding the maximum sales of the company using agg
group_data = df2.groupBy('Company')
group_data.agg({'Sales':'max'}).show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



In [50]:
#counting the number of rows per company
df2.groupBy('Company').count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



In [51]:
#displaying sum of all sales in the dataframe
df2.agg({'Sales':'sum'}).show()

+----------+
|sum(Sales)|
+----------+
|    4327.0|
+----------+



In [52]:
#displaying the max sale
df2.agg({'sales':'max'}).show()

+----------+
|max(sales)|
+----------+
|     870.0|
+----------+



In [53]:
#displaying the min sale
df2.agg({'Sales':'min'}).show()

+----------+
|min(Sales)|
+----------+
|     120.0|
+----------+



In [54]:
#importing functions from spark
from pyspark.sql.functions import countDistinct, avg, stddev

In [55]:
#displaying the number of distinct sales
df2.select(countDistinct('Sales')).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+



In [56]:
#displaying average sales
df2.select(avg('Sales')).show()

+-----------------+
|       avg(Sales)|
+-----------------+
|360.5833333333333|
+-----------------+



In [57]:
#changing the name of the column
df2.select(avg('Sales').alias('Average Sales')).show()

+-----------------+
|    Average Sales|
+-----------------+
|360.5833333333333|
+-----------------+



In [58]:
#displaying the standard deviation of the sales
df2.select(stddev('Sales')).show()

+------------------+
|stddev_samp(Sales)|
+------------------+
|250.08742410799007|
+------------------+



In [59]:
#storing the standard deviation of sales in another column
sales_std = df2.select(stddev('Sales')).show()

+------------------+
|stddev_samp(Sales)|
+------------------+
|250.08742410799007|
+------------------+



In [60]:
#importing format_number library
from pyspark.sql.functions import format_number

In [61]:
#renaming stddev of sales column
sales_std = df2.select(stddev('Sales').alias('std'))

In [62]:
sales_std.show()

+------------------+
|               std|
+------------------+
|250.08742410799007|
+------------------+



In [63]:
#formating the float number upto 2 decimal places
sales_std.select(format_number('std', 2)).show()

+---------------------+
|format_number(std, 2)|
+---------------------+
|               250.09|
+---------------------+



In [64]:
#renaming the column name
sales_std.select(format_number('std', 2).alias('final_std')).show()

+---------+
|final_std|
+---------+
|   250.09|
+---------+



### Ordering and sorting

**orderBy(*cols, **kwargs)** => Returns a new DataFrame sorted by the specified column(s).

**Parameters:**	
* cols – list of Column or column names to sort by.
* ascending – boolean or list of boolean (default True). Sort ascending vs. descending. Specify list for multiple sort orders. If a list is specified, length of the list must equal length of the cols.

In [65]:
#ascending ordering
df2.orderBy(df2['Sales']).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [66]:
df2.orderBy(df2['Sales'].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



### Handling Null values in a dataframe

In [67]:
#reading a new dataframe
df3 = spark.read.csv('ContainsNull.csv', inferSchema=True, header=True)

In [68]:
df3.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



**dropna(how='any', thresh=None, subset=None)** => Returns a new DataFrame omitting rows with null values. DataFrame.dropna() and DataFrameNaFunctions.drop() are aliases of each other.

**Parameters:**	
* how – ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null.
* thresh – int, default None If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter.
* subset – optional list of column names to consider.

In [69]:
#dropping rows that have null value
df3.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [70]:
#dropping rows that have more than 2 nulls
df3.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [71]:
#dropping rows if there are any nulls values
df3.na.drop(how='any').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [72]:
#dropping rows if all values are null
df3.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [73]:
#dropping rows wherein a particular column value value is null
df3.na.drop(subset='Sales').show()
#this removes the rows which have nul values in 'Sales' column

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



### Fill in the missing values

In [74]:
df3.printSchema()

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sales: double (nullable = true)



**fillna(value, subset=None)** => Replace null values, alias for na.fill(). DataFrame.fillna() and DataFrameNaFunctions.fill() are aliases of each other.

**Parameters:**	
* value – int, long, float, string, or dict. Value to replace null values with. If the value is a dict, then subset is ignored and value must be a mapping from column name (string) to replacement value. The replacement value must be an int, long, float, or string.
* subset – optional list of column names to consider. Columns specified in subset that do not have matching data type are ignored. For example, if value is a string, and subset contains a non-string column, then the non-string column is simply ignored.

In [75]:
#if we give a string value to fill it only fills those null values whose data type is string
df3.na.fill('Unknown').show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|Unknown| null|
|emp3|Unknown|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



In [76]:
#if we give a integer value to fill it only fills those null values whose data type is integer
df3.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [77]:
#always fill values using the subset 
df3.na.fill('Unknown', subset=['Name']).show()
df3.na.fill(0, subset=['Sales']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|Unknown| null|
|emp3|Unknown|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



### Fill values with the mean value

In [78]:
from pyspark.sql.functions import mean

In [79]:
mean_val = df3.select(mean(df3['Sales'])).collect()

In [80]:
mean_val

[Row(avg(Sales)=400.5)]

In [81]:
mean_val[0]

Row(avg(Sales)=400.5)

In [82]:
mean_val[0][0]

400.5

In [83]:
mean_sales = mean_val[0][0]

In [84]:
df3.na.fill(mean_sales,subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



## OR

In [85]:
df3.na.fill(mean_sales,['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [86]:
#everything in one line
df3.na.fill(df3.select(mean(df3['Sales'])).collect()[0][0], ['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



### Dates and Timestamps

In [87]:
#using df1
df1.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

In [88]:
df1.head(1)

[Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=213.429998, High=214.499996, Low=212.38000099999996, Close=214.009998, Volume=123432400, Adj Close=27.727039)]

In [89]:
df1.select(['Date', 'Open']).show()

+-------------------+------------------+
|               Date|              Open|
+-------------------+------------------+
|2010-01-04 00:00:00|        213.429998|
|2010-01-05 00:00:00|        214.599998|
|2010-01-06 00:00:00|        214.379993|
|2010-01-07 00:00:00|            211.75|
|2010-01-08 00:00:00|        210.299994|
|2010-01-11 00:00:00|212.79999700000002|
|2010-01-12 00:00:00|209.18999499999998|
|2010-01-13 00:00:00|        207.870005|
|2010-01-14 00:00:00|210.11000299999998|
|2010-01-15 00:00:00|210.92999500000002|
|2010-01-19 00:00:00|        208.330002|
|2010-01-20 00:00:00|        214.910006|
|2010-01-21 00:00:00|        212.079994|
|2010-01-22 00:00:00|206.78000600000001|
|2010-01-25 00:00:00|202.51000200000001|
|2010-01-26 00:00:00|205.95000100000001|
|2010-01-27 00:00:00|        206.849995|
|2010-01-28 00:00:00|        204.930004|
|2010-01-29 00:00:00|        201.079996|
|2010-02-01 00:00:00|192.36999699999998|
+-------------------+------------------+
only showing top

In [90]:
from pyspark.sql.functions import (dayofmonth, hour, dayofyear, weekofyear, month, year, format_number, date_format)

In [91]:
df1.select(dayofmonth(df1['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [92]:
df1.select(hour(df1['Date'])).show(5)

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 5 rows



In [93]:
df1.select(month(df1['Date'])).show()

+-----------+
|month(Date)|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          2|
+-----------+
only showing top 20 rows



In [94]:
df1.select(dayofyear(df1['Date'])).show(5)

+---------------+
|dayofyear(Date)|
+---------------+
|              4|
|              5|
|              6|
|              7|
|              8|
+---------------+
only showing top 5 rows



In [95]:
df1.select(weekofyear(df1['Date'])).show(5)

+----------------+
|weekofyear(Date)|
+----------------+
|               1|
|               1|
|               1|
|               1|
|               1|
+----------------+
only showing top 5 rows



In [96]:
df1.select(year(df1['Date'])).show(5)

+----------+
|year(Date)|
+----------+
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
+----------+
only showing top 5 rows



In [97]:
#average closing price per year
#df1.withColumn('Year', year(df1['Date'])).show()
newdf = df1.withColumn('Year', year(df1['Date']))

In [98]:
newdf.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+----+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|Year|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+----+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.0600050000000

In [99]:
newdf.groupBy('Year').mean().show()

+----+------------------+------------------+------------------+------------------+--------------------+------------------+---------+
|Year|         avg(Open)|         avg(High)|          avg(Low)|        avg(Close)|         avg(Volume)|    avg(Adj Close)|avg(Year)|
+----+------------------+------------------+------------------+------------------+--------------------+------------------+---------+
|2015|120.17575393253965|121.24452385714291| 118.8630954325397|120.03999980555547|  5.18378869047619E7|115.96740080555561|   2015.0|
|2013| 473.1281355634922| 477.6389272301587|468.24710264682557| 472.6348802857143|          1.016087E8| 62.61798788492063|   2013.0|
|2014| 295.1426195357143|297.56103184523823| 292.9949599801587| 295.4023416507935| 6.315273055555555E7| 87.63583323809523|   2014.0|
|2012|     576.652720788| 581.8254008040001| 569.9211606079999| 576.0497195640002|       1.319642044E8| 74.81383696800002|   2012.0|
|2016|104.50777772619044| 105.4271825436508|103.69027771825397|104.60

In [100]:
newdf.groupby('Year').mean().select(['Year', 'avg(Close)']).show()

+----+------------------+
|Year|        avg(Close)|
+----+------------------+
|2015|120.03999980555547|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2012| 576.0497195640002|
|2016|104.60400786904763|
|2010| 259.8424600000002|
|2011|364.00432532142867|
+----+------------------+



In [101]:
result = newdf.groupBy('Year').mean().select(['Year', 'avg(Close)'])

In [102]:
result.withColumnRenamed('avg(Close)', 'Average Closing Price').show()

+----+---------------------+
|Year|Average Closing Price|
+----+---------------------+
|2015|   120.03999980555547|
|2013|    472.6348802857143|
|2014|    295.4023416507935|
|2012|    576.0497195640002|
|2016|   104.60400786904763|
|2010|    259.8424600000002|
|2011|   364.00432532142867|
+----+---------------------+



In [103]:
new = result.withColumnRenamed('avg(Close)', 'Average Closing Price')

**alias(alias)** => Returns a new DataFrame with an alias set.

In [104]:
new.select(['Year',format_number('Average Closing Price', 2).alias('Avg Closing Price')]).show()

+----+-----------------+
|Year|Avg Closing Price|
+----+-----------------+
|2015|           120.04|
|2013|           472.63|
|2014|           295.40|
|2012|           576.05|
|2016|           104.60|
|2010|           259.84|
|2011|           364.00|
+----+-----------------+

