In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

In [3]:
spark = SparkSession.builder.getOrCreate()

Burada Spark Streaming için bir DataFrame oluşturuyoruz. Bu DataFrame ile soket bağlantısından veri okuyacağız. <br><br>
Sık kullanılan bazı metotlar:  
`.readStream` - Bu komut, bir streaming kaynağından real-time veri okuma işlemini başlatır.  
`.format()` - Bu metot, real-time verilerin hangi formatla geleceğini belirler.  
`.option()` - Bu metot, DataFrame ayarlarında değişiklikler başlatır.  
`.load()` - Bu metot, streaming kaynağını yükler.

In [5]:
lines = spark \
        .readStream \
        .format('socket') \
        .option("host","localhost") \
        .option("port",9999) \
        .load()

24/12/21 18:14:35 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.


`.format('socket')` - Burada veri formatını soket olarak belirledik.  
`.option("host", "localhost")` - Burada soketin host adresini localhost olarak belirledik.  
`.option("port", 9999)` - Burada soketin bağlanacağı port numarasını 9999 olarak belirledik.  

Burada soketten alacağımız satırlardaki kelimeleri ayıklamak için bir DataFrame oluşturuyoruz.<br><br>

> Burada önemli bir nokta var. `readStream` kullanılarak bir soketten veri okunduğunda Spark her bir satırı bir string olarak ele alır. Bu ele aldığı stringlerle default biçimde oluşturuduğu DataFrame ise tek sütunludur. Spark o sütuna `value` adını verir. Bunu nedenle `lines` olarak nitelediğimiz DataFrame aslında tek sütunlu bir tablodur.

In [6]:
words = lines.select(
    explode(
        split(lines.value, " ")
    ).alias("word")
)

`split(lines.value, " ")` - `lines` DataFrame'inin `values` columnunu boşluklarından parçalıyoruz.  
`alias("word")` - Her bir parçaya `"word"` adını veriyoruz.  
`explode()` - Tüm parçaları explode ediyoruz.

In [7]:
wordCounts = words.groupBy("word").count()

Burada okumuş olduğumuz streaminge (`readStream`) çeşitli işlemler (`words`+`wordCounts`) uyguladıktan sonra artık işlenmiş olan veriyi streaminge yazmaya (`writeStream`) başlıyoruz.  

`wordCounts` - Yukarıdaki dümdüz batch-data analiz işlemleri.  
`writeStream` - İşlenmiş veriyi stream olarak yazıyoruz.  
`outputMode()` - Output modunu belirliyoruz.  
`format()` - Outputun formatını belirliyoruz.  
`start()` - Yazdığımız streami başlatıyoruz.  

In [None]:
query = wordCounts \
        .writeStream \
        .outputMode("complete") \
        .format("console") \
        .start()

query.awaitTermination()

`outputMode("complete")` - Çıktı modunu "complete" olarak ayarladık, böylece her mikro-batch işleminde tüm sonuçlar gösterilecek.  
`format("console")` - Çıktı formatını "console" olarak ayarladık, böylece output terminalde gösterilecek.<br><br>

`awaitTermination()` - Streaming sorgusunu sürekli çalışır halde tutar.