In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (date_format, format_number,
                                   hour, year,month, 
                                   dayofyear,weekofyear, dayofmonth)

In [2]:
spark = SparkSession.builder.appName("date").getOrCreate()

In [3]:
df = spark.read.csv('appl_stock.csv', header=True, inferSchema=True)

In [4]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [5]:
df.show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+---------+------------------+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
|2010-01-11|212.79999700000002|        213.000002|      

In [6]:
df.select(dayofmonth(df['Date'])).show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [7]:
df.select(hour(df['Date'])).show()

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 20 rows



In [8]:
df.select(dayofyear(df['Date'])).show()

+---------------+
|dayofyear(Date)|
+---------------+
|              4|
|              5|
|              6|
|              7|
|              8|
|             11|
|             12|
|             13|
|             14|
|             15|
|             19|
|             20|
|             21|
|             22|
|             25|
|             26|
|             27|
|             28|
|             29|
|             32|
+---------------+
only showing top 20 rows



In [9]:
df.select(month(df['Date'])).show()

+-----------+
|month(Date)|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          2|
+-----------+
only showing top 20 rows



In [10]:
df.select(year(df['Date'])).show()

+----------+
|year(Date)|
+----------+
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
|      2010|
+----------+
only showing top 20 rows



In [11]:
df.withColumn('Year', year(df['Date'])).show()

+----------+------------------+------------------+------------------+------------------+---------+------------------+----+
|      Date|              Open|              High|               Low|             Close|   Volume|         Adj Close|Year|
+----------+------------------+------------------+------------------+------------------+---------+------------------+----+
|2010-01-04|        213.429998|        214.499996|212.38000099999996|        214.009998|123432400|         27.727039|2010|
|2010-01-05|        214.599998|        215.589994|        213.249994|        214.379993|150476200|27.774976000000002|2010|
|2010-01-06|        214.379993|            215.23|        210.750004|        210.969995|138040000|27.333178000000004|2010|
|2010-01-07|            211.75|        212.000006|        209.050005|            210.58|119282800|          27.28265|2010|
|2010-01-08|        210.299994|        212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|2010|
|2010-01-11|212.

In [12]:
newdf = df.withColumn('Year', year(df['Date']))

In [13]:
newdf.groupby('Year').mean()['Year','avg(Close)'].show()

+----+------------------+
|Year|        avg(Close)|
+----+------------------+
|2015|120.03999980555547|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2012| 576.0497195640002|
|2016|104.60400786904763|
|2010| 259.8424600000002|
|2011|364.00432532142867|
+----+------------------+



In [14]:
results = newdf.groupby('Year').agg({'Open':'mean','Close':'mean'})

In [16]:
results.show()

+----+------------------+------------------+
|Year|        avg(Close)|         avg(Open)|
+----+------------------+------------------+
|2015|120.03999980555547|120.17575393253965|
|2013| 472.6348802857143| 473.1281355634922|
|2014| 295.4023416507935| 295.1426195357143|
|2012| 576.0497195640002|     576.652720788|
|2016|104.60400786904763|104.50777772619044|
|2010| 259.8424600000002| 259.9576190992064|
|2011|364.00432532142867|364.06142773412705|
+----+------------------+------------------+



In [17]:
results = results.withColumnRenamed('avg(Open)','Mean open')

In [18]:
results.show()

+----+------------------+------------------+
|Year|        avg(Close)|         Mean open|
+----+------------------+------------------+
|2015|120.03999980555547|120.17575393253965|
|2013| 472.6348802857143| 473.1281355634922|
|2014| 295.4023416507935| 295.1426195357143|
|2012| 576.0497195640002|     576.652720788|
|2016|104.60400786904763|104.50777772619044|
|2010| 259.8424600000002| 259.9576190992064|
|2011|364.00432532142867|364.06142773412705|
+----+------------------+------------------+



In [21]:
resutls = results.withColumnRenamed('avg(Close)','Mean close')

In [23]:
type(resutls)

pyspark.sql.dataframe.DataFrame

In [25]:
results.select('Year',format_number('Mean open',2).alias('Mean')).show()

+----+------+
|Year|  Mean|
+----+------+
|2015|120.18|
|2013|473.13|
|2014|295.14|
|2012|576.65|
|2016|104.51|
|2010|259.96|
|2011|364.06|
+----+------+

