In [41]:
# Import necessary libraries

from pyspark.sql import SparkSession

In [42]:
# Create a Spark session

spark = SparkSession.builder.appName("pyspark-jupyter").getOrCreate()

In [43]:
# Define the path to the data file within the "sparkdata" volume

data_path_stock_prices = "/sparkdata/stock/stock_prices.csv"
data_path_stock_list = "/sparkdata/stock/stock_list.csv"

In [44]:
# Load the data into a PySpark DataFrame

df_stock_prices = spark.read.csv(data_path_stock_prices, header=True, inferSchema=True)
df_stock_list = spark.read.csv(data_path_stock_list, header=True, inferSchema=True)

In [45]:
# Register the DataFrame as a temporary SQL table

df_stock_prices.createOrReplaceTempView("stock_prices")
df_stock_list.createOrReplaceTempView("stock_list")

In [46]:
# Perform an SQL query to select 15 recent stock records based on the date

stock_query = spark.sql("""
    -- Select specific columns from stock_prices and stock_list tables
    SELECT 
        stock_prices.Date,            -- Date of the stock record
        stock_prices.RowId,           -- Row ID from the stock_prices table
        stock_list.Name,              -- Stock name from the stock_list table
        stock_list.`Section/Products`,-- Section or Products column from stock_list table (escaped with backticks)
        stock_prices.Open,            -- Opening price of the stock
        stock_prices.High,            -- Highest price during the trading session
        stock_prices.Low,             -- Lowest price during the trading session
        stock_prices.Close,           -- Closing price of the stock
        stock_prices.Volume           -- Volume of stocks traded
    FROM stock_prices 
    -- Perform an INNER JOIN on the SecuritiesCode column to link stock_prices and stock_list
    INNER JOIN stock_list 
    ON stock_prices.SecuritiesCode = stock_list.SecuritiesCode
    -- Retrieve the most recent 15 stock records
    ORDER BY stock_prices.Date DESC  -- Sort the results by date in descending order (most recent first)
    LIMIT 15                         -- Limit the result to 15 rows
""")

In [47]:
# Show the result of the query

stock_query.show()

+----------+-------------+--------------------+--------------------+------+------+------+------+-------+
|      Date|        RowId|                Name|    Section/Products|  Open|  High|   Low| Close| Volume|
+----------+-------------+--------------------+--------------------+------+------+------+------+-------+
|2021-12-03|20211203_1301|    KYOKUYO CO.,LTD.|First Section (Do...|2983.0|2983.0|2965.0|2982.0|  11400|
|2021-12-03|20211203_1431|   Lib Work Co.,Ltd.|  Mothers (Domestic)| 813.0| 830.0| 809.0| 830.0|  37300|
|2021-12-03|20211203_1332|Nippon Suisan Kai...|First Section (Do...| 579.0| 585.0| 570.0| 585.0|1195500|
|2021-12-03|20211203_1333|Maruha Nichiro Co...|First Section (Do...|2333.0|2358.0|2315.0|2358.0| 103200|
|2021-12-03|20211203_1375|YUKIGUNI MAITAKE ...|First Section (Do...|1222.0|1236.0|1218.0|1235.0|  69900|
|2021-12-03|20211203_1376|KANEKO SEEDS CO.,...|First Section (Do...|1370.0|1370.0|1321.0|1332.0|   5000|
|2021-12-03|20211203_1377|SAKATA SEED CORPO...|First Se