<a href="https://colab.research.google.com/github/sherif17/PySpark-For-Big-Data/blob/main/Streaming_Pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark
sc = spark.sparkContext
sc

In [None]:
from IPython.display import display, HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Create the schema of the streamed files (check the column names and types from the CSV files)

In [None]:
!unzip -u "/content/InputStream.zip"

Archive:  /content/InputStream.zip
  inflating: InputStream/KOSPI_STOCK_0.csv  
  inflating: InputStream/KOSPI_STOCK_1.csv  
  inflating: InputStream/KOSPI_STOCK_2.csv  
  inflating: InputStream/KOSPI_STOCK_3.csv  


In [None]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType,IntegerType,DateType

my_schema = StructType([
    StructField('ID',  IntegerType(), True),
    StructField('Date',DateType(), True),
    StructField('Open', DoubleType(), True),
    StructField('High', DoubleType(), True),
    StructField('Low', DoubleType(), True),
    StructField('Close', DoubleType(), True),
    StructField('dj Close', DoubleType(), True),
    StructField('Volume', IntegerType(), True)

])

### Create the dataframe by reading the stream using format "csv" and the schema you created.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ReadCSV").getOrCreate()

df = spark.readStream.format('csv').option("header", "true").option("inferSchema", "true").load('/content/InputStream/*.csv',schema=my_schema)


### Make sure the sataframe is streaming the files from the folder

In [None]:
df.printSchema()

root
 |-- ID: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- dj Close: double (nullable = true)
 |-- Volume: integer (nullable = true)



### Create a stream writer into memory and specify the query name "stock:

In [None]:
writer = df.writeStream.outputMode("append") \
  .format("memory") \
  .option("queryName", "stock") \
  .option("truncate", False) \
  .option("numRows", 100) 

### Start the write stream and make sure it works (read all columns from the table)

In [None]:
stock = writer.start() 

In [None]:
# query.stop()

In [None]:
stock.status['isDataAvailable']

False

In [None]:
df2 = spark.sql("SELECT * FROM stock")
df2.show(0)

+---+----+----+----+---+-----+--------+------+
| ID|Date|Open|High|Low|Close|dj Close|Volume|
+---+----+----+----+---+-----+--------+------+
+---+----+----+----+---+-----+--------+------+
only showing top 0 rows



In [None]:
df2.show(100)

+---+----------+------------+------------+------------+------------+------------+------+
| ID|      Date|        Open|        High|         Low|       Close|    dj Close|Volume|
+---+----------+------------+------------+------------+------------+------------+------+
|120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466|
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651|
|122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209|
|123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483|
|124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969|
|125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809|
|126|2000-06-28|23884.199219|24666.099609|23884.199219|24666.099609|23309.408203| 86236|
|127|2000-06-29|25234.699219|25234.699219|23919.699219|24239.599609|22906.365234| 45299|
|128|2000-06-30|24523

### Remove the first row from the data (hint: drop the rows where ALL values are null), then add a new column "diff", which is the difference between high and low columns

In [None]:
df3 = df.dropna(how='all')

In [None]:
from pyspark.sql.functions import col
df4 = df.withColumn("diff", col("High") - col("Low"))

### Create a new write stream using the new generated dataframe and call the generate table "modified_data"

In [None]:
stock.stop()

In [None]:
writer = df4.writeStream.outputMode("append") \
  .format("memory") \
  .option("queryName", "modified_data") \
  .option("truncate", False) \
  .option("numRows", 100)

In [None]:
query = writer.start()

### Write the generated data into files instead of the memory. 

In [None]:
writer = df4.writeStream \
    .outputMode("append") \
    .format("csv") \
    .option("path", "Output") \
   .option("checkpointLocation", "checkpoint") \
    .option("header", "true")

In [None]:
query = writer.start()

### Stop the query. Now, try reading the generated files into a normal dataframe
- Create a schema and use it to read the data.
- Show the output.

In [None]:
query.stop()

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType,IntegerType,DateType

my_schema = StructType([
    StructField('ID',  IntegerType(), True),
    StructField('Date',DateType(), True),
    StructField('Open', DoubleType(), True),
    StructField('High', DoubleType(), True),
    StructField('Low', DoubleType(), True),
    StructField('Close', DoubleType(), True),
    StructField('dj Close', DoubleType(), True),
    StructField('Volume', IntegerType(), True)

])

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ReadCSV").getOrCreate()

df = spark.readStream.format('csv').option("header", "true").option("inferSchema", "true").load('Output/*.csv',schema=my_schema)

In [None]:
writer = df.writeStream.outputMode("append") \
  .format("memory") \
  .option("queryName", "normal_dataframe") \
  .option("truncate", False) \
  .option("numRows", 100) 

In [None]:
query = writer.start()

In [None]:
df5 = spark.sql("SELECT * FROM normal_dataframe")
df5.show()

+---+----------+------------+------------+------------+------------+------------+------+
| ID|      Date|        Open|        High|         Low|       Close|    dj Close|Volume|
+---+----------+------------+------------+------------+------------+------------+------+
|120|2000-06-20|22817.900391|23102.199219|21680.599609|22320.300781|21092.632813| 34466|
|121|2000-06-21|21893.800781|22675.699219|21680.599609|22675.699219|21428.484375| 68651|
|122|2000-06-22|23386.599609|23386.599609|     22462.5|23031.099609|21764.335938| 97209|
|123|2000-06-23|22107.099609|24097.400391|22107.099609|     22889.0|21630.052734|199483|
|124|2000-06-26|23102.199219|     24168.5|22569.099609|24026.300781|22704.796875|121969|
|125|2000-06-27|24026.300781|25519.099609|     23742.0|24026.300781|22704.796875|113809|
|126|2000-06-28|23884.199219|24666.099609|23884.199219|24666.099609|23309.408203| 86236|
|127|2000-06-29|25234.699219|25234.699219|23919.699219|24239.599609|22906.365234| 45299|
|128|2000-06-30|24523

### Sort the dataframe based on the ID

In [None]:
finalDFSorted = df5.sort('ID')
finalDFSorted.show()

+---+----------+------------+------------+------------+------------+------------+------+
| ID|      Date|        Open|        High|         Low|       Close|    dj Close|Volume|
+---+----------+------------+------------+------------+------------+------------+------+
|  0|2000-01-04|22817.900391|25696.800781|22817.900391|24879.300781|23510.880859|108745|
|  1|2000-01-05|24523.900391|26229.900391|23670.900391|24417.300781|23074.294922|175990|
|  2|2000-01-06|24381.699219|24666.099609|22746.800781|22817.900391|21562.865234| 71746|
|  3|2000-01-07|     22036.0|24879.300781|     22036.0|23884.199219|22570.513672|120984|
|  4|2000-01-10|24879.300781|25519.099609|23813.099609|24061.900391|22738.439453|151371|
|  5|2000-01-11|     24168.5|     25021.5|23955.199219|24239.599609|22906.365234| 95943|
|  6|2000-01-12|     24168.5|24452.800781|23457.599609|23670.900391|22368.947266| 61899|
|  7|2000-01-13|23670.900391|24132.900391|23102.199219|23244.400391| 21965.90625| 57538|
|  8|2000-01-14|23457