## Dane
Dane są dostępne na AWS i dostęp zapewnia Databricks `/databricks-datasets/structured-streaming/events/` 

In [0]:
%fs ls /databricks-datasets/structured-streaming/events/

path,name,size,modificationTime
dbfs:/databricks-datasets/structured-streaming/events/file-0.json,file-0.json,72530,1469673865000
dbfs:/databricks-datasets/structured-streaming/events/file-1.json,file-1.json,72961,1469673866000
dbfs:/databricks-datasets/structured-streaming/events/file-10.json,file-10.json,73025,1469673878000
dbfs:/databricks-datasets/structured-streaming/events/file-11.json,file-11.json,72999,1469673879000
dbfs:/databricks-datasets/structured-streaming/events/file-12.json,file-12.json,72987,1469673880000
dbfs:/databricks-datasets/structured-streaming/events/file-13.json,file-13.json,73006,1469673881000
dbfs:/databricks-datasets/structured-streaming/events/file-14.json,file-14.json,73003,1469673882000
dbfs:/databricks-datasets/structured-streaming/events/file-15.json,file-15.json,73007,1469673883000
dbfs:/databricks-datasets/structured-streaming/events/file-16.json,file-16.json,72978,1469673885000
dbfs:/databricks-datasets/structured-streaming/events/file-17.json,file-17.json,73008,1469673886000


In [0]:
#%fs head /databricks-datasets/structured-streaming/events/file-0.json
df = spark.read.json("/databricks-datasets/structured-streaming/events/file-0.json")
df.show(10)

+------+----------+
|action|      time|
+------+----------+
|  Open|1469501107|
|  Open|1469501147|
|  Open|1469501202|
|  Open|1469501219|
|  Open|1469501225|
|  Open|1469501234|
|  Open|1469501245|
|  Open|1469501246|
|  Open|1469501248|
|  Open|1469501256|
+------+----------+
only showing top 10 rows



* Stwórz osobny folder 'streamDir' do którego będziesz kopiować część plików. możesz użyć dbutils....
* Pozostałe pliki będziesz kopiować jak stream będzie aktywny

In [0]:
from pyspark.sql.functions import input_file_name
from pyspark.sql import SparkSession

file_path = "/dbfs/FileStore/streamDir/"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_autoloader_test"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_autoloader_test"

dbutils.fs.mkdirs(file_path)

spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)
dbutils.fs.rm(f"dbfs:/user/hive/warehouse/{table_name}", True)


Out[17]: False

In [0]:
streamDir = "/dbfs/FileStore/streamDir/"
dbutils.fs.mkdirs(streamDir)

for i in range(10):
  dbutils.fs.cp(f"/databricks-datasets/structured-streaming/events/file-{i}.json", f"{streamDir}file-{i}.json", True)

display(dbutils.fs.ls(streamDir))

file_path = streamDir

path,name,size,modificationTime
dbfs:/dbfs/FileStore/streamDir/file-0.json,file-0.json,72530,1744873809000
dbfs:/dbfs/FileStore/streamDir/file-1.json,file-1.json,72961,1744873809000
dbfs:/dbfs/FileStore/streamDir/file-2.json,file-2.json,73007,1744873810000
dbfs:/dbfs/FileStore/streamDir/file-3.json,file-3.json,72996,1744873810000
dbfs:/dbfs/FileStore/streamDir/file-4.json,file-4.json,72992,1744873811000
dbfs:/dbfs/FileStore/streamDir/file-5.json,file-5.json,72998,1744873811000
dbfs:/dbfs/FileStore/streamDir/file-6.json,file-6.json,72997,1744873812000
dbfs:/dbfs/FileStore/streamDir/file-7.json,file-7.json,73022,1744873812000
dbfs:/dbfs/FileStore/streamDir/file-8.json,file-8.json,72997,1744873813000
dbfs:/dbfs/FileStore/streamDir/file-9.json,file-9.json,72970,1744873813000


## Analiza danych/Statyczny DF
* Stwórz schemat danych i wyświetl zawartość danych z oginalnego folderu

In [0]:
from pyspark.sql.types import StructType, StringType, TimestampType
from pyspark.sql import SparkSession

input_path = "/databricks-datasets/structured-streaming/events/"


json_schema = (StructType()
    .add("time", StringType())
    .add("action", StringType())
    .add("device", StringType()))

static_input_df = spark.read.schema(json_schema).json(input_path)

display(static_input_df.limit(10))


time,action,device
1469679568,Close,
1469679568,Close,
1469679569,Open,
1469679571,Close,
1469679571,Open,
1469679571,Open,
1469679572,Close,
1469679573,Close,
1469679575,Close,
1469679576,Open,


Databricks data profile. Run in Databricks to view.

Policz ilość akcji "open" i "close" w okienku (window) jedno godzinnym (kompletny folder). 

In [0]:
from pyspark.sql.functions import window, col, to_timestamp, from_unixtime

df_with_timestamp = static_input_df.withColumn(
    "event_time", to_timestamp(from_unixtime(col("time").cast("long")))
)

agg_df = (df_with_timestamp
          .groupBy(window(col("event_time"), "1 hour"), col("action"))
          .count()
          .orderBy("window")
         )

display(agg_df.limit(10))

window,action,count
"List(2016-07-26T02:00:00.000+0000, 2016-07-26T03:00:00.000+0000)",Open,179
"List(2016-07-26T02:00:00.000+0000, 2016-07-26T03:00:00.000+0000)",Close,11
"List(2016-07-26T03:00:00.000+0000, 2016-07-26T04:00:00.000+0000)",Open,1001
"List(2016-07-26T03:00:00.000+0000, 2016-07-26T04:00:00.000+0000)",Close,344
"List(2016-07-26T04:00:00.000+0000, 2016-07-26T05:00:00.000+0000)",Open,999
"List(2016-07-26T04:00:00.000+0000, 2016-07-26T05:00:00.000+0000)",Close,815
"List(2016-07-26T05:00:00.000+0000, 2016-07-26T06:00:00.000+0000)",Close,1003
"List(2016-07-26T05:00:00.000+0000, 2016-07-26T06:00:00.000+0000)",Open,1000
"List(2016-07-26T06:00:00.000+0000, 2016-07-26T07:00:00.000+0000)",Close,1011
"List(2016-07-26T06:00:00.000+0000, 2016-07-26T07:00:00.000+0000)",Open,993


Użyj sql i pokaż na wykresie ile było akcji 'open' a ile 'close'.

In [0]:
agg_df.createOrReplaceTempView("static_counts")

In [0]:
%sql select action, sum(count) as total_count from static_counts group by action

action,total_count
Open,50000
Close,50000


Użyj sql i pokaż ile było akcji w każdym dniu i godzinie przykład ('Jul-26 09:00')

In [0]:
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from static_counts order by time, action

action,time,count
Close,Jul-26 03:00,11
Open,Jul-26 03:00,179
Close,Jul-26 04:00,344
Open,Jul-26 04:00,1001
Close,Jul-26 05:00,815
Open,Jul-26 05:00,999
Close,Jul-26 06:00,1003
Open,Jul-26 06:00,1000
Close,Jul-26 07:00,1011
Open,Jul-26 07:00,993


## Stream Processing 
Teraz użyj streamu.
* Ponieważ będziesz streamować pliki trzeba zasymulować, że jest to normaly stream. Podpowiedź dodaj opcję 'maxFilesPerTrigger'
* Użyj 'streamDir' niekompletne pliki

In [0]:
from pyspark.sql.functions import col, from_unixtime, to_timestamp, window

streaming_input_df = (spark.readStream
    .schema(json_schema)
    .option("maxFilesPerTrigger", 1)   # symulacja
    .json(streamDir)
)

stream_with_timestamp = streaming_input_df.withColumn(
    "event_time", to_timestamp(from_unixtime(col("time").cast("long")))
)

stream_counts = (stream_with_timestamp
    .groupBy(window(col("event_time"), "1 hour"), col("action"))
    .count()
)


query = (stream_counts
    .writeStream
    .outputMode("complete") 
    .format("memory")        
    .queryName("streaming_counts")
    .start()
)

Sprawdź czy stream działa

In [0]:
stream_counts.isStreaming

Out[20]: True

* Zredukuj partyce shuffle do 4 
* Teraz ustaw Sink i uruchom stream
* użyj formatu 'memory'
* 'outputMode' 'complete'

In [0]:
from pyspark.sql.functions import col, from_unixtime, to_timestamp, window


spark.conf.set("spark.sql.shuffle.partitions", 4)


query = (stream_counts
    .writeStream
    .outputMode("complete")
    .format("memory")
    .queryName("streaming_counts")
    .start()
)


`query` działa teraz w tle i wczytuje pliki cały czas uaktualnia count. Postęp widać w Dashboard

In [0]:
%scala
Thread.sleep(3000) // lekkie opóźnienie żeby poczekać na wczytanie plików

* Użyj sql żeby pokazać ilość akcji w danym dniu i godzinie 

In [0]:
%sql
SELECT * FROM streaming_counts
ORDER BY window

window,action,count
"List(2016-07-26T02:00:00.000+0000, 2016-07-26T03:00:00.000+0000)",Close,11
"List(2016-07-26T02:00:00.000+0000, 2016-07-26T03:00:00.000+0000)",Open,179
"List(2016-07-26T03:00:00.000+0000, 2016-07-26T04:00:00.000+0000)",Open,1001
"List(2016-07-26T03:00:00.000+0000, 2016-07-26T04:00:00.000+0000)",Close,344
"List(2016-07-26T04:00:00.000+0000, 2016-07-26T05:00:00.000+0000)",Open,999
"List(2016-07-26T04:00:00.000+0000, 2016-07-26T05:00:00.000+0000)",Close,815
"List(2016-07-26T05:00:00.000+0000, 2016-07-26T06:00:00.000+0000)",Close,1003
"List(2016-07-26T05:00:00.000+0000, 2016-07-26T06:00:00.000+0000)",Open,1000
"List(2016-07-26T06:00:00.000+0000, 2016-07-26T07:00:00.000+0000)",Open,993
"List(2016-07-26T06:00:00.000+0000, 2016-07-26T07:00:00.000+0000)",Close,1011


* Sumy mogą się nie zgadzać ponieważ wcześniej użyłeś niekompletnych danych.
* Teraz przekopiuj resztę plików z orginalnego folderu do 'streamDir', sprawdź czy widać zmiany 


In [0]:
dbutils.fs.cp(
    "dbfs:/databricks-datasets/structured-streaming/events/", 
    "dbfs:/FileStore/streamDir/", 
    recurse=True
)

Out[27]: True

In [0]:
%sql
SELECT * FROM streaming_counts
ORDER BY window

window,action,count
"List(2016-07-26T02:00:00.000+0000, 2016-07-26T03:00:00.000+0000)",Open,179
"List(2016-07-26T02:00:00.000+0000, 2016-07-26T03:00:00.000+0000)",Close,11
"List(2016-07-26T03:00:00.000+0000, 2016-07-26T04:00:00.000+0000)",Open,1001
"List(2016-07-26T03:00:00.000+0000, 2016-07-26T04:00:00.000+0000)",Close,344
"List(2016-07-26T04:00:00.000+0000, 2016-07-26T05:00:00.000+0000)",Close,815
"List(2016-07-26T04:00:00.000+0000, 2016-07-26T05:00:00.000+0000)",Open,999
"List(2016-07-26T05:00:00.000+0000, 2016-07-26T06:00:00.000+0000)",Close,1003
"List(2016-07-26T05:00:00.000+0000, 2016-07-26T06:00:00.000+0000)",Open,1000
"List(2016-07-26T06:00:00.000+0000, 2016-07-26T07:00:00.000+0000)",Open,993
"List(2016-07-26T06:00:00.000+0000, 2016-07-26T07:00:00.000+0000)",Close,1011


* Zatrzymaj stream

In [0]:
query.stop()