# Streaming Assginment

In [None]:
## Let's first use an API to get the data
!pip install pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import date, timedelta, datetime
import time

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


These two functions allow us to create a CSV file (Initialize the file), and then we can update every seconds. Let's start with a streaming: let's populate the CSV file with 1 second of data

In [None]:
## Let's start by setting up the imports and the create the Spark Session
import warnings
warnings.filterwarnings('ignore')

from pyspark.sql import SparkSession

app_name = "Pyspark-sql"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .config("spark.ui.port","42229")\
        .getOrCreate()
sc = spark.sparkContext

In [None]:
**Data Description**

**Daily_stock_report**
- date: daily stock date (datType:TimestampType)
- open: daily open stock (datType:FloatType)
- high: daily high stock (datType:FloatType)
- low: daily low stock (datType:FloatType)
- closer: daily closer stock (datType:FloatType)
- Adj closer: daily adj stock (datType:FloatType)
- volume: daily volumn stock (datType:IntegerType)

In [136]:
df_streaming = spark.read.csv("MSFT.csv", header="true", inferSchema="true")

In [None]:
raw_events = spark \
    .readStream \
    .format('csv') \
    .schema(raw_schema) \
    .csv('MSFT.csv')

In [137]:
from pyspark.sql.types import *
df_streaming.registerTempTable("df_streaming")
spark.sql("SELECT High,Low,Volume,AVG(High) OVER() AS avg_high FROM df_streaming").show();

+---------+---------+--------+------------------+
|     High|      Low|  Volume|          avg_high|
+---------+---------+--------+------------------+
|64.389999|64.050003|19292700|162.64645748371714|
|64.730003|64.190002|20273100|162.64645748371714|
|64.800003|64.139999|21796800|162.64645748371714|
|64.540001|64.050003|15871500|162.64645748371714|
|64.199997|63.759998|23239800|162.64645748371714|
|64.989998|64.019997|26937500|162.64645748371714|
|    64.75|63.880001|24539600|162.64645748371714|
|64.279999|63.619999|18135900|162.64645748371714|
|64.559998|63.810001|18750300|162.64645748371714|
|64.779999|64.190002|18521000|162.64645748371714|
|65.080002|    64.25|21510900|162.64645748371714|
|65.199997|64.480003|19846800|162.64645748371714|
|65.260002|    64.75|19538200|162.64645748371714|
|65.190002|    64.57|20100000|162.64645748371714|
|64.550003|64.150002|14280200|162.64645748371714|
|64.919998|    64.25|24833800|162.64645748371714|
|64.760002|64.300003|20674300|162.64645748371714|


In [None]:
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql import functions as F
df_streaming = df_streaming.withColumn("Weekly_Avg", F.avg('High').over(Window.partitionBy("Date").orderBy("High").rangeBetween(-7, Window.currentRow)))
df_streaming.show(5)

+-------------------+---------+---------+---------+---------+---------+--------+---------+----------+
|               Date|     Open|     High|      Low|    Close|Adj Close|  Volume| Avg_high|Weekly_Avg|
+-------------------+---------+---------+---------+---------+---------+--------+---------+----------+
|2017-02-22 00:00:00|64.330002|64.389999|64.050003|64.360001|60.079075|19292700|64.389999| 64.389999|
|2017-02-23 00:00:00|64.419998|64.730003|64.190002|64.620003|60.321793|20273100|64.730003| 64.730003|
|2017-02-24 00:00:00|64.529999|64.800003|64.139999|64.620003|60.321793|21796800|64.800003| 64.800003|
|2017-02-27 00:00:00|64.540001|64.540001|64.050003|64.230003|59.957733|15871500|64.540001| 64.540001|
|2017-02-28 00:00:00|64.080002|64.199997|63.759998|    63.98| 59.72435|23239800|64.199997| 64.199997|
+-------------------+---------+---------+---------+---------+---------+--------+---------+----------+
only showing top 5 rows



In [None]:
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql import functions as F
df_streaming = df_streaming.withColumn("Monthly_Avg", F.avg('High').over(Window.partitionBy("Date").orderBy("High").rangeBetween(-30, Window.currentRow)))
df_streaming.show(5)

+-------------------+---------+---------+---------+---------+---------+--------+---------+----------+-----------+
|               Date|     Open|     High|      Low|    Close|Adj Close|  Volume| Avg_high|Weekly_Avg|Monthly_Avg|
+-------------------+---------+---------+---------+---------+---------+--------+---------+----------+-----------+
|2017-02-22 00:00:00|64.330002|64.389999|64.050003|64.360001|60.079075|19292700|64.389999| 64.389999|  64.389999|
|2017-02-23 00:00:00|64.419998|64.730003|64.190002|64.620003|60.321793|20273100|64.730003| 64.730003|  64.730003|
|2017-02-24 00:00:00|64.529999|64.800003|64.139999|64.620003|60.321793|21796800|64.800003| 64.800003|  64.800003|
|2017-02-27 00:00:00|64.540001|64.540001|64.050003|64.230003|59.957733|15871500|64.540001| 64.540001|  64.540001|
|2017-02-28 00:00:00|64.080002|64.199997|63.759998|    63.98| 59.72435|23239800|64.199997| 64.199997|  64.199997|
+-------------------+---------+---------+---------+---------+---------+--------+--------

In [None]:
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql import functions as F
df_streaming = df_streaming.withColumn("Yearly_Avg", F.avg('High').over(Window.partitionBy("Date").orderBy("High").rangeBetween(-365, Window.currentRow)))
df_streaming.show(5)

+-------------------+---------+---------+---------+---------+---------+--------+---------+----------+-----------+----------+
|               Date|     Open|     High|      Low|    Close|Adj Close|  Volume| Avg_high|Weekly_Avg|Monthly_Avg|Yearly_Avg|
+-------------------+---------+---------+---------+---------+---------+--------+---------+----------+-----------+----------+
|2017-02-22 00:00:00|64.330002|64.389999|64.050003|64.360001|60.079075|19292700|64.389999| 64.389999|  64.389999| 64.389999|
|2017-02-23 00:00:00|64.419998|64.730003|64.190002|64.620003|60.321793|20273100|64.730003| 64.730003|  64.730003| 64.730003|
|2017-02-24 00:00:00|64.529999|64.800003|64.139999|64.620003|60.321793|21796800|64.800003| 64.800003|  64.800003| 64.800003|
|2017-02-27 00:00:00|64.540001|64.540001|64.050003|64.230003|59.957733|15871500|64.540001| 64.540001|  64.540001| 64.540001|
|2017-02-28 00:00:00|64.080002|64.199997|63.759998|    63.98| 59.72435|23239800|64.199997| 64.199997|  64.199997| 64.199997|


In [None]:
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from pyspark.sql import functions as F
df_streaming = df_streaming.withColumn("Monthly_Avg", F.avg('High').over(Window.partitionBy("Date").orderBy("High").rangeBetween(-3, Window.currentRow)))
df_streaming.show(5)

+-------------------+---------+---------+---------+---------+---------+--------+---------+----------+-----------+----------+
|               Date|     Open|     High|      Low|    Close|Adj Close|  Volume| Avg_high|Weekly_Avg|Monthly_Avg|Yearly_Avg|
+-------------------+---------+---------+---------+---------+---------+--------+---------+----------+-----------+----------+
|2017-02-22 00:00:00|64.330002|64.389999|64.050003|64.360001|60.079075|19292700|64.389999| 64.389999|  64.389999| 64.389999|
|2017-02-23 00:00:00|64.419998|64.730003|64.190002|64.620003|60.321793|20273100|64.730003| 64.730003|  64.730003| 64.730003|
|2017-02-24 00:00:00|64.529999|64.800003|64.139999|64.620003|60.321793|21796800|64.800003| 64.800003|  64.800003| 64.800003|
|2017-02-27 00:00:00|64.540001|64.540001|64.050003|64.230003|59.957733|15871500|64.540001| 64.540001|  64.540001| 64.540001|
|2017-02-28 00:00:00|64.080002|64.199997|63.759998|    63.98| 59.72435|23239800|64.199997| 64.199997|  64.199997| 64.199997|


In [None]:
##### Validating the missing data in dataset
from pyspark.sql import functions as F
cols_check = ["Date","Open","High","Low","Close","Adj Close"]
df_streaming.select(*[
    (
        F.count(F.when((F.isnan(c) | F.col(c).isNull()), c)) if t not in ("timestamp", "date")
        else F.count(F.when(F.col(c).isNull(), c))
    ).alias(c)
    for c, t in df_streaming.dtypes if c in cols_check
]).show()

+----+----+----+---+-----+---------+
|Date|Open|High|Low|Close|Adj Close|
+----+----+----+---+-----+---------+
|   0|   0|   0|  0|    0|        0|
+----+----+----+---+-----+---------+



In [None]:
df_streaming.registerTempTable("df_streaming")
spark.sql("select avg(Volume) from df_streaming").show()

+-------------------+
|        avg(Volume)|
+-------------------+
|2.894781755361398E7|
+-------------------+



In [None]:
df_streaming.registerTempTable("df_streaming")
spark.sql("select avg(High) from df_streaming").show()

+------------------+
|         avg(High)|
+------------------+
|162.64645748371714|
+------------------+



In [None]:
df_streaming.registerTempTable("df_streaming")
spark.sql("select avg(Low) from df_streaming").show()

+-----------------+
|         avg(Low)|
+-----------------+
|159.4894678467037|
+-----------------+



In [None]:
df_streaming.registerTempTable("df_streaming")
spark.sql("SELECT Date,Close,(AVG(Close)OVER(ORDER BY Date ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), 2)AS rolling_average FROM df_streaming").show()

+-------------------+---------+--------------------+
|               Date|    Close|     rolling_average|
+-------------------+---------+--------------------+
|2017-02-22 00:00:00|64.360001|      {64.360001, 2}|
|2017-02-23 00:00:00|64.620003|      {64.490002, 2}|
|2017-02-24 00:00:00|64.620003|{64.5333356666666...|
|2017-02-27 00:00:00|64.230003|     {64.4575025, 2}|
|2017-02-28 00:00:00|    63.98|    {64.36250225, 2}|
|2017-03-01 00:00:00|64.940002|      {64.442502, 2}|
|2017-03-02 00:00:00|64.010002|    {64.29000175, 2}|
|2017-03-03 00:00:00|    64.25|      {64.295001, 2}|
|2017-03-06 00:00:00|64.269997|    {64.36750025, 2}|
|2017-03-07 00:00:00|64.400002|{64.2325002499999...|
|2017-03-08 00:00:00|64.989998|    {64.47749925, 2}|
|2017-03-09 00:00:00|64.730003|{64.5975000000000...|
|2017-03-10 00:00:00|    64.93|    {64.76250075, 2}|
|2017-03-13 00:00:00|64.709999|          {64.84, 2}|
|2017-03-14 00:00:00|64.410004|     {64.6950015, 2}|
|2017-03-15 00:00:00|    64.75|    {64.7000007

In [134]:
###Output is stored in csv format
df_streaming.write.csv("output_")

In [None]:
Aritecture of the pipeline:

Approach One:Using the Spark technolgy in which receiving the dataset continously by kafa(stream.read)method is Used.then calculating
basic query as per question2 ,average for low,high,volume columns and caculating the rolling_average to ecevute the spark sql query and store the output in CSV.

Approach Two:Using the Dag capbility in which file input path is FILE_PATH_INPUT to store the CSV file in GCP and output path is FILE_PATH_OUTPUT to store the output then 
send the report to mapped the send_report mathod inside the Dag and calculate the flag_anomaly method to send the report
after calculating avgerage rolling stock tp slack API which is mapped in dag code 

https://hooks.slack.com/services/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
FILE_PATH_INPUT = '/home/airflow/gcs/data/input/'
FILE_PATH_OUTPUT = '/home/airflow/gcs/data/output/' 
