<a href="https://colab.research.google.com/github/shanvelc/NN/blob/main/M7_AST_07_Spark_Streaming_C.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Advanced Certification Program in Computational Data Science
## A program by IISc and TalentSprint
### Assignment 7: Spark Streaming


## Learning Objectives
At the end of the experiment, you will be able to :

* understand Spark Structured Streaming pipeline and implement it using Pyspark.
* understand Structured Streaming Query along with steps involved.

## Information

### Streaming data:
Streaming data is data that is continuously generated by different sources such as applications, networking devices, server log files, website activity, banking transactions and location data. Such data should be processed incrementally using Stream Processing techniques without having access to all of the data. Besides, it should be considered that concept drift may happen in the data which means that the properties of the stream may change over time.
It is usually used in the context of big data in which it is generated by many different sources at high speed.

To know more about streaming data click [here](https://aws.amazon.com/streaming-data/).


### Examples of streaming data
* IoT/Sensor generated data at certain time intervals in vehicles, industrial equipments, and farm machineries can be monitored to check performance and detect defects in advance.
* Continuous changing stock market value can be tracked for the financial institution in real-time.
* A media publisher streams billions of clickstream records from its online properties, aggregates and enriches the data with demographic information about users, and optimizes content placement on its site, delivering relevancy and a better experience to its audience.

### Setup Steps:

In [1]:
#@title Please enter your registration id to start: { run: "auto", display-mode: "form" }
Id = "2306024" #@param {type:"string"}

In [2]:
#@title Please enter your password (your registered phone number) to continue: { run: "auto", display-mode: "form" }
password = "9742221781" #@param {type:"string"}

In [3]:
#@title Run this cell to complete the setup for this Notebook
from IPython import get_ipython

ipython = get_ipython()

notebook= "M7_AST_07_Spark_Streaming_C" #name of the notebook

def setup():
#  ipython.magic("sx pip3 install torch")
    ipython.magic("sx wget https://cdn.iisc.talentsprint.com/CDS/Datasets/TempHumi.csv")
    from IPython.display import HTML, display
    display(HTML('<script src="https://dashboard.talentsprint.com/aiml/record_ip.html?traineeId={0}&recordId={1}"></script>'.format(getId(),submission_id)))
    print("Setup completed successfully")
    return

def submit_notebook():
    ipython.magic("notebook -e "+ notebook + ".ipynb")

    import requests, json, base64, datetime

    url = "https://dashboard.talentsprint.com/xp/app/save_notebook_attempts"
    if not submission_id:
      data = {"id" : getId(), "notebook" : notebook, "mobile" : getPassword()}
      r = requests.post(url, data = data)
      r = json.loads(r.text)

      if r["status"] == "Success":
          return r["record_id"]
      elif "err" in r:
        print(r["err"])
        return None
      else:
        print ("Something is wrong, the notebook will not be submitted for grading")
        return None

    elif getAnswer() and getComplexity() and getAdditional() and getConcepts() and getComments() and getMentorSupport():
      f = open(notebook + ".ipynb", "rb")
      file_hash = base64.b64encode(f.read())

      data = {"complexity" : Complexity, "additional" :Additional,
              "concepts" : Concepts, "record_id" : submission_id,
              "answer" : Answer, "id" : Id, "file_hash" : file_hash,
              "notebook" : notebook,
              "feedback_experiments_input" : Comments,
              "feedback_mentor_support": Mentor_support}
      r = requests.post(url, data = data)
      r = json.loads(r.text)
      if "err" in r:
        print(r["err"])
        return None
      else:
        print("Your submission is successful.")
        print("Ref Id:", submission_id)
        print("Date of submission: ", r["date"])
        print("Time of submission: ", r["time"])
        print("View your submissions: https://cds-iisc.talentsprint.com/notebook_submissions")
        #print("For any queries/discrepancies, please connect with mentors through the chat icon in LMS dashboard.")
        return submission_id
    else: submission_id


def getAdditional():
  try:
    if not Additional:
      raise NameError
    else:
      return Additional
  except NameError:
    print ("Please answer Additional Question")
    return None

def getComplexity():
  try:
    if not Complexity:
      raise NameError
    else:
      return Complexity
  except NameError:
    print ("Please answer Complexity Question")
    return None

def getConcepts():
  try:
    if not Concepts:
      raise NameError
    else:
      return Concepts
  except NameError:
    print ("Please answer Concepts Question")
    return None


# def getWalkthrough():
#   try:
#     if not Walkthrough:
#       raise NameError
#     else:
#       return Walkthrough
#   except NameError:
#     print ("Please answer Walkthrough Question")
#     return None

def getComments():
  try:
    if not Comments:
      raise NameError
    else:
      return Comments
  except NameError:
    print ("Please answer Comments Question")
    return None


def getMentorSupport():
  try:
    if not Mentor_support:
      raise NameError
    else:
      return Mentor_support
  except NameError:
    print ("Please answer Mentor support Question")
    return None

def getAnswer():
  try:
    if not Answer:
      raise NameError
    else:
      return Answer
  except NameError:
    print ("Please answer Question")
    return None


def getId():
  try:
    return Id if Id else None
  except NameError:
    return None

def getPassword():
  try:
    return password if password else None
  except NameError:
    return None

submission_id = None
### Setup
if getPassword() and getId():
  submission_id = submit_notebook()
  if submission_id:
    setup()
else:
  print ("Please complete Id and Password cells before running setup")



Setup completed successfully


#### Install Pyspark

In [4]:
!pip -qq install pyspark

#### Importing required packages

In [5]:
import os
import time
import random
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

#### Starting the Spark Session

In [6]:
spark = SparkSession.builder.master("local[*]").getOrCreate()

#### Reading Data



In [7]:
df=spark.read.csv('TempHumi.csv',header=True,inferSchema=True)

In [8]:
df.columns

['Date', 'Temperatur (C)', 'Humidity (%)', 'Hour']

In [9]:
df.show(2)

+--------------------+--------------+------------+----+
|                Date|Temperatur (C)|Humidity (%)|Hour|
+--------------------+--------------+------------+----+
|12/12/2017 12:00:...|          16.1|      72.092|   0|
|12/12/2017 12:05:...|          16.0|      72.092|   0|
+--------------------+--------------+------------+----+
only showing top 2 rows



In [10]:
df.select('Date').dtypes # Checking data type of any column

[('Date', 'string')]

In [11]:
df.groupBy("Hour").count().show(24)

+----+-----+
|Hour|count|
+----+-----+
|  12|   12|
|  22|   12|
|   1|   12|
|  13|   12|
|   6|   12|
|  16|   12|
|   3|   11|
|  20|   12|
|   5|   12|
|  19|   12|
|  15|   12|
|   9|   12|
|  17|   12|
|   4|   13|
|   8|   12|
|  23|   12|
|   7|   12|
|  10|   12|
|  21|   12|
|  11|   12|
|  14|   12|
|   2|   12|
|   0|   12|
|  18|   12|
+----+-----+



We can now save the output of that job by filtering on each step and saving it to a separate file

In [12]:
!mkdir HourFolder  # Making a folder on current directory

In [15]:
steps = df.select("Hour").distinct().collect()

In [16]:
print(steps)
print(type(steps))
print(steps[0])
print(steps[0][0])

[Row(Hour=12), Row(Hour=22), Row(Hour=1), Row(Hour=13), Row(Hour=6), Row(Hour=16), Row(Hour=3), Row(Hour=20), Row(Hour=5), Row(Hour=19), Row(Hour=15), Row(Hour=9), Row(Hour=17), Row(Hour=4), Row(Hour=8), Row(Hour=23), Row(Hour=7), Row(Hour=10), Row(Hour=21), Row(Hour=11), Row(Hour=14), Row(Hour=2), Row(Hour=0), Row(Hour=18)]
<class 'list'>
Row(Hour=12)
12


In [17]:
df_test = df.where(f"Hour={steps[0][0]}")
df_test.show()
type(df_test)

+--------------------+--------------+------------+----+
|                Date|Temperatur (C)|Humidity (%)|Hour|
+--------------------+--------------+------------+----+
|12/12/2017 12:00:...|     19.900002|   69.742004|  12|
|12/12/2017 12:05:...|         19.95|      68.892|  12|
|12/12/2017 12:10:...|         20.05|    69.04201|  12|
|12/12/2017 12:15:...|         20.05|      68.942|  12|
|12/12/2017 12:20:...|          20.3|      69.342|  12|
|12/12/2017 12:25:...|         20.35|      69.092|  12|
|12/12/2017 12:30:...|     20.400002|      68.842|  12|
|12/12/2017 12:35:...|         20.45|      68.642|  12|
|12/12/2017 12:40:...|          20.5|   68.742004|  12|
|12/12/2017 12:45:...|         20.55|      68.592|  12|
|12/12/2017 12:50:...|     20.599998|   68.242004|  12|
|12/12/2017 12:55:...|     20.599998|    68.29201|  12|
+--------------------+--------------+------------+----+



###  Creating the streaming version of this input
Implementing the above steps in the loop and making individual `csv` files for each hour step. These files are saved in a folder as given below. We will use this folder as a source of incoming stream data and, we will read each file one by one as if it is a stream.

To know more about coalesce, click [here](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.coalesce.html)

In [18]:
for step in steps:
  df1 = df.where(f"Hour={step[0]}")
  df1.coalesce(1).write.csv(path='/content/HourFolder/Hourly_Record',header="true",mode="append")

In [19]:
Hourly_Record_list = [i for i in os.listdir("/content/HourFolder/Hourly_Record/") if i.endswith(".csv")]
Hourly_Record_list

['part-00000-6e6a1220-3844-4cdf-8f66-62d4ad39bb52-c000.csv',
 'part-00000-0246b67d-96b0-4f9d-8e7a-8999cef5207c-c000.csv',
 'part-00000-eccbc05a-a619-4b1f-a3c6-1332cd16bec4-c000.csv',
 'part-00000-d7da3bb0-0029-4ab8-9044-68256c6033c9-c000.csv',
 'part-00000-b049b1e1-cba2-4284-acc0-c76cd69b2137-c000.csv',
 'part-00000-58ad75cf-f383-497c-8309-a7cad12d04ef-c000.csv',
 'part-00000-b8091c0c-c0cf-4c8a-a320-8d9ddcef2301-c000.csv',
 'part-00000-71d794e9-3e26-4b33-8e8a-e5564545d172-c000.csv',
 'part-00000-4e5f51c8-0b7f-472e-aafe-e753c3f7e231-c000.csv',
 'part-00000-f3b589bf-062a-4823-83e4-d9bbb9bb4912-c000.csv',
 'part-00000-7d4816b7-b5df-4a8a-bb52-747aa54087d4-c000.csv',
 'part-00000-8e4ce6af-1f8c-41be-96e8-8915dfeb87e7-c000.csv',
 'part-00000-4d0cee61-a177-4e17-8ffd-c8bc4f924016-c000.csv',
 'part-00000-9eea29b4-e6ae-4dda-9454-b672a9a8bed8-c000.csv',
 'part-00000-d2a065cf-370c-439d-82fe-ddb329ec8aa7-c000.csv',
 'part-00000-3ec785aa-a2ac-48aa-83d4-0116185b6b9e-c000.csv',
 'part-00000-27210766-42

#### Checking any one file from above :

In [20]:
file_name = Hourly_Record_list[random.randint(0,len(Hourly_Record_list)-1)]
file_name

'part-00000-4496a62b-67c8-4ca3-9519-d0c39ed8a98d-c000.csv'

In [21]:
part =spark.read.csv("/content/HourFolder/Hourly_Record/"+file_name, header=True,inferSchema=True)
part.show()

+--------------------+--------------+------------+----+
|                Date|Temperatur (C)|Humidity (%)|Hour|
+--------------------+--------------+------------+----+
|12/12/2017 04:00:...|         18.35|      69.892|  16|
|12/12/2017 04:05:...|         18.25|      69.942|  16|
|12/12/2017 04:10:...|         18.25|      69.942|  16|
|12/12/2017 04:15:...|          18.2|      70.092|  16|
|12/12/2017 04:20:...|         18.15|   69.992004|  16|
|12/12/2017 04:25:...|         18.05|   69.992004|  16|
|12/12/2017 04:30:...|         17.95|      70.142|  16|
|12/12/2017 04:35:...|     17.900002|      70.442|  16|
|12/12/2017 04:40:...|     17.900002|      70.392|  16|
|12/12/2017 04:45:...|         17.85|      70.442|  16|
|12/12/2017 04:50:...|          17.8|      70.342|  16|
|12/12/2017 04:55:...|          17.7|      70.442|  16|
+--------------------+--------------+------------+----+



### The Programming Model of Structured Streaming
“Table” is a well-known concept that developers are familiar with when building
batch applications. Structured Streaming extends this concept to streaming applications by treating a stream as an unbounded, continuously appended table, as illustrated in Figure.

![data_stream_unbounded%20table.PNG](https://cdn.iisc.talentsprint.com/CDS/Images/data_stream_unbounded_table.PNG)

Every new record received in the data stream is like a new row being appended to the unbounded input table. This leads to a new stream processing model that is very similar to a batch processing model. We will express our streaming computation as a standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table. Let’s understand this model in more detail. A query on the input will generate the “Result Table” as given in the figure below. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink.

![spark%20stream.PNG](https://cdn.iisc.talentsprint.com/CDS/Images/spark_stream.PNG)

The “Output” is defined as what gets written out to the external storage. The output can be defined in a different mode:

Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle the writing of the entire table.

Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.

Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.

Note that each mode is applicable on certain types of queries.

### Five Steps to Define a Streaming Query
We have prepared our data as if it is coming from a continuous streaming source now in this section, we will explore the steps involved in defining a streaming query.
#### Step 1: Define input sources
*  The first step is to define a DataFrame from a streaming source. For streaming sources, we need spark .readStream to create a DataStreamReader.
* Apache Spark natively supports reading data streams from Apache Kafka and all the various file-based formats that DataFrameReader supports (Parquet, ORC, JSON, etc.).
* We can think of the folder containing 24 csv files as the Kafka source feeding to spark streaming.

In [22]:
part.schema # checking the part schema

StructType([StructField('Date', StringType(), True), StructField('Temperatur (C)', DoubleType(), True), StructField('Humidity (%)', DoubleType(), True), StructField('Hour', IntegerType(), True)])

In [23]:
streaming = (spark.readStream.schema(part.schema).option('maxFilesPerTrigger',1).csv('HourFolder/Hourly_Record/'))
# maxFilesPerTrigger: maximum number of new files to be considered in every trigger, here taken 1.
# File source - Reads files written in a directory as a stream of data, here directory: HourFolder/Hourly_Record.
# Supported file formats are text, CSV, JSON, ORC, Parquet, here is CSV.
# Files will be processed in the order of file modification time.

In [24]:
type(streaming)

#### Step 2: Setting  Up Transformation:


In [25]:
Hourly_Mean_Value = streaming.groupBy('Hour').mean("Temperatur (C)","Humidity (%)").orderBy(F.desc("Hour"))

* Hourly_Mean_Value is a streaming DataFrame (that is, a DataFrame on unbounded, streaming data) that represents the running mean that will be computed once the streaming query is started and the streaming input data is being continuously processed.
* Now that we have our transformation, we need to specify an output sink for the results. for this example, we are going to write to a memory sink that keeps the results in memory.
* We also need to define how spark will output that data. In this example, we will use the complete output mode (rewriting all of the keys along with their counts after every trigger).
* In this example, we will not include activity query.awaitTermination() because it is required only to prevent the driver process from terminating when the stream is active. So to be able to run this locally in a notebook we will not include it.

#### Step 3: Define output sink and output mode

In [26]:
writer = (Hourly_Mean_Value.writeStream.queryName("Temp_Humi_Mean").format("memory").outputMode("complete").start())
# Memory sink (for debugging) - The output is stored in memory as an in-memory table.
# Both, Append and Complete output modes, are supported.
# This should be used for debugging purposes on low data volumes.
# as the entire output is collected and stored in the driver’s memory. Hence, use it with caution.
# Have all the aggregates in an in-memory table. The query name will be the table name, here: "Temp_Humi_Mean".
# Note that we have to call start() to start the execution of the query.
# This returns a StreamingQuery object which is a handle to the continuously running execution.

#### Step 4: Specifying processing details:
 * Triggering details: This indicates when to trigger the discovery and processing of newly available streaming data.
 * Checkpoint location: This option is necessary for failure recovery in the real application.

 For sake of simplicity, these parameters are not set by us here. By default trigger --> The streaming query executes data in micro-batches where the next micro-batch is triggered as soon as the previous micro-batch has been completed.


#### Step 5: Start the query :
* Once everything has been specified, the final step is to start the query, already done in the above cell by .start(). We have created a table with --> .queryName ("Temp_Humi_Mean") which is updated according to trigger.
* This is a StreamingQuery object which is a handle to the continuously running execution. We can use this object to manage the query.

In [27]:
for x in range(45):
  print('Query Result at time step : ',x)
  df_q = spark.sql("SELECT * FROM Temp_Humi_Mean")
  df_q.show(24)
  time.sleep(1)

Query Result at time step :  0
+----+-------------------+-----------------+
|Hour|avg(Temperatur (C))|avg(Humidity (%))|
+----+-------------------+-----------------+
+----+-------------------+-----------------+

Query Result at time step :  1
+----+-------------------+-----------------+
|Hour|avg(Temperatur (C))|avg(Humidity (%))|
+----+-------------------+-----------------+
+----+-------------------+-----------------+

Query Result at time step :  2
+----+-------------------+-----------------+
|Hour|avg(Temperatur (C))|avg(Humidity (%))|
+----+-------------------+-----------------+
+----+-------------------+-----------------+

Query Result at time step :  3
+----+-------------------+-----------------+
|Hour|avg(Temperatur (C))|avg(Humidity (%))|
+----+-------------------+-----------------+
+----+-------------------+-----------------+

Query Result at time step :  4
+----+-------------------+-----------------+
|Hour|avg(Temperatur (C))|avg(Humidity (%))|
+----+-------------------+-----

#### Check if the stream is active

In [28]:
spark.streams.active  # get the list of currently active streaming queries.

[<pyspark.sql.streaming.query.StreamingQuery at 0x7ccccdca26d0>]

In [29]:
spark.streams.active[0].isActive

True

In [30]:
writer.status
# It gives information about what the query is immediately doing - is a trigger active, is data being processed, etc.
# Will print something like the following.

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

If we want to turn off the stream we'll run writer.stop() to reset the query for testing purposes.

In [31]:
writer.stop()  # stop the query.

### Please answer the questions below to complete the experiment:




In [32]:
# @title In the transformation step, how does the groupby function operate? { run: "auto", form-width: "500px", display-mode: "form" }
Answer = "It performs groupby on a table formed after appending the new file to the previous one at each time step or trigger" #@param ["", "It performs groupby on each file ingested at each time step", "It performs groupby at the end of the ingestion of all the files", "It performs groupby on a table formed after appending the new file to the previous one at each time step or trigger"]

In [33]:
#@title How was the experiment? { run: "auto", form-width: "500px", display-mode: "form" }
Complexity = "Good and Challenging for me" #@param ["","Too Simple, I am wasting time", "Good, But Not Challenging for me", "Good and Challenging for me", "Was Tough, but I did it", "Too Difficult for me"]


In [34]:
#@title If it was too easy, what more would you have liked to be added? If it was very difficult, what would you have liked to have been removed? { run: "auto", display-mode: "form" }
Additional = "na" #@param {type:"string"}


In [35]:
#@title Can you identify the concepts from the lecture which this experiment covered? { run: "auto", vertical-output: true, display-mode: "form" }
Concepts = "Yes" #@param ["","Yes", "No"]


In [36]:
#@title  Text and image description/explanation and code comments within the experiment: { run: "auto", vertical-output: true, display-mode: "form" }
Comments = "Very Useful" #@param ["","Very Useful", "Somewhat Useful", "Not Useful", "Didn't use"]


In [37]:
#@title Mentor Support: { run: "auto", vertical-output: true, display-mode: "form" }
Mentor_support = "Didn't use" #@param ["","Very Useful", "Somewhat Useful", "Not Useful", "Didn't use"]


In [38]:
#@title Run this cell to submit your notebook for grading { vertical-output: true }
try:
  if submission_id:
      return_id = submit_notebook()
      if return_id : submission_id = return_id
  else:
      print("Please complete the setup first.")
except NameError:
  print ("Please complete the setup first.")

Your submission is successful.
Ref Id: 8159
Date of submission:  09 Feb 2025
Time of submission:  21:01:17
View your submissions: https://cds-iisc.talentsprint.com/notebook_submissions
