In [1]:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("chapter21_1").getOrCreate()

staticDF = spark.read.format("csv").option("header", "true").load("retail-data/by-day/*.csv")
static_schema = staticDF.schema

print(static_schema)


streamingDF = spark.readStream.format("csv").option("header", "true").schema(static_schema).load("retail-data/by-day/*.csv")

purchaseByHourDF = streamingDF.selectExpr("CustomerID", "Quantity * UnitPrice as TotalCost", "InvoiceDate").groupBy("CustomerID").sum("TotalCost")


purchases_activity = purchaseByHourDF.writeStream.format("memory").queryName("customer_purchased").outputMode("complete").start()






StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))


In [2]:
spark.sql("select * from customer_purchased").show()

+----------+--------------+
|CustomerID|sum(TotalCost)|
+----------+--------------+
+----------+--------------+



In [3]:
static = spark.read.json("activity-data/")
dataSchema = static.schema

streaming = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
  .json("activity-data")

activityCounts = streaming.groupBy("gt").count()

activityQuery = activityCounts.writeStream.queryName("activitycounts")\
  .format("memory").outputMode("complete")\
  .start()




In [4]:
spark.sql("SELECT * FROM activitycounts").show()

+---+-----+
| gt|count|
+---+-----+
+---+-----+



In [5]:
from pyspark.sql.functions import expr, to_date, from_unixtime, to_timestamp

streaming.withColumn("stairs", expr("gt like '%stairs%'"))\
.where("stairs")\
.where("gt is not null")\
.select("gt", "model", "arrival_time", "creation_time")\
.writeStream.queryName("stairsactivity").format("memory")\
.outputMode("append").start()

<pyspark.sql.streaming.StreamingQuery at 0x7ff8a822bfa0>

In [6]:
spark.sql("select * from stairsactivity").show()

+---+-----+------------+-------------+
| gt|model|arrival_time|creation_time|
+---+-----+------------+-------------+
+---+-----+------------+-------------+



In [7]:
historicalAgg = static.groupBy("gt", "model").avg()

streaming.drop("Arrival_Time", "Creation_Time", "Index")\
.cube("gt", "model").avg()\
.join(historicalAgg, ["gt", "model"])\
.writeStream.queryName("historical_joined_data1").format("memory")\
.outputMode("complete").start()



<pyspark.sql.streaming.StreamingQuery at 0x7ff8a824c910>

In [8]:
print(static.schema)
spark.sql("select * from historical_joined_data1").show(10, False)

StructType(List(StructField(Arrival_Time,LongType,true),StructField(Creation_Time,LongType,true),StructField(Device,StringType,true),StructField(Index,LongType,true),StructField(Model,StringType,true),StructField(User,StringType,true),StructField(gt,StringType,true),StructField(x,DoubleType,true),StructField(y,DoubleType,true),StructField(z,DoubleType,true)))
+---+-----+------+------+------+-----------------+------------------+----------+------+------+------+
|gt |model|avg(x)|avg(y)|avg(z)|avg(Arrival_Time)|avg(Creation_Time)|avg(Index)|avg(x)|avg(y)|avg(z)|
+---+-----+------+------+------+-----------------+------------------+----------+------+------+------+
+---+-----+------+------+------+-----------------+------------------+----------+------+------+------+



In [26]:
# import os
# os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 pyspark-shell"

# from pyspark.sql import SparkSession
# from pyspark.sql.functions import explode
# from pyspark.sql.functions import split

# bootstrapServers = "localhost:9092"
# subscribeType = "subscribe"
# topics = "test"

# spark = SparkSession\
#     .builder\
#     .appName("StructuredKafkaWordCount")\
#     .getOrCreate()

# # Create DataSet representing the stream of input lines from kafka
# lines = spark\
#     .readStream\
#     .format("kafka")\
#     .option("kafka.bootstrap.servers", bootstrapServers)\
#     .option(subscribeType, topics)\
#     .load()\
#     .selectExpr("CAST(value AS STRING)")




The history saving thread hit an unexpected error (OperationalError('attempt to write a readonly database')).History will not be written to the database.


In [27]:
# kafkaDf = spark.readStream.format("kafka")\
# .option("kafka.bootstrap.servers", "127.0.0.1:9092")\
# .option("subscribe", "spark-topic")\
# .load()




In [33]:
from pyspark.sql.functions import window, col

withEventTime = streaming.selectExpr("*", "cast(cast(Creation_Time as double)/1000000000 as timestamp) as eventTime")

withEventTime.groupBy(window(col("eventTime"), "10 minutes"))\
.count()\
.writeStream.queryName("events_count").format("memory").outputMode("complete").start()



<pyspark.sql.streaming.StreamingQuery at 0x7ff8ac21b190>

In [35]:
spark.sql("select * from events_count").show(10, False)

+------------------------------------------+-----+
|window                                    |count|
+------------------------------------------+-----+
|{2015-02-23 15:40:00, 2015-02-23 15:50:00}|937  |
|{2015-02-24 18:30:00, 2015-02-24 18:40:00}|11610|
|{2015-02-23 16:40:00, 2015-02-23 16:50:00}|8084 |
|{2015-02-24 18:20:00, 2015-02-24 18:30:00}|18074|
|{2015-02-24 18:40:00, 2015-02-24 18:50:00}|9278 |
|{2015-02-23 17:40:00, 2015-02-23 17:50:00}|5101 |
|{2015-02-23 20:10:00, 2015-02-23 20:20:00}|6592 |
|{2015-02-24 20:00:00, 2015-02-24 20:10:00}|16117|
|{2015-02-23 18:00:00, 2015-02-23 18:10:00}|8822 |
|{2015-02-23 16:10:00, 2015-02-23 16:20:00}|7688 |
+------------------------------------------+-----+
only showing top 10 rows



In [39]:
withEventTime.groupBy(window(col("eventTime"), "10 minutes"), "User")\
.count()\
.writeStream.queryName("event_counts_per_user").format("memory").outputMode("complete").start()

<pyspark.sql.streaming.StreamingQuery at 0x7ff8ad199bb0>