In [3]:
spark.conf.set("spark.sql.shuffle.partitions", 5)

# json 데이터 경로(해당 경로에서 json 파일들 읽어오기)
static = spark.read.json("./data/activity-data")

streaming = spark\
    .readStream\
    .schema(static.schema)\
    .option("maxFilesPerTrigger", 10)\
    .json("./data/activity-data")

streaming.printSchema()



root
 |-- Arrival_Time: long (nullable = true)
 |-- Creation_Time: long (nullable = true)
 |-- Device: string (nullable = true)
 |-- Index: long (nullable = true)
 |-- Model: string (nullable = true)
 |-- User: string (nullable = true)
 |-- gt: string (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)



                                                                                

In [4]:
# createion time 컬럼 사용
withEventTime = streaming.selectExpr("*", "cast(cast(Creation_Time as double)/1000000000 as timestamp) as event_time")

In [4]:
# 텀블링 윈도우(10분 단위)
from pyspark.sql.functions import window, col

# 결과를 메모리 싱크에 저장
withEventTime.groupBy(window(col("event_time"), "10 minutes")).count()\
    .writeStream\
    .queryName("pyevents_per_window")\
    .format("memory")\
    .outputMode("complete")\
    .start()

21/11/13 10:45:58 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/50/34qx4w51577d2hlff47g2fp00000gn/T/temporary-fa515dbb-5571-46da-96fb-e9950c7348eb. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


<pyspark.sql.streaming.StreamingQuery at 0x7ff24d6f06a0>

                                                                                

In [8]:
spark.sql("SELECT * FROM pyevents_per_window").show()

+--------------------+------+
|              window| count|
+--------------------+------+
|{2015-02-24 20:50...|150773|
|{2015-02-24 22:00...|133323|
|{2015-02-23 21:30...|100853|
|{2015-02-23 19:20...| 99178|
|{2015-02-24 21:30...|125679|
|{2015-02-24 22:10...|105494|
|{2015-02-23 19:30...|100443|
|{2015-02-23 19:40...| 88681|
|{2015-02-23 22:20...|106075|
|{2015-02-22 09:40...|    35|
|{2015-02-24 20:20...|113768|
|{2015-02-24 21:20...|133623|
|{2015-02-24 23:00...|150225|
|{2015-02-24 23:10...|169064|
|{2015-02-24 22:40...|132243|
|{2015-02-24 22:50...| 96023|
|{2015-02-23 23:30...| 94669|
|{2015-02-23 22:40...|167565|
|{2015-02-24 21:00...|200133|
|{2015-02-23 21:20...|106291|
+--------------------+------+
only showing top 20 rows



In [9]:
# 집계 수행
from pyspark.sql.functions import window, col

withEventTime.groupBy(window(col("event_time"), "10 minutes"), "User").count()\
    .writeStream\
    .queryName("pyevents_per_window")\
    .format("memory")\
    .outputMode("complete")\
    .start()

21/11/13 10:52:52 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/50/34qx4w51577d2hlff47g2fp00000gn/T/temporary-c4a56013-47ce-4d2c-9836-45b05f6793ca. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


IllegalArgumentException: Cannot start query with name pyevents_per_window as a query with that name is already active in this SparkSession

In [10]:
# 증분 수행
from pyspark.sql.functions import window, col

withEventTime.groupBy(window(col("event_time"), "10 minutes", "5 minutes")).count()\
    .writeStream\
    .queryName("events_per_window")\
    .format("memory")\
    .outputMode("complete")\
    .start()

21/11/13 10:55:35 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/50/34qx4w51577d2hlff47g2fp00000gn/T/temporary-e5014a3d-0701-4172-9742-e5ff44e3e8a4. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


<pyspark.sql.streaming.StreamingQuery at 0x7ff24da9a160>

                                                                                

In [11]:
spark.sql("SELECT * FROM events_per_window").show()

+--------------------+------+
|              window| count|
+--------------------+------+
|{2015-02-23 23:15...|107668|
|{2015-02-24 20:50...|150773|
|{2015-02-24 22:00...|133323|
|{2015-02-22 09:35...|    35|
|{2015-02-23 21:30...|100853|
|{2015-02-23 19:20...| 99178|
|{2015-02-23 22:25...| 91684|
|{2015-02-24 23:25...|203945|
|{2015-02-23 21:55...|113953|
|{2015-02-22 09:40...|    35|
|{2015-02-23 21:35...| 91221|
|{2015-02-23 22:05...|205912|
|{2015-02-24 20:20...|113768|
|{2015-02-24 22:35...|174863|
|{2015-02-24 23:00...|150225|
|{2015-02-24 21:30...|125679|
|{2015-02-24 22:10...|105494|
|{2015-02-23 19:30...|100443|
|{2015-02-24 20:45...|138413|
|{2015-02-23 19:40...| 88681|
+--------------------+------+
only showing top 20 rows



In [13]:
# 30분 지연 예제
from pyspark.sql.functions import window, col

withEventTime\
    .withWatermark("event_time", "30 minutes")\
    .groupBy(window(col("event_time"), "10 minutes", "5 minutes"))\
    .count()\
    .writeStream\
    .queryName("pyevents_per_window")\
    .format("memory")\
    .outputMode("complete")\
    .start()

21/11/13 11:09:28 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/50/34qx4w51577d2hlff47g2fp00000gn/T/temporary-cc0c2ad5-700f-43b8-a154-c132447e3375. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


IllegalArgumentException: Cannot start query with name pyevents_per_window as a query with that name is already active in this SparkSession

In [5]:
from pyspark.sql.functions import expr

withEventTime\
    .withWatermark("event_time", "5 seconds")\
    .dropDuplicates(["User", "event_time"])\
    .groupBy("User")\
    .count()\
    .writeStream\
    .queryName("pydeduplicated")\
    .format("memory")\
    .outputMode("complete")\
    .start()

21/11/13 17:43:58 WARN StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/50/34qx4w51577d2hlff47g2fp00000gn/T/temporary-d77257a6-411a-4e51-9a97-9a79aabed165. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.


<pyspark.sql.streaming.StreamingQuery at 0x7fd86e24ea60>

                                                                                

In [7]:
streaming = spark\
    .readStream\
    .schema(static.schema)\
    .option("maxFilesPerTrigger", 10)\
    .json("./data/activity-data")\
    .groupBy("gt")\
    .count()
query = streaming\
    .writeStream\
    .outputMode("complete")\
    .option("checkpointLocation", "/some/python/location")\
    .queryName("test_python_stream")\
    .format("memory")\
    .start()

21/11/13 19:53:46 ERROR StreamMetadata: Error writing stream metadata StreamMetadata(c9cd173f-01d4-484c-8b38-0b7841d661f6) to file:/some/python/location/metadata
java.io.FileNotFoundException: File file:/some/python/location does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:666)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:987)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:656)
	at org.apache.hadoop.fs.DelegateToFileSystem.getFileStatus(DelegateToFileSystem.java:126)
	at org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:91)
	at org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:353)
	at org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:400)
	at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:605)
	at org.apache.hadoop.fs.FileContext$3.next(FileContext.

Py4JJavaError: An error occurred while calling o85.start.
: java.io.FileNotFoundException: File file:/some/python/location does not exist
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:666)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:987)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:656)
	at org.apache.hadoop.fs.DelegateToFileSystem.getFileStatus(DelegateToFileSystem.java:126)
	at org.apache.hadoop.fs.DelegateToFileSystem.createInternal(DelegateToFileSystem.java:91)
	at org.apache.hadoop.fs.ChecksumFs$ChecksumFSOutputSummer.<init>(ChecksumFs.java:353)
	at org.apache.hadoop.fs.ChecksumFs.createInternal(ChecksumFs.java:400)
	at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:605)
	at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:696)
	at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:692)
	at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
	at org.apache.hadoop.fs.FileContext.create(FileContext.java:698)
	at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:316)
	at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:133)
	at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:136)
	at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:322)
	at org.apache.spark.sql.execution.streaming.StreamMetadata$.write(StreamMetadata.scala:78)
	at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$streamMetadata$1(StreamExecution.scala:176)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.streaming.StreamExecution.<init>(StreamExecution.scala:174)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.<init>(MicroBatchExecution.scala:50)
	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:317)
	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:359)
	at org.apache.spark.sql.streaming.DataStreamWriter.startQuery(DataStreamWriter.scala:466)
	at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:401)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:301)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [9]:
test_python_stream".status


SyntaxError: EOL while scanning string literal (749746331.py, line 1)