In [1]:
#start a simple spark session
import findspark
findspark.init('/home/paul/spark-3.0.0-bin-hadoop2.7')
import pyspark
from pyspark.sql import SparkSession 

In [2]:
spark = SparkSession.builder.appName('sol').getOrCreate()

In [3]:
#load the walmart stock csv file, have spark infer the data types
df = spark.read.csv('Downloads/WMT.csv', header=True, inferSchema=True)

In [4]:
#define columns name
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']

In [5]:
#Schema look like
df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [6]:
#print out first 5 columns
for row in df.head(5):
    print(row)
    print('\n')


Row(Date='2019-07-12', Open=114.089996, High=114.769997, Low=113.620003, Close=114.599998, Adj Close=112.534279, Volume=3743300)


Row(Date='2019-07-15', Open=114.669998, High=115.080002, Low=114.449997, Close=114.980003, Adj Close=112.907433, Volume=3346000)


Row(Date='2019-07-16', Open=115.330002, High=115.489998, Low=114.040001, Close=114.760002, Adj Close=112.691399, Volume=3488200)


Row(Date='2019-07-17', Open=114.809998, High=115.169998, Low=114.199997, Close=114.599998, Adj Close=112.534279, Volume=2688500)


Row(Date='2019-07-18', Open=114.349998, High=114.779999, Low=113.739998, Close=114.720001, Adj Close=112.652122, Volume=3224800)




In [7]:
#Describe data
df.describe().show()

+-------+----------+------------------+------------------+------------------+------------------+------------------+-----------------+
|summary|      Date|              Open|              High|               Low|             Close|         Adj Close|           Volume|
+-------+----------+------------------+------------------+------------------+------------------+------------------+-----------------+
|  count|       252|               252|               252|               252|               252|               252|              252|
|   mean|      null|118.08813497222226|119.29162696031744|116.91523788888895|118.12956356349204|117.10961042063497|7487978.968253968|
| stddev|      null| 5.058526579992194| 5.007457713071321| 5.029074692710633| 4.966992425100021| 5.322205020724095|4370560.229845061|
|    min|2019-07-12|        105.199997|        106.839996|             102.0|        104.050003|        103.137947|          2227400|
|    max|2020-07-10|        132.389999|        133.380005|    

In [8]:
df.describe().printSchema()

root
 |-- summary: string (nullable = true)
 |-- Date: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Adj Close: string (nullable = true)
 |-- Volume: string (nullable = true)



In [9]:
from pyspark.sql.functions import format_number

In [10]:
result =df.describe()

In [11]:
result.select(result['summary'],
             format_number(result['Open'].cast('float'),2).alias('Open'),
             format_number(result['High'].cast('float'),2).alias('High'),
             format_number(result['Low'].cast('float'),2).alias('Low'),
             format_number(result['Close'].cast('float'),2).alias('Close'),
             result['Volume'].cast('int').alias('Volume')).show()

+-------+------+------+------+------+--------+
|summary|  Open|  High|   Low| Close|  Volume|
+-------+------+------+------+------+--------+
|  count|252.00|252.00|252.00|252.00|     252|
|   mean|118.09|119.29|116.92|118.13| 7487978|
| stddev|  5.06|  5.01|  5.03|  4.97| 4370560|
|    min|105.20|106.84|102.00|104.05| 2227400|
|    max|132.39|133.38|129.76|132.33|31152700|
+-------+------+------+------+------+--------+



In [12]:
#Create a new dataframe
df2 = df.withColumn("HV Ratio",df['High']/df['Volume'])
df2.select('HV Ratio').show()

+--------------------+
|            HV Ratio|
+--------------------+
|3.066011193332087E-5|
|3.439330603705917E-5|
|3.310876612579554E-5|
|4.283801301841175E-5|
|3.559290467625899...|
|3.037431130311092E-5|
|2.531382054063627...|
|1.967258645858677...|
|2.700069446771838...|
|2.918083837465922...|
|2.396701120744343...|
|3.015534164308973...|
|3.703215669478123...|
|1.808639028866743E-5|
|1.504627739771965...|
|1.947071596513868...|
|1.426684280052840...|
|1.536970469186477...|
|1.492615222916809...|
|2.488112564008778...|
+--------------------+
only showing top 20 rows



In [13]:
#what day have the Peak High in Price
df.orderBy(df['High'].desc()).head(1)[0][0]


'2020-04-20'

In [14]:
#What is the mean of the Close column
from pyspark.sql.functions import mean
df.select(mean("Close")).show()

+------------------+
|        avg(Close)|
+------------------+
|118.12956356349204|
+------------------+



In [15]:
#What is the max and min of the Volume column
from pyspark.sql.functions import max,min
df.select(max("Volume"),min('Volume')).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   31152700|    2227400|
+-----------+-----------+



In [16]:
#How many days was the Close higher than 60$
df.filter('Close>100').count()

252

In [17]:
#What percentage of the time was High lower than 120$?
#In other words, (Number of Days High>80)/(Total Days in dataset)
(df.filter(df['High']<120).count()/df.count())*100

63.095238095238095

In [18]:
#what is Pearson correlation b.w High and Volume
from pyspark.sql.functions import corr
df.select(corr('High','Volume')).show()

+------------------+
|corr(High, Volume)|
+------------------+
|0.2719334383137132|
+------------------+



In [19]:
#What is the max High per year?
from pyspark.sql.functions import year
yeardf = df.withColumn("Year",year(df['Date']))

In [20]:
max_df = yeardf.groupBy('Year').max()

In [21]:
max_df.select('Year','max(High)').show()

+----+----------+
|Year| max(High)|
+----+----------+
|2019|125.379997|
|2020|133.380005|
+----+----------+



In [22]:
#What is avearge Close for each Calendar Month
from pyspark.sql.functions import month

In [23]:
monthdf = df.withColumn('Month',month('Date'))

In [26]:
monthavgs = monthdf.select(['Month','Close']).groupBy('Month').mean()

In [27]:
monthavgs.select('Month','avg(Close)').orderBy('Month').show()

+-----+------------------+
|Month|        avg(Close)|
+-----+------------------+
|    1|116.21095204761906|
|    2|115.63631521052632|
|    3|114.41727345454545|
|    4|125.78809571428572|
|    5|124.07350000000001|
|    6|120.50636331818183|
|    7|116.76333247619047|
|    8|110.29409072727275|
|    9|116.83600119999998|
|   10|118.59434769565217|
|   11|119.31650085000001|
|   12|119.55095242857145|
+-----+------------------+

