# Spark - working with dates & timestamps

In [1]:
import findspark
findspark.init('/home/ubuntu/spark-2.1.1-bin-hadoop2.7')

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName('dates').getOrCreate()

In [4]:
df = spark.read.csv('appl_stock.csv', header=True, inferSchema=True)
df.show(5)

+--------------------+----------+----------+------------------+------------------+---------+------------------+
|                Date|      Open|      High|               Low|             Close|   Volume|         Adj Close|
+--------------------+----------+----------+------------------+------------------+---------+------------------+
|2010-01-04 00:00:...|213.429998|214.499996|212.38000099999996|        214.009998|123432400|         27.727039|
|2010-01-05 00:00:...|214.599998|215.589994|        213.249994|        214.379993|150476200|27.774976000000002|
|2010-01-06 00:00:...|214.379993|    215.23|        210.750004|        210.969995|138040000|27.333178000000004|
|2010-01-07 00:00:...|    211.75|212.000006|        209.050005|            210.58|119282800|          27.28265|
|2010-01-08 00:00:...|210.299994|212.000006|209.06000500000002|211.98000499999998|111902700|         27.464034|
+--------------------+----------+----------+------------------+------------------+---------+------------

In [5]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [6]:
df.describe().show()

+-------+------------------+------------------+------------------+-----------------+-------------------+------------------+
|summary|              Open|              High|               Low|            Close|             Volume|         Adj Close|
+-------+------------------+------------------+------------------+-----------------+-------------------+------------------+
|  count|              1762|              1762|              1762|             1762|               1762|              1762|
|   mean| 313.0763111589103| 315.9112880164581| 309.8282405079457|312.9270656379113|9.422577587968218E7| 75.00174115607275|
| stddev|185.29946803981522|186.89817686485767|183.38391664371008|185.1471036170943|6.020518776592709E7| 28.57492972179906|
|    min|              90.0|         90.699997|         89.470001|        90.279999|           11475900|         24.881912|
|    max|        702.409988|        705.070023|        699.569977|       702.100021|          470249500|127.96609099999999|
+-------

In [7]:
from pyspark.sql.functions import (dayofmonth, hour, dayofyear, 
                                   month, year, weekofyear, 
                                   format_number, date_format)

In [8]:
df.select(dayofmonth(df['Date']))\
    .show()

+----------------+
|dayofmonth(Date)|
+----------------+
|               4|
|               5|
|               6|
|               7|
|               8|
|              11|
|              12|
|              13|
|              14|
|              15|
|              19|
|              20|
|              21|
|              22|
|              25|
|              26|
|              27|
|              28|
|              29|
|               1|
+----------------+
only showing top 20 rows



In [9]:
df.select(hour(df['Date']))\
    .show()

+----------+
|hour(Date)|
+----------+
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
|         0|
+----------+
only showing top 20 rows



In [10]:
df.select(month(df['Date']))\
    .show()

+-----------+
|month(Date)|
+-----------+
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          1|
|          2|
+-----------+
only showing top 20 rows



In [20]:
newdf = df.withColumn('year', year(df['Date']))

newdf.groupBy('year')\
    .mean()\
    .select(['year','avg(Close)'])\
    .show()

+----+------------------+
|year|        avg(Close)|
+----+------------------+
|2015|120.03999980555547|
|2013| 472.6348802857143|
|2014| 295.4023416507935|
|2012| 576.0497195640002|
|2016|104.60400786904763|
|2010| 259.8424600000002|
|2011|364.00432532142867|
+----+------------------+



In [31]:
from pyspark.sql.functions import mean, stddev, count, format_number

newdf.groupBy('year')\
    .agg(\
        format_number(mean('Close'), 2).alias('avg_close'),\
        format_number(stddev('Close'), 2).alias('sd_close'),\
        count('Close').alias('cnt_close'))\
    .orderBy('year')\
    .show()

+----+---------+--------+---------+
|year|avg_close|sd_close|cnt_close|
+----+---------+--------+---------+
|2010|   259.84|   37.56|      252|
|2011|   364.00|   25.92|      252|
|2012|   576.05|   66.98|      250|
|2013|   472.63|   44.89|      252|
|2014|   295.40|  224.92|      252|
|2015|   120.04|    7.68|      252|
|2016|   104.60|    7.64|      252|
+----+---------+--------+---------+

