In [None]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
#packages
scala_version = '2.12'
spark_version = '3.0.1'
packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    'org.apache.kafka:kafka-clients:2.8.0',
]


#kafka config
kafka_topic_name_in = "Iowa_Liquor_Sales_COL"
kafka_bootstrap_servers = "localhost:29092"


spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Iowa_Liquor_Sales_streamingData") \
    .config("spark.jars.packages",",".join(packages)) \
    .config("spark.jars","postgresql-42.5.4.jar")\
    .getOrCreate()

# Construct a streaming DataFrame that reads from Iowa_Liquor_Sales_COL
Spark_load = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name_in) \
        .option("startingOffsets", "earliest") \
        .load()

schema = "date TIMESTAMP,invoice_and_item_number STRING,store_name STRING, address STRING,city STRING, county STRING, category_name STRING, " \
                           + "vendor_name STRING, item_number INT, item_description STRING, pack INT, " \
                           + "bottle_volume_ml FLOAT," \
                           + "state_bottle_cost FLOAT, state_bottle_retail INT, bottles_sold FLOAT, " \
                           + "sale_dollars FLOAT," \
                           + "volume_sold_liters FLOAT"
options = {'sep': ','}

trans_Df = Spark_load \
            .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
            .select(from_csv(col("value"), schema, options).alias("data")) \
            .select("data.*")
sale_dollars_SUM = trans_Df.select("sale_dollars")
def _write_streaming(df,epoch_id) -> None:         
    df.write \
        .mode('append') \
        .format("jdbc") \
        .option("url", f"jdbc:postgresql://localhost:5432/StreamingData") \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", 'streaming.Iowa_COL_Streaming') \
        .option("user", 'postgres') \
        .option("password", 'admin') \
        .save() 
print("----")
write1=trans_Df.writeStream \
    .foreachBatch(_write_streaming) \
    .start()
print("----")
def sum_dollar(df,epoch_id) -> None:
    df.write \
        .mode('append') \
        .format("jdbc") \
        .option("url", f"jdbc:postgresql://localhost:5432/StreamingData") \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", 'streaming.total_spent') \
        .option("user", 'postgres') \
        .option("password", 'admin') \
        .save() 
write2 =sale_dollars_SUM.writeStream \
    .foreachBatch(sum_dollar) \
    .start() 
write1.awaitTermination()
write2.awaitTermination()