In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext

spark = SparkSession.builder \
    .appName("Streaming") \
    .getOrCreate()

sc = spark.sparkContext
print(sc) 

24/09/02 15:15:15 WARN Utils: Your hostname, DESKTOP-26AECPL resolves to a loopback address: 127.0.1.1; using 192.168.220.1 instead (on interface eth1)
24/09/02 15:15:15 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/02 15:15:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


<SparkContext master=local[*] appName=Streaming>


In [2]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [3]:
from pyspark.sql.types import *

schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Date", DateType(), True),
    StructField("Open", DoubleType(), True),
    StructField("High", DoubleType(), True),
    StructField("Low", DoubleType(), True),
    StructField("Close", DoubleType(), True),
    StructField("Adj Close", DoubleType(), True),
    StructField("Volume", IntegerType(), True)
])

24/09/02 15:15:41 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [4]:
streaming_df = spark.readStream \
    .format("csv") \
    .option("header", "False") \
    .schema(schema) \
    .load("/home/codebind/spark/kospi")  

### Make sure the dataframe is streaming the files from the folder

In [5]:
streaming_df.isStreaming

True

### Create a stream writer into memory and specify the query name "stock:

In [6]:
query = streaming_df.writeStream \
    .format("memory") \
    .queryName("stock") \
    .outputMode("append") \
    .start()

24/09/02 15:15:55 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-7c6c251d-43a8-4ac0-9ab4-7244eea8dc92. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/09/02 15:15:55 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


### Start the write stream and make sure it works (read all columns from the table)

In [7]:
stock_df = spark.sql("SELECT * FROM stock")
stock_df

[Stage 0:>                                                          (0 + 4) / 4]

DataFrame[ID: int, Date: date, Open: double, High: double, Low: double, Close: double, Adj Close: double, Volume: int]

                                                                                

In [8]:
query.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}

In [9]:
stock_df.show(100)

+----+----------+------------+------------+------------+------------+------------+------+
|  ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|
+----+----------+------------+------------+------------+------------+------------+------+
|NULL|      NULL|        NULL|        NULL|        NULL|        NULL|        NULL|  NULL|
| 240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|
| 241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791|
| 242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656|
| 243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964|
| 244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|
| 245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560|
| 246|2000-12-13|27651.599609|     29286.5|27651.599609|28469.099609|26903.234375|270385|
| 247|2000

### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [10]:
from pyspark.sql.functions import col

filtered_df = streaming_df.withColumn("diff", col("High") - col("Low")).dropna(how='all')

In [11]:
filtered_df.isStreaming

True

### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [12]:
query.stop()

In [13]:
query = filtered_df.writeStream \
    .format("memory") \
    .outputMode("append") \
    .queryName("modified_data") \
    .start()

24/09/02 15:16:27 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-3bc0e055-28f4-4a97-9e78-f241e54c96c7. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/09/02 15:16:27 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


### Write the generated data into files instead of the memory. 

In [14]:
query = filtered_df.writeStream\
    .outputMode("append")\
    .format("csv") \
    .option("path", "/home/codebind/spark/kospi_updated") \
    .option("checkpointLocation", "/home/codebind/spark/kospi_updated/checkpoint") \ #rename in every run
    .start()

24/09/02 15:16:32 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [15]:
query.stop()

### Stop the query. Now, try reading the generated files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [16]:
schema = StructType([
    StructField("ID", IntegerType(), True),
    StructField("Date", StringType(), True),
    StructField("Open", DoubleType(), True),
    StructField("High", DoubleType(), True),
    StructField("Low", DoubleType(), True),
    StructField("Close", DoubleType(), True),
    StructField("Adj Close", DoubleType(), True),
    StructField("Volume", LongType(), True),
    StructField("diff", DoubleType(), True)  
])

In [18]:
finalDF = spark.read.format("csv") \
    .schema(schema) \
    .load("/home/codebind/spark/kospi_updated") #Static Query 

finalDF.show(100)

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|  995.201172000001|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791| 710.8007819999984|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656| 533.1015620000035|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964| 1066.298827999999|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|1208.4003909999992|
|245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560|  639.798827999999|
|246|2000-12-13|27651.599609

### Sort the dataframe based on the ID

In [19]:
finalDFSorted = finalDF.sort('ID')
finalDFSorted.show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|            2559.0|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746| 1919.298827999999|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|2843.3007810000017|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|            1706.0|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|1066.3007810000017|
|  6|2000-01-12|     24168.5

24/09/02 18:20:12 WARN FileStreamSource: Listed 11 file(s) in 10799115 ms
24/09/03 08:10:28 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 49787786 ms exceeds timeout 120000 ms
24/09/03 08:10:28 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10000 milliseconds]. This timeout is controlled by spark.executor.heartbeatInterval
	at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
	at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.e