In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Sparkteaming.com').getOrCreate()
from pyspark.sql.types import StructType, StructField, IntegerType, StringType,FloatType,DoubleType,LongType,DateType


In [2]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [3]:
schema = StructType(
    [
        StructField('ID', IntegerType()),
        StructField('Date', DateType()),
        StructField('Open', DoubleType()),
        StructField('High', DoubleType()),
        StructField('Low', DoubleType()),
        StructField('Close', DoubleType()),
        StructField('Adj Close', DoubleType()),
        StructField('Volume', LongType())]
)

### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [4]:
df_schema=spark.readStream.format("csv")\
.option('header','true')\
.schema(schema)\
.load("/content/drive/MyDrive/Data (1)/Data/kospi")

### Make sure the dataframe is streaming the files from the folder

In [5]:
df_schema.isStreaming

True

In [6]:
df_schema.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: long (nullable = true)



### Create a stream writer into memory and specify the query name "stock:

In [7]:
writer = df_schema.writeStream.format('memory')\
.outputMode('append')\
.queryName('stock')\
.trigger(processingTime='3 seconds')

### Start the write stream and make sure it works (read all columns from the table)

In [8]:
q=writer.start()

In [9]:
q.isActive

True

In [12]:
df_q=spark.sql("select * from stock").show(100)

+---+----------+------------+------------+------------+------------+------------+------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|
+---+----------+------------+------------+------------+------------+------------+------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|
|245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560|
|246|2000-12-13|27651.599609|     29286.5|27651.599609|28469.099609|26903.234375|270385|
|247|2000-12-14|28469.099609|29784.099609|28291.300781|28362.400391| 26802.40625|256317|
|248|2000-12-15|28362

+----+----------+------------+------------+------------+------------+------------+------+
|  ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|
+----+----------+------------+------------+------------+------------+------------+------+
|null|      null|        null|        null|        null|        null|        null|  null|
| 120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466|
| 121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651|
| 122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209|
| 123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483|
| 124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969|
| 125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809|
| 126|2000-06-28|23884.199219|24666.099609|23884.199219|24666.099609|23309.408203| 86236|
| 127|2000

### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [13]:
df=df_schema.na.drop(how='all')
df_diff=df.withColumn('diff',df.High-df.Low)



In [14]:
df_diff.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: long (nullable = true)
 |-- diff: double (nullable = true)



In [15]:
q.stop()

### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [16]:

writer2 = df_diff.writeStream.format('memory')\
.outputMode('append')\
.queryName('modified_data')\
.trigger(processingTime='3 seconds')

In [17]:
q2=writer2.start()

In [18]:
q2.isActive

True

In [19]:
df_q2=spark.sql("select * from modified_data").show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|  995.201172000001|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791| 710.8007819999984|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656| 533.1015620000035|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964| 1066.298827999999|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|1208.4003909999992|
|245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560|  639.798827999999|
|246|2000-12-13|27651.599609

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466|1421.5996099999975|
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651| 995.0996099999975|
|122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209| 924.0996090000008|
|123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483|1990.3007819999984|
|124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969|1599.4003909999992|
|125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809|1777.0996090000008|
|126|2000-06-28|23884.199219

### Write the generated data into files instead of the memory.

In [20]:
writer3=df_diff.writeStream.format('csv')\
.outputMode('append')\
.option('path','/content/drive/MyDrive/Data (1)/Data/outStream')\
.option('checkpointLocation','chkpoint1')\
.trigger(processingTime='2 seconds')


In [21]:
q3=writer3.start()

### Stop the query. Now, try reading the generated files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [22]:
q3.stop()

In [23]:
schema2=schema = StructType(
    [
        StructField('ID', IntegerType()),
        StructField('Date', DateType()),
        StructField('Open', DoubleType()),
        StructField('High', DoubleType()),
        StructField('Low', DoubleType()),
        StructField('Close', DoubleType()),
        StructField('Adj Close', DoubleType()),
        StructField('Volume', LongType()),
        StructField('diff', DoubleType())]
)

In [24]:
df_schema3=spark.read.format('csv')\
.option('header','true')\
.schema(schema2)\
.load("/content/drive/MyDrive/Data (1)/Data/outStream")

In [25]:
df_schema3.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Adj Close: double (nullable = true)
 |-- Volume: long (nullable = true)
 |-- diff: double (nullable = true)



In [28]:
writer4=df_diff.writeStream.format('memory')\
.outputMode('append')\
.queryName('modified_data2')\
.trigger(processingTime='3 seconds')

In [29]:
q4=writer4.start()

In [30]:
df_q4=spark.sql("select * from modified_data2").show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|  995.201172000001|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791| 710.8007819999984|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656| 533.1015620000035|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964| 1066.298827999999|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|1208.4003909999992|
|245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560|  639.798827999999|
|246|2000-12-13|27651.599609

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466|1421.5996099999975|
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651| 995.0996099999975|
|122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209| 924.0996090000008|
|123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483|1990.3007819999984|
|124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969|1599.4003909999992|
|125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809|1777.0996090000008|
|126|2000-06-28|23884.199219

In [None]:
q4.stop()

### Sort the dataframe based on the ID

In [21]:
sorted_Data=spark.sql("select * from modified_data order by ID")

In [22]:
sorted_Data.show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|            2559.0|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746| 1919.298827999999|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|2843.3007810000017|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|            1706.0|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|1066.3007810000017|
|  6|2000-01-12|     24168.5

In [None]:
finalDFSorted = finalDF.sort('ID')
finalDFSorted.show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|       Close|   Adj Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|            2559.0|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746| 1919.298827999999|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|2843.3007810000017|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|            1706.0|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|1066.3007810000017|
|  6|2000-01-12|     24168.5