# Spark DataFrame Basics

Comprehensive code to demonstrate various basics of a Spark DataFrame

park DataFrames are the workhouse and main way of working with Spark and Python post Spark 2.0. DataFrames act as powerful versions of tables, with rows and columns, easily handling large datasets. The shift to DataFrames provides many advantages:

A much simpler syntax

Ability to use SQL directly in the dataframe
Operations are automatically distributed across RDDs

If you've used R or even the pandas library with Python you are probably already familiar with the concept of DataFrames. Spark DataFrame expand on a lot of these concepts, allowing you to transfer that knowledge easily by understanding the simple syntax of Spark DataFrames. Remember that the main advantage to using Spark DataFrames vs those other programs is that Spark can handle data across many RDDs, huge data sets that would never fit on a single computer. That comes at a slight cost of some "peculiar" syntax choices, but after this course you will feel very comfortable with all those topics!

## Creating a DataFrame

First we need to start a SparkSession

In [1]:
from pyspark.sql import SparkSession

Now we start the spark session

In [2]:
spark = SparkSession.builder.appName("Basics").getOrCreate()

Now we will load the data. This time the dataset is a json file

In [3]:
df = spark.read.json('resources/people.json')

Show the data

In [4]:
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In order to view the schema (info of the data structure)

In [5]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In order to view the columns

In [6]:
df.columns

['age', 'name']

In order to see the statistical summary of the columns of the dataframe

In [7]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



Sometimes we can specify the schema while we read the data from other file

In [8]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType

Next we need to create the list of Structure fields

* :param name: string, name of the field.
* :param dataType: :class:`DataType` of the field.
* :param nullable: boolean, whether the field can be null (None) or not.

In [9]:
data_schema = [StructField("age", IntegerType(), True), StructField("name", StringType(), True)]

final_struc = StructType(fields=data_schema)

In [10]:
# Now while reading the data , we will specify this schema

df = spark.read.json('resources/people.json', schema=final_struc)

df.printSchema()

root
 |-- age: integer (nullable = true)
 |-- name: string (nullable = true)



## Grabbing Data from a Spark Dataframe

This returns a column object

In [11]:
df['age']

Column<b'age'>

to check the type of the object

In [13]:
type(df['age'])

pyspark.sql.column.Column

In Order to select a particular column and see it data type

In [14]:
df.select('age')

DataFrame[age: int]

To return all the values of that column

In [15]:
df.select('age').show()

+----+
| age|
+----+
|null|
|  30|
|  19|
+----+



To return first two rows of the dataframe

In [16]:
df.head(2)

[Row(age=None, name='Michael'), Row(age=30, name='Andy')]

To Select multiple columns and return its value

In [17]:
df.select(['age', 'name']).show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



## Creating new columns

Adding new column with a simple copy from existing one. This returns a new data frame with the newly added column

In [18]:
df.withColumn('newage', df['age']).show()

+----+-------+------+
| age|   name|newage|
+----+-------+------+
|null|Michael|  null|
|  30|   Andy|    30|
|  19| Justin|    19|
+----+-------+------+



If we want to perform some operation while this column gets added

In [20]:
df.withColumn('double_age', df['age']*2).show()

+----+-------+----------+
| age|   name|double_age|
+----+-------+----------+
|null|Michael|      null|
|  30|   Andy|        60|
|  19| Justin|        38|
+----+-------+----------+



If we want to rename a column

In [21]:
df.withColumnRenamed('age', 'new_age').show()

+-------+-------+
|new_age|   name|
+-------+-------+
|   null|Michael|
|     30|   Andy|
|     19| Justin|
+-------+-------+



## Using SQLs on Spark Dataframes

To use SQL Queries directly, we need to register it to a temporary view. View is basically a pointer to the spark
data frame

In [22]:
df.createOrReplaceTempView("people")

In [23]:
sql_results = spark.sql("SELECT * FROM people")

sql_results.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [24]:
spark.sql("SELECT * FROM people WHERE age=30").show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



## Basic Operations on Spark DataFrames

Now we will read a CSV file

In [25]:
df = spark.read.csv('resources/appl_stock.csv', inferSchema=True, header=True)

In [26]:
# Now we will check the schema

df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



## Filtering Data

A large part of working with DataFrames is the ability to quickly filter out data based on conditions. Spark DataFrames are built on top of the Spark SQL platform, which means that is you already know SQL, you can quickly and easily grab that data using SQL commands, or using the DataFrame methods

In [27]:
# We will do that using SQL first

df.filter("Close<500").show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

From the filtered set, if we need to see only a specific column

In [31]:
df.filter("Close<500").select('Open').show()

+------------------+
|              Open|
+------------------+
|        213.429998|
|        214.599998|
|        214.379993|
|            211.75|
|        210.299994|
|212.79999700000002|
|209.18999499999998|
|        207.870005|
|210.11000299999998|
|210.92999500000002|
|        208.330002|
|        214.910006|
|        212.079994|
|206.78000600000001|
|202.51000200000001|
|205.95000100000001|
|        206.849995|
|        204.930004|
|        201.079996|
|192.36999699999998|
+------------------+
only showing top 20 rows



In [32]:
# We can extract multiple columns using SQL with select()

df.filter("Close < 500").select(['Open', 'Close']).show()

+------------------+------------------+
|              Open|             Close|
+------------------+------------------+
|        213.429998|        214.009998|
|        214.599998|        214.379993|
|        214.379993|        210.969995|
|            211.75|            210.58|
|        210.299994|211.98000499999998|
|212.79999700000002|210.11000299999998|
|209.18999499999998|        207.720001|
|        207.870005|        210.650002|
|210.11000299999998|            209.43|
|210.92999500000002|            205.93|
|        208.330002|        215.039995|
|        214.910006|            211.73|
|        212.079994|        208.069996|
|206.78000600000001|            197.75|
|202.51000200000001|        203.070002|
|205.95000100000001|        205.940001|
|        206.849995|        207.880005|
|        204.930004|        199.289995|
|        201.079996|        192.060003|
|192.36999699999998|        194.729998|
+------------------+------------------+
only showing top 20 rows



We can also use the normal python operators

In [33]:
df.filter(df['Close'] < 200).show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|        207.499996|            197.16|            197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|        205.500004|        198.699995|        199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|        202.199995|        190.250002|        192.060003|311488100|         24.883208|
|2010-02-01 00:00:00|192.36999699999998|             196.0|191.29999899999999|        194.729998|187469100|         25.229131|
|2010-02-02 00:00:00|        195.909998|        196.319994|193.37999299999998|        195.859997|174585600|25.3

To use logical operators for selections

In [34]:
df.filter((df['Close'] < 200) & (df['Open'] > 200)).show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



In [35]:
df.filter((df['Close'] < 200) | (df['Open'] > 200)).show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

Using and with not operator

In [36]:
df.filter((df['Close'] < 200) & ~(df['Open'] < 200)).show()

+-------------------+------------------+----------+----------+----------+---------+------------------+
|               Date|              Open|      High|       Low|     Close|   Volume|         Adj Close|
+-------------------+------------------+----------+----------+----------+---------+------------------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|    197.16|    197.75|220441900|         25.620401|
|2010-01-28 00:00:00|        204.930004|205.500004|198.699995|199.289995|293375600|25.819922000000002|
|2010-01-29 00:00:00|        201.079996|202.199995|190.250002|192.060003|311488100|         24.883208|
+-------------------+------------------+----------+----------+----------+---------+------------------+



In [37]:
df.filter(df['Low'] == 197.16).show()

+-------------------+------------------+----------+------+------+---------+---------+
|               Date|              Open|      High|   Low| Close|   Volume|Adj Close|
+-------------------+------------------+----------+------+------+---------+---------+
|2010-01-22 00:00:00|206.78000600000001|207.499996|197.16|197.75|220441900|25.620401|
+-------------------+------------------+----------+------+------+---------+---------+



Now we will collect these results as python objects

In [38]:
df.filter(df['Low'] == 197.16).collect()

[Row(Date=datetime.datetime(2010, 1, 22, 0, 0), Open=206.78000600000001, High=207.499996, Low=197.16, Close=197.75, Volume=220441900, Adj Close=25.620401)]

In [42]:
result = df.filter(df['Low'] == 197.16).collect()
type(result)

list

The above returns a list of row objects. In order to see the type of the first object in the list

In [43]:
type(result[0])

pyspark.sql.types.Row

Rows can be converted into dictionaries

In [44]:
row = result[0]
row.asDict()

{'Date': datetime.datetime(2010, 1, 22, 0, 0),
 'Open': 206.78000600000001,
 'High': 207.499996,
 'Low': 197.16,
 'Close': 197.75,
 'Volume': 220441900,
 'Adj Close': 25.620401}

We can print individual items (column values) from the row object

In [45]:
for item in result[0]:
    print(item)

2010-01-22 00:00:00
206.78000600000001
207.499996
197.16
197.75
220441900
25.620401


## GroupBy and Aggregate Functions

Let's learn how to use GroupBy and Aggregate methods on a DataFrame. GroupBy allows you to group rows together based off some column value, for example, you could group together sales data by the day the sale occured, or group repeast customer data based off the name of the customer. Once you've performed the GroupBy operation you can use an aggregate function off that data. An aggregate function aggregates multiple rows of data into a single output, such as taking the sum of inputs, or counting the number of inputs.

In [46]:
# Now we will read another data

df = spark.read.csv('resources/sales_info.csv', inferSchema=True, header=True)

df.printSchema()

root
 |-- Company: string (nullable = true)
 |-- Person: string (nullable = true)
 |-- Sales: double (nullable = true)



In [47]:
# Check the data

df.show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|    Sam|200.0|
|   GOOG|Charlie|120.0|
|   GOOG|  Frank|340.0|
|   MSFT|   Tina|600.0|
|   MSFT|    Amy|124.0|
|   MSFT|Vanessa|243.0|
|     FB|   Carl|870.0|
|     FB|  Sarah|350.0|
|   APPL|   John|250.0|
|   APPL|  Linda|130.0|
|   APPL|   Mike|750.0|
|   APPL|  Chris|350.0|
+-------+-------+-----+



In [48]:
# We will group together by company.. this will return a grouped data off which various methods can be called

df.groupBy('Company').mean().show()

+-------+-----------------+
|Company|       avg(Sales)|
+-------+-----------------+
|   APPL|            370.0|
|   GOOG|            220.0|
|     FB|            610.0|
|   MSFT|322.3333333333333|
+-------+-----------------+



In [49]:
# We can get count of values
df.groupBy('Company').count().show()

+-------+-----+
|Company|count|
+-------+-----+
|   APPL|    4|
|   GOOG|    3|
|     FB|    2|
|   MSFT|    3|
+-------+-----+



In [51]:
# Max, Min and Sum
df.groupBy('Company').max().show()
df.groupBy('Company').min().show()
df.groupBy('Company').sum().show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+

+-------+----------+
|Company|min(Sales)|
+-------+----------+
|   APPL|     130.0|
|   GOOG|     120.0|
|     FB|     350.0|
|   MSFT|     124.0|
+-------+----------+

+-------+----------+
|Company|sum(Sales)|
+-------+----------+
|   APPL|    1480.0|
|   GOOG|     660.0|
|     FB|    1220.0|
|   MSFT|     967.0|
+-------+----------+



Check out this link for more info on other methods:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark-sql-module

Not all methods need a groupby call, instead you can just call the generalized .agg() method, that will call the aggregate across all rows in the dataframe column specified. It can take in arguments as a single column, or create multiple aggregate calls all at once using dictionary notation.

For example:

In [52]:
# Max sales across everything
df.agg({'Sales': 'max'}).show()

+----------+
|max(Sales)|
+----------+
|     870.0|
+----------+



In [53]:
# We could have done this on the groupby object as well

df.groupBy('Company').agg({'Sales': 'max'}).show()

+-------+----------+
|Company|max(Sales)|
+-------+----------+
|   APPL|     750.0|
|   GOOG|     340.0|
|     FB|     870.0|
|   MSFT|     600.0|
+-------+----------+



## Functions

There are a variety of functions you can import from pyspark.sql.functions. Check out the documentation for the full list available:
http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions

In [63]:
from pyspark.sql.functions import countDistinct, avg, stddev, format_number

In [56]:
df.select(countDistinct("Sales")).show()
df.select(countDistinct("Company")).show()

+---------------------+
|count(DISTINCT Sales)|
+---------------------+
|                   11|
+---------------------+

+-----------------------+
|count(DISTINCT Company)|
+-----------------------+
|                      4|
+-----------------------+



In [57]:
# If we want to change the name, we can use alias

df.select(countDistinct("Sales").alias('Distinct Sales')).show()

+--------------+
|Distinct Sales|
+--------------+
|            11|
+--------------+



In [59]:
df.select(avg('Sales')).show()
df.select(stddev('Sales').alias('std')).show()

+-----------------+
|       avg(Sales)|
+-----------------+
|360.5833333333333|
+-----------------+

+------------------+
|               std|
+------------------+
|250.08742410799007|
+------------------+



In [66]:
# In order to specify a precision digits

df.select(stddev('Sales').alias('std')).select(format_number('std', 2)).show()

+---------------------+
|format_number(std, 2)|
+---------------------+
|               250.09|
+---------------------+



## Order By

In [67]:
# Orderby Ascending

df.orderBy('Sales').show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|   GOOG|Charlie|120.0|
|   MSFT|    Amy|124.0|
|   APPL|  Linda|130.0|
|   GOOG|    Sam|200.0|
|   MSFT|Vanessa|243.0|
|   APPL|   John|250.0|
|   GOOG|  Frank|340.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   MSFT|   Tina|600.0|
|   APPL|   Mike|750.0|
|     FB|   Carl|870.0|
+-------+-------+-----+



In [68]:
# Descending call off the column itself.
df.orderBy(df["Sales"].desc()).show()

+-------+-------+-----+
|Company| Person|Sales|
+-------+-------+-----+
|     FB|   Carl|870.0|
|   APPL|   Mike|750.0|
|   MSFT|   Tina|600.0|
|     FB|  Sarah|350.0|
|   APPL|  Chris|350.0|
|   GOOG|  Frank|340.0|
|   APPL|   John|250.0|
|   MSFT|Vanessa|243.0|
|   GOOG|    Sam|200.0|
|   APPL|  Linda|130.0|
|   MSFT|    Amy|124.0|
|   GOOG|Charlie|120.0|
+-------+-------+-----+



## Dealing with Missing Data

Often data sources are incomplete, which means you will have missing data, you have 3 basic options for filling in missing data (you will personally have to make the decision for what is the right approach:

* Just keep the missing data points.
* Drop them missing data points (including the entire row)
* Fill them in with some other value.

Let's cover examples of each of these methods!

In [69]:
# read the data frame

df = spark.read.csv("resources/ContainsNull.csv",header=True,inferSchema=True)

In [70]:
df.show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



Missing data remains as null

### Drop the missing data

You can use the .na functions for missing data. The drop command has the following parameters:

    df.na.drop(how='any', thresh=None, subset=None)
    
    * param how: 'any' or 'all'.
    
        If 'any', drop a row if it contains any nulls.
        If 'all', drop a row only if all its values are null.
    
    * param thresh: int, default None
    
        If specified, drop rows that have less than `thresh` non-null values.
        This overwrites the `how` parameter.
        
    * param subset: 
        optional list of column names to consider.

In [71]:
# drop any row that contains missing data
df.na.drop().show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [72]:
# has to have atleast 2 NON-null values
df.na.drop(thresh=2).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [74]:
# If we have to drop missing values from a specific column
df.na.drop(subset=['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



In [75]:
# If we have to drop missing value if any value of a column in a row as a missing value
df.na.drop(how='any').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp4|Cindy|456.0|
+----+-----+-----+



In [76]:
# If we have to drop missing value only if all value of a column in a row as a missing value
df.na.drop(how='all').show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John| null|
|emp2| null| null|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



### Fill the missing values

We can also fill the missing values with new values. If you have multiple nulls across multiple data types, Spark is actually smart enough to match up the data types. For example:

In [77]:
df.na.fill('NEW VALUE').show()

+----+---------+-----+
|  Id|     Name|Sales|
+----+---------+-----+
|emp1|     John| null|
|emp2|NEW VALUE| null|
|emp3|NEW VALUE|345.0|
|emp4|    Cindy|456.0|
+----+---------+-----+



In [78]:
df.na.fill(0).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|  0.0|
|emp2| null|  0.0|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



Usually you should specify what columns you want to fill with the subset parameter

In [79]:
df.na.fill('No Name',subset=['Name']).show()

+----+-------+-----+
|  Id|   Name|Sales|
+----+-------+-----+
|emp1|   John| null|
|emp2|No Name| null|
|emp3|No Name|345.0|
|emp4|  Cindy|456.0|
+----+-------+-----+



A very common practice is to fill values with the mean value for the column, for example:

In [83]:
from pyspark.sql.functions import mean

mean_val = df.select(mean(df['Sales'])).collect()

# Weird nested formatting of Row object!
mean_val[0][0]

400.5

In [84]:
df.na.fill(mean_val[0][0], ['Sales']).show()

+----+-----+-----+
|  Id| Name|Sales|
+----+-----+-----+
|emp1| John|400.5|
|emp2| null|400.5|
|emp3| null|345.0|
|emp4|Cindy|456.0|
+----+-----+-----+



## # Dates and Timestamps

You will often find yourself working with Time and Date information, let's walk through some ways you can deal with it!

In [85]:
# read the data

df = spark.read.csv("resources/appl_stock.csv",header=True,inferSchema=True)

In [86]:
df.show()

+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|               Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+-------------------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:00|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:00|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:00|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:00|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:00|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|    

Lets walkthrough to grab the parts of the timestamp data

In [87]:
from pyspark.sql.functions import format_number, dayofmonth, hour, dayofyear, month, year, weekofyear, date_format

In [91]:
df.select(dayofmonth(df['Date'])).show()
print(f"Showing hour")
df.select(hour(df['Date'])).show()
print(f"Showing day of year")
df.select(dayofyear(df['Date'])).show()
print(f"Showing month")
df.select(month(df['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows

Showing hour
+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 20 rows

Showing day of year
+---------------+
|dayofyear(Date)|
+---------------+
|              4|
|              5|
|              6|
|              7|
|              8|
|  

So for example, let's say we wanted to know the average closing price per year. Easy! With a groupby and the year() function call:

In [94]:
newdf = df.withColumn("Year", year(df['Date']))

In [100]:
newdf.groupBy('Year').agg({'Close': 'avg'}).show()

+----+------------------+
|Year|        avg(Close)|
+----+------------------+
|2015|120.03999980555547|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2012| 576.0497195640002|
|2016|104.60400786904763|
|2010| 259.8424600000002|
|2011|364.00432532142867|
+----+------------------+

