In [1]:
import findspark
findspark.init()

import pyspark

In [2]:
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

sc = SparkContext()
spark = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/21 20:06:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
data = spark.read.csv("walmart_stock.csv", header=True,
                      inferSchema=True)

In [4]:
data.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

In [5]:
data.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



In [6]:
for row in data.head(5):
    print(row)
    print('\n')
data.show(3)

Row(Date=datetime.date(2012, 1, 3), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996)


Row(Date=datetime.date(2012, 1, 4), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475)


Row(Date=datetime.date(2012, 1, 5), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539)


Row(Date=datetime.date(2012, 1, 6), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922)


Row(Date=datetime.date(2012, 1, 9), Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)


+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+

In [7]:
data.describe().show()

25/07/21 20:06:57 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

In [8]:
from pyspark.sql import functions as F

In [9]:
hv_ratio_df = data.withColumn("HV Ratio", data["High"]/data["Volume"])
hv_ratio_df.select("HV Ratio").show(5)

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
+--------------------+
only showing top 5 rows



In [16]:
max_high_value = data.agg(F.max("High")).collect()[0][0]
max_high_value

90.970001

In [21]:
peak_high_date_df = data.filter(F.col("High") == max_high_value).select("Date")
peak_high_date_df.show()

+----------+
|      Date|
+----------+
|2015-01-13|
+----------+



In [10]:
data.select(F.min("High")).show()

+---------+
|min(High)|
+---------+
|57.060001|
+---------+



In [11]:
data.select(F.mean("Close")).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



In [12]:
data.select(F.max("Volume"), F.min("Volume")).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



In [63]:
num_days = data.filter("Close < 60").count()
num_days

81

In [65]:
num_days_80 = data.filter("High > 80").count()
percentage_80 = num_days_80/data.count()*100
percentage_80

9.141494435612083

In [72]:
data.corr("High", "Volume")

-0.3384326061737161

In [74]:
data.agg(F.corr("High", data.Volume)).show()

+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326061737161|
+-------------------+



In [77]:
data.groupBy(F.year(data["Date"])).agg(F.max("High")).show()

+----------+---------+
|year(Date)|max(High)|
+----------+---------+
|      2015|90.970001|
|      2013|81.370003|
|      2014|88.089996|
|      2012|77.599998|
|      2016|75.190002|
+----------+---------+



In [86]:
data.groupBy(F.month(data["Date"])).agg(F.mean("Close")).show()

+-----------+-----------------+
|month(Date)|       avg(Close)|
+-----------+-----------------+
|         12|72.84792478301885|
|          1|71.44801958415842|
|          6| 72.4953774245283|
|          3|71.77794377570092|
|          5|72.30971688679247|
|          9|72.18411785294116|
|          4|72.97361900952382|
|          8|73.02981855454546|
|          7|74.43971943925233|
|         10|71.57854545454543|
|         11| 72.1110893069307|
|          2|  71.306804443299|
+-----------+-----------------+

