In [None]:
!pip install pyspark



In [None]:
import pyspark
from pyspark import SparkConf, SparkContext
import collections

conf = SparkConf().setMaster("local").setAppName("count")
sc = SparkContext.getOrCreate(conf=conf)

In [None]:
#Connect tới drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

spark = SparkSession.builder.getOrCreate()

Ý tưởng Spark Streaming : phân tích là kiểm tra các dữ liệu về giao dịch được thực hiện trong một ngân hàng có phải là giao dịch gian lận hay không.

Ý tưởng thuật toán: thay vì load lại toàn bộ data về lịch sử giao dịch thì sẽ chia thành các luồng dữ liệu riêng biệt, xử lý chúng và cuối cùng cho ra kết quả

# Data lấy từ kaggle
https://www.kaggle.com/arjunjoshua/predicting-fraud-in-financial-payment-services/data

In [None]:
df = spark.read.csv("drive/My Drive/Colab Notebooks/payment_data.csv", header=True, inferSchema=True)

In [None]:
df.columns

['step',
 'type',
 'amount',
 'nameOrig',
 'oldbalanceOrg',
 'newbalanceOrig',
 'nameDest',
 'oldbalanceDest',
 'newbalanceDest',
 'isFraud',
 'isFlaggedFraud']

In [None]:
# Loại bỏ 2 cột cuối
df = df.drop("isFraud", "isFlaggedFraud")

In [None]:
df.show(3)

+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
|step|    type| amount|   nameOrig|oldbalanceOrg|newbalanceOrig|   nameDest|oldbalanceDest|newbalanceDest|
+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
|   1| PAYMENT|9839.64|C1231006815|     170136.0|     160296.36|M1979787155|           0.0|           0.0|
|   1| PAYMENT|1864.28|C1666544295|      21249.0|      19384.72|M2044282225|           0.0|           0.0|
|   1|TRANSFER|  181.0|C1305486145|        181.0|           0.0| C553264065|           0.0|           0.0|
+----+--------+-------+-----------+-------------+--------------+-----------+--------------+--------------+
only showing top 3 rows



In [None]:
df.groupBy("step").count().show(3)

+----+-----+
|step|count|
+----+-----+
| 148|   12|
|  31|   12|
|  85|   14|
+----+-----+
only showing top 3 rows



-------------------------------------------------------------------------------

Select mỗi Step, mỗi tầng dữ liệu giao dịch sẽ tương ứng với mỗi Step, thực hiện lưu trữ datafram vào từng file.csv riêng biệt tương ứng trong drive/My Drive/Colab Notebooks/payment_data (cứ mỗi Step là sẽ xuất ra một file.csv)

In [None]:
%%time
steps = df.select("step").distinct().collect()
for step in steps[:]:
    _df = df.where(f"step = {step[0]}")
    _df.coalesce(1).write.mode("append").option("header", "true").csv("drive/My Drive/Colab Notebooks/payment_data")

CPU times: user 43.3 s, sys: 4.45 s, total: 47.8 s
Wall time: 2h 23min 54s


-------------------------------------------------------------------------------------------------

Sau khi xuất được các file dữ liệu giao dịch theo từng Step, chọn một file bất kì 

In [None]:
part = spark.read.csv(
    "drive/My Drive/Colab Notebooks/payment_data/part-00000-0a7b6e13-1b71-4dc2-9162-ac161c018b5a-c000.csv",
    header=True,
    inferSchema=True,
)

In [None]:
part.groupBy("step").count().show()

+----+-----+
|step|count|
+----+-----+
| 123|   52|
+----+-----+



In [None]:
dataSchema = part.schema

In [None]:
dataSchema

StructType(List(StructField(step,IntegerType,true),StructField(type,StringType,true),StructField(amount,DoubleType,true),StructField(nameOrig,StringType,true),StructField(oldbalanceOrg,DoubleType,true),StructField(newbalanceOrig,DoubleType,true),StructField(nameDest,StringType,true),StructField(oldbalanceDest,DoubleType,true),StructField(newbalanceDest,DoubleType,true)))

maxFilesPerTrigger cho phép chúng ta kiểm soát tốc độ Spark sẽ đọc tất cả các file trong thư mục "payment_data". Trong trường hợp này, chúng ta sẽ giới hạn lưu lượng của luồng ở một tệp cho mỗi việc kích hoạt.

In [None]:
streaming = (
    spark.readStream.schema(dataSchema)
    .option("maxFilesPerTrigger", 1)
    .csv("drive/My Drive/Colab Notebooks/payment_data")
)

In [None]:
dest_count = streaming.groupBy("nameDest").count().orderBy(F.desc("count"))

Bây giờ chúng ta đã có tranformation, chúng ta cần hướng tới một đầu ra cho các kết quả. Trong trường hợp này, chúng ta sẽ ghi kết quả vào bộ nhớ.

Chúng ta cũng cần xác định cách Spark_Streaming sẽ xuất ra dữ liệu đó. Ở đây, chúng ta sẽ sử dụng output hoàn chỉnh (code lại tất cả các luồng cùng với số lượng của chúng sau mỗi lần kích hoạt).

In [None]:
activityQuery = (
    dest_count.writeStream.queryName("dest_counts")
    .format("memory")
    .outputMode("complete")
    .start()
)

import time

for x in range(50):
    _df = spark.sql(
        "SELECT * FROM dest_counts WHERE nameDest != 'nameDest' AND count >= 2"
    )
    if _df.count() > 0:
        _df.show(10)
    time.sleep(0.5)

+-----------+-----+
|   nameDest|count|
+-----------+-----+
|  C59741756|    4|
|C1505584626|    4|
| C392045706|    4|
|C2051223495|    4|
|C1415086760|    4|
| C442014622|    4|
|C1124405901|    4|
| C452438968|    3|
|C1182908789|    3|
| C972000700|    3|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
|  C59741756|    4|
|C1505584626|    4|
| C392045706|    4|
|C2051223495|    4|
|C1415086760|    4|
| C442014622|    4|
|C1124405901|    4|
| C972000700|    3|
| C452438968|    3|
|C1182908789|    3|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C225852166|    5|
| C957956212|    5|
|C1364882847|    5|
|C1920581720|    5|
| C451765861|    5|
| C234654339|    5|
|C1930186237|    4|
|C2031896350|    4|
| C947126193|    4|
|C1015650621|    4|
+-----------+-----+
only showing top 10 rows

+-----------+-----+
|   nameDest|count|
+-----------+-----+
| C225852166|    5|
| 

In [None]:
spark.streams.active[0].isActive

True

In [None]:
activityQuery.status

{'isDataAvailable': True,
 'isTriggerActive': True,
 'message': 'Processing new data'}

In [None]:
activityQuery.stop()