# Spark Streaming 

## Create the folder structure in DBFS File System

In [0]:
dbutils.fs.mkdirs("/FileStore/tables/stream_read/")
dbutils.fs.mkdirs("/FileStore/tables/stream_checkpoint/")
dbutils.fs.mkdirs("/FileStore/tables/stream_write/")

Out[19]: True

## Define Schema for Streaming Data

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *

In [0]:
schema_defined = StructType([StructField('File',StringType(),True),
                             StructField('Shop',StringType(),True),
                             StructField('Sale_count',IntegerType(),True)
                            ])

## Read Streaming Data

In [0]:
df = spark.readStream.format("csv") \
    .schema(schema_defined) \
    .option("header",True) \
    .option("sep",";") \
    .load("/FileStore/tables/stream_read/")

df1 = df.groupBy("Shop").sum("Sale_count")    
display(df1)

Shop,sum(Sale_count)
Snapdeal,50.0
Myntra,100.0
Flipkart,250.0
Alibaba,
Amazon,500.0


## Write Streaming Data

In [0]:
df4 = df.writeStream.format("parquet") \
    .outputMode("append") \
    .option("path","/FileStore/tables/stream_write/") \
    .option("checkpointLocation","/FileStore/tables/stream_checkpoint/") \
    .start().awaitTermination()

## To Read Data File in Parquet Format

In [0]:
df5 = spark.read.parquet("/FileStore/tables/stream_write/part-0000-372418a9-e19a-4e75-9338-dd613a910413-c000.snappy.parquet/")
df5.display()

File,Shop,Sale_count
File2,Amazon,100.0
File2,Flipkart,50.0
File2,Myntra,20.0
File2,Snapdeal,10.0
File2,Alibaba,


## Remove the folder from DBFS File System

In [0]:
dbutils.fs.rm("/FileStore/tables/stream_read/", True)
dbutils.fs.rm("/FileStore/tables/stream_checkpoint/", True)
dbutils.fs.rm("/FileStore/tables/stream_write/", True)

Out[32]: True