# Spark Streaming

Ten notebook demonstruje zastosowanie Spark Streaming do detekcji oszustw bankowych.


Zadanie: Bank chce automatycznie wykrywać podejrzane transakcje, potrzebuje rozwiązania analizującego napływające dane na bieżąco.


Dane: https://www.kaggle.com/datasets/ealaxi/paysim1

In [15]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.appName("Fraud Detection").getOrCreate()
spark

In [17]:
project_dir = "<PROJECT_DIR>"
data_dir = f"{project_dir}/data"
outputs_dir = f"{data_dir}/outputs"
input_tables = f"{data_dir}/paysim/"
output_tables = f"{outputs_dir}/paysim/"

In [18]:
df = spark.read.csv(input_tables, header=True, inferSchema=True) 

                                                                                

In [19]:
df.write.parquet(f"{output_tables}/paysim.parquet")

25/04/26 06:34:37 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/04/26 06:34:37 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/04/26 06:34:37 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/04/26 06:34:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/04/26 06:34:40 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

In [20]:
df.printSchema()

root
 |-- step: integer (nullable = true)
 |-- type: string (nullable = true)
 |-- amount: double (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- oldbalanceOrg: double (nullable = true)
 |-- newbalanceOrig: double (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- oldbalanceDest: double (nullable = true)
 |-- newbalanceDest: double (nullable = true)
 |-- isFraud: integer (nullable = true)
 |-- isFlaggedFraud: integer (nullable = true)


In [21]:
df.columns

['step',
 'type',
 'amount',
 'nameOrig',
 'oldbalanceOrg',
 'newbalanceOrig',
 'nameDest',
 'oldbalanceDest',
 'newbalanceDest',
 'isFraud',
 'isFlaggedFraud']

In [22]:
from pyspark.sql.functions import col

df.where(col("isFraud") == 1).show(5)

+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|   1|TRANSFER|  181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|      1|             0|
|   1|CASH_OUT|  181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|      1|             0|
|   1|TRANSFER| 2806.0|C1420196421|       2806.0|           0.0| C972765878|           0.0|           0.0|      1|             0|
|   1|CASH_OUT| 2806.0|C2101527076|       2806.0|           0.0|C1007251739|       26202.0|           0.0|      1|             0|
|   1|TRANSFER|20128.0| C137533655|      20128.0|           0.0|C1848415041|           0.0

In [23]:
from pyspark.sql.functions import col

df.where(col("isFlaggedFraud") == 1).show(5)

[Stage 371:>                                                        (0 + 5) / 5]

+----+--------+----------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
|step|    type|    amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|isFraud|isFlaggedFraud|
+----+--------+----------+-----------+-------------+--------------+-----------+--------------+--------------+-------+--------------+
| 212|TRANSFER|4953893.08| C728984460|   4953893.08|    4953893.08| C639921569|           0.0|           0.0|      1|             1|
| 250|TRANSFER|1343002.08|C1100582606|   1343002.08|    1343002.08|C1147517658|           0.0|           0.0|      1|             1|
| 279|TRANSFER| 536624.41|C1035541766|    536624.41|     536624.41|C1100697970|           0.0|           0.0|      1|             1|
| 387|TRANSFER|4892193.09| C908544136|   4892193.09|    4892193.09| C891140444|           0.0|           0.0|      1|             1|
| 425|TRANSFER|     1.0E7| C689608084|1.958504037E7| 1.958504037E7|C1

                                                                                

In [24]:
# możemy policzyć name orig
df.select("nameOrig").distinct().count()

25/04/26 06:37:54 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/26 06:37:54 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/26 06:37:54 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/26 06:37:54 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/26 06:37:54 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/26 06:37:54 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/26 06:37:54 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/26 06:37:54 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/04/26 06:37:55 WARN RowBasedKeyValueBatch: Calling spill() on

6353307

In [25]:
df.select("nameDest").distinct().count()

                                                                                

2722362

In [26]:
df.count()

6362620

In [27]:
# Różnica nameDest i nameOrig
6362620 - 6353307

9313

In [28]:
# usuwamy - nie potrzebujemy
df = df.drop("isFraud", "isFlaggedFraud")
df.show(5)

+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|    type|  amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+
|   1| PAYMENT| 9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|
|   1| PAYMENT| 1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|
|   1|TRANSFER|   181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|
|   1|CASH_OUT|   181.0| C840083671|        181.0|           0.0|  C38997010|       21182.0|           0.0|
|   1| PAYMENT|11668.14|C2048537720|      41554.0|      29885.86|M1230701703|           0.0|           0.0|
+----+--------+--------+-----------+-------------+--------------+-----------+--------------+--------------+


In [29]:
df.groupBy("type").count().show()

[Stage 388:>                                                      (0 + 10) / 10]

+--------+-------+
|    type|  count|
+--------+-------+
|TRANSFER| 532909|
| CASH_IN|1399284|
|CASH_OUT|2237500|
| PAYMENT|2151495|
|   DEBIT|  41432|
+--------+-------+


                                                                                

Krok (step) odwzorowuje jednostkę czasu w rzeczywistym świecie.
W tym przypadku 1 krok to 1 godzina.
Możemy więc założyć, że w naszym przykładzie istnieje inna praca (job), która uruchamia się co godzinę i pobiera wszystkie transakcje z tego przedziału czasowego.


In [ ]:
df.groupBy("step").count().show(3)

In [30]:
df.groupBy("step").count().orderBy(F.desc("count")).show()

[Stage 391:>                                                      (0 + 10) / 10]

+----+-----+
|step|count|
+----+-----+
|  19|51352|
|  18|49579|
| 187|49083|
| 235|47491|
| 307|46968|
| 163|46352|
| 139|46054|
| 403|45155|
|  43|45060|
| 355|44787|
|  15|44609|
| 186|43747|
| 306|43615|
|  17|43361|
| 259|43328|
|  16|42471|
| 379|41759|
|  14|41485|
|  42|41304|
| 354|40696|
+----+-----+


                                                                                

Możemy więc zapisywać wyniki tej pracy, filtrując dane dla każdego kroku i zapisując je do osobnego pliku.

In [31]:
df.select("step").distinct().count()

                                                                                

743

In [35]:
steps = df.select("step").distinct().collect()
len(steps)

                                                                                

743

In [36]:
for step in steps[:]:
    _df = df.where(df.step == step[0])
    _df.coalesce(1).write.mode("append").option("header", "true").csv(f"{output_tables}/paysim_by_steps")

                                                                                

In [39]:
part = spark.read.csv(f"{output_tables}/paysim_by_steps/part-00000-f31a50f0-d893-4565-809e-5d9cba0bef44-c000.csv", header=True, inferSchema=True)

In [40]:
part.groupBy("step").count().show()

+----+-----+
|step|count|
+----+-----+
| 135|27556|
+----+-----+


## Wersja strumieniowa

Stwórzmy strumieniową wersję tego wejścia.
Będziemy odczytywać każdy plik pojedynczo, tak jakby był częścią strumienia.

In [None]:
dataSchema = df.schema
dataSchema

`maxFilesPerTrigger` pozwala kontrolować, jak szybko Spark będzie odczytywał wszystkie pliki w folderze.
W tym przykładzie ograniczamy przepływ strumienia do jednego pliku na każde wyzwolenie (trigger).

In [None]:
streaming = spark.readStream.schema(dataSchema).option("header", "true").option( "maxFilesPerTrigger", 1).csv(f"{output_tables}/paysim_by_steps")

Skonfigurujmy transformację.

Kolumna `nameDest` to identyfikator odbiorcy transakcji:

In [None]:
dest_count = streaming.groupBy("nameDest").count().orderBy(F.desc("count"))

Teraz, gdy mamy naszą transformację, musimy określić miejsce docelowe wyników (sink).

W tym przykładzie zapiszemy wyniki do pamięci (memory sink), aby przechowywać je w pamięci RAM.

Musimy także określić, w jaki sposób Spark będzie wypisywać te dane.

W tym przykładzie użyjemy trybu complete output mode (przepisywanie wszystkich kluczy wraz z ich licznikami po każdym wyzwoleniu – triggerze).



In [None]:
dest_count.show()

In [None]:
activityQuery = (
    dest_count.writeStream.queryName("dest_counts")
    .format("memory")
    .outputMode("complete")
    .start()
)

# nie potrzebujemy w notebooku
# activityQuery.awaitTermination(1)                   


In [None]:
import time

count_threshold = 2

for x in range(5):
    _df = spark.sql(f"SELECT * FROM dest_counts WHERE nameDest != 'nameDest' AND count >= {count_threshold}")
    if _df.count() > 0:
        print("####### x:", x)
        _df.show(10)
    time.sleep(0.5)


Sprawdźmy czy strumień jest aktywny:

In [None]:
spark.streams.active[0].isActive

In [None]:
activityQuery.status

Na koniec możemy wyłączyć stream:

In [None]:
activityQuery.stop()

In [None]:
spark.stop()