In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Normalizer, StandardScaler
import random

import time
import os

# setup arguments
spark_v = '3.2.1'
scala_v= '2.12'
os.environ['PYSPARK_SUBMIT_ARGS'] = f'--packages org.apache.spark:spark-sql-kafka-0-10_{scala_v}:{spark_v} pyspark-shell'
kafka_topic_name = "iris"
kafka_bootstrap_servers = 'localhost:9092'

spark = SparkSession \
        .builder \
        .appName("Structured Streaming ") \
        .master("local[*]") \
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Construct a streaming DataFrame that reads from topic
flower_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "latest") \
        .load()
# get data and set all to sring
flower_df1 = flower_df.selectExpr("CAST(value AS STRING)", "timestamp")

# schema for data (like schema on filter db)
flower_schema_string = "order_id INT,sepal_length DOUBLE,sepal_length DOUBLE,sepal_length DOUBLE,sepal_length DOUBLE,species STRING"



flower_df2 = flower_df1 \
        .select(from_csv(col("value"), flower_schema_string) \
                .alias("flower"), "timestamp")


flower_df3 = flower_df2.select("flower.*", "timestamp")

    
flower_df3.createOrReplaceTempView("flower_find");
song_find_text = spark.sql("SELECT * FROM flower_find")
flower_agg_write_stream = song_find_text \
        .writeStream \
        .trigger(processingTime='5 seconds') \
        .outputMode("append") \
        .option("truncate", "false") \
        .format("memory") \
        .queryName("testedTable") \
        .start()

flower_agg_write_stream.awaitTermination(1)

IllegalArgumentException: Cannot start query with name testedTable as a query with that name is already active in this SparkSession

In [7]:
df = spark.sql("SELECT * FROM testedTable")
df.show(3)

+--------+------------+------------+------------+------------+-------+--------------------+
|order_id|sepal_length|sepal_length|sepal_length|sepal_length|species|           timestamp|
+--------+------------+------------+------------+------------+-------+--------------------+
|       1|         4.9|         4.9|         4.9|         4.9| setosa|2022-09-15 18:35:...|
|       2|         4.7|         4.7|         4.7|         4.7| setosa|2022-09-15 18:35:...|
|       3|         4.6|         4.6|         4.6|         4.6| setosa|2022-09-15 18:35:...|
+--------+------------+------------+------------+------------+-------+--------------------+
only showing top 3 rows



In [10]:
df.count()

91

In [13]:
df.count()

                                                                                

146

In [7]:
df.show(12)

+--------+------------+------------+------------+------------+-------+--------------------+
|order_id|sepal_length|sepal_length|sepal_length|sepal_length|species|           timestamp|
+--------+------------+------------+------------+------------+-------+--------------------+
|       0|         5.1|         5.1|         5.1|         5.1| setosa|2022-05-28 17:28:...|
|       1|         4.9|         4.9|         4.9|         4.9| setosa|2022-05-28 17:28:...|
|       2|         4.7|         4.7|         4.7|         4.7| setosa|2022-05-28 17:28:...|
|       3|         4.6|         4.6|         4.6|         4.6| setosa|2022-05-28 17:28:...|
|       4|         5.0|         5.0|         5.0|         5.0| setosa|2022-05-28 17:28:...|
|       5|         5.4|         5.4|         5.4|         5.4| setosa|2022-05-28 17:28:...|
|       6|         4.6|         4.6|         4.6|         4.6| setosa|2022-05-28 17:28:...|
|       7|         5.0|         5.0|         5.0|         5.0| setosa|2022-05-28


[Stage 26:>                                                         (0 + 1) / 1]

                                                                                

In [None]:
!/opt/spark/bin/spark-shell

In [None]:
spark  version 3.2.1
         
Using Scala version 2.13.5 (OpenJDK 64-Bit Server VM, Java 11.0.15)

In [None]:
!pyspark-shell