In [0]:
spark

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Structured Streaming").getOrCreate()

### 1. Create a DataSet.

In [0]:
green_taxiDF = spark.read.csv("/Volumes/azuredatabme/default/greentaxi/2021_Green_Taxi_Trip_Data.csv",inferSchema=True,header=True)

In [0]:
green_taxiDF1 = green_taxiDF.repartition(10).write.mode("append").option("header","true").parquet("/Volumes/azuredatabme/default/greentaxi")

In [0]:
print(green_taxiDF.schema)

StructType([StructField('VendorID', IntegerType(), True), StructField('lpep_pickup_datetime', StringType(), True), StructField('lpep_dropoff_datetime', StringType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('RatecodeID', IntegerType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('passenger_count', IntegerType(), True), StructField('trip_distance', DoubleType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('ehail_fee', StringType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('payment_type', IntegerType(), True), StructField('trip_type', IntegerType(), True), StructField('congestion_surcharge', Doubl

In [0]:
from pyspark.sql.functions import *
# create readStream
taxiReadStream = spark.readStream.format("parquet").schema(green_taxiDF.schema).option("header","true").load("/Volumes/azuredatabme/default/readstream")



In [0]:
from pyspark.sql.types import *
green_taxiDF = green_taxiDF.withColumn("lpep_pickup_datetime",to_timestamp(col("lpep_pickup_datetime"),"yyyy-MM-dd HH:mm:ss"))


##### Structured streaming transformation

In [0]:

# readStream transformation
taxitransformation = taxiReadStream.withWatermark("lpep_pickup_datetime", "10 minutes").groupBy("trip_type","lpep_pickup_datetime").sum("fare_amount")

In [0]:
taxiWriteStream = taxitransformation.writeStream.outputMode("append")\
.format("parquet")\
.option("path","/Volumes/azuredatabme/default/writestream/")\
.option("checkpointLocation","/Volumes/azuredatabme/default/checkpoint/")\
.start()

taxiWriteStream.awaitTermination()

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-1691572086355891>, line 5[0m
[1;32m      1[0m taxiWriteStream [38;5;241m=[39m taxitransformation[38;5;241m.[39mwriteStream[38;5;241m.[39moutputMode([38;5;124m"[39m[38;5;124mappend[39m[38;5;124m"[39m)\
[1;32m      2[0m [38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mparquet[39m[38;5;124m"[39m)\
[1;32m      3[0m [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mpath[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124m/Volumes/azuredatabme/default/writestream/[39m[38;5;124m"[39m)\
[1;32m      4[0m [38;5;241m.[39moption([38;5;124m"[39m[38;5;124mcheckpointlocation[39m[38;5;124m"[39m,[38;5;124m"[39m[38;5;124m/Volumes/azuredatabme/default/checkpoint/[39m[38;5;124m"[39m)\
[0;32m----> 5[0m [38;5;241m.[39mstart()
[1;32m      7[0m taxiWriteStream[38;5;24

In [0]:
from pyspark.sql.functions import *




# Step 1 : Create a ReadStream
taxiReadStream = spark.readStream.format("parquet").schema(green_taxiDF.schema).option("header","true").load("/Volumes/azuredatabme/default/readstream/")

# step 2 : Convert String Stype to timestamp
taxiReadStream = taxiReadStream.withColumn("lpep_pickup_datetime",to_timestamp(col("lpep_pickup_datetime"),"yyyy-MM-dd HH:mm:ss"))


# step 3 : Add a watermark and perform the aggregation (window of aggregate operation) 
# Use windowing based on event time column
taxitransformation = taxiReadStream\
    .withWatermark("lpep_pickup_datetime", "10 minutes")\
        .groupBy(window(col("lpep_pickup_datetime"), "10 minutes"))\
        .sum("fare_amount")


# step 4 : Write the aggregated data to parquet file
taxiWriteStream = taxitransformation.writeStream.outputMode("append")\
.format("parquet")\
.option("path","/Volumes/azuredatabme/default/writestream/")\
.option("checkpointLocation","/Volumes/azuredatabme/default/checkpoint/")\
.start()

taxiWriteStream.awaitTermination()

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:103)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:103)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:718)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:437)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:437)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecutio

In [0]:
from pyspark.sql.functions import *

# Step 1: Create a ReadStream
taxiReadStream = spark.readStream.format("parquet").schema(green_taxiDF.schema).option("header", "true").load("/Volumes/azuredatabme/default/readstream/")

# Step 2: Convert String Stype to timestamp
taxiReadStream = taxiReadStream.withColumn("lpep_pickup_datetime", to_timestamp(col("lpep_pickup_datetime"), "yyyy-MM-dd HH:mm:ss"))

# Step 3: Add a watermark and perform the aggregation (window of aggregate operation)
# Use windowing based on event time column
# taxitransformation = taxitransformation\
#     .withWatermark("lpep_pickup_datetime", "10 minutes")\
#     .groupBy(window(col("lpep_pickup_datetime"), "10 minutes"))\
#     .sum("fare_amount")

taxiReadStream = taxiReadStream.withWatermark("lpep_pickup_datetime", "10 minutes")
# create 
taxiReadStream.createOrReplaceTempView("taxitable")


query = spark.sql("""
                  select lpep_pickup_datetime, trip_type, count(*) from taxitable where fare_amount > 30 group by lpep_pickup_datetime , trip_type 
                  """)

# Step 4: Write the aggregated data to parquet file
taxiWriteStream = query.writeStream.outputMode("append")\
    .format("parquet")\
    .option("path", "/Volumes/azuredatabme/default/writestream1/")\
    .option("checkpointLocation", "/Volumes/azuredatabme/default/checkpoint1/")\
    .start()

taxiWriteStream.awaitTermination()



com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:103)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:103)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:718)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:437)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:437)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecutio

#### Template for streaming
* Step 1 --> Create a ReadStream
* Step 2 --> Covert the string to a timestamp
* Step 3 --> Add a watermark and perform the aggregation (window of aggregate functions)


In [0]:
writeddata = spark.read.parquet("/Volumes/azuredatabme/default/writestream1/part-00000-a55ec5ab-e5e0-4b7b-8c18-ffb7a6d4d540.c000.snappy.parquet")
writeddata.show()

+--------------------+---------+--------+
|lpep_pickup_datetime|trip_type|count(1)|
+--------------------+---------+--------+
+--------------------+---------+--------+



In [0]:
taxiWriteStream.stop()

In [0]:
dbutils.fs.rm("/Volumes/azuredatabme/default/checkpoint/", True)

True