# ДЗ №3. Очистка данных по транзакциям 

## Подготовка к работе 

### Проверка hostname 

In [1]:
! hostname

rc1a-dataproc-m-rig0u7i3lwc2dxan.mdb.yandexcloud.net


Отлично, я на Spark-кластере!

### Импорты 

In [2]:
import functools
import io
import os

import boto3
import pandas as pd
import matplotlib.pyplot as plt

In [3]:
import findspark
findspark.init()

In [4]:
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max, min, mean
from pyspark.sql.types import StructType, StructField, IntegerType, TimestampType, FloatType, ByteType
from pyspark_dist_explore import hist

## Загрузка данных

### Артефакты объектного хранилища с исходными данными 

In [5]:
S3FS_URL = 'https://storage.yandexcloud.net'
BUCKET_NAME = 'otus-mlops-bucket'
FRAUD_DATA_DIR = 'fraud_data/'
FRAUD_DATA_EXT = '.txt'

### Подключение к объектному хранилищу с исходными данными (s3, код не используется)

In [6]:
# session = boto3.Session(
#     aws_access_key_id=(os.environ['BUCKET_RAW_DATA_ACCESS_ID']),
#     aws_secret_access_key=(os.environ['BUCKET_RAW_DATA_ACCESS_KEY']),
#     region_name='ru-central1',
# )
# s3 = session.client('s3', endpoint_url=S3FS_URL)

In [7]:
# filenames = []
# for metainfo in s3.list_objects(Bucket=BUCKET_NAME)['Contents']:
#     name = metainfo['Key']
#     if name.startswith(FRAUD_DATA_DIR) and name.endswith(FRAUD_DATA_EXT):
#         filenames.append(name)

In [8]:
# filenames

### Формирование датафрейма с данными (s3, код не используется)

Данных очень много. Для проведения анализа, выработки подхода к очистке данных и написания соответствующего скрипта достаточно будет загрузить только часть данных. Возьмём первый и последний по времени файлы. В каждом из них возьмём по 10 млн. строк.

In [9]:
# filenames = sorted(filenames)
# filenames_to_use = [filenames[0], filenames[-1]]

In [10]:
# %%time
# data = pd.DataFrame()
# for filename in filenames_to_use:
#     print(f'Get the file {filename} ...')
#     obj = s3.get_object(Bucket=BUCKET_NAME, Key=filename)    
#     df = pd.read_csv(io.BytesIO(obj['Body'].read()), nrows=int(1e4), delimiter='[|,]', index_col=0, engine='python')
# #     df = pd.read_csv(io.BytesIO(obj['Body'].read()), nrows=int(1e7), delimiter='[|,]', index_col=0, engine='python')
#     data = pd.concat([data, df])
#     print('    ... data successfully added!')
#     del df

In [11]:
# data.info()

### Загрузка данных из HDFS

В связи с тем, что с объектного хранилища получать данные было долго, они были перенесены на HDFS Spark-кластера.

In [12]:
!hdfs dfs -ls /user/ubuntu/fraud_data

Found 40 items
-rw-r--r--   1 ubuntu hadoop 2807409271 2024-02-11 20:27 /user/ubuntu/fraud_data/2019-08-22.txt
-rw-r--r--   1 ubuntu hadoop 2854479008 2024-02-11 20:23 /user/ubuntu/fraud_data/2019-09-21.txt
-rw-r--r--   1 ubuntu hadoop 2895460543 2024-02-11 20:24 /user/ubuntu/fraud_data/2019-10-21.txt
-rw-r--r--   1 ubuntu hadoop 2939120942 2024-02-11 20:25 /user/ubuntu/fraud_data/2019-11-20.txt
-rw-r--r--   1 ubuntu hadoop 2995462277 2024-02-11 20:28 /user/ubuntu/fraud_data/2019-12-20.txt
-rw-r--r--   1 ubuntu hadoop 2994906767 2024-02-11 20:28 /user/ubuntu/fraud_data/2020-01-19.txt
-rw-r--r--   1 ubuntu hadoop 2995431240 2024-02-11 20:26 /user/ubuntu/fraud_data/2020-02-18.txt
-rw-r--r--   1 ubuntu hadoop 2995176166 2024-02-11 20:25 /user/ubuntu/fraud_data/2020-03-19.txt
-rw-r--r--   1 ubuntu hadoop 2996034632 2024-02-11 20:25 /user/ubuntu/fraud_data/2020-04-18.txt
-rw-r--r--   1 ubuntu hadoop 2995666965 2024-02-11 20:27 /user/ubuntu/fraud_data/2020-05-18.txt
-rw-r--r--   1

### Артефакты HDFS с исходными данными 

In [13]:
SPARK_CLUSTER_URL = 'hdfs://rc1a-dataproc-m-rig0u7i3lwc2dxan.mdb.yandexcloud.net'
FRAUD_DATA_DIR = '/user/ubuntu/fraud_data'

Анализ данных для чистки проведём на примере одного файла

In [14]:
filename_example = '2019-08-22.txt'
path_to_example = f'{SPARK_CLUSTER_URL}{FRAUD_DATA_DIR}/{filename_example}'

Перенесём его на мастер-ноду, чтобы повысить скорость доступа (сначала было в ячейке).

In [15]:
path_to_local_example = f'fraud_data/{filename_example}'

In [16]:
# !sudo hdfs dfs -get '/user/ubuntu/fraud_data/2019-08-22.txt' '~/fraud_data/2019-08-22.txt'

In [17]:
path_to_local_example

'fraud_data/2019-08-22.txt'

In [18]:
%%time
spark = SparkSession.builder.appName('mlops') \
            .config(conf=SparkConf()) \
            .config('spark.sql.execution.arrow.pyspark.enabled', True) \
            .config('spark.driver.memory','8G') \
            .getOrCreate()

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2024-02-16 22:01:26,700 WARN util.Utils: spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs.
2024-02-16 22:01:32,975 WARN util.Utils: spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs.
2024-02-16 22:01:32,985 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpo

CPU times: user 28.3 ms, sys: 8.1 ms, total: 36.4 ms
Wall time: 16.2 s


Захардкодим список столбцов и их типов для pyspark (есть проблемы с различием разделителей для имён столбцов и для величин).

In [19]:
data_cols = {
    'tx_id': IntegerType,
    'tx_datetime': TimestampType,
    'customer_id': IntegerType,
    'terminal_id': IntegerType,
    'tx_amount': FloatType,
    'tx_time_in_seconds': IntegerType,
    'tx_time_in_days': IntegerType,
    'tx_is_fraud': ByteType,
    'tx_fraud_scenario': ByteType,
}

Соответствующая схема

In [20]:
schema = StructType([StructField(col_name, col_type(), True) for col_name, col_type in data_cols.items()])
schema

StructType(List(StructField(tx_id,IntegerType,true),StructField(tx_datetime,TimestampType,true),StructField(customer_id,IntegerType,true),StructField(terminal_id,IntegerType,true),StructField(tx_amount,FloatType,true),StructField(tx_time_in_seconds,IntegerType,true),StructField(tx_time_in_days,IntegerType,true),StructField(tx_is_fraud,ByteType,true),StructField(tx_fraud_scenario,ByteType,true)))

Загрузим данные файла

In [21]:
data = spark.read.csv(path_to_local_example, schema=schema, header=False, sep=',', comment='#')

In [22]:
data.show(15)

[Stage 0:>                                                          (0 + 1) / 1]

+-----+-------------------+-----------+-----------+---------+------------------+---------------+-----------+-----------------+
|tx_id|        tx_datetime|customer_id|terminal_id|tx_amount|tx_time_in_seconds|tx_time_in_days|tx_is_fraud|tx_fraud_scenario|
+-----+-------------------+-----------+-----------+---------+------------------+---------------+-----------+-----------------+
|    0|2019-08-22 06:51:03|          0|        711|    70.91|             24663|              0|          0|                0|
|    1|2019-08-22 05:10:37|          0|          0|    90.55|             18637|              0|          0|                0|
|    2|2019-08-22 19:05:33|          0|        753|    35.38|             68733|              0|          0|                0|
|    3|2019-08-22 07:21:33|          0|          0|    80.41|             26493|              0|          0|                0|
|    4|2019-08-22 09:06:17|          1|        981|   102.83|             32777|              0|          0|   

                                                                                

## Анализ данных на качество 

Общая информация по всем столбцам

In [23]:
%%time
data.describe().show()



+-------+--------------------+------------------+------------------+------------------+------------------+----------------+-------------------+-------------------+
|summary|               tx_id|       customer_id|       terminal_id|         tx_amount|tx_time_in_seconds| tx_time_in_days|        tx_is_fraud|  tx_fraud_scenario|
+-------+--------------------+------------------+------------------+------------------+------------------+----------------+-------------------+-------------------+
|  count|            46988418|          46988418|          46988418|          46988418|          46988418|        46988418|           46988418|           46988418|
|   mean|2.3494115552178305E7|500433.83151635365|26597.231571533222| 54.23395999449788|1296054.5003670265|14.5006346883183|0.05377931642644364|0.10841507794537794|
| stddev|1.3564333711807257E7| 288539.1737685039|1528137.6620335374|41.250335143800804|  748049.508734945|8.65541588439431|0.22558169835808656| 0.4568780500029874|
|    min|       

                                                                                

In [24]:
%%time
data.count()



CPU times: user 12.7 ms, sys: 7.35 ms, total: 20 ms
Wall time: 15.3 s


                                                                                

46988418

Мощность множества идентификаторов транзакций `tx_id` (46988236) меньше числа записей, то есть какие-то из них точно повторяются. Возможно, какие-то записи являются полными дубликатами. Удалим их, и посмотрим, что можно сделать с оставшимися записями, если у них одинаковые идентификаторы.

Также видно, что в поле `customer_id` есть отрицательные значения. Их смысл неясен, поэтому тоже стоит удалить такие записи как записи без доверия (однако, если их много, можно бы и оставить, так как по сути в классификации они будут либо в закодированном виде, либо являться метками графа).

Ещё в поле `tx_amount` есть нулевые значения. Так как мы работаем с денежными переводами, нам не интересны нулевые переводы (по кранайней мере в первой постановке, однако это может оказаться полезным, если это, например, неудачная попытка отправки средств).

Напишем декоратор для функций удаления строк с проблемами.

In [25]:
def count_decorator(drop_func):
    @functools.wraps(drop_func)
    def wrapper(*args, **kwargs):
        data_ = args[0]
        n1 = data_.count()
        print(f'Number of original data rows: {n1}')
        data_ = drop_func(data_)
        n2 = data_.count()
        print(f'Number of data rows after deleting duplicated data: {n2}')
        n3 = n1 - n2
        print(f'Number of duplicate rows: {n3}')
        drop_info_ = {
            'rows_before': n1,
            'rows_after': n2,
            'diff': n3,
        }
        return data_, drop_info_
    return wrapper

Декоратор решил тут не использовать для экономии времени вычислений. Использую его только в скрипте.

### Удаление полных дубликатов 

In [26]:
# @count_decorator
def drop_duplicates(data_):
    return data_.dropDuplicates()

In [27]:
%%time
data = drop_duplicates(data)
data.count()



CPU times: user 119 ms, sys: 22.8 ms, total: 142 ms
Wall time: 5min 3s


                                                                                

46988237

Помимо полных дуликатов стоит посмотреть, нет ли дубликатов по признаковому пространству (одинаковые признаки, разные метки класса). Полностью удалим такие записи.

### Удаление дубликатов по признаковому пространству

In [28]:
# @count_decorator
def drop_feature_duplicates(data_):
    return data_.groupBy([col_ for col_ in data_.columns if col_ != 'tx_is_fraud']).count().where('count = 1')

In [29]:
%%time
data = drop_feature_duplicates(data)
data.count()



CPU times: user 191 ms, sys: 58.7 ms, total: 250 ms
Wall time: 5min 18s


                                                                                

46988237

Таких строк не оказалось. Ну и хорошо, добавлять это в скрипт не будем.

### Удаление отрицательных индентификаторов клиентов

In [30]:
# @count_decorator
def drop_negative_customers(data_):
    return data_.filter(data_['customer_id'] >= 0)

In [31]:
%%time
data = drop_negative_customers(data)
data.count()



CPU times: user 210 ms, sys: 35.2 ms, total: 245 ms
Wall time: 5min 42s


                                                                                

46988173

### Удаление строк с нулевым объёмом перевода

In [32]:
# @count_decorator
def drop_nonpositive_amount(data_):
    return data_.filter(data_['tx_amount'] > 0)

In [33]:
%%time
data = drop_nonpositive_amount(data)
data.count()



CPU times: user 179 ms, sys: 32.3 ms, total: 211 ms
Wall time: 5min 22s


                                                                                

46987289

Проведём также аналитику каждого столбца в отдельности. Напишем небольшую функцию анализа столбца с подсчётом числа вхождений каждого уникального значения в столбце и сортировкой в обе стороны.

In [34]:
def value_counts(data_, col_):
    count = data_.groupBy(col_).count().orderBy('count', ascending=False)
    print(count.head(10))
    print(count.tail(10))

### tx_datetime

Проверим, нет ли плохих значений в столбце со временем транзакции без подсчёта уникальных значений.

In [35]:
%%time
data.agg(min("tx_datetime"), max("tx_datetime")).show()



+-------------------+-------------------+
|   min(tx_datetime)|   max(tx_datetime)|
+-------------------+-------------------+
|2019-08-22 00:00:00|2019-09-20 23:59:59|
+-------------------+-------------------+

CPU times: user 135 ms, sys: 24.4 ms, total: 160 ms
Wall time: 3min 8s




Кажется, всё в порядке.

###  tx_id

Согласно `describe()` выше у столбца `tx_id` 

In [36]:
%%time
value_counts(data, 'tx_id')

                                                                                

[Row(tx_id=28051526, count=1), Row(tx_id=30114151, count=1), Row(tx_id=43881607, count=1), Row(tx_id=44528945, count=1), Row(tx_id=15189129, count=1), Row(tx_id=21311391, count=1), Row(tx_id=11730797, count=1), Row(tx_id=34877848, count=1), Row(tx_id=35220158, count=1), Row(tx_id=19153704, count=1)]


[Stage 41:>                                                         (0 + 1) / 1]

[Row(tx_id=35351914, count=1), Row(tx_id=26117107, count=1), Row(tx_id=43481098, count=1), Row(tx_id=17735241, count=1), Row(tx_id=12438871, count=1), Row(tx_id=13743141, count=1), Row(tx_id=20813020, count=1), Row(tx_id=22214544, count=1), Row(tx_id=2432003, count=1), Row(tx_id=4004245, count=1)]
CPU times: user 690 ms, sys: 98.2 ms, total: 788 ms
Wall time: 12min 59s


                                                                                

Проблемых значений столбца `tx_id` не видно, одинаковых идентификаторов не осталось.

### customer_id

In [37]:
%%time
value_counts(data, 'customer_id')

                                                                                

[Row(customer_id=918024, count=121), Row(customer_id=857766, count=120), Row(customer_id=330952, count=119), Row(customer_id=399172, count=118), Row(customer_id=208556, count=117), Row(customer_id=368394, count=117), Row(customer_id=308035, count=117), Row(customer_id=704848, count=117), Row(customer_id=800861, count=117), Row(customer_id=725932, count=116)]


                                                                                

[Row(customer_id=358480, count=1), Row(customer_id=518803, count=1), Row(customer_id=101683, count=1), Row(customer_id=818605, count=1), Row(customer_id=823556, count=1), Row(customer_id=745373, count=1), Row(customer_id=763147, count=1), Row(customer_id=682106, count=1), Row(customer_id=927901, count=1), Row(customer_id=518115, count=1)]
CPU times: user 510 ms, sys: 113 ms, total: 623 ms
Wall time: 11min 23s


Проблемых значений столбца `customer_id` не видно.

### terminal_id

In [38]:
%%time
value_counts(data, 'terminal_id')

                                                                                

[Row(terminal_id=0, count=2041628), Row(terminal_id=145, count=1298651), Row(terminal_id=81, count=774903), Row(terminal_id=97, count=139789), Row(terminal_id=937, count=89374), Row(terminal_id=969, count=85823), Row(terminal_id=979, count=80430), Row(terminal_id=993, count=79982), Row(terminal_id=939, count=79151), Row(terminal_id=980, count=78898)]




[Row(terminal_id=392, count=18393), Row(terminal_id=348, count=16978), Row(terminal_id=299, count=16677), Row(terminal_id=223, count=15332), Row(terminal_id=89518096, count=13696), Row(terminal_id=21861, count=1552), Row(terminal_id=22225, count=333), Row(terminal_id=618230, count=77), Row(terminal_id=436002, count=2), Row(terminal_id=22129, count=1)]
CPU times: user 369 ms, sys: 44.6 ms, total: 413 ms
Wall time: 8min 5s


                                                                                

Подозрительно, что самое частое значение у терминала – `0`. Возможно, это ошибка обработки номера терминала. С другой стороны, это может быть просто первый и самый популярный терминал (специально устанавливали там, где будет большой спрос).  
Также через некоторые терминалы осуществляется заметно мало транзакций, может быть какой-то сбой, а соовтетственно, нерелевантные данные.  
Обе ситуации никак исправлять не будем, однако, в реальной задаче надо бы было уточнить этот момент у коллег.

### tx_amount

In [39]:
%%time
value_counts(data, 'tx_amount')

                                                                                

[Row(tx_amount=9.59000015258789, count=6575), Row(tx_amount=8.960000038146973, count=6553), Row(tx_amount=9.390000343322754, count=6540), Row(tx_amount=12.020000457763672, count=6506), Row(tx_amount=8.699999809265137, count=6488), Row(tx_amount=10.369999885559082, count=6480), Row(tx_amount=8.84000015258789, count=6477), Row(tx_amount=9.300000190734863, count=6473), Row(tx_amount=9.859999656677246, count=6473), Row(tx_amount=8.949999809265137, count=6459)]


                                                                                

[Row(tx_amount=476.3299865722656, count=1), Row(tx_amount=473.07000732421875, count=1), Row(tx_amount=567.5, count=1), Row(tx_amount=719.739990234375, count=1), Row(tx_amount=611.1099853515625, count=1), Row(tx_amount=671.510009765625, count=1), Row(tx_amount=392.3500061035156, count=1), Row(tx_amount=294.82000732421875, count=1), Row(tx_amount=885.8300170898438, count=1), Row(tx_amount=613.0499877929688, count=1)]
CPU times: user 307 ms, sys: 51.5 ms, total: 358 ms
Wall time: 7min 32s


Проблемых значений столбца `tx_amount` не видно.

### tx_time_in_seconds и tx_time_in_days

Поля `tx_time_in_seconds` и `tx_time_in_days` не вызывают вопросов, поэтому отдельно смотреть на них не будем.

### tx_is_fraud

In [40]:
%%time
value_counts(data, 'tx_is_fraud')

AnalysisException: cannot resolve '`tx_is_fraud`' given input columns: [count, customer_id, terminal_id, tx_amount, tx_datetime, tx_fraud_scenario, tx_id, tx_time_in_days, tx_time_in_seconds];;
'Aggregate ['tx_is_fraud], ['tx_is_fraud, count(1) AS count#893L]
+- Filter (tx_amount#4 > cast(0 as float))
   +- Filter (customer_id#2 >= 0)
      +- Filter (count#630L = cast(1 as bigint))
         +- Aggregate [tx_id#0, tx_datetime#1, customer_id#2, terminal_id#3, tx_amount#4, tx_time_in_seconds#5, tx_time_in_days#6, tx_fraud_scenario#8], [tx_id#0, tx_datetime#1, customer_id#2, terminal_id#3, tx_amount#4, tx_time_in_seconds#5, tx_time_in_days#6, tx_fraud_scenario#8, count(1) AS count#630L]
            +- Deduplicate [customer_id#2, tx_time_in_seconds#5, tx_amount#4, tx_is_fraud#7, tx_time_in_days#6, terminal_id#3, tx_datetime#1, tx_id#0, tx_fraud_scenario#8]
               +- Relation[tx_id#0,tx_datetime#1,customer_id#2,terminal_id#3,tx_amount#4,tx_time_in_seconds#5,tx_time_in_days#6,tx_is_fraud#7,tx_fraud_scenario#8] csv


Ошибка возникла из-за фильтрации дубликатов по признаковому пространству: выполнялась группировка без столбца `tx_is_fraud`. Не страшно, так как это тестовый скрипт. В итоговом скрипте указанной фильтрации не будет. Тем не менее, проблемых значений столбца `tx_is_fraud` не было, когда эта часть кода отрабатывала – перезапускать не буду.

### tx_fraud_scenario

In [41]:
%%time
value_counts(data, 'tx_fraud_scenario')

                                                                                

[Row(tx_fraud_scenario=0, count=44460337), Row(tx_fraud_scenario=2, count=2435404), Row(tx_fraud_scenario=3, count=65895), Row(tx_fraud_scenario=1, count=25653)]


                                                                                

[Row(tx_fraud_scenario=0, count=44460337), Row(tx_fraud_scenario=2, count=2435404), Row(tx_fraud_scenario=3, count=65895), Row(tx_fraud_scenario=1, count=25653)]
CPU times: user 314 ms, sys: 55.8 ms, total: 370 ms
Wall time: 7min 29s


Проблемых значений столбца `tx_fraud_scenario` не видно.

## Итог 

Таким образом, чистка данных будет заключаться в следующем:
1. Удаление полных дубликатов
1. Удаление отрицательных индентификаторов клиентов
1. Удаление строк с нулевым объёмом перевода