In [1]:
import pyspark


In [2]:
from pyspark.sql import SparkSession


In [3]:
spark = SparkSession.builder \
      .master("local[1]") \
      .appName("beula.sam") \
      .getOrCreate() 

In [5]:
df= spark.read.csv(r"C:\Users\kotayaswanth\Downloads\timberland_stock.csv", header = True)
df.show(10)

+----------+------------------+------------------+------------------+------------------+--------+------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.616215000000004|
|2012-01-10|             59.43|59.709998999999996|             5

In [6]:
from pyspark.sql.functions import year, month, dayofmonth


df = df.withColumn("year", year(df.Date))
df = df.withColumn("month", month(df.Date))
df = df.withColumn("day", dayofmonth(df.Date))


df.show(10)

+----------+------------------+------------------+------------------+------------------+--------+------------------+----+-----+---+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|year|month|day|
+----------+------------------+------------------+------------------+------------------+--------+------------------+----+-----+---+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|2012|    1|  3|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|2012|    1|  4|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|2012|    1|  5|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|2012|    1|  6|
|2012-01-09|         59.029999|         59.549999|         58.919998|       

In [7]:
df.createOrReplaceTempView("timber")

In [8]:
df2 = spark.sql("select Date,High from timber where High=(select max(High) from timber)")

In [9]:
df2.show()

+----------+---------+
|      Date|     High|
+----------+---------+
|2015-01-13|90.970001|
+----------+---------+



In [10]:
mean_close = spark.sql("select mean(Close) as Mean_Close from timber")
mean_close.show()

+-----------------+
|       Mean_Close|
+-----------------+
|72.38844998012726|
+-----------------+



In [11]:
min_volume = spark.sql("select min(Volume) as Min_Volume from timber")
min_volume.show()

+----------+
|Min_Volume|
+----------+
|  10010500|
+----------+



In [13]:
max_volume = spark.sql("select max(Volume) as Max_Volume from timber")
max_volume.show()

+----------+
|Max_Volume|
+----------+
|   9994400|
+----------+



In [14]:
close60dollars = spark.sql("select Date as Close_ls_60 from timber where Close<60.00")
close60dollars.createOrReplaceTempView("close60dollars")
count60close = spark.sql("select count(*) as No_of_days_is60 from close60dollars")
count60close.show()

+---------------+
|No_of_days_is60|
+---------------+
|             81|
+---------------+



In [15]:
pearson_corr = spark.sql("SELECT corr(High, Volume) AS pearson_corr from timber")
pearson_corr.show()

+-------------------+
|       pearson_corr|
+-------------------+
|-0.3384326061737161|
+-------------------+



In [16]:
max_high_per_year = spark.sql("select year, max(High) as max_high from timber group by year")
max_high_per_year.show()

+----+---------+
|year| max_high|
+----+---------+
|2012|77.599998|
|2013|81.370003|
|2014|88.089996|
|2015|90.970001|
|2016|75.190002|
+----+---------+



In [17]:
avg_close_per_month = spark.sql("select month, avg(Close) as avg_close_monthly from timber group by month order by month asc")
avg_close_per_month.show()

+-----+-----------------+
|month|avg_close_monthly|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+

