
## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

## Import Stock Data

In [0]:
# Function to load and process stock data
def load_and_process_data(file_location, infer_schema="false", first_row_is_header="false", delimiter=","):
    """
    Loads a CSV file, processes its header, removes the first row, and renames the first column to 'Date'.
    """
    # Read the CSV file
    df = spark.read.format("csv") \
        .option("inferSchema", infer_schema) \
        .option("header", first_row_is_header) \
        .option("sep", delimiter) \
        .load(file_location)

    # Add an index to each row
    df_with_index = df.rdd.zipWithIndex().toDF()

    # Rename columns: first column contains data, second column contains index
    df_with_index = df_with_index.select("_1", "_2")

    # Filter out the first and third rows based on their index
    df_filtered = df_with_index.filter(~(df_with_index["_2"] == 0) & ~(df_with_index["_2"] == 2))

    # Extract the first row to use as column names
    new_columns = df_filtered.first()[0]

    # Convert DataFrame to use proper column names from extracted row
    df = df_filtered.select("_1.*").toDF(*new_columns)

    # Remove the first row from the DataFrame
    first_row = df.limit(1)
    df = df.subtract(first_row)

    # Rename the first column dynamically to "Date"
    first_col_name = df.columns[0]
    df = df.withColumnRenamed(first_col_name, "Date")

    return df

# Load and process each dataset
df_volume = load_and_process_data("/FileStore/tables/Volume.csv", infer_schema="true")
df_open = load_and_process_data("/FileStore/tables/Open.csv", infer_schema="true")
df_close = load_and_process_data("/FileStore/tables/Close.csv", infer_schema="true")
df_high = load_and_process_data("/FileStore/tables/High.csv", infer_schema="true")
df_low = load_and_process_data("/FileStore/tables/Low.csv", infer_schema="true")

# Display processed DataFrames
display(df_volume.orderBy("Date").limit(10))
display(df_open.orderBy("Date").limit(10))
display(df_close.orderBy("Date").limit(10))
display(df_high.orderBy("Date").limit(10))
display(df_low.orderBy("Date").limit(10))

## Create temp view

In [0]:
# Create a view or table
temp_table_volume = "Volume_csv"
temp_table_low = "Low_csv"
temp_table_high = "High_csv"
temp_table_open = "Open_csv"
temp_table_close = "Close_csv"

df_volume.createOrReplaceTempView(temp_table_volume)
df_low.createOrReplaceTempView(temp_table_low)
df_high.createOrReplaceTempView(temp_table_high)
df_open.createOrReplaceTempView(temp_table_open)
df_close.createOrReplaceTempView(temp_table_close)

## Basic SQL Queries

In [0]:
%sql
-- 1: Get the latest closing price for all stocks
SELECT * 
FROM Close_csv
WHERE date = (SELECT MAX(date) FROM Close_csv);



In [0]:
%sql
-- 2: Find the total trading volume for each stock over all time
SELECT 
    SUM(AAPL) AS AAPL_total_volume, 
    SUM(MSFT) AS MSFT_total_volume, 
    SUM(GOOGL) AS GOOGL_total_volume
FROM Volume_csv;

In [0]:
%sql
-- 3: Get the lowest opening price for all stocks
SELECT 
    MIN(AAPL) AS AAPL_min_open, 
    MIN(MSFT) AS MSFT_min_open, 
    MIN(GOOGL) AS GOOGL_min_open
FROM Open_csv;

In [0]:
%sql
-- 4: Get the highest closing price for all stocks
SELECT 
    MAX(AAPL) AS AAPL_max_close, 
    MAX(MSFT) AS MSFT_max_close, 
    MAX(GOOGL) AS GOOGL_max_close
FROM Close_csv;

In [0]:
%sql
-- 5: Find the average closing price for each stock in the last month
SELECT 
    AVG(AAPL) AS AAPL_avg_close, 
    AVG(MSFT) AS MSFT_avg_close, 
    AVG(GOOGL) AS GOOGL_avg_close
FROM Close_csv
WHERE date >= DATE_SUB(CURRENT_DATE(), 30);

In [0]:
%sql
-- 6: Simple Query to Compare Close and Open Prices
SELECT c.date, 
       c.AAPL, 
       o.AAPL AS Open_AAPL
FROM Close_csv c
JOIN Open_csv o ON c.date = o.date
WHERE c.AAPL > o.AAPL
ORDER BY c.date DESC
LIMIT 10;




In [0]:
%sql
-- 7: Find the number of trading days in the dataset
SELECT COUNT(DISTINCT date) AS trading_days FROM Close_csv;



In [0]:
%sql
-- 8: Retrieve stock prices for a specific date range
SELECT * FROM Close_csv WHERE date BETWEEN '2024-01-01' AND '2024-02-01';



In [0]:
%sql
-- 9: Find the first recorded closing price for each stock
SELECT * FROM Close_csv WHERE date = (SELECT MIN(date) FROM Close_csv);


In [0]:
%sql
-- 10: Find stocks with the highest trading volume on any single day
SELECT date, 
       MAX(AAPL) AS max_AAPL_volume, 
       MAX(MSFT) AS max_MSFT_volume, 
       MAX(GOOGL) AS max_GOOGL_volume
FROM Volume_csv
GROUP BY date
ORDER BY date DESC;



# Medium SQL Queries

In [0]:
%sql
-- 11: Find the Average Closing Price for Each Stock Over the Last 30 Days
SELECT 'AAPL' AS stock, AVG(CAST(AAPL AS DOUBLE)) AS avg_close
FROM Close_csv
WHERE date >= DATEADD(DAY, -30, CURRENT_DATE)
UNION
SELECT 'MSFT' AS stock, AVG(CAST(MSFT AS DOUBLE)) AS avg_close
FROM Close_csv
WHERE date >= DATEADD(DAY, -30, CURRENT_DATE)
UNION
SELECT 'GOOGL' AS stock, AVG(CAST(GOOGL AS DOUBLE)) AS avg_close
FROM Close_csv
WHERE date >= DATEADD(DAY, -30, CURRENT_DATE);



In [0]:
%sql
-- 12: Find Days Where AAPL Closed Above Its 30-Day Average
SELECT date, AAPL
FROM Close_csv
WHERE CAST(AAPL AS DOUBLE) > (
    SELECT AVG(CAST(AAPL AS DOUBLE)) 
    FROM Close_csv
    WHERE date >= DATEADD(DAY, -30, CURRENT_DATE)
)
ORDER BY date DESC;



In [0]:
%sql
-- 13: Find the Highest Opening Price for Each Stock in the Last 60 Days
SELECT 'AAPL' AS stock, MAX(CAST(AAPL AS DOUBLE)) AS max_open
FROM Open_csv
WHERE date >= DATEADD(DAY, -60, CURRENT_DATE)
UNION
SELECT 'MSFT' AS stock, MAX(CAST(MSFT AS DOUBLE)) AS max_open
FROM Open_csv
WHERE date >= DATEADD(DAY, -60, CURRENT_DATE)
UNION
SELECT 'GOOGL' AS stock, MAX(CAST(GOOGL AS DOUBLE)) AS max_open
FROM Open_csv
WHERE date >= DATEADD(DAY, -60, CURRENT_DATE);


In [0]:
%sql
-- 14: Find Stocks Where the Closing Price Increased for 3 Consecutive Days
SELECT c1.date, c1.AAPL
FROM Close_csv c1
JOIN Close_csv c2 ON c1.date = DATEADD(DAY, -1, c2.date)
JOIN Close_csv c3 ON c2.date = DATEADD(DAY, -1, c3.date)
WHERE CAST(c1.AAPL AS DOUBLE) > CAST(c2.AAPL AS DOUBLE)
  AND CAST(c2.AAPL AS DOUBLE) > CAST(c3.AAPL AS DOUBLE)
ORDER BY c1.date DESC;


In [0]:
%sql
-- 15: Find Days Where AAPL Had the Highest Trading Volume Among AAPL, MSFT, and GOOGL
SELECT v.date
FROM Volume_csv v
WHERE CAST(v.AAPL AS DOUBLE) > CAST(v.MSFT AS DOUBLE)
  AND CAST(v.AAPL AS DOUBLE) > CAST(v.GOOGL AS DOUBLE);


In [0]:
%sql
-- 16: Find the Dates When AAPL and MSFT Both Closed Higher Than They Opened
SELECT c.date, c.AAPL, c.MSFT
FROM Close_csv c
JOIN Open_csv o ON c.date = o.date
WHERE CAST(c.AAPL AS DOUBLE) > CAST(o.AAPL AS DOUBLE)
  AND CAST(c.MSFT AS DOUBLE) > CAST(o.MSFT AS DOUBLE)
ORDER BY c.date DESC;


In [0]:
%sql
-- 17: Find the Highest and Lowest Closing Price of AAPL in the Last 3 Months
SELECT 
    MAX(CAST(AAPL AS DOUBLE)) AS highest_close,
    MIN(CAST(AAPL AS DOUBLE)) AS lowest_close
FROM Close_csv
WHERE date >= DATEADD(MONTH, -3, CURRENT_DATE);


In [0]:
%sql
-- 18: Find Stocks Where the Daily Fluctuation Was Greater Than 5%
SELECT h.date, 
       h.AAPL, l.AAPL,
       ((CAST(h.AAPL AS DOUBLE) - CAST(l.AAPL AS DOUBLE)) / CAST(l.AAPL AS DOUBLE)) * 100 AS fluctuation_percent
FROM High_csv h
JOIN Low_csv l ON h.date = l.date
WHERE ((CAST(h.AAPL AS DOUBLE) - CAST(l.AAPL AS DOUBLE)) / CAST(l.AAPL AS DOUBLE)) * 100 > 5
ORDER BY fluctuation_percent DESC;


In [0]:
# 19: Find Stocks Where the Name Ends With 'L'
df_close = spark.table("Close_csv")

# Get the list of column names
columns = df_close.columns

# Filter column names that end with 'L'
columns_ending_with_L = [col for col in columns if col.endswith('L')]

# Print the result
print(columns_ending_with_L)



In [0]:
%sql
-- 20: Find Days Where AAPL's Trading Volume Was Above Its 50-Day Average
SELECT v.date, v.AAPL
FROM Volume_csv v
WHERE CAST(v.AAPL AS DOUBLE) > (
    SELECT AVG(CAST(AAPL AS DOUBLE)) 
    FROM Volume_csv
    WHERE date >= DATEADD(DAY, -50, CURRENT_DATE)
)
ORDER BY v.date DESC;


# Advanced SQL Queries

In [0]:
%sql
-- 21: Common Table Expression (CTE) to Find AAPL’s Price Difference
WITH AAPL_Changes AS (
    SELECT date, 
           CAST(AAPL AS DOUBLE) AS close_price,
           LAG(CAST(AAPL AS DOUBLE), 1) OVER (ORDER BY date) AS prev_close
    FROM Close_csv
)
SELECT date, close_price, 
       (close_price - prev_close) AS price_change
FROM AAPL_Changes
WHERE prev_close IS NOT NULL
ORDER BY date DESC;


In [0]:
%sql
-- 22: CASE Statement to Classify Stocks Based on Daily Fluctuation
SELECT h.date, h.AAPL AS high, l.AAPL AS low,
       CASE 
           WHEN (CAST(h.AAPL AS DOUBLE) - CAST(l.AAPL AS DOUBLE)) > 10 THEN 'High Volatility'
           WHEN (CAST(h.AAPL AS DOUBLE) - CAST(l.AAPL AS DOUBLE)) BETWEEN 5 AND 10 THEN 'Medium Volatility'
           ELSE 'Low Volatility'
       END AS volatility_category
FROM High_csv h
JOIN Low_csv l ON h.date = l.date
ORDER BY h.date DESC;




In [0]:
%sql
-- 23: Calculate the 7-Day Moving Average for Each Stock’s Closing Price
SELECT c.date, c.AAPL AS close_price,
       AVG(CAST(c.AAPL AS DOUBLE)) OVER (ORDER BY c.date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS moving_avg_7d
FROM Close_csv c
ORDER BY c.date DESC;



In [0]:
# 24: Creating a UDF
# Import necessary PySpark functions
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Define Python function for categorization
def categorize_stock_movement(change):
    try:
        change = float(change)  # Ensure that the input is treated as a float
    except ValueError:
        return 'Invalid Data'
    
    if change > 15:
        return 'Significant Increase'
    elif change < -15:
        return 'Significant Decrease'
    else:
        return 'Stable'

# Convert Python function to Spark UDF
categorize_udf = udf(categorize_stock_movement, StringType())

# Register the UDF so it can be used in SQL
spark.udf.register("CategorizeStockMovement", categorize_stock_movement, StringType())



In [0]:
%sql
-- 25: Calling a UDF
SELECT date, AAPL, CategorizeStockMovement(AAPL) AS Movement_Category
FROM Close_csv;

