In [1]:
import findspark
findspark.init()
import pyspark

In [2]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Streaming").getOrCreate()

22/06/19 16:52:12 WARN Utils: Your hostname, medo-HP-ZBook-15-G3 resolves to a loopback address: 127.0.1.1; using 10.175.240.203 instead (on interface wlp2s0)
22/06/19 16:52:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/19 16:52:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Create the schema of the streamed files (check the column names and types from the CSV files)

In [4]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType , DoubleType, DateType
recordSchema = StructType([StructField('ID', IntegerType(), True),
                           StructField('Date', DateType(), True),
                           StructField('Open', DoubleType(), True),
                           StructField('High', DoubleType(), True),
                           StructField('Low', DoubleType(), True),
                           StructField('Adj', DoubleType(), True),
                           StructField('Close', DoubleType(), True),
                           StructField('Volume', IntegerType(), True)])

### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [5]:
df = spark.readStream.format("csv") \
    .schema(recordSchema) \
    .load("kospi/")

### Make sure the dataframe is streaming the files from the folder

In [6]:
df.isStreaming

True

In [7]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Adj: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



### Create a stream writer into memory and specify the query name "stock:

In [8]:
writer = df.writeStream.outputMode('append')\
            .format('memory')\
            .queryName('stock')

In [9]:
query = writer.start('stock')

22/06/19 16:52:19 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-4726aacf-d6b9-4a4b-a434-031ab1ec5b5c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/06/19 16:52:19 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

### Start the write stream and make sure it works (read all columns from the table)

In [10]:
spark.sql('SELECT * FROM stock').show()

+----+----------+------------+------------+------------+------------+------------+------+
|  ID|      Date|        Open|        High|         Low|         Adj|       Close|Volume|
+----+----------+------------+------------+------------+------------+------------+------+
|null|      null|        null|        null|        null|        null|        null|  null|
| 240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|
| 241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791|
| 242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656|
| 243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964|
| 244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|
| 245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560|
| 246|2000-12-13|27651.599609|     29286.5|27651.599609|28469.099609|26903.234375|270385|
| 247|2000

### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [11]:
df2 =df.dropna(how='any')

### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [12]:
# query.stop()

In [13]:
dd=df2['High'] - df2['Low']

In [14]:
dd

Column<'(High - Low)'>

In [15]:
modified_data = df2.withColumn('diff' ,dd)

In [16]:
writer = modified_data.writeStream.outputMode('append')\
            .format('memory')\
            .queryName('stock')

In [17]:
query.stop()

In [18]:
query = writer.start()

22/06/19 16:52:48 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-9c80a2d5-a97c-4ebe-85ff-6787a8d0c814. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
22/06/19 16:52:48 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [19]:
spark.sql('select * from stock').show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|         Adj|       Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|  995.201172000001|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791| 710.8007819999984|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656| 533.1015620000035|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964| 1066.298827999999|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|1208.4003909999992|
|245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560|  639.798827999999|
|246|2000-12-13|27651.599609

In [20]:
modified_data.columns

['ID', 'Date', 'Open', 'High', 'Low', 'Adj', 'Close', 'Volume', 'diff']

+---+----+----+----+---+-----+---------+------+----+
| ID|Date|Open|High|Low|Close|Adj Close|Volume|diff|
+---+----+----+----+---+-----+---------+------+----+
+---+----+----+----+---+-----+---------+------+----+



In [21]:
df3 = spark.sql('select * from stock')

In [22]:
df3.show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|         Adj|       Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|240|2000-12-05|26585.300781|27367.300781|26372.099609|27011.800781|25526.091797| 91019|  995.201172000001|
|241|2000-12-06|27011.800781|27509.400391|26798.599609|26869.699219|25391.804688|105791| 710.8007819999984|
|242|2000-12-07|27011.800781|27011.800781|26478.699219|26656.400391|25190.236328| 40656| 533.1015620000035|
|243|2000-12-08|26656.400391|27722.699219|26656.400391|27651.599609|26130.699219|149964| 1066.298827999999|
|244|2000-12-11|27687.099609|     28860.0|27651.599609|28078.099609|26533.740234|159671|1208.4003909999992|
|245|2000-12-12|28042.599609|28078.099609|27438.300781|27935.900391|26399.361328| 74560|  639.798827999999|
|246|2000-12-13|27651.599609

In [23]:
query.stop()

### Write the generated data into files instead of the memory. 

In [25]:
# df3.write.format("parquet").save('/parquej')
df3.write.format('parquet').save('gen2')

22/06/19 16:53:26 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

### Stop the query. Now, try reading the generated parquet files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [26]:
query.stop()

In [27]:
df5 = spark.read.parquet('gen' , header = True)

In [28]:
df5.show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|         Adj|       Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
| 70|2000-04-11|23244.400391|     23351.0|22213.699219|22213.699219|20991.894531| 33341|1137.3007810000017|
| 71|2000-04-12|22249.199219|22391.400391|21751.599609|22178.099609|20958.253906| 38546| 639.8007819999984|
| 72|2000-04-13|22178.099609|22178.099609|22178.099609|22178.099609|20958.253906|     0|               0.0|
| 73|2000-04-14|21751.599609|21822.699219|21111.900391|21680.599609|20488.117188| 33622|  710.798827999999|
| 74|2000-04-17|     19903.5|20969.699219|     19050.5|19761.300781|18674.380859| 56272|1919.1992189999983|
| 75|2000-04-18|20472.099609|     21183.0|19619.099609|     19903.5|18808.763672| 62321|1563.9003909999992|
| 76|2000-04-19|20614.300781

### Sort the dataframe based on the ID

In [29]:
finalDFSorted = df5.sort('ID')
finalDFSorted.show()

+---+----------+------------+------------+------------+------------+------------+------+------------------+
| ID|      Date|        Open|        High|         Low|         Adj|       Close|Volume|              diff|
+---+----------+------------+------------+------------+------------+------------+------+------------------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|2878.9003900000025|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|            2559.0|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746| 1919.298827999999|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|2843.3007810000017|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|            1706.0|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|1066.3007810000017|
|  6|2000-01-12|     24168.5