# Test PySpark & DataFrames

You will be asked some basic questions about some stock market data from the years 2012-2017. 

#### Use the stock.csv file to Answer and complete the  tasks below!

#### Start a Spark Session

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Operations").getOrCreate()

#### Load the Stock CSV File, have Spark infer the data types.

In [2]:
df = spark.read.csv('stock.csv',inferSchema=True,header=True)
df.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



In [None]:
df.columns

#### List the 5th and 6th column names?

In [None]:
print(df.columns[4:6])

# or 
# print(df.select(['Close', 'Volume']

#### Print the Schema and change the inferred Volume type from integer to double

In [None]:
from pyspark.sql.types import (DoubleType, TimestampType)
df.printSchema()

In [None]:
# Column already provides cast method with DataType instance

df = df.withColumn("Volume", df["Volume"].cast("double"))
df = df.withColumn("Date", df["Date"].cast("timestamp"))

In [None]:
# here is the expected output

df.printSchema()

#### Print out the date and the open price of the first 5 rows as follows.
on 2012-01-03 00:00:00 , the open price was 59.97 $


on 2012-01-04 00:00:00 , the open price was 60.21 $


on 2012-01-05 00:00:00 , the open price was 59.35 $


on 2012-01-06 00:00:00 , the open price was 59.42 $


on 2012-01-09 00:00:00 , the open price was 59.03 $

In [None]:
for row in df.head(5):
  print(f'on {row["Date"]} , the open price was {round(row["Open"], 2)} $')

#### Use describe() to generate a summary DataFrame.

In [None]:
# here is the expected output

df.describe().show()

#### There are too many decimal places for mean and stddev in the describe() dataframe. Format the numbers to just show up to two decimal places. Pay careful attention to the datatypes that .describe() returns. [Check this link for a hint](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.Column.cast.html?highlight=cast)

In [None]:
from pyspark.sql.types import DecimalType
from pyspark.sql.functions import format_number

In [None]:
result = df.describe()

In [None]:
# here is the expected output

In [None]:
result.select(result['summary'],format_number(result['Open'].cast('float'),0).alias('Open'),
             format_number(result['High'].cast('float'),2).alias('High'),
             format_number(result['Low'].cast('float'),2).alias('Low'),
             format_number(result['Close'].cast('float'),2).alias('Close'),
             format_number(result['Volume'].cast('float'),2).alias('Volume')).show()

#### Create a new dataframe with 2 columns : 
##### HV Ratio that is the ratio of the High Price versus volume of stock traded for a day,  
##### and LV Ratio that is the ratio of the Low Price versus volume of stock traded for a day.

In [None]:
# here is the expected output

df_2 = df.withColumn('HV Ratio', df['High']/df['Volume'])
df_3 = df.withColumn('LV Ratio', df['Low']/df['Volume'])

df_2.select('HV Ratio').show(20)
df_3.select('LV Ratio').show(20)

#### What day had the Highest HV Ratio? and what day we had the lowest?

In [None]:
# here is the expected output
df_2.orderBy(df_2["HV Ratio"].desc()).head(1)[0][0]

In [None]:
df_2.orderBy(df_2["HV Ratio"]).head(1)[0][0]

#### What is the mean of the LV Ratio?

In [None]:
# here is the expected output

from pyspark.sql.functions import mean
df_3.select(mean('LV Ratio')).show()

#### How many days volume was greater than 9000000 and the Close was lower than 70$

In [None]:
result = filter(lambda x: x["Volume"] > 9000000 and x["Close"] < 70, df.collect())
len(list(result))

#### What percentage of the time was the difference between High and Low greater than 1 dollars ?

In [None]:
filtered_df_one_dollar = filter(lambda x: abs(x["High"] - x["Low"]) > 1, df.collect())
filtered_df_all_differences = filter(lambda x: x["High"] != x["Low"], df.collect())
more_than_one_dollar_difference_count = len(list(filtered_df_one_dollar))
all_difference_count = len(list(filtered_df_all_differences))
round(more_than_one_dollar_difference_count / (all_difference_count / 100),5)

#### What is the Pearson correlation between Close and Volume?
#### [Hint](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/api/pyspark.pandas.DataFrame.corr.html?highlight=corr)

In [None]:
# here is the expected output

from pyspark.sql.functions import corr
df.select(corr('Close', 'Volume')).show()

#### What is the max and min HV Ratio per year?

In [None]:
from pyspark.sql.functions import (year, max, min)

In [None]:
yeardf = df_2.withColumn("Year", year(df_2['Date']))
yeardf2 = df_2.withColumn("Year", year(df_2['Date']))

max_df = yeardf.groupBy('Year').max()
max_df2 = yeardf2.groupBy('Year').min()

# here is the expected output

max_df.select('Year', 'max(HV Ratio)').show()
max_df2.select('Year', 'min(HV Ratio)').show()

#### What is the highest Close for each Calendar Month?
#### In other words, across all the years, what is the highest Close price for Jan,Feb, Mar, etc... Your result will have a value for each of these months. 

In [None]:
from pyspark.sql.functions import month

In [None]:
monthdf = df.withColumn('Month', month('Date'))
month_high = monthdf.select(['Month', 'Close']).groupBy('Month').max()

# here is the expected output

month_high.select('Month', 'max(Close)').orderBy('Month').show()