# Spark DataFrames Project

Let's get some quick practice with Spark DataFrame tools.

We will answer some basic questions about some stock market data, in this case Microsoft Stock from the years 2010-2019.

We will learn:

- How to start a Spark Session
- How to load data
- How to access some information from data
- How to extract some descriptive statistics
- How to use SQL syntax and SQL functions in python/pyspark to answer our questions
- How to work with some time functions

Start a simple Spark Session

In [1]:
import findspark
findspark.init('/home/lucasabarbano/spark-2.4.5-bin-hadoop2.7')
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('DataFrame-Project').getOrCreate()

Load the Microsoft Stock CSV File, have Spark infer the data types.

In [2]:
df = spark.read.csv('msft_stock.csv', inferSchema=True, header=True)

What are the column names?

In [3]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Adj Close', 'Volume']

What does the Schema look like?

In [4]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



Print out the first 5 columns.

In [5]:
for row in df.head(5):
    print(row)
    print('\n')

#df.head(5)

Row(Date=datetime.datetime(2010, 1, 4, 0, 0), Open=30.620001, High=31.1, Low=30.59, Close=30.950001, Adj Close=24.294369, Volume=38409100)


Row(Date=datetime.datetime(2010, 1, 5, 0, 0), Open=30.85, High=31.1, Low=30.639999, Close=30.959999, Adj Close=24.302216, Volume=49749600)


Row(Date=datetime.datetime(2010, 1, 6, 0, 0), Open=30.879999, High=31.08, Low=30.52, Close=30.77, Adj Close=24.15307, Volume=58182400)


Row(Date=datetime.datetime(2010, 1, 7, 0, 0), Open=30.629999, High=30.700001, Low=30.190001, Close=30.450001, Adj Close=23.901886, Volume=50559700)


Row(Date=datetime.datetime(2010, 1, 8, 0, 0), Open=30.280001, High=30.879999, Low=30.24, Close=30.66, Adj Close=24.066734, Volume=51197400)




Descriptive statistics about the DataFrame.

In [7]:
from pyspark.sql.functions import format_number

df_describe = df.describe()  

df_describe.select(df_describe["summary"],
                  format_number(df_describe["Open"].cast('float'),2).alias('Open'),
                  format_number(df_describe["High"].cast('float'),2).alias('High'),
                  format_number(df_describe["Low"].cast('float'),2).alias('Low'),
                  format_number(df_describe["Close"].cast('float'),2).alias('Close'),
                  format_number(df_describe["Adj Close"].cast('float'),2).alias('Adj Close'),
                  format_number(df_describe["Volume"].cast('float'),0).alias('Volume')).show()

+-------+--------+--------+--------+--------+---------+-----------+
|summary|    Open|    High|     Low|   Close|Adj Close|     Volume|
+-------+--------+--------+--------+--------+---------+-----------+
|  count|2,516.00|2,516.00|2,516.00|2,516.00| 2,516.00|      2,516|
|   mean|   56.31|   56.77|   55.82|   56.32|    52.34| 39,926,492|
| stddev|   33.87|   34.11|   33.57|   33.86|    35.11| 22,991,730|
|    min|   23.09|   23.32|   22.73|   23.01|    18.23|  7,425,600|
|    max|  159.45|  159.55|  158.22|  158.96|   158.53|319,317,888|
+-------+--------+--------+--------+--------+---------+-----------+



Create a new variable, ratio of the High Price versus volume of stock traded for a day.

In [12]:
df.withColumn('HV Ratio', df['High']/df['Volume']).select('HV Ratio').show()

+--------------------+
|            HV Ratio|
+--------------------+
|8.097039503659289E-7|
|6.251306543168187E-7|
|5.341821581784181E-7|
|6.072029897329296E-7|
|6.031556094645432E-7|
|4.473875967752023E-7|
|4.612203222170132...|
|5.884678049109682E-7|
|4.918699122700192E-7|
|3.909241527056856...|
|6.707360275851999E-7|
| 5.64089025424115E-7|
|4.203226989315429...|
|2.960650892214665E-7|
|4.680226594922128E-7|
|4.479298438322987E-7|
|4.663054441395163E-7|
|2.541831377958484...|
|1.543154957617393...|
|3.314283187344279...|
+--------------------+
only showing top 20 rows



What day had the Peak High in Price?

In [13]:
df.orderBy(df['High'].desc()).head(1)[0][0]

datetime.datetime(2019, 12, 27, 0, 0)

What is the mean of the Close column?

In [14]:
df.createOrReplaceTempView('microsoft')

# using sql syntax
spark.sql('''
          SELECT AVG(Close) FROM microsoft  
          ''').show()

# using sql functions
#from pyspark.sql.functions import mean
#df.select(mean("Close")).show()

+-----------------+
|       avg(Close)|
+-----------------+
|56.32216619634341|
+-----------------+



What is the max and min of the Volume column?

In [17]:
# using sql syntax
spark.sql('''
          SELECT MIN(Volume),MAX(Volume) FROM microsoft
            ''').show()

# using sql functions
#from pyspark.sql.functions import max,min
#df.select(min("Volume"),max("Volume")).show()

+-----------+-----------+
|min(Volume)|max(Volume)|
+-----------+-----------+
|    7425600|  319317900|
+-----------+-----------+



How many days was the Close lower than 80 dollars?

In [19]:
# using sql syntax
spark.sql('''
          SELECT COUNT(Close) FROM microsoft
          WHERE Close < 80
          ''').show()

# using filter
#df.filter("Close < 80").count()
#df.filter(df['Close'] < 80).count()

+------------+
|count(Close)|
+------------+
|        1969|
+------------+



What percentage of the time was the High greater than 100 dollars ?

In [21]:
# using spark.sql
df_high100 = spark.sql('''
                       SELECT COUNT('Date') FROM microsoft
                       WHERE High > 100
                       ''').collect()

df_totaldays = spark.sql('''
                         SELECT COUNT('Date') FROM microsoft
                         ''').collect()

print('The percentage of the time wich the high price greater than 100 dollas was {:0.2f}%'.
      format(df_high100[0][0] / df_totaldays[0][0]*100))

# A way easier to answer this question
#df.filter(df["High"]>100).count()/df.count()*100

The percentage of the time wich the high price greater than 100 dollas was 15.70%


What is the Pearson correlation between High and Volume?

In [28]:
from pyspark.sql.functions import corr
df.select(corr("High","Volume")).show()

+--------------------+
|  corr(High, Volume)|
+--------------------+
|-0.44237956137895296|
+--------------------+



What is the max High per year?

In [30]:
from pyspark.sql.functions import year

df_year = df.withColumn('Year', year(df['Date']))

df_year.groupBy('Year').agg({'High':'max'}).orderBy('Year').show()

+----+----------+
|Year| max(High)|
+----+----------+
|2010|     31.58|
|2011| 29.459999|
|2012| 32.950001|
|2013|     38.98|
|2014| 50.049999|
|2015| 56.849998|
|2016| 64.099998|
|2017|      87.5|
|2018|    116.18|
|2019|159.550003|
+----+----------+



What is the average Close for each Calendar Month?

In [31]:
from pyspark.sql.functions import month
df_month = df.withColumn('Month', month(df['Date']))

df_month.groupBy('Month').agg({'Close':'avg'}).orderBy('Month').show()

+-----+------------------+
|Month|        avg(Close)|
+-----+------------------+
|    1| 50.95534645544554|
|    2| 50.82932285416666|
|    3|   51.890321206422|
|    4| 53.75874399516908|
|    5|55.348591511737084|
|    6| 55.00586861032864|
|    7| 57.55094798104266|
|    8|  57.9727803542601|
|    9|57.987290802955684|
|   10| 60.25800005909088|
|   11|  61.7215610878049|
|   12| 61.99248818181819|
+-----+------------------+



## Thank you!