In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [3]:
#to discover the schema and data types before streaming
df_test = spark.read.csv('KOSPI_STOCK_0.csv',header=True,inferSchema=True)  
df_test.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



In [4]:
from pyspark.sql.types import (StructType, StructField,
                               StringType, IntegerType,
                               DoubleType)


recordSchema = StructType([
    StructField('', StringType(), True),  
    StructField('Date', StringType(), True),
    StructField('Open', DoubleType(), True),
    StructField('High', DoubleType(), True),
    StructField('Low', DoubleType(), True),
    StructField('Close', DoubleType(), True),
    StructField('Adj Close', DoubleType(), True),
    StructField('Volume', IntegerType(), True)
])

### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [5]:
df_0 = spark.readStream.format("csv") \
    .schema(recordSchema) \
    .option("header", True) \
    .load("kospi/")

In [6]:
df = df_0.toDF("ID", "Date", "Open", "High", "Low", "Close", "Adj Close", "Volume")

### Make sure the dataframe is streaming the files from the folder

In [7]:
df.isStreaming

True

### Create a stream writer into memory and specify the query name "stock:

In [8]:
writer = df.writeStream\
    .outputMode("append") \
    .format("memory")  \
    .queryName("stock") 

### Start the write stream and make sure it works (read all columns from the table)

In [9]:
query = writer.start()

24/09/02 17:16:41 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-36f588ab-40af-48dd-a0a5-e22200575d22. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/09/02 17:16:41 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [10]:
spark.sql("SELECT * FROM stock").show()

+---+----+----+----+---+-----+---------+------+
| ID|Date|Open|High|Low|Close|Adj Close|Volume|
+---+----+----+----+---+-----+---------+------+
+---+----+----+----+---+-----+---------+------+



In [11]:
#query.awaitTermination()

In [22]:
# After waiting for some time
spark.sql("SELECT * FROM stock").show() #Order By ID ASC

+---+----------+------------+------------+------------+------------+------------+------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|
+---+----------+------------+------------+------------+------------+------------+------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|
|245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560|
|246|2000-12-13|27651.599609|     29286.5|27651.599609|28469.099609|26903.234375|270385|
|247|2000-12-14|28469.099609|29784.099609|28291.300781|28362.400391| 26802.40625|256317|
|248|2000-12-15|28362

### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [13]:
from pyspark.sql import functions as F

In [14]:
df_P1 = df.dropna(how='all')

In [15]:
df_P2 = df_P1.withColumn("diff", F.col("high") - F.col("low"))

### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [16]:
writer_2 = df_P2.writeStream\
    .outputMode("append") \
    .format("memory")  \
    .queryName("modified_data") 

In [17]:
query_2 = writer_2.start()

24/09/02 17:16:43 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-078c4624-b1a6-4457-8a14-375c6d985011. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/09/02 17:16:43 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [18]:
spark.sql("SELECT * FROM modified_data").show()

+---+----+----+----+---+-----+---------+------+----+
| ID|Date|Open|High|Low|Close|Adj Close|Volume|diff|
+---+----+----+----+---+-----+---------+------+----+
+---+----+----+----+---+-----+---------+------+----+



In [19]:
#query_2.awaitTermination()

In [23]:
#after waiting a while
spark.sql("SELECT * FROM modified_data").show() #ORDER BY ID ASC

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|  995.201172000001|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791| 710.8007819999984|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656| 533.1015620000035|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964| 1066.298827999999|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|1208.4003909999992|
|245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560|  639.798827999999|
|246|2000-12-13|27651.599609

### Write the generated data into files instead of the memory. 

In [24]:
writer_3 = df_P2.writeStream \
    .outputMode("append") \
    .format("csv") \
    .option("checkpointLocation", "chkpnt") \
    .option("path", "OutStream/") 

In [25]:
query_3 = writer_3.start()

24/09/02 17:17:09 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


### Stop the query. Now, try reading the generated files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [26]:
query_3.stop()

In [27]:
newSchema = StructType([
    StructField('ID', StringType(), True),  
    StructField('Date', StringType(), True),
    StructField('Open', DoubleType(), True),
    StructField('High', DoubleType(), True),
    StructField('Low', DoubleType(), True),
    StructField('Close', DoubleType(), True),
    StructField('Adj Close', DoubleType(), True),
    StructField('Volume', IntegerType(), True),
    StructField('diff', DoubleType(), True)
])

In [28]:
df_2 = spark.read.schema(newSchema).csv("OutStream/")
df_2.show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|  995.201172000001|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791| 710.8007819999984|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656| 533.1015620000035|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964| 1066.298827999999|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|1208.4003909999992|
|245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560|  639.798827999999|
|246|2000-12-13|27651.599609

### Sort the dataframe based on the ID

In [29]:
finalDFSorted = df_2.sort('ID')
finalDFSorted.show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|            2559.0|
| 10|2000-01-18|23457.599609|     23742.0|22746.800781|23422.099609|22133.832031| 27995| 995.1992189999983|
|100|2000-05-23|17557.699219|17913.099609|16775.800781|17557.699219|16591.984375| 39671| 1137.298827999999|
|101|2000-05-24|17664.300781|18481.800781|16846.900391|     17415.5|16457.609375| 57256|1634.9003900000025|
|102|2000-05-25|18339.599609|19157.099609|17628.800781|     18695.0|17666.732422|219319| 1528.298827999999|
|103|2000-05-26|     18695.0