# Spark Streaming

In [None]:
import json
import os
import sys
import ast
import yaml
import logging

import yaml
import datetime as dt
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# sc = SparkContext()
# sc.setLogLevel('INFO')
logger = logging.getLogger('reddit_streaming')

spark_host = "spark-master" 
kafka_host = "kafka" 
subreddit = "aws"
spark_version = "3.3.2"
hadoop_version = "3.3.4"
delta_version = "2.3.0"
postgres_version = "9.4.1212"
# aws_client = ast.literal_eval(secretmanager_client.get_secret_value(SecretId="AWS_ACCESS_KEY_ID")["SecretString"])["AWS_ACCESS_KEY_ID"]
# aws_secret = ast.literal_eval(secretmanager_client.get_secret_value(SecretId="AWS_SECRET_ACCESS_KEY")["SecretString"])["AWS_SECRET_ACCESS_KEY"]
extra_jar_list = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{spark_version},org.apache.hadoop:hadoop-common:{hadoop_version},org.apache.hadoop:hadoop-aws:{hadoop_version},org.apache.hadoop:hadoop-client:{hadoop_version},io.delta:delta-core_2.12:{delta_version},org.postgresql:postgresql:{postgres_version}"
bucket = "reddit-streaming-stevenhurwitt-2"

import os
import sys

# Set Java environment variables
os.environ['JAVA_HOME'] = '/usr/local/openjdk-8'
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
os.environ['SPARK_LOCAL_IP'] = 'localhost'
os.environ['SPARK_LOCAL_DIRS'] = '/opt/workspace/tmp/spark'
os.environ['SPARK_LOG_DIR'] = '/opt/workspace/events'


## functions

In [4]:

def read_files():
    """
    initializes spark session using config.yaml and creds.json files.
    """

    base = os.getcwd()
    creds_path_container = os.path.join(base, "creds.json")

    creds_dir = "/".join(base.split("/")[:-3])
    creds_path = os.path.join(base, "creds.json")

    try:
        with open(creds_path, "r") as f:
            creds = json.load(f)
            f.close()

    except FileNotFoundError:
        # print("couldn't find: {}.".format(creds_path))
        try:
            with open(creds_path_container, "r") as f:
                creds = json.load(f)
                f.close()

        except FileNotFoundError:
            with open("/opt/workspace//redditStreaming/creds.json", "r") as f:
                creds = json.load(f)
                f.close()

    except:
        print("failed to find creds.json.")
        sys.exit()

    try:
        with open("config.yaml", "r") as f:
            config = yaml.safe_load(f)
            # print("read config file.")
            f.close()

    except:
        print("failed to find config.yaml, exiting now.")
        sys.exit()

    return(creds, config)

def init_spark(subreddit, index):
    """
    initialize spark given config and credential's files

    returns: spark, sparkContext (sc)
    raises: Exception if Spark session creation fails
    """
    creds, config = read_files()
    spark_host = config["spark_host"]
    extra_jar_list = config["extra_jar_list"]

    # Set Java specific configurations
    # os.environ['PYSPARK_PYTHON'] = sys.executable
    # os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
    
    
    # initialize spark session
    try:
        spark = SparkSession.builder.appName(f"reddit_{subreddit}") \
                    .master("local[*]") \
                    .config("spark.driver.host", "localhost") \
                    .config("spark.driver.bindAddress", "0.0.0.0") \
                    .config("spark.scheduler.mode", "FAIR") \
                    .config("spark.scheduler.allocation.file", "file:///opt/workspace/redditStreaming/fairscheduler.xml") \
                    .config("spark.driver.memory", "4g") \
                    .config("spark.executor.memory", "4g") \
                    .config("spark.executor.cores", "2") \
                    .config("spark.streaming.concurrentJobs", "8") \
                    .config("spark.local.dir", "/opt/workspace/tmp/spark") \
                    .config("spark.worker.dir", "/opt/workspace/tmp/executor/{}/".format(subreddit)) \
                    .config("spark.eventLog.enabled", "true") \
                    .config("spark.eventLog.dir", "file:///opt/workspace/events") \
                    .config("spark.sql.debug.maxToStringFields", 1000) \
                    .config("spark.jars.packages", extra_jar_list) \
                    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
                    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
                    .enableHiveSupport() \
                    .getOrCreate()

                    # .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
                    

        sc = spark.sparkContext
        sc.setLogLevel('WARN')
        sc.setLocalProperty("spark.scheduler.pool", "pool{}".format(str(index)))
        
        print("Created Spark session successfully")
        return spark, sc

    except Exception as e:
        print(f"Failed to create Spark session: {str(e)}")
        raise
    
def read_kafka_stream(spark, sc, subreddit):
    """
    reads streaming data from kafka producer

    params: spark, sc
    returns: df
    """
    creds, config = read_files()
    kafka_host = config["kafka_host"]

    # define schema for payload data
    payload_schema = StructType([
        StructField("approved_at_utc", FloatType(), True),
        StructField("subreddit", StringType(), False),
        StructField("selftext", StringType(), False),
        StructField("author_fullname", StringType(), False),
        StructField("saved", BooleanType(), False),
        StructField("mod_reason_title", StringType(), True),
        StructField("gilded", IntegerType(), False),
        StructField("clicked", BooleanType(), False),
        StructField("title", StringType(), False),
        StructField("subreddit_name_prefixed", StringType(), False),
        StructField("hidden", BooleanType(), False),
        StructField("pwls", IntegerType(), False),
        StructField("link_flair_css_class", StringType(), False),
        StructField("downs", IntegerType(), False),
        StructField("thumbnail_height", IntegerType(), True),
        StructField("top_awarded_type", StringType(), True),
        StructField("hide_score", BooleanType(), False),
        StructField("name", StringType(), False),
        StructField("quarantine", BooleanType(), False),
        StructField("link_flair_text_color", StringType(), True),
        StructField("upvote_ratio", FloatType(), False),
        StructField("author_flair_background_color", StringType(), True),
        StructField("ups", IntegerType(), False),
        StructField("total_awards_received", IntegerType(), False),
        StructField("thumbnail_width", IntegerType(), True),
        StructField("author_flair_template_id", StringType(), True),
        StructField("is_original_content", BooleanType(), False),
        StructField("secure_media", StringType(), True),
        StructField("is_reddit_media_domain", BooleanType(), False),
        StructField("is_meta", BooleanType(), False),
        StructField("category", StringType(), True),
        StructField("link_flair_text", StringType(), True),
        StructField("can_mod_post", BooleanType(), False),
        StructField("score", IntegerType(), False),
        StructField("approved_by", StringType(), True),
        StructField("is_created_from_ads_ui", BooleanType(), False),
        StructField("author_premium", BooleanType(), False),
        StructField("thumbnail", StringType(), True),
        StructField("edited", BooleanType(), False),
        StructField("author_flair_css_class", StringType(), True),
        StructField("post_hint", StringType(), False),
        StructField("content_categories", StringType(), True),
        StructField("is_self", BooleanType(), False),
        StructField("subreddit_type", StringType(), False),
        StructField("created", FloatType(), False),
        StructField("link_flair_type", StringType(), True),
        StructField("wls", IntegerType(), False),
        StructField("removed_by_category", StringType(), True),
        StructField("banned_by", StringType(), True),
        StructField("author_flair_type", StringType(), True),
        StructField("domain", StringType(), True),
        StructField("allow_live_comments", BooleanType(), False),
        StructField("selftext_html", StringType(), True),
        StructField("likes", IntegerType(), True),
        StructField("suggested_sort", StringType(), True),
        StructField("banned_at_utc", FloatType(), True),
        StructField("url_overridden_by_dest", StringType(), True),
        StructField("view_count", IntegerType(), True),
        StructField("archived", BooleanType(), False),
        StructField("no_follow", BooleanType(), False),
        StructField("is_crosspostable", BooleanType(), False),
        StructField("pinned", BooleanType(), False),
        StructField("over_18", BooleanType(), False),
        StructField("media_only", BooleanType(), False),
        StructField("link_flair_template_id", StringType(), True),
        StructField("can_gild", BooleanType(), False),
        StructField("spoiler", BooleanType(), False),
        StructField("locked", BooleanType(), False),
        StructField("author_flair_text", StringType(), True),
        StructField("visited", BooleanType(), False),
        StructField("removed_by", StringType(), True),
        StructField("mod_note", StringType(), True),
        StructField("distinguished", StringType(), True),
        StructField("subreddit_id", StringType(), False),
        StructField("author_is_blocked", BooleanType(), False),
        StructField("mod_reason_by", StringType(), True),
        StructField("num_reports", IntegerType(), True),
        StructField("removal_reason", StringType(), True),
        StructField("link_flair_background_color", StringType(), True),
        StructField("id", StringType(), False),
        StructField("is_robot_indexable", BooleanType(), False),
        StructField("report_reasons", StringType(), True),
        StructField("author", StringType(), False),
        StructField("discussion_type", StringType(), True),
        StructField("num_comments", IntegerType(), False),
        StructField("send_replies", BooleanType(), False),
        StructField("whitelist_status", StringType(), False),
        StructField("contest_mode", BooleanType(), False),
        StructField("author_patreon_flair", BooleanType(), False),
        StructField("author_flair_text_color", StringType(), True),
        StructField("permalink", StringType(), False),
        StructField("parent_whitelist_status", StringType(), False),
        StructField("stickied", BooleanType(), False),
        StructField("url", StringType(), False),
        StructField("subreddit_subscribers", IntegerType(), False),
        StructField("created_utc", FloatType(), False),
        StructField("num_crossposts", IntegerType(), False),
        StructField("media", StringType(), True),
        StructField("is_video", BooleanType(), False),
    ])

    # read json from kafka and select all columns
    df = spark \
            .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", "{}:9092".format(kafka_host)) \
                .option("subscribe", "reddit_" + subreddit) \
                .option("startingOffsets", "latest") \
                .option("failOnDataLoss", "false") \
                .load() \
                .selectExpr("CAST(value AS STRING) as json") \
                .select(from_json(col("json"), payload_schema).alias("data")) \
                .select("data.*") 

    return(df)

def write_stream(df, subreddit):
    """
    writes streaming data to s3 data lake

    params: df
    """

    creds, config = read_files()

    bucket = config["bucket"]
    # logger.info("bucket: {}".format(bucket))
    logger.info("subreddit: {}".format(subreddit))
    write_path = f"s3a://{bucket}/{subreddit}"
    logger.info("write path: {}".format(write_path))

    # write subset of df to console
    df.withColumn("created_utc", col("created_utc").cast("timestamp")) \
        .select("subreddit", "title", "score", "created_utc") \
        .writeStream \
        .trigger(processingTime='180 seconds') \
        .option("truncate", "true") \
        .option("checkpointLocation", "file:///opt/workspace/checkpoints/{}_console".format(subreddit)) \
        .outputMode("update") \
        .format("console") \
        .queryName(subreddit + "_console") \
        .start()

    # write to s3 delta
    df.writeStream \
        .trigger(processingTime="180 seconds") \
        .format("delta") \
        .option("path", write_path) \
        .option("checkpointLocation", "file:///opt/workspace/checkpoints/{}".format(subreddit)) \
        .option("header", True) \
        .outputMode("append") \
        .queryName(subreddit + "_delta") \
        .start(f"/opt/workspace/data/{subreddit}")
    

## main

In [5]:
spark = None
streams = []

try:
    creds, config = read_files()
    subreddit_list = config["subreddit"]
    extra_jar_list = config["extra_jar_list"]

    # Initialize Delta Lake (add this before creating SparkSession)
    builder = SparkSession.builder \
        .appName("reddit_streaming") \
        .config("spark.jars.packages", extra_jar_list) \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

    # Validate configuration
    if not subreddit_list or not isinstance(subreddit_list, list):
        raise ValueError("Invalid subreddit configuration")
        
    for i, s in enumerate(subreddit_list):
        try:
            print(f"Initializing Spark for subreddit: {s}")
            spark, sc = init_spark(s, i)
            stage_df = read_kafka_stream(spark, sc, s)
            write_stream(stage_df, s)
            streams.append(stream)
            
        except Exception as e:
            print(f"Error processing subreddit {s}: {str(e)}")
            continue

    if not streams:
        raise Exception("No streams were successfully created")

    print("All streams initialized, awaiting termination...")
    spark.streams.awaitAnyTermination()

    # Only await termination if at least one stream was created
    if 'spark' in locals():
        spark.streams.awaitAnyTermination()
    else:
        print("No streams were successfully created")
        sys.exit(1)
            
except KeyboardInterrupt:
    print("\nShutting down gracefully...")
    for stream in streams:
        stream.stop()
    if 'spark' in locals():
        spark.stop()
    sys.exit(0)

except Exception as e:
    print(f"Fatal error: {str(e)}")
    if 'spark' in locals():
        spark.stop()
    sys.exit(1)

finally:
    for stream in streams:
        try:
            stream.stop()
        except:
            pass
    if spark:
        try:
            spark.stop()
        except:
            pass

Initializing Spark for subreddit: aws
Failed to create Spark session: Java gateway process exited before sending its port number
Error processing subreddit aws: Java gateway process exited before sending its port number
Initializing Spark for subreddit: bikinibottomtwitter


/usr/local/lib/python3.7/dist-packages/pyspark/bin/spark-class: line 71: /usr/lib/jvm/java-11-openjdk-amd64/bin/java: No such file or directory
/usr/local/lib/python3.7/dist-packages/pyspark/bin/spark-class: line 96: CMD: bad array subscript
/usr/local/lib/python3.7/dist-packages/pyspark/bin/spark-class: line 71: /usr/lib/jvm/java-11-openjdk-amd64/bin/java: No such file or directory
/usr/local/lib/python3.7/dist-packages/pyspark/bin/spark-class: line 96: CMD: bad array subscript


Failed to create Spark session: Java gateway process exited before sending its port number
Error processing subreddit bikinibottomtwitter: Java gateway process exited before sending its port number
Initializing Spark for subreddit: BlackPeopleTwitter
Failed to create Spark session: Java gateway process exited before sending its port number
Error processing subreddit BlackPeopleTwitter: Java gateway process exited before sending its port number
Initializing Spark for subreddit: WhitePeopleTwitter


/usr/local/lib/python3.7/dist-packages/pyspark/bin/spark-class: line 71: /usr/lib/jvm/java-11-openjdk-amd64/bin/java: No such file or directory
/usr/local/lib/python3.7/dist-packages/pyspark/bin/spark-class: line 96: CMD: bad array subscript
/usr/local/lib/python3.7/dist-packages/pyspark/bin/spark-class: line 71: /usr/lib/jvm/java-11-openjdk-amd64/bin/java: No such file or directory
/usr/local/lib/python3.7/dist-packages/pyspark/bin/spark-class: line 96: CMD: bad array subscript


Failed to create Spark session: Java gateway process exited before sending its port number
Error processing subreddit WhitePeopleTwitter: Java gateway process exited before sending its port number
Initializing Spark for subreddit: news
Failed to create Spark session: Java gateway process exited before sending its port number
Error processing subreddit news: Java gateway process exited before sending its port number
Initializing Spark for subreddit: ProgrammerHumor


/usr/local/lib/python3.7/dist-packages/pyspark/bin/spark-class: line 71: /usr/lib/jvm/java-11-openjdk-amd64/bin/java: No such file or directory
/usr/local/lib/python3.7/dist-packages/pyspark/bin/spark-class: line 96: CMD: bad array subscript
/usr/local/lib/python3.7/dist-packages/pyspark/bin/spark-class: line 71: /usr/lib/jvm/java-11-openjdk-amd64/bin/java: No such file or directory
/usr/local/lib/python3.7/dist-packages/pyspark/bin/spark-class: line 96: CMD: bad array subscript


Failed to create Spark session: Java gateway process exited before sending its port number
Error processing subreddit ProgrammerHumor: Java gateway process exited before sending its port number
Initializing Spark for subreddit: technology
Failed to create Spark session: Java gateway process exited before sending its port number
Error processing subreddit technology: Java gateway process exited before sending its port number
Initializing Spark for subreddit: worldsnews


/usr/local/lib/python3.7/dist-packages/pyspark/bin/spark-class: line 71: /usr/lib/jvm/java-11-openjdk-amd64/bin/java: No such file or directory
/usr/local/lib/python3.7/dist-packages/pyspark/bin/spark-class: line 96: CMD: bad array subscript
/usr/local/lib/python3.7/dist-packages/pyspark/bin/spark-class: line 71: /usr/lib/jvm/java-11-openjdk-amd64/bin/java: No such file or directory
/usr/local/lib/python3.7/dist-packages/pyspark/bin/spark-class: line 96: CMD: bad array subscript


Failed to create Spark session: Java gateway process exited before sending its port number
Error processing subreddit worldsnews: Java gateway process exited before sending its port number
Fatal error: No streams were successfully created


AttributeError: 'NoneType' object has no attribute 'stop'

In [9]:
creds, config = read_files()
spark_host = config["spark_host"]
extra_jar_list = config["extra_jar_list"]

# Get list of local JARs
jar_dir = "/opt/workspace/jars"
local_jars = ",".join([os.path.join(jar_dir, f) for f in os.listdir(jar_dir) if f.endswith('.jar')])


spark = SparkSession.builder.appName(f"reddit_{subreddit}") \
                    .master("local[*]") \
                    .config("spark.driver.host", "localhost") \
                    .config("spark.driver.bindAddress", "0.0.0.0") \
                    .config("spark.scheduler.mode", "FAIR") \
                    .config("spark.scheduler.allocation.file", "file:///opt/workspace/redditStreaming/fairscheduler.xml") \
                    .config("spark.driver.memory", "4g") \
                    .config("spark.executor.memory", "4g") \
                    .config("spark.executor.cores", "2") \
                    .config("spark.streaming.concurrentJobs", "8") \
                    .config("spark.local.dir", "/opt/workspace/tmp/spark") \
                    .config("spark.worker.dir", "/opt/workspace/tmp/executor/{}/".format(subreddit)) \
                    .config("spark.eventLog.enabled", "true") \
                    .config("spark.eventLog.dir", "file:///opt/workspace/events") \
                    .config("spark.sql.debug.maxToStringFields", 1000) \
                    .config("spark.jars", local_jars) \
                    .config("spark.driver.extraClassPath", local_jars) \
                    .config("spark.executor.extraClassPath", local_jars) \
                    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
                    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
                    .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
                    .enableHiveSupport() \
                    .getOrCreate()

                    # .config("spark.jars.packages", extra_jar_list) \

sc = spark.sparkContext
sc.setLogLevel('WARN')
sc.setLocalProperty("spark.scheduler.pool", "pool{}".format(str(1)))
        

:: loading settings :: url = jar:file:/usr/local/lib/python3.7/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.hadoop#hadoop-common added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
org.apache.hadoop#hadoop-client added as a dependency
io.delta#delta-core_2.12 added as a dependency
org.postgresql#postgresql added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e402f1b5-a4c7-483e-9411-98e978cf86be;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.2 in local-m2-cache
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.2 in local-m2-cache
	found org.apache.kafka#kafka-clients;2.8.1 in local-m2-cache
	found org.lz4#lz4-java;1.8.0 in local-m2-cache
	found org.xerial.snappy#snappy-java;1.1.8.4 in local-m2-cache
	found org.slf4j#slf4j-api;1.7.32 in local-m2-cache
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in local-m2-cache
	fo

RuntimeError: Java gateway process exited before sending its port number

In [30]:
def test_kafka_connection(kafka_host):
    """Test Kafka connectivity"""
    try:
        # Try reading from Kafka to verify connection
        test_df = spark.read \
            .format("kafka") \
            .option("kafka.bootstrap.servers", f"{kafka_host}:9092") \
            .option("subscribe", "test") \
            .option("failOnDataLoss", "false") \
            .load()
        return True
    except Exception as e:
        print(f"Failed to connect to Kafka: {str(e)}")
        return False

# Test connection before proceeding
if not test_kafka_connection(kafka_host):
    raise Exception("Failed to connect to Kafka")

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.7/dist-packages/py4j/clientserver.py", line 540, in send_command
    "Error while sending or receiving", e, proto.ERROR_ON_RECEIVE)
py4j.protocol.Py4JNetworkError: Error while sending or receiving


Failed to connect to Kafka: An error occurred while calling o886.read


Exception: Failed to connect to Kafka

In [31]:
creds, config = read_files()
kafka_host = config["kafka_host"]

# define schema for payload data
payload_schema = StructType([
    StructField("approved_at_utc", FloatType(), True),
    StructField("subreddit", StringType(), False),
    StructField("selftext", StringType(), False),
    StructField("author_fullname", StringType(), False),
    StructField("saved", BooleanType(), False),
    StructField("mod_reason_title", StringType(), True),
    StructField("gilded", IntegerType(), False),
    StructField("clicked", BooleanType(), False),
    StructField("title", StringType(), False),
    StructField("subreddit_name_prefixed", StringType(), False),
    StructField("hidden", BooleanType(), False),
    StructField("pwls", IntegerType(), False),
    StructField("link_flair_css_class", StringType(), False),
    StructField("downs", IntegerType(), False),
    StructField("thumbnail_height", IntegerType(), True),
    StructField("top_awarded_type", StringType(), True),
    StructField("hide_score", BooleanType(), False),
    StructField("name", StringType(), False),
    StructField("quarantine", BooleanType(), False),
    StructField("link_flair_text_color", StringType(), True),
    StructField("upvote_ratio", FloatType(), False),
    StructField("author_flair_background_color", StringType(), True),
    StructField("ups", IntegerType(), False),
    StructField("total_awards_received", IntegerType(), False),
    StructField("thumbnail_width", IntegerType(), True),
    StructField("author_flair_template_id", StringType(), True),
    StructField("is_original_content", BooleanType(), False),
    StructField("secure_media", StringType(), True),
    StructField("is_reddit_media_domain", BooleanType(), False),
    StructField("is_meta", BooleanType(), False),
    StructField("category", StringType(), True),
    StructField("link_flair_text", StringType(), True),
    StructField("can_mod_post", BooleanType(), False),
    StructField("score", IntegerType(), False),
    StructField("approved_by", StringType(), True),
    StructField("is_created_from_ads_ui", BooleanType(), False),
    StructField("author_premium", BooleanType(), False),
    StructField("thumbnail", StringType(), True),
    StructField("edited", BooleanType(), False),
    StructField("author_flair_css_class", StringType(), True),
    StructField("post_hint", StringType(), False),
    StructField("content_categories", StringType(), True),
    StructField("is_self", BooleanType(), False),
    StructField("subreddit_type", StringType(), False),
    StructField("created", FloatType(), False),
    StructField("link_flair_type", StringType(), True),
    StructField("wls", IntegerType(), False),
    StructField("removed_by_category", StringType(), True),
    StructField("banned_by", StringType(), True),
    StructField("author_flair_type", StringType(), True),
    StructField("domain", StringType(), True),
    StructField("allow_live_comments", BooleanType(), False),
    StructField("selftext_html", StringType(), True),
    StructField("likes", IntegerType(), True),
    StructField("suggested_sort", StringType(), True),
    StructField("banned_at_utc", FloatType(), True),
    StructField("url_overridden_by_dest", StringType(), True),
    StructField("view_count", IntegerType(), True),
    StructField("archived", BooleanType(), False),
    StructField("no_follow", BooleanType(), False),
    StructField("is_crosspostable", BooleanType(), False),
    StructField("pinned", BooleanType(), False),
    StructField("over_18", BooleanType(), False),
    StructField("media_only", BooleanType(), False),
    StructField("link_flair_template_id", StringType(), True),
    StructField("can_gild", BooleanType(), False),
    StructField("spoiler", BooleanType(), False),
    StructField("locked", BooleanType(), False),
    StructField("author_flair_text", StringType(), True),
    StructField("visited", BooleanType(), False),
    StructField("removed_by", StringType(), True),
    StructField("mod_note", StringType(), True),
    StructField("distinguished", StringType(), True),
    StructField("subreddit_id", StringType(), False),
    StructField("author_is_blocked", BooleanType(), False),
    StructField("mod_reason_by", StringType(), True),
    StructField("num_reports", IntegerType(), True),
    StructField("removal_reason", StringType(), True),
    StructField("link_flair_background_color", StringType(), True),
    StructField("id", StringType(), False),
    StructField("is_robot_indexable", BooleanType(), False),
    StructField("report_reasons", StringType(), True),
    StructField("author", StringType(), False),
    StructField("discussion_type", StringType(), True),
    StructField("num_comments", IntegerType(), False),
    StructField("send_replies", BooleanType(), False),
    StructField("whitelist_status", StringType(), False),
    StructField("contest_mode", BooleanType(), False),
    StructField("author_patreon_flair", BooleanType(), False),
    StructField("author_flair_text_color", StringType(), True),
    StructField("permalink", StringType(), False),
    StructField("parent_whitelist_status", StringType(), False),
    StructField("stickied", BooleanType(), False),
    StructField("url", StringType(), False),
    StructField("subreddit_subscribers", IntegerType(), False),
    StructField("created_utc", FloatType(), False),
    StructField("num_crossposts", IntegerType(), False),
    StructField("media", StringType(), True),
    StructField("is_video", BooleanType(), False),
])

# read json from kafka and select all columns
df = spark \
        .readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "{}:9092".format(kafka_host)) \
            .option("subscribe", "reddit_" + subreddit) \
            .option("startingOffsets", "latest") \
            .option("failOnDataLoss", "false") \
            .load() \
            .selectExpr("CAST(value AS STRING) as json") \
            .select(from_json(col("json"), payload_schema).alias("data")) \
            .select("data.*") 

ConnectionRefusedError: [Errno 111] Connection refused

In [21]:
spark = SparkSession.builder \
    .appName("test") \
    .master("local[*]") \
    .config("spark.driver.host", "localhost") \
    .getOrCreate()

In [22]:
df = spark.createDataFrame([(1, "test")], ["id", "value"])
df.show()

Py4JJavaError: An error occurred while calling o904.showString.
: org.apache.spark.SparkException: Cannot find catalog plugin class for catalog 'spark_catalog': org.apache.spark.sql.delta.catalog.DeltaCatalog
	at org.apache.spark.sql.errors.QueryExecutionErrors$.catalogPluginClassNotFoundForCatalogError(QueryExecutionErrors.scala:1638)
	at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:65)
	at org.apache.spark.sql.connector.catalog.CatalogManager.loadV2SessionCatalog(CatalogManager.scala:67)
	at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$2(CatalogManager.scala:86)
	at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
	at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$v2SessionCatalog$1(CatalogManager.scala:86)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.connector.catalog.CatalogManager.v2SessionCatalog(CatalogManager.scala:85)
	at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:51)
	at org.apache.spark.sql.connector.catalog.CatalogManager.currentCatalog(CatalogManager.scala:122)
	at org.apache.spark.sql.connector.catalog.CatalogManager.currentNamespace(CatalogManager.scala:93)
	at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:109)
	at org.apache.spark.sql.catalyst.optimizer.ReplaceCurrentLike.apply(finishAnalysis.scala:106)
	at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.$anonfun$apply$1(Optimizer.scala:294)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:294)
	at org.apache.spark.sql.catalyst.optimizer.Optimizer$FinishAnalysis$.apply(Optimizer.scala:276)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
	at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
	at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
	at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:126)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:122)
	at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:118)
	at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:136)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:154)
	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:151)
	at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:204)
	at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:249)
	at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:218)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.delta.catalog.DeltaCatalog
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:55)
	... 65 more
