# Hands-on example with Spark Streaming

Kaggle fraud detection data will be used in this notebook. We want to count the number of transactions that reach a recipient (nameDest) in streaming

#### Data import

In [1]:
import findspark
findspark.init()

In [2]:
#%load_ext nb_black
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

In [3]:
df = spark.read.csv("data/fraud_detection.csv", 
                    header=True, 
                    inferSchema=True)

In [4]:
df.columns

['step',
 'type',
 'amount',
 'nameOrig',
 'oldbalanceOrg',
 'newbalanceOrig',
 'nameDest',
 'oldbalanceDest',
 'newbalanceDest',
 'isFraud',
 'isFlaggedFraud']

In [5]:
df = df.drop("isFraud", "isFlaggedFraud")

In [6]:
df.show(2)

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|
|   1|PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
only showing top 2 rows



#### Input Data Streaming

Step maps to a unit of time, in this case, 1 step is 1 hour of time. In a real case we would have a process that runs every hour and would get all the transactions every hour.

In [7]:
df.groupBy("step").count().show(3)

+----+-----+
|step|count|
+----+-----+
| 148|   12|
| 463|   10|
| 471| 2620|
+----+-----+
only showing top 3 rows



Therefore, we can save the result of that work by filtering on each step and saving it to a separate file.

In [8]:
%%time
steps = df.select("step").distinct().collect()

for step in steps[:]:
    _df = df.where(f"step = {step[0]}")
    #by adding coalesce(1) we save the dataframe to one file
    _df.coalesce(1).write.mode("append").option("header", "true").csv("data/fraud")

Wall time: 0 ns


'steps = df.select("step").distinct().collect()\nfor step in steps[:]:\n    _df = df.where(f"step = {step[0]}")\n    #by adding coalesce(1) we save the dataframe to one file\n    _df.coalesce(1).write.mode("append").option("header", "true").csv("data/fraud")'

In [9]:
!cd data/fraud 

In [10]:
part = spark.read.csv(
    "data/fraud/part-00000-897a9dd3-832b-4e43-bcdc-c0009cfec4f0-c000.csv",
    header=True,
    inferSchema=True,
)

In [11]:
part.groupBy("step").count().show()

+----+-----+
|step|count|
+----+-----+
|  34|30904|
+----+-----+



#### Streaming Processing

Let's create a streaming version of this post, we will read each file one by one as if it were a streaming process.

In [12]:
dataSchema = part.schema

In [13]:
dataSchema

StructType(List(StructField(step,IntegerType,true),StructField(type,StringType,true),StructField(amount,DoubleType,true),StructField(nameOrig,StringType,true),StructField(oldbalanceOrg,DoubleType,true),StructField(newbalanceOrig,DoubleType,true),StructField(nameDest,StringType,true),StructField(oldbalanceDest,DoubleType,true),StructField(newbalanceDest,DoubleType,true)))

**maxFilesPerTrigger** allows you to control how quickly Spark will read all the files in the folder.
In this example, we are limiting the stream to one file per trigger.

In [14]:
streaming = (
    spark.readStream.schema(dataSchema)
    .option("maxFilesPerTrigger", 1)
    .csv("data/fraud/")
)

Let's set up a transform. The nameDest column is the ID of the recipient of the transaction.

In [15]:
dest_count = streaming.groupBy("nameDest").count().orderBy(F.desc("count"))

Now that we have our transformation, we need to specify an output for the results. For this example, we are going to write the results to memory.

We also need to define how Spark will generate that data. In this example, we will use complete output mode (rewriting all data along with its counts after each trigger) with **.outputMode("complete")**.

In this example, we won't include **activityQuery.awaitTermination()** because it's needed only to prevent the controller process from terminating when the stream is active. So in order to run this locally on a laptop, we won't include it.

In [16]:
activityQuery = (
    dest_count.writeStream.queryName("dest_counts")
    .format("memory")
    .outputMode("complete")
    .start()
)

# include this in production
# activityQuery.awaitTermination()

import time

for x in range(50):
    _df = spark.sql(
        "SELECT * FROM dest_counts WHERE nameDest != 'nameDest' AND count >= 2"
    )
    if _df.count() > 0:
        _df.show(10)
    time.sleep(0.5)

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333|    2|
| C763794011|    2|
| C488343370|    2|
+-----------+-----+

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C319921943|    2|
| C803352127|    2|
|C1887077333| 

Check if streaming is active

In [17]:
spark.streams.active[0].isActive

True

In [18]:
activityQuery.status

{'message': 'Processing new data',
 'isDataAvailable': True,
 'isTriggerActive': True}

If we want to disable streaming, we'll call activityQuery.stop() to reset the query for testing purposes.

In [19]:
activityQuery.stop()