In [3]:
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=f2bee5723cb4cbfb4511b5f91de02cef193429f072b43732ca3f2c120f52e0a0
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [4]:
#Create a session 

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MyApp").getOrCreate()

In [5]:
# Import the data
from google.colab import files

uploaded_files = files.upload()

Saving exampleData (1).csv to exampleData (1).csv


In [10]:
# Read the data 
df = spark.read.csv("exampleData (1).csv", header=True, inferSchema=True)
df.show(vertical=True)

-RECORD 0--------------------------------
 _c0                 | 14                
 dateTime            | 01-jan-1990 00:00 
 indicator_rain      | 0                 
 precipitation       | 0.3               
 indicator_temp      | 0                 
 air_temperature     | 9.1               
 indicator_wetb      | 0                 
 wetb                | 9.0               
 dewpt               | 8.9               
 vappr               | 11.4              
 relative_humidity   | 99                
 msl                 | 1006.7            
 indicator_wdsp      | 2                 
 wind_speed          | 7                 
 indicator_wddir     | 2                 
 wind_from_direction | 190               
-RECORD 1--------------------------------
 _c0                 | 15                
 dateTime            | 01-jan-1990 01:00 
 indicator_rain      | 0                 
 precipitation       | 0.2               
 indicator_temp      | 0                 
 air_temperature     | 8.2        

In [7]:
# Creating a schema
df.createOrReplaceTempView("weatherConditions")
staticSchema = df.schema

In [8]:
streamingDF = spark.readStream \
    .schema(staticSchema) \
    .option("maxFilesPerTrigger", 1) \
    .option("header", "true") \
    .csv("exampleData (1).csv")
print(streamingDF)

DataFrame[_c0: int, dateTime: string, indicator_rain: int, precipitation: string, indicator_temp: int, air_temperature: string, indicator_wetb: int, wetb: string, dewpt: string, vappr: string, relative_humidity: string, msl: string, indicator_wdsp: int, wind_speed: string, indicator_wddir: int, wind_from_direction: int]


In [13]:

from pyspark.sql import functions as F
filteredDF = streamingDF.filter((F.col('air_temperature')<5))
print(filteredDF)


DataFrame[_c0: int, dateTime: string, indicator_rain: int, precipitation: string, indicator_temp: int, air_temperature: string, indicator_wetb: int, wetb: string, dewpt: string, vappr: string, relative_humidity: string, msl: string, indicator_wdsp: int, wind_speed: string, indicator_wddir: int, wind_from_direction: int]


In [14]:

import time
df = filteredDF.writeStream.format("memory") \
    .queryName("weatherQuery") \
    .outputMode("append") \
    .start()

# Wait for the specified duration
time.sleep(600)  # Sleep for 10 minutes

# Stop the query after the duration has elapsed
df.stop()


In [15]:
df = filteredDF.writeStream.format("csv") \
    .option("header", "true") \
    .queryName("weatherQuery") \
    .outputMode("append") \
    .option("checkpointLocation", "file:///Users/erraf/OneDrive/Desktop/ayman/checkpoint") \
    .option("path", "file:///Users/erraf/OneDrive/Desktop/ayman/output.csv") \
    .trigger(processingTime="10 seconds") \
    .start()

# Wait for the specified duration
time.sleep(600)  # Sleep for 10 minutes

# Stop the query after the duration has elapsed
df.stop()



In [None]:
df = filteredDF.writeStream.format("csv") \
    .option("header", "true") \
    .queryName("weatherQuery") \
    .outputMode("append") \
    .option("checkpointLocation", "file:///C:/Users/erraf/OneDrive/Desktop/ayman/checkpoint") \
    .option("path", "file:///C:/Users/erraf/OneDrive/Desktop/ayman/output.csv") \
    .trigger(processingTime="10 seconds") \
    .start()

# Wait for the specified duration
time.sleep(600)  # Sleep for 10 minutes

# Stop the query after the duration has elapsed
df.stop()






In [None]:
df = filteredDF.writeStream.format("parquet") \
    .queryName("weatherQuery") \
    .outputMode("append") \
    .trigger(once=True) \
    .option("path", "file:///C:/Users/erraf/OneDrive/Desktop/Project Big data") \
    .option("checkpointLocation", "file:///C:/Users/erraf/OneDrive/Desktop/checkpoint") \
    .start()







In [None]:
'''
Q:Try to view the results using print() or show(). See what you get
A: Already answered in the cell 
Q: How can we know if the stream has loaded the data?
A: Already answered in the cell
Q: Try to read a response from API or json file or try simple scraping
A: Already answered 

Q. Which sink is used here?
A. The sink used here is "memory". The streaming DataFrame filteredDF is being written to an in-memory table or collection.

Q. Which output mode is used here?
A. The output mode used here is "append". This means that only the new rows appended to the streaming DataFrame since the last trigger will be written to the sink.

Q. Which trigger is used here?
A. The code  doesn't specify a trigger explicitly, so it uses the default trigger. The default trigger is "ProcessingTime(0 seconds)", which triggers the query as soon as possible (as fast as the system can process the data).
Q:Try to modify these to see the difference.
As you can see above  we Change the sink to a different format like , "parquet", or "csv" to see the data written to different sinks.
If we change the format to CSV or Parquet, the output of the streaming query will be written to CSV files or Parquet files, respectively, instead of the in-memory table.

Specify a different trigger, such as "processingTime='10 seconds'" or "once()" to see how the query execution is triggered based on the specified interval or condition.


2. Is there any difference between methods and operations provided for structured
streaming using sparksession and spark streamingcontext

SparkSession is the entry point for structured streaming and provides a unified interface for working with both batch and streaming data. It allows you to create streaming DataFrames/Datasets and perform transformations using the DataFrame API.
StreamingContext is part of the older streaming API and is used specifically for processing continuous data streams. It works with DStreams and provides methods for transformations and operations on the streaming data.
In general, SparkSession is the preferred choice for structured streaming due to its more modern and efficient approach.
Change the sink to a different format like "console", "parquet", or "csv" to see the data written to different sinks.
If we change the format to CSV or Parquet, the output of the streaming query will be written to CSV files or Parquet files, respectively, instead of the in-memory table.

Specify a different trigger, such as "processingTime='10 seconds'" or "once()" to see how the query execution is triggered based on the specified interval or condition.
1. What have you learned today?
. The streaming DataFrame filteredDF is being written to an in-memory table or collection.

The output mode used here is "append". This means that only the new rows appended to the streaming DataFrame since the last trigger will be written to the sink.

By specifying a trigger, we can control the timing and frequency of data processing in our streaming application. The choice of trigger depends on the specific use case and requirements, such as the desired latency, processing time intervals, or the nature of the data source.

To see the difference, we can modify these aspects. For example, we can change the sink to a different format like "parquet" or "csv" to write the data to different types of sinks. we can also specify a different trigger, such as "processingTime='10 seconds'" or "once()", to control how the query execution is triggered based on a specific interval or condition.

By experimenting with these modifications, we can observe how the data is written to different sinks and how the query execution is triggered based on the specified interval or condition.



'''
