https://www.kaggle.com/ealaxi/paysim1

In [None]:
# Apache Spark uses Java, so first we must install that
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
from google.colab import drive
drive.mount('/content/drive')
!tar xzf /content/drive/MyDrive/spark-3.3.0-bin-hadoop3.tgz

In [None]:
# Set up environment variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

In [None]:
# Install findspark, which helps python locate the psyspark module files
!pip install -q findspark
import findspark
findspark.init()

In [None]:
# Finally, we initialse a "SparkSession", which handles the computations
from pyspark.sql import SparkSession
spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [None]:
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [None]:
df = spark.read.csv("/content/paysim.csv", header=True, inferSchema=True)

In [None]:
df.show(10)
df.printSchema()

+-------------------+--------------------+
|         created_at|                text|
+-------------------+--------------------+
|2019-04-17 07:34:18|👍 on @YouTube: G...|
|2019-04-16 03:34:16|👍 on @YouTube: U...|
|2019-04-16 03:06:08|Liked on YouTube:...|
|2019-04-17 07:07:38|Liked on YouTube:...|
|2019-04-17 07:34:09|@MrLegenDarius un...|
|2019-04-17 07:33:17|Reddit Is Burstin...|
|2019-04-16 18:13:17|Reddit Is Burstin...|
|2019-04-17 03:48:38|10 'Game Of Thron...|
|2019-04-16 19:13:27|What Reddit's 'Ga...|
|2019-04-17 03:35:08|GAME OF THRONES S...|
+-------------------+--------------------+
only showing top 10 rows

root
 |-- created_at: string (nullable = true)
 |-- text: string (nullable = true)



141520

In [None]:
|df.columns

['step',
 'type',
 'amount',
 'nameOrig',
 'oldbalanceOrg',
 'newbalanceOrig',
 'nameDest',
 'oldbalanceDest',
 'newbalanceDest',
 'isFraud',
 'isFlaggedFraud']

In [None]:
df = df.drop("isFraud", "isFlaggedFraud")

In [None]:
df.show(2)

+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|   type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
|   1|PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|
|   1|PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|
+----+-------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
only showing top 2 rows



Step maps a unit of time in the real world. In this case 1 step is 1 hour of time. So we can assume for this example that we have another job that runs every hour and gets all the transactions in that time frame.

In [None]:
df.groupBy("step").count().sort('step').show(24)

+----+-----+
|step|count|
+----+-----+
|   1| 2708|
|   2| 1014|
|   3|  552|
|   4|  565|
|   5|  665|
|   6| 1660|
|   7| 6837|
|   8|21097|
|   9|37628|
|  10|35991|
|  11|37241|
|  12|36153|
|  13|37515|
|  14|41485|
|  15|44609|
|  16|42471|
|  17|43361|
|  18|49579|
|  19|51352|
|  20|25415|
+----+-----+



We can therefore save the output of that job by filtering on each step and saving it to a separate file.

In [None]:
#%%time
steps = df.select("step").distinct().collect()
for step in steps[:]:
   _df = df.where(f"step = {step[0]}")
   #by adding coalesce(1) we save the dataframe to one file
   _df.coalesce(1).write.mode("append").option("header", "true").csv("data/paysim")

In [None]:
!cd data/paysim/ && ls

part-00000-093eb8e0-5350-41f6-b94f-5ebab1922665-c000.csv
part-00000-0e3adbee-ef2d-45b8-a912-282082949966-c000.csv
part-00000-0e5914d8-9d9b-46da-a0bd-e69673ca20c6-c000.csv
part-00000-1376f6b4-f6ca-4fa5-a145-41349d323616-c000.csv
part-00000-3620f102-52b3-48d1-be28-ff8914187197-c000.csv
part-00000-367c7653-08fb-45c2-ac64-352a9032e9eb-c000.csv
part-00000-3973430e-b4c5-4a15-be4c-c75f047728c2-c000.csv
part-00000-487d796e-0a7d-4f92-8223-7c8f5d6bbbbe-c000.csv
part-00000-57aace50-ba33-44d8-9eb5-7dc8132166fc-c000.csv
part-00000-7657973e-b861-4b81-8e41-9ce014023a22-c000.csv
part-00000-7b615707-9ffd-4adf-9c35-ff0484fed723-c000.csv
part-00000-8a6e86f3-f9da-433e-aa79-0088a5a2f91c-c000.csv
part-00000-b3503087-ad46-434d-a040-94d44b34bb14-c000.csv
part-00000-b8fa2b81-f560-4f95-8460-91fe71c54080-c000.csv
part-00000-c4735edc-b1b5-4681-b37f-557d865d2630-c000.csv
part-00000-d5c8fdde-9579-4317-8053-bc795a2a3289-c000.csv
part-00000-dc020f88-2d02-43a4-84ee-0f44748271b0-c000.csv
part-00000-ebc1c9d1-c680-40b9-9

In [None]:
part = spark.read.csv(
    "data/paysim/part-00000-093eb8e0-5350-41f6-b94f-5ebab1922665-c000.csv",
    header=True,
    inferSchema=True,
)

In [None]:
part.groupBy("step").count().show()

+----+-----+
|step|count|
+----+-----+
|  18|49579|
+----+-----+



Let’s create a streaming version of this input, we'll read each file one by one as if it was a stream.

In [None]:
dataSchema = part.schema

In [None]:
dataSchema

StructType(List(StructField(step,IntegerType,true),StructField(type,StringType,true),StructField(amount,DoubleType,true),StructField(nameOrig,StringType,true),StructField(oldbalanceOrg,DoubleType,true),StructField(newbalanceOrig,DoubleType,true),StructField(nameDest,StringType,true),StructField(oldbalanceDest,DoubleType,true),StructField(newbalanceDest,DoubleType,true)))

*maxFilesPerTrigger* allows you to control how quickly Spark will read all of the files in the folder.
In this example we're limiting the flow of the stream to one file per trigger.


In [None]:
streaming = (
    spark.readStream.schema(dataSchema)
    .option("maxFilesPerTrigger", 1)
    .csv("data/paysim/")
)

Let's set up a transformation.

The nameDest column is the recipient ID of the transaction.

In [None]:
dest_count = streaming.groupBy("nameDest").count().orderBy(F.desc("count"))

Now that we have our transformation, we need to specify an output sink for the results. For this example, we're going to write to a memory sink which keeps the results in memory.

We also need to define how Spark will output that data. In this example, we'll use the complete output mode (rewriting all of the keys along with their counts after every trigger).

In this example we won't include activityQuery.awaitTermination() because it is required only to prevent the driver process from terminating when the stream is active.

So in order to be able to run this locally in a notebook we won't include it.

In [None]:
activityQuery = (
    dest_count.writeStream.queryName("dest_counts")
    .format("memory")
    .outputMode("complete")
    .start()
)

# include this in production
# activityQuery.awaitTermination()

import time

for x in range(50):
    _df = spark.sql(
        "SELECT * FROM dest_counts WHERE nameDest != 'nameDest' AND count >= 2"
    )
    if _df.count() > 0:
        _df.show(10)
    time.sleep(0.5)

+-----------+-----+
|   nameDest|count|
+-----------+-----+
|C1590550415|   34|
| C985934102|   33|
| C564160838|   30|
|C2083562754|   25|
| C665576141|   23|
|C1286084959|   23|
| C401424608|   22|
|  C33524623|   22|
| C998351292|   22|
|C1023714065|   22|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C985934102|   36|
|C1590550415|   35|
| C564160838|   30|
|C2083562754|   27|
|C1789550256|   25|
| C998351292|   25|
| C665576141|   24|
| C451111351|   23|
|C1286084959|   23|
| C401424608|   22|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C985934102|   46|
|C1590550415|   44|
|C2083562754|   31|
| C665576141|   30|
|C1360767589|   30|
|C1789550256|   30|
| C564160838|   30|
| C451111351|   28|
|C1286084959|   26|
| C998351292|   25|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C985934102|   46|
|C

KeyboardInterrupt: ignored

Check if stream is active

In [None]:
spark.streams.active[0].isActive

AttributeError: ignored

In [None]:
activityQuery.status

TypeError: ignored

If we  want to turn off the stream we'll run activityQuery.stop() to reset the query for testing purposes.

In [None]:
activityQuery.stop()

Py4JError: ignored