# **PySpark DataFrame Exercise (Walmart Stock)**

---

This notebook is a hands-on PySpark practice session using a historical Walmart stock dataset (`walmart_stock.csv`).

### Goals
- Load a CSV file into a Spark DataFrame with inferred schema
- Inspect schema and columns (`printSchema`, `columns`, `dtypes`)
- Compute descriptive statistics (`describe`)
- Answer common analytical questions using core DataFrame operations:
  - sorting (`orderBy`)
  - filtering (`filter`)
  - aggregations (`agg`, `groupBy`)
  - correlation (`stat.corr`, `corr`)
  - date-based grouping (year / month)

### Notes
- Spark DataFrames are immutable: transformations return new DataFrames.
- Most operations are lazy; actions like `show()` and `count()` trigger computation.
- For date-based analysis, the `Date` column should be cast to a proper date type, in case its not correctly infered.

---
### **Initial Setup and exploration**

#### 1. Start a simple Spark Session



In [None]:

from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .appName("Exercise_1")
         .getOrCreate())

#### 2. Load the Walmart Stock CSV File, have Spark infer the data types.


In [None]:

df = spark.read.csv("walmart_stock.csv", header=True, inferSchema=True)
df.show(10)

#### 3. What are the column names?

In [None]:
df.dtypes # This prints the pair of column 'names' and data 'types'

In [None]:
df.columns # This prints only the names of the columns

In [None]:
for col in df.columns: # slightly more natural way of displaying the data
    print(col)

#### 4. What does the Schema look like?

In [None]:
df.printSchema()

#### 5. Print out the first 5 columns 

In [None]:
df.select(df.columns[:5]).show() # This prints all the data of the slice

In [None]:
print(df.columns[:5]) # This prints only the names of the columns in the slice

#### 6. Use describe() to learn about the DataFrame.

In [None]:
df.describe().show() # This gives descriptive statistics of all numeric typed columns.

In [None]:
df.describe(df.columns[:3]).show() # This way we can limit the columns summarized, to a slice of the total columns list. It will only summarize numeric types.

---


### **SQL Type exploration**

#### 7. When was the highest value of High?



In [None]:
# Use select() and orderBy() when the information in the whole row is important. It is a more expensive operation since it involves sorting.

df.select("Date", "High").orderBy(df.High.desc()).show(1) 

In [None]:
# When we only want the value, use agg() There is no sorting so its very quick.
df.agg({"High": "max"}).show() 

#### 8. What is the mean of the Close column?

In [None]:
from pyspark.sql.functions import mean, round

# With the function style, it allows aliases and also to concatenate functions (min, max, etc)
df.agg(round(mean("Close"), 2).alias("'Close' mean")).show()

In [None]:
# This relies on the SQL engine. Fast but the output is not as pretty. No need to import functions.
df.agg({"Close" : "mean"}).show()


#### 9. What is the max and min of the Volume column?

In [None]:
from pyspark.sql.functions import min, max, round 
# Use select() when planning to generate new derived columns and reuse that specific calculation.
df_MinMeanMax = df.select(min("Volume").alias("Min Volume"),
                          max("Volume").alias("Max Volume"),
                          round(mean("Volume"), 2).alias("Avg Volume")) # round() takes a float and the number of decimals as arguments.

df_MinMeanMax.show()

In [None]:
# Use agg() when only looking to summarize. No saved state.
df.agg(min("Volume").alias("Min Volume"),
       max("Volume").alias("Max Volume")).show()

#### 10. How many days was the Close lower than 60 dollars?

In [None]:
# Use select first if you want to retrieve the rows after applying the filter.
df.select("Date", "Close").filter(df.Close < 60).show() 

In [None]:
# This style only cares about the number of records fulfilling the condition. Notice the use of count() instead of show()
df.filter(df.Close < 60).count()

#### 11. What percentage of the time was the High greater than 80 dollars ? 

In [None]:
#In other words, (Number of Days High>80)/(Total Days in the dataset)

# First create two variables for each part of the operation, similar to making subqueries.
grt_80 = df.filter(df.High > 80).count()
total_days = df.count()

# Either save the result in a new variable or just apply the operation directly while printing.
print(f"Percentage of days where High was over 80 USD: {(grt_80 / total_days) * 100:.2f}%")

#### 12. What is the Pearson correlation between High and Volume?

In [None]:
# Can either use the stat modules corr() function, or import corr() to use directly.
# both functions default to Pearson correlation.

pearson_corr = df.stat.corr("High", "Volume")
print(f"Pearson correlation between High and Volume: {pearson_corr:.3f}")

In [None]:
from pyspark.sql.functions import corr
df.agg(round(corr("High", "Volume"), 3).alias("Corr High vs Volume")).show()

#### 13. What is the max High per year?

In [None]:
# Import unctions for extracting and aggregating time-based features (day, week, month, year) from date columns in Spark DataFrames
from pyspark.sql.functions import (dayofmonth, hour,
                                   dayofyear, month,
                                   year, weekofyear,
                                   format_number, date_format)

df.groupBy(year("Date").alias("Year")).agg(max("High").alias("Max High")).show()

#### 14. What is the average Close for each Calendar Month?

In [None]:
#In other words, across all the years, what is the average Close price for Jan,Feb, Mar, etc... Your result will have a value for each of these months.

from pyspark.sql.functions import avg # This is exactly the same function as mean(), however it helps keep the similiarity with SQL.

# Create two new columns Month from existing Date column, one extracting the months number (month()) and one the month name (date_format("Date", "MMMM")).
# The two columns are useful to display the name, but be able to sort chronologically (using the number), not alphabetically.
df_months = df.withColumn("MonthNum", month("Date"))\
              .withColumn("Month", date_format("Date", "MMMM"))

df_months.groupBy("MonthNum", "Month")\
    .agg(avg("Close").alias("AVG Close pr Month"))\
    .orderBy("MonthNum")\
    .select("Month", "AVG Close pr Month")\
    .show()

In [None]:
spark.stop()