# Suspicious Purchase Amounts with Stateful Streaming

In this lab, you will see how to use Spark Streaming to identify suspicious activity from a stream of purchase data.

## Objectives

Use Spark Streaming's support for maintaining state of data to watch an incoming stream of transactions to determine the presence of anomalies.


## Stream's State

Spark Streaming allows for maintaining an arbitary state of the stream, which is continuously updated with the new information as it arrives.


## Transaction Anomalies

We'll define a transaction anomaly as a transaction with debit exceeding the average debits by defined threshold since the streaming started.

In order to determine the average we will maintain the state, which is defined by the tuple:

```python
(debitCount, debitTotal, debitAverage)
```

Why do we need three values as opposed to just the average?

This is because in order to recalculate the average correctly we need to know how many debits and the debits total it was based on.

The state will be updated by the following function:

```python
updateStateByKey(func)
```

This function return a new state of the stream (which is the stream itself) where the state for each key is updated by applying the given function operating on the previous state of the key and the new values for the key. 

We'll see how it works in a bit.

Let's get started.

## Create Spark Session

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Python Streaming").getOrCreate()

## Create Streaming Context with Batch Interval Duration


In [None]:
from pyspark.streaming import StreamingContext

# the duration is in seconds
intervalDuration = 2

ssc = StreamingContext(spark.sparkContext, intervalDuration)

# checkpoint for backups
ssc.checkpoint("checkpoint")

## Define "update state" Function

In [None]:
def updateFunc(new_values, last_state):

    # handling the state that hasn't been created yet
    state = (0, 0, 0) if (last_state is None)  else last_state
    lst = list(state)
    for new_value in new_values:
      lst[0] = lst[0]+new_value[0]
      lst[1] = lst[1]+new_value[1]
    lst[2]=0 if (lst[0] == 0) else lst[1]/lst[0]

    return tuple(lst)


## Create Socket Stream

In [None]:
hostname = "nc"
port = 9999

# create a DStream that will connect to hostname:port
lines = ssc.socketTextStream(hostname, port)

## Parse Transactions and Identify Debits

In [None]:
# parsing transactions
rawTxns = lines.map(lambda st: st.split(",")).map(lambda el: (el[0], el[1], float(el[2])))

# filtering transactions for purchases only
debitTxns = rawTxns.filter(lambda s: s[2] < 0)

We need to add the key to transactions to be able to compare with the amount average.

In the real application it would be more natural to use the account id as the key.

In [None]:
keyedTxns = debitTxns.map(lambda s: (1, s))

## Update Stream's State

In [None]:
amounts = debitTxns.map(lambda s: (1, (1, s[2], s[2])))
meanAmount = amounts.updateStateByKey(updateFunc)

meanAmount.pprint()

## Join Transactions and State

In [None]:
joinedTxns = keyedTxns.join(meanAmount)

## Identify Suspicious Transactions

In [None]:
fraud_factor = 1.33

suspiciousTxns = joinedTxns.map(lambda v: v[1]).filter(lambda t: t[0][2] < t[1][2]*fraud_factor).map(lambda t: t[0])

suspiciousTxns.pprint()

## Start Streaming

In [None]:
ssc.start()             # starting the computation
ssc.awaitTermination()  # waiting for the computation to terminate


### Simulate transaction source

We will be receiving transaction data on port 9999.  There's a convenient utility in most Unix-like systems called `nc` ("netcat") that will take data from `stdin` and pipe it to a designated socket.

> Note:  Windows systems should have a similar command called `ncat`.

For our application, this will be `localhost` on port `9999`, of course.

Open a new terminal and, if you're on Linux, issue the command

``` 
nc localhost 9999
```

If you're on Mac, issue the command

``` 
nc -lk 9999
```

Check your documentation if you're on Windows (I'll avoid the obligatory snide "Windoze" or other remark here).

If there's an error, diagnose & correct it.  Otherwise, you should see the cursor on the next line, waiting for input on `stdin`.  Leave that process be for the moment; we're going to return to our program now and come back to it when we're ready to stream data.

Copy and paste a large amount of transaction data from tx.csv file.

## Conclusion

In this lab, you saw how Spark Streaming's support for maintaining the stream state can be used to process continuously streamed data and handle it not only with ease, but also with nearly the same API and programming concepts as batch data!


## Complete Solution

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Python Streaming").getOrCreate()

from pyspark.streaming import StreamingContext

# the duration is in seconds
intervalDuration = 2

ssc = StreamingContext(spark.sparkContext, intervalDuration)

# checkpoint for backups
ssc.checkpoint("checkpoint")

def updateFunc(new_values, last_state):

    # handling the state that hasn't been created yet
    state = (0, 0, 0) if (last_state is None)  else last_state
    lst = list(state)
    for new_value in new_values:
      lst[0] = lst[0]+new_value[0]
      lst[1] = lst[1]+new_value[1]
    lst[2]=0 if (lst[0] == 0) else lst[1]/lst[0]

    return tuple(lst)

hostname = "nc"
port = 9999

# create a DStream that will connect to hostname:port
lines = ssc.socketTextStream(hostname, port)

# parsing transactions
rawTxns = lines.map(lambda st: st.split(",")).map(lambda el: (el[0], el[1], float(el[2])))

# filtering transactions for purchases only
debitTxns = rawTxns.filter(lambda s: s[2] < 0)

# we need to add the key to transactions to be able to compare with the amount mean
# in the real application it would be more natural to use the account id as the key
keyedTxns = debitTxns.map(lambda s: (1, s))

# getting transaction amounts and updating the state holding amount mean
amounts = debitTxns.map(lambda s: (1, (1, s[2], s[2])))
meanAmount = amounts.updateStateByKey(updateFunc)
meanAmount.pprint()

# joining two streams with the purchase transactions and the mean
joinedTxns = keyedTxns.join(meanAmount)

fraud_factor = 1.33

# getting suspicious purchases
suspiciousTxns = joinedTxns.map(lambda v: v[1]).filter(lambda t: t[0][2] < t[1][2]*fraud_factor).map(lambda t: t[0])

suspiciousTxns.pprint()

ssc.start()             # starting the computation
ssc.awaitTermination()  # waiting for the computation to terminate