# Spark DataFrames Project Exercise

#### Use the `walmart_stock.csv` file to Answer and complete the  required tasks

In [None]:
### do on colab

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


#### Start a simple Spark Session

In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Check if a SparkContext exists
try:
    sc = SparkContext.getOrCreate()
except Exception as e:
    print(e)

# Create a SparkSession using the existing SparkContext
spark = SparkSession.builder.appName("YourAppName").getOrCreate()

# Now you can use the 'sc' and 'spark' variables for your Spark application


#### Load the Walmart Stock CSV File, have Spark infer the data types.

In [None]:
from google.colab import files
files.upload()

In [None]:
import pandas as pd

data = pd.read_csv('walmartstock.csv')
data.head()

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close
0,2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800,52.619235
1,2012-01-04,60.209999,60.349998,59.470001,59.709999,9593300,52.078475
2,2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200,51.825539
3,2012-01-06,59.419998,59.450001,58.869999,59.0,8069400,51.45922
4,2012-01-09,59.029999,59.549999,58.919998,59.18,6679300,51.616215


In [None]:
#Spark Dataframe
dataspark = spark.createDataFrame(data)
dataspark.show()

  for column, series in pdf.iteritems():


+----------+---------+---------+---------+---------+--------+------------------+
|      Date|     Open|     High|      Low|    Close|  Volume|         Adj Close|
+----------+---------+---------+---------+---------+--------+------------------+
|2012-01-03|59.970001|61.060001|59.869999|60.330002|12668800|         52.619235|
|2012-01-04|60.209999|60.349998|59.470001|59.709999| 9593300|         52.078475|
|2012-01-05|59.349998|59.619999|58.369999|59.419998|12768200|         51.825539|
|2012-01-06|59.419998|59.450001|58.869999|     59.0| 8069400|          51.45922|
|2012-01-09|59.029999|59.549999|58.919998|    59.18| 6679300|         51.616215|
|2012-01-10|    59.43|59.709999|    58.98|59.040001| 6907300|         51.494109|
|2012-01-11|59.060001|59.529999|59.040001|59.400002| 6365600|         51.808098|
|2012-01-12|59.790001|     60.0|59.400002|     59.5| 7236400|         51.895316|
|2012-01-13|    59.18|59.610001|59.009998|59.540001| 7729300|         51.930204|
|2012-01-17|59.869999|60.110

In [None]:
#Have Spark infer the data types
print('Data type', type(dataspark))

Data type <class 'pyspark.sql.dataframe.DataFrame'>


#### What are the column names?

In [None]:
list(data.columns)

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

#### What does the Schema look like?

In [None]:
dataspark.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: long (nullable = true)
 |-- Adj Close: double (nullable = true)



#### Print out the first 5 columns.

In [None]:
dataspark.rdd.zipWithIndex().take(5)

[(Row(Date='2012-01-03', Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619235),
  0),
 (Row(Date='2012-01-04', Open=60.209999, High=60.349998, Low=59.470001, Close=59.709999, Volume=9593300, Adj Close=52.078475),
  1),
 (Row(Date='2012-01-05', Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539),
  2),
 (Row(Date='2012-01-06', Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922),
  3),
 (Row(Date='2012-01-09', Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215),
  4)]

#### Use `describe()` to learn about the DataFrame.

In [None]:
dataspark.describe().show()

+-------+----------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|      Date|             Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+----------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|      1258|             1258|             1258|             1258|             1258|             1258|             1258|
|   mean|      null|72.35785375357703|72.83938807631154|71.91860095945944|72.38844998012719|8222093.481717011| 67.2388384872814|
| stddev|      null|6.768090244708284|6.768186808159222|6.744075756255509|6.756859163732981|  4519780.8431556|6.722609449996835|
|    min|2012-01-03|        56.389999|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|2016-12-30|        90.800003|        90.970001|            89.25|        90.470001|     

## Some advanced Question
#### We can see that there are too many decimal places for `mean` and `stddev` in the `describe()` dataframe. Format the numbers to just show up to two decimal places. Pay careful attention to the datatypes that `.describe()` returns, we didn't cover how to do this exact formatting, but we covered something very similar. [Check this link for a hint](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.Column.cast.html)


In [None]:
# First, let check out the schema
dataspark.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: long (nullable = true)
 |-- Adj Close: double (nullable = true)



In [None]:
dataspark.describe().show()

+-------+----------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|      Date|             Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+----------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|      1258|             1258|             1258|             1258|             1258|             1258|             1258|
|   mean|      null|72.35785375357703|72.83938807631154|71.91860095945944|72.38844998012719|8222093.481717011| 67.2388384872814|
| stddev|      null|6.768090244708284|6.768186808159222|6.744075756255509|6.756859163732981|  4519780.8431556|6.722609449996835|
|    min|2012-01-03|        56.389999|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|2016-12-30|        90.800003|        90.970001|            89.25|        90.470001|     

#### Create a new dataframe with a column called HV Ratio that is the ratio of the High Price versus volume of stock traded for a day.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Assuming you have a SparkSession named 'spark' and a DataFrame named 'dataspark'
result_df = dataspark.select("Date", (col("High") / col("Volume")).alias("HV Ratio"))
result_df.show()

+----------+--------------------+
|      Date|            HV Ratio|
+----------+--------------------+
|2012-01-03|4.819714653321546E-6|
|2012-01-04|6.290848613094555E-6|
|2012-01-05|4.669412994783916E-6|
|2012-01-06|7.367338463826307E-6|
|2012-01-09|8.915604778943901E-6|
|2012-01-10|8.644477436914568E-6|
|2012-01-11|9.351828421515645E-6|
|2012-01-12| 8.29141562102703E-6|
|2012-01-13|7.712212102001474E-6|
|2012-01-17|7.071764823529411...|
|2012-01-18|1.015495466386981E-5|
|2012-01-19|6.576354146362592...|
|2012-01-20| 5.90145296180676E-6|
|2012-01-23|8.547679455011844E-6|
|2012-01-24|8.420709512685392E-6|
|2012-01-25|1.041448341728929...|
|2012-01-26|8.316075414862431E-6|
|2012-01-27|9.721183814992126E-6|
|2012-01-30|8.029436027707578E-6|
|2012-01-31|6.307432259386365E-6|
+----------+--------------------+
only showing top 20 rows



#### What day had the Peak High in Price?

In [None]:
peak_high_row = dataspark.orderBy(col("High").desc()).first()

print("Peak High Price Row:")
print(peak_high_row)

Peak High Price Row:
Row(Date='2015-01-13', Open=90.800003, High=90.970001, Low=88.93, Close=89.309998, Volume=8215400, Adj Close=83.825448)


#### What is the mean of the Close column?

In [None]:
close_mean = dataspark.select('Close').rdd.map(lambda x: x[0]).mean()
print('Close Coumn mean: ',close_mean)

Close Coumn mean:  72.38844998012719


#### What is the max and min of the Volume column?

In [None]:
max_volume = dataspark.select('Volume').rdd.map(lambda x: x[0]).max()
print(max_volume)

80898100


In [None]:
min_volume = dataspark.select('Volume').rdd.map(lambda x: x[0]).min()
print(min_volume)

2094900


#### How many days was the Close lower than 60 dollars?

---



In [None]:
days_below_60_count = dataspark.filter(col("Close") < 60).count()
print(days_below_60_count)

81


#### What percentage of the time was the High greater than 80 dollars ?
#### In other words, (Number of Days High>80)/(Total Days in the dataset)

In [None]:
total_days = dataspark.count()
high_above_80_days = dataspark.filter(col("High") > 80).count()
percentage_high_above_80 = (high_above_80_days / total_days) * 100

In [None]:
print(percentage_high_above_80)

9.141494435612083


#### What is the Pearson correlation between High and Volume?

1.   List item
2.   List item


#### [Hint](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/DataFrameStatFunctions.html)

In [None]:
correlation = dataspark.corr("High", "Volume")

In [None]:
print(correlation)

-0.3384326061737167


#### What is the max High per year?

In [None]:
from pyspark.sql.functions import year, max, col


In [None]:
max_high_per_year = dataspark.withColumn("Year", year(col("Date"))).groupBy("Year").agg(max("High").alias("MaxHigh"))
print(max_high_per_year.show())

+----+---------+
|Year|  MaxHigh|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+

None


#### What is the average Close for each Calendar Month?
#### In other words, across all the years, what is the average Close price for Jan,Feb, Mar, etc... Your result will have a value for each of these months.

In [None]:
from pyspark.sql.functions import month, avg, col
avg_close_per_month = dataspark.withColumn("Month", month(col("Date"))).groupBy("Month").agg(avg("Close").alias("AvgClose"))
print(avg_close_per_month.show())

+-----+-----------------+
|Month|         AvgClose|
+-----+-----------------+
|   12|72.84792478301885|
|    1|71.44801958415842|
|    6|72.49537742452831|
|    3|71.77794377570092|
|    5|72.30971688679246|
|    9|72.18411785294116|
|    4|72.97361900952382|
|    8|73.02981855454544|
|    7|74.43971943925233|
|   10|71.57854545454545|
|   11|72.11108930693068|
|    2|71.30680444329897|
+-----+-----------------+

None
