In [1]:
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()
                        .appName("TestApp")
                        .config("spark.master", "local")
                        .getOrCreate()

// required for scala spark $ syntax
import spark.implicits._

Intitializing Scala interpreter ...

Spark Web UI available at http://74e4be6e0134:4044
SparkContext available as 'sc' (version = 3.2.1, master = local[*], app id = local-1651480830600)
SparkSession available as 'spark'


import org.apache.spark.sql.SparkSession
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@12fe4f09
import spark.implicits._


In [3]:
val df = spark.read.option("header", "true").option("inferSchema","true").csv("Net_11_16")

df: org.apache.spark.sql.DataFrame = [Date: string, Open: double ... 5 more fields]


In [4]:
df.columns

res0: Array[String] = Array(Date, Open, High, Low, Close, Volume, Adj Close)


In [12]:
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [6]:
df.describe().show()

+-------+----------+------------------+------------------+------------------+------------------+--------------------+------------------+
|summary|      Date|              Open|              High|               Low|             Close|              Volume|         Adj Close|
+-------+----------+------------------+------------------+------------------+------------------+--------------------+------------------+
|  count|      1259|              1259|              1259|              1259|              1259|                1259|              1259|
|   mean|      null|230.39351086656092|233.97320872915006|226.80127876251044|  230.522453845909|2.5634836060365368E7|55.610540036536875|
| stddev|      null|164.37456353264244| 165.9705082667129| 162.6506358235739|164.40918905512854| 2.306312683388607E7|35.186669331525486|
|    min|2011-10-24|         53.990001|         55.480001|             52.81|              53.8|             3531300|          7.685714|
|    max|2016-10-24|        708.900017|  

In [7]:
// HV Ratio computation
val df2 = df.withColumn("HV Ratio", df("High")/df("Volume"))

df2: org.apache.spark.sql.DataFrame = [Date: string, Open: double ... 6 more fields]


In [8]:
df.orderBy($"High".desc).show(1)

+----------+-----------------+----------+----------+----------+--------+------------------+
|      Date|             Open|      High|       Low|     Close|  Volume|         Adj Close|
+----------+-----------------+----------+----------+----------+--------+------------------+
|2015-07-13|686.6900019999999|716.159996|686.550026|707.610001|33205200|101.08714300000001|
+----------+-----------------+----------+----------+----------+--------+------------------+
only showing top 1 row



In [11]:
df.select(mean("Close")).show()

+----------------+
|      avg(Close)|
+----------------+
|230.522453845909|
+----------------+



In [13]:
// max and min of volume column
df.select(max("Volume")).show()

+-----------+
|max(Volume)|
+-----------+
|  315541800|
+-----------+



In [14]:
df.select(min("Volume")).show()

+-----------+
|min(Volume)|
+-----------+
|    3531300|
+-----------+



In [16]:
df.filter($"Close" < 600).show(4)

+----------+----------+------------------+----------+-----------------+---------+------------------+
|      Date|      Open|              High|       Low|            Close|   Volume|         Adj Close|
+----------+----------+------------------+----------+-----------------+---------+------------------+
|2011-10-24|119.100002|120.28000300000001|115.100004|       118.839996|120460200|         16.977142|
|2011-10-25| 74.899999|         79.390001| 74.249997|        77.370002|315541800|11.052857000000001|
|2011-10-26|     78.73|         81.420001| 75.399997|        79.400002|148733900|         11.342857|
|2011-10-27| 82.179998| 82.71999699999999| 79.249998|80.86000200000001| 71190000|11.551428999999999|
+----------+----------+------------------+----------+-----------------+---------+------------------+
only showing top 4 rows



In [17]:
df.filter($"Close" < 600).count()

res12: Long = 1218


In [21]:
// what precentage of time high was greater than $500
((df.filter($"High">500).count() * 1.0)/ df.count()) * 100

res16: Double = 4.924543288324067


In [22]:
// Pearson correlation between High & Volume
df.select(corr("High", "Volume")).show()

+--------------------+
|  corr(High, Volume)|
+--------------------+
|-0.20960233287942157|
+--------------------+



In [23]:
// max high per year 
val yeardf = df.withColumn("Year", year(df("Date")))
val yearmaxs = yeardf.select($"Year", $"High").groupBy("Year").max()

yeardf: org.apache.spark.sql.DataFrame = [Date: string, Open: double ... 6 more fields]
yearmaxs: org.apache.spark.sql.DataFrame = [Year: int, max(Year): int ... 1 more field]


In [25]:
yearmaxs.select($"Year", $"max(High)").show()

+----+------------------+
|Year|         max(High)|
+----+------------------+
|2015|        716.159996|
|2013|        389.159988|
|2014|        489.290024|
|2012|        133.429996|
|2016|129.28999299999998|
|2011|120.28000300000001|
+----+------------------+



In [28]:
// average close for each calender month
val monthdf = df.withColumn("Month", month(df("Date")))
val monthavgs = monthdf.select($"Month", $"Close").groupBy("Month").mean()
monthavgs.select($"Month", $"avg(Close)").orderBy("Month").show()

+-----+------------------+
|Month|        avg(Close)|
+-----+------------------+
|    1|212.22613874257422|
|    2| 254.1954634020619|
|    3| 249.5825228971963|
|    4|246.97514271428562|
|    5|264.37037614150944|
|    6| 295.1597153490566|
|    7|243.64747528037387|
|    8|195.25599892727263|
|    9|206.09598121568627|
|   10|205.93297300900903|
|   11| 194.3172275445545|
|   12| 199.3700942358491|
+-----+------------------+



monthdf: org.apache.spark.sql.DataFrame = [Date: string, Open: double ... 6 more fields]
monthavgs: org.apache.spark.sql.DataFrame = [Month: int, avg(Month): double ... 1 more field]
