
## Input Sources

Spark Streaming ingests data from different types of input sources for processing in real-time

**Rate (for Testing)**: It will automatically generate data including 2 columns timestamp and value . This is generally used for testing purposes.

**Socket (for Testing)**: This data source will listen to the specified socket and ingest any data into Spark Streaming. It is also used only for testing purposes.

**File**: This will listen to a particular directory as streaming data. It supports file formats like CSV, JSON, ORC, and Parquet.

**Kafka**: This will read data from Apache Kafka® and is compatible with Kafka broker versions 0.10.0 or higher

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession\
        .builder\
        .master("local")\
        .appName("SocketSource")\
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

In [None]:
# Define host and port number to Listen.
HOST = "127.0.0.1"
PORT = "9999"

# Create Streaming DataFrame by reading data from socket.
initDF = spark\
         .readStream\
         .format("socket")\
         .option("host", HOST)\
         .option("port", PORT)\
         .load()

# Check if DataFrame is streaming or Not.
print(f"Streaming DataFrame : {initDF.isStreaming}")

We use the socket format to read data from a socket (127.0.0.1:9999). You can use any arbitrary permitted port to listen.

In [None]:
# Perform word count on streaming DataFrame
wordCount = initDF\
    .select(f.explode(f.split(f.col("value"), " ")).alias("words"))\
    .groupBy("words").count()

# Print Schema of DataFrame
print("Schema of DataFame wordCount.")
print(f"{wordCount.printSchema()}")

In [None]:
wordCount\
  .writeStream\
  .outputMode("update")\
  .option("truncate", False)\
  .format("console")\
  .start()\
  .awaitTermination()