In [1]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=28203dc2ff8e0bb968e44dbb964673c1ab0657a20398f64832f1b7cd909118f8
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [3]:
# import required libraries
from pyspark.sql.types import *

In [4]:
# Create the schema

schemaStreamedfile =  StructType([
    StructField("ID", IntegerType(), True),
    StructField("Date", DateType(), True),
    StructField("Open", FloatType(), True),
    StructField("High", FloatType(), True),
    StructField("Low", FloatType(), True),
    StructField("Close", FloatType(), True),
    StructField("Adj Close", FloatType(), True),
    StructField("Volume", IntegerType(), True),

    # ID|Date|Open|High|Low|Close|Adj Close|Volume
])

### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [32]:
# Create a streaming DataFrame that connects to a streaming file source

dfStockReader = spark.readStream.format("csv") \
                .schema(schemaStreamedfile) \
                .option("header", "true") \
                .load("/content/InputStream")

### Make sure the dataframe is streaming the files from the folder

In [33]:
dfStockReader.isStreaming

True

### Create a stream writer into memory and specify the query name "stock:

In [38]:
streamWriter = dfStockReader.writeStream \
    .format("memory").queryName("stock").outputMode("append")

### Start the write stream and make sure it works (read all columns from the table)

In [39]:
firstQuery = streamWriter.start()

In [40]:
# with no uploaded files
spark.sql("SELECT * FROM stock").show(100)

+---+----+----+----+---+-----+---------+------+
| ID|Date|Open|High|Low|Close|Adj Close|Volume|
+---+----+----+----+---+-----+---------+------+
+---+----+----+----+---+-----+---------+------+



In [41]:
# upload KOSPI_STOCK_0 file

spark.sql("SELECT * FROM stock").show(100)

+---+----------+-------+-------+-------+-------+---------+------+
| ID|      Date|   Open|   High|    Low|  Close|Adj Close|Volume|
+---+----------+-------+-------+-------+-------+---------+------+
|  0|2000-01-04|22817.9|25696.8|22817.9|24879.3| 23510.88|108745|
|  1|2000-01-05|24523.9|26229.9|23670.9|24417.3|23074.295|175990|
|  2|2000-01-06|24381.7|24666.1|22746.8|22817.9|21562.865| 71746|
|  3|2000-01-07|22036.0|24879.3|22036.0|23884.2|22570.514|120984|
|  4|2000-01-10|24879.3|25519.1|23813.1|24061.9| 22738.44|151371|
|  5|2000-01-11|24168.5|25021.5|23955.2|24239.6|22906.365| 95943|
|  6|2000-01-12|24168.5|24452.8|23457.6|23670.9|22368.947| 61899|
|  7|2000-01-13|23670.9|24132.9|23102.2|23244.4|21965.906| 57538|
|  8|2000-01-14|23457.6|24168.5|22746.8|23244.4|21965.906| 84267|
|  9|2000-01-17|22533.6|23457.6|22533.6|23457.6|22167.377| 67807|
| 10|2000-01-18|23457.6|23742.0|22746.8|23422.1|22133.832| 27995|
| 11|2000-01-19|22817.9|23173.3|22036.0|22036.0| 20823.97| 44173|
| 12|2000-

In [42]:
# upload the rest file

spark.sql("SELECT * FROM stock").show(100)

+---+----------+-------+-------+-------+-------+---------+------+
| ID|      Date|   Open|   High|    Low|  Close|Adj Close|Volume|
+---+----------+-------+-------+-------+-------+---------+------+
|  0|2000-01-04|22817.9|25696.8|22817.9|24879.3| 23510.88|108745|
|  1|2000-01-05|24523.9|26229.9|23670.9|24417.3|23074.295|175990|
|  2|2000-01-06|24381.7|24666.1|22746.8|22817.9|21562.865| 71746|
|  3|2000-01-07|22036.0|24879.3|22036.0|23884.2|22570.514|120984|
|  4|2000-01-10|24879.3|25519.1|23813.1|24061.9| 22738.44|151371|
|  5|2000-01-11|24168.5|25021.5|23955.2|24239.6|22906.365| 95943|
|  6|2000-01-12|24168.5|24452.8|23457.6|23670.9|22368.947| 61899|
|  7|2000-01-13|23670.9|24132.9|23102.2|23244.4|21965.906| 57538|
|  8|2000-01-14|23457.6|24168.5|22746.8|23244.4|21965.906| 84267|
|  9|2000-01-17|22533.6|23457.6|22533.6|23457.6|22167.377| 67807|
| 10|2000-01-18|23457.6|23742.0|22746.8|23422.1|22133.832| 27995|
| 11|2000-01-19|22817.9|23173.3|22036.0|22036.0| 20823.97| 44173|
| 12|2000-

In [43]:
spark.sql("SELECT count(id) FROM stock").show()

+---------+
|count(id)|
+---------+
|      160|
+---------+



### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [44]:
dfStockProcess = dfStockReader.dropna(how = "all")


In [45]:
# check nulls

spark.sql("SELECT * FROM stock WHERE id IS NULL").show()

+---+----+----+----+---+-----+---------+------+
| ID|Date|Open|High|Low|Close|Adj Close|Volume|
+---+----+----+----+---+-----+---------+------+
+---+----+----+----+---+-----+---------+------+



In [46]:
from pyspark.sql.functions import col

In [47]:
# create diff column --> difference between high and low columns

dfStockProcess2 = dfStockProcess.withColumn("diff", col("high") - col("low"))

In [48]:
firstQuery.stop()

### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [49]:
streamWriterModified = dfStockProcess2.writeStream \
    .format("memory") \
    .queryName("modified_data") \
    .outputMode("append")


In [50]:
secondQuery = streamWriterModified.start()

In [51]:
spark.sql("SELECT * FROM modified_data").show(200)

+---+----------+-------+-------+-------+-------+---------+------+---------+
| ID|      Date|   Open|   High|    Low|  Close|Adj Close|Volume|     diff|
+---+----------+-------+-------+-------+-------+---------+------+---------+
|120|2000-06-20|22817.9|23102.2|21680.6|22320.3|21092.633| 34466|1421.5996|
|121|2000-06-21|21893.8|22675.7|21680.6|22675.7|21428.484| 68651| 995.0996|
|122|2000-06-22|23386.6|23386.6|22462.5|23031.1|21764.336| 97209| 924.0996|
|123|2000-06-23|22107.1|24097.4|22107.1|22889.0|21630.053|199483|1990.3008|
|124|2000-06-26|23102.2|24168.5|22569.1|24026.3|22704.797|121969|1599.4004|
|125|2000-06-27|24026.3|25519.1|23742.0|24026.3|22704.797|113809|1777.0996|
|126|2000-06-28|23884.2|24666.1|23884.2|24666.1|23309.408| 86236| 781.9004|
|127|2000-06-29|25234.7|25234.7|23919.7|24239.6|22906.365| 45299|   1315.0|
|128|2000-06-30|24523.9|25092.6|23742.0|24879.3| 23510.88| 76670|1350.5996|
|129|2000-07-03|24239.6|25590.2|24239.6|25092.6| 23712.45| 63306|1350.5996|
|130|2000-07

In [52]:
secondQuery.stop()

### Write the generated data into files instead of the memory.

In [53]:
streamWriterModified3 = dfStockProcess2.writeStream.format("csv"). \
    outputMode("append"). \
    option("checkpointLocation", "/content/checkpoint"). \
    option("path", "/content/OutputStream")

In [54]:
thirdQuery = streamWriterModified3.start()

### Stop the query. Now, try reading the generated files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [55]:
thirdQuery.stop()

In [56]:
finalSchema =  """ID int,
                  Date date,
                  Open float,
                  High float,
                  Low float,
                  Close float,
                  Adj float,
                  Volume int,
                  diff float"""

In [57]:
dfFinal = spark.read.csv("/content/OutputStream", header = False,schema = finalSchema)

In [58]:
dfFinal.show()

+---+----------+-------+-------+-------+-------+---------+------+---------+
| ID|      Date|   Open|   High|    Low|  Close|      Adj|Volume|     diff|
+---+----------+-------+-------+-------+-------+---------+------+---------+
|120|2000-06-20|22817.9|23102.2|21680.6|22320.3|21092.633| 34466|1421.5996|
|121|2000-06-21|21893.8|22675.7|21680.6|22675.7|21428.484| 68651| 995.0996|
|122|2000-06-22|23386.6|23386.6|22462.5|23031.1|21764.336| 97209| 924.0996|
|123|2000-06-23|22107.1|24097.4|22107.1|22889.0|21630.053|199483|1990.3008|
|124|2000-06-26|23102.2|24168.5|22569.1|24026.3|22704.797|121969|1599.4004|
|125|2000-06-27|24026.3|25519.1|23742.0|24026.3|22704.797|113809|1777.0996|
|126|2000-06-28|23884.2|24666.1|23884.2|24666.1|23309.408| 86236| 781.9004|
|127|2000-06-29|25234.7|25234.7|23919.7|24239.6|22906.365| 45299|   1315.0|
|128|2000-06-30|24523.9|25092.6|23742.0|24879.3| 23510.88| 76670|1350.5996|
|129|2000-07-03|24239.6|25590.2|24239.6|25092.6| 23712.45| 63306|1350.5996|
|130|2000-07

### Sort the dataframe based on the ID

In [59]:
dfFinalSorted = dfFinal.sort('ID')
dfFinalSorted.show()

+---+----------+-------+-------+-------+-------+---------+------+---------+
| ID|      Date|   Open|   High|    Low|  Close|      Adj|Volume|     diff|
+---+----------+-------+-------+-------+-------+---------+------+---------+
|  0|2000-01-04|22817.9|25696.8|22817.9|24879.3| 23510.88|108745|2878.9004|
|  1|2000-01-05|24523.9|26229.9|23670.9|24417.3|23074.295|175990|   2559.0|
|  2|2000-01-06|24381.7|24666.1|22746.8|22817.9|21562.865| 71746|1919.2988|
|  3|2000-01-07|22036.0|24879.3|22036.0|23884.2|22570.514|120984|2843.3008|
|  4|2000-01-10|24879.3|25519.1|23813.1|24061.9| 22738.44|151371|   1706.0|
|  5|2000-01-11|24168.5|25021.5|23955.2|24239.6|22906.365| 95943|1066.3008|
|  6|2000-01-12|24168.5|24452.8|23457.6|23670.9|22368.947| 61899| 995.2012|
|  7|2000-01-13|23670.9|24132.9|23102.2|23244.4|21965.906| 57538|1030.7012|
|  8|2000-01-14|23457.6|24168.5|22746.8|23244.4|21965.906| 84267|1421.6992|
|  9|2000-01-17|22533.6|23457.6|22533.6|23457.6|22167.377| 67807|    924.0|
| 10|2000-01