# Spark Structured Streaming ve Delta Tables

Spark, *Spark Structured Streaming* aracılığıyla veri akışı desteği sağlar ve bu desteği, akış verilerinin hedefleri (*havuzları*) veya *kaynakları* olabilecek *delta tabloları* aracılığıyla genişletir.

Bu demo çalışmasında, cihazlardan gelen simüle edilmiş durum mesajlarından oluşan bir JSON dosyası klasöründen veri akışını almak için Spark'ı kullanacaksınız. Gerçek bir senaryoda veriler Kafka kuyruğu veya Azure Event Hub gibi başka bir gerçek zamanlı kaynaktan gelebilir.

## Gelen veri akışı için bir klasör oluşturun

1. Bu not defterinin Spark havuzunuza eklendiğinden emin olun (yukarıdaki **Attach to** açılır listesini kullanarak).
2. Simüle edilen cihaz verilerinin yazılacağı **data** adlı bir klasör oluşturmak için aşağıdaki cell'i çalıştırın.

    > **Not**: Spark havuzunun başlatılması gerektiğinden ilk cell'in çalışması biraz zaman alabilir.


In [None]:
from notebookutils import mssparkutils

# Create a folder
inputPath = '/data/'
mssparkutils.fs.mkdirs(inputPath)

## Bir veri streamingi sorgulamak için Spark Structured Streaming kullanın

1. Cihazın adını ve durumunu içeren bir JSON şemasına göre klasördeki verileri okuyan bir stream veri framework oluşturmak için aşağıdaki cell'i çalıştırın.

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Create a stream that reads data from the folder, using a JSON schema
jsonSchema = StructType([
  StructField("device", StringType(), False),
  StructField("status", StringType(), False)
])

fileDF = spark.readStream.schema(jsonSchema).option("maxFilesPerTrigger", 1).json(inputPath)



2. Yukarıdaki cell'in tamamlanmasını bekleyin.
3. Streaming dataframe oluşturulduğunda, verileri toplamak ve sonuçları bir çıkış streaminge yazmak için bir dönüştürme sorgusu uygulayabilirsiniz. Gelen stream cihaz verilerindeki hatalara göre filtrelemek ve cihaz başına hata sayısını saymak için aşağıdaki kodu çalıştırın.

In [None]:
countDF = fileDF.filter("status == 'error'").groupBy("device").count()
query = countDF.writeStream.format("memory").queryName("counts").outputMode("complete").start()
print('Streaming query started.')

4. Sorgu çıktısı bir bellek içi tabloya aktarılır. Bu tabloyu sorgulamak ve cihaz başına hata sayısını görüntülemek amacıyla SQL'i kullanmak için aşağıdaki cell'i çalıştırın.

In [None]:
%%sql
select * from counts


5. Henüz herhangi bir cihaz durum verisi yazmadığımız için sorgunun veri döndürmediğini unutmayın.
6. Birkaç simüle edilmiş cihazdan bazı durum olayı verilerini yazarak bunu düzeltelim.

In [None]:
device_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"ok"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''

mssparkutils.fs.put(inputPath + "data.txt", device_data, True)

7. Toplu hata sayılarını görmek için SQL sorgusunu yeniden çalıştırın (sorgu hala veri döndürmüyorsa, birkaç saniye bekleyin ve tekrar deneyin!) Cihaz 1 için bir hata ve cihaz 2 için iki hata olmalıdır.

In [None]:
%%sql
select * from counts


8. Hata sayısını not ederek sonuçları gözden geçirin. Daha sonra daha fazla cihaz verisi yazmak için aşağıdaki kodu çalıştırın.

In [None]:
more_data = '''{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"ok"}
{"device":"Dev1","status":"error"}
{"device":"Dev2","status":"error"}
{"device":"Dev1","status":"ok"}'''

mssparkutils.fs.put(inputPath + "more-data.txt", more_data, True)

9. Toplamalara yansıyan yeni durum olaylarını görmek için SQL sorgusunu yeniden çalıştırın (gerekirse birkaç saniye bekleyin). Artık cihaz 1 için iki hata ve cihaz 2 için üç hata olmalıdır.

In [None]:
%%sql
select * from counts


## Delta tablosu oluşturma

Azure Synapse Analytics, işlemlere, sürüm oluşturmaya ve diğer yararlı özelliklere destek eklemek için Spark Structured Streaming'i temel alan Linux Foundation *Delta Lake* mimarisini destekler.

Özellikle, streaming verileri için hedef (veya *havuz*) olarak veya downstream  streaming sorguları için stream verilerinin *kaynağı* olarak *delta tabloları* oluşturabilirsiniz.

Bunu keşfetmek için, daha önce oluşturduğumuz **data** klasörünü temel alan stream veri framework'ü, dosya sistemindeki bir konuma giden yolu kullanarak tanımlayacağımız yeni bir delta tablosuna yazacağız.

1. Klasör verilerini bir delta tablosuna aktarmak için aşağıdaki cell'i çalıştırın.

In [None]:
delta_table_path = inputPath + 'deltatable'
stream = fileDF.writeStream.format("delta").option("checkpointLocation", inputPath + 'checkpoint').start(delta_table_path)

2. Şimdi, kendisine aktarılan verileri görmek amacıyla delta tablosunu sorgulamak için sonraki cell'i çalıştırın. Sorgu ilk başta hiçbir veri döndürmezse, birkaç saniye bekleyin ve hücreyi yeniden çalıştırın).

In [None]:
df = spark.read.format("delta").load(delta_table_path)
display(df)

Delta tabloları, zamanda önceki bir noktaya ait verileri görüntülemek için *time travel* adlı özelliği kullanmanızı sağlar.

4. Dosya verilerinden stream'e alınan ilk mikro veri kümesini almak için aşağıdaki sorguyu çalıştırın.

In [None]:
df = spark.read.format("delta").option("versionAsOf", 0).load(delta_table_path)
display(df)

5. Artık Spark Structured Streaming ve delta tablolarını incelemeyi tamamladığınıza göre, veri akışını durdurun ve bu alıştırmada kullanılan dosyaları temizleyin.

In [None]:
stream.stop()
query.stop()
print("Stream stopped")
mssparkutils.fs.rm(inputPath, True)