In [1]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.types import StructType

In [2]:
spark = SparkSession \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()
spark.conf.set("spark.sql.shuffle.partitions",1)

# Create DataFrame representing the stream of input lines from connection to localhost:9999
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "192.168.92.121") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()

# Start running the query that prints the running counts to the console
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()

In [None]:
# Read all the csv files written atomically in a directory
userSchema = StructType().add("orderid", "integer").add("orderdate", "string").add("custoemrid", "integer").add("status", "string")
csvDF = spark \
    .readStream \
    .option("sep", ",") \
    .schema(userSchema) \
    .csv("log/")  # Equivalent to format("csv").load("/path/to/directory")

csvDF.isStreaming    # Returns True for DataFrames that have streaming sources

csvDF.printSchema()

df = csvDF.select("orderid","orderdate")

# Select the devices which have signal more than 10
df.select("orderid").where("orderid > 1")

# Running count of the number of updates for each device type
df.groupBy("orderid").count()

# Start running the query that prints the running counts to the console
query = df \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

query.awaitTermination()


root
 |-- orderid: integer (nullable = true)
 |-- orderdate: string (nullable = true)
 |-- custoemrid: integer (nullable = true)
 |-- status: string (nullable = true)

