In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [6]:
spark = SparkSession.builder.appName("sparkdev-stream-json demo").master("local[*]").getOrCreate()

In [7]:
input_schema = StructType([
    StructField(name="registration_dttm", dataType=StringType(), nullable=True),
    StructField(name="id", dataType=IntegerType(), nullable=True),
    StructField(name="first_name", dataType=StringType(), nullable=True),
    StructField(name="last_name", dataType=StringType(), nullable=True),
    StructField(name="email", dataType=StringType(), nullable=True),
    StructField(name="gender", dataType=StringType(), nullable=True),
    StructField(name="ip_address", dataType=StringType(), nullable=True),
    StructField(name="cc", dataType=StringType(), nullable=True),
    StructField(name="country", dataType=StringType(), nullable=True),
    StructField(name="birthdate", dataType=StringType(), nullable=True),
    StructField(name="salary", dataType=DoubleType(), nullable=True),
    StructField(name="title", dataType=StringType(), nullable=True),
    StructField(name="comments", dataType=StringType(), nullable=True)
])

In [8]:
stream_df = spark.readStream.format("json").schema(input_schema).option("path", "input_data/json").load()

In [9]:
stream_count_df = stream_df.groupBy(col("country")).count().orderBy("count", ascending=False).limit(10)

In [10]:
write_stream_query = stream_count_df \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("checkpointLocation", "streaming-checkpoint-loc-json") \
    .trigger(processingTime="10 second") \
    .start()

23/05/17 16:29:02 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


In [11]:
write_stream_query.awaitTermination()

print("Streaming Application Completed.")

                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-----------+-----+
|    country|count|
+-----------+-----+
|      China|  189|
|  Indonesia|   97|
|     Russia|   62|
|Philippines|   45|
|   Portugal|   38|
|     Brazil|   38|
|     France|   37|
|     Poland|   35|
|     Sweden|   25|
|      Japan|   20|
+-----------+-----+

23/05/17 16:29:47 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 27419 milliseconds


                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+-------------+-----+
|      country|count|
+-------------+-----+
|        China|  362|
|    Indonesia|  186|
|       Russia|  123|
|  Philippines|   91|
|     Portugal|   79|
|       Poland|   76|
|       Brazil|   71|
|       France|   70|
|       Sweden|   60|
|United States|   40|
+-------------+-----+

23/05/17 16:30:46 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 16223 milliseconds


KeyboardInterrupt: 