In [1]:
# !pip install pydoop
# !pip install boto3
# !pip install pyspark==3.0.2
# !pip install findspark
# !pip install sh

# Imports

In [2]:
import sh
import boto3
import findspark

import pydoop.hdfs as hdfs

from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Create Spark Session

In [3]:
findspark.init()
# findspark.find()

In [4]:
spark = (SparkSession.builder
         .config(conf=SparkConf())
         .appName("local")
         .getOrCreate())

spark.conf.set('spark.sql.repl.eagerEval.enabled', True) 

schema = StructType([StructField("transaction_id", LongType(), True),
                     StructField("tx_datetime", TimestampType(), True),
                     StructField("customer_id", LongType(), True),
                     StructField("terminal_id", LongType(), True),
                     StructField("tx_amount", DoubleType(), True),
                     StructField("tx_time_seconds", LongType(), True),
                     StructField("tx_time_days", LongType(), True),
                     StructField("tx_fraud", LongType(), True),
                     StructField("tx_fraud_scenario", LongType(), True)])

# File Process

### Init vars

In [5]:
!hdfs getconf -confKey fs.defaultFS # получить хост hdfs

hdfs://rc1a-dataproc-m-r02wxfzuv2ttjl1t.mdb.yandexcloud.net


In [6]:
hdfs_host = 'hdfs://rc1a-dataproc-m-r02wxfzuv2ttjl1t.mdb.yandexcloud.net'
path = '/user/ubuntu/'
target_s3 = 's3a://mlops-bucket-parquet-20231030/'

### Files list

In [7]:
!hdfs dfs -ls

Found 41 items
drwxr-xr-x   - ubuntu hadoop          0 2023-11-06 20:20 .sparkStaging
-rw-r--r--   1 ubuntu hadoop 2807409271 2023-10-31 10:04 2019-08-22.txt
-rw-r--r--   1 ubuntu hadoop 2854479008 2023-10-31 10:05 2019-09-21.txt
-rw-r--r--   1 ubuntu hadoop 2895460543 2023-10-31 10:05 2019-10-21.txt
-rw-r--r--   1 ubuntu hadoop 2939120942 2023-10-31 10:04 2019-11-20.txt
-rw-r--r--   1 ubuntu hadoop 2995462277 2023-10-31 10:02 2019-12-20.txt
-rw-r--r--   1 ubuntu hadoop 2994906767 2023-10-31 10:09 2020-01-19.txt
-rw-r--r--   1 ubuntu hadoop 2995431240 2023-10-31 10:01 2020-02-18.txt
-rw-r--r--   1 ubuntu hadoop 2995176166 2023-10-31 10:02 2020-03-19.txt
-rw-r--r--   1 ubuntu hadoop 2996034632 2023-10-31 10:02 2020-04-18.txt
-rw-r--r--   1 ubuntu hadoop 2995666965 2023-10-31 10:06 2020-05-18.txt
-rw-r--r--   1 ubuntu hadoop 2994699401 2023-10-31 10:02 2020-06-17.txt
-rw-r--r--   1 ubuntu hadoop 2995810010 2023-10-31 10:02 2020-07-17.txt
-rw-r--r--   1 ubuntu hadoop 2995995

In [8]:
hdfsdir = './'

In [9]:
filelist = [ line.rsplit(None,1)[-1] for line in sh.hdfs('dfs','-ls',hdfsdir).split('\n') if len(line.rsplit(None,1))][1:]
filelist = [f for f in filelist if f.endswith('.txt')]

In [10]:
len(filelist)

40

## One File Pipeline

In [49]:
filename = filelist[0]
filename

'2019-08-22.txt'

In [50]:
df = spark.read.schema(schema)\
                .option("comment", "#")\
                .option("header", False)\
                .csv(f'{hdfs_host}{path}{filename}')

In [51]:
df

transaction_id,tx_datetime,customer_id,terminal_id,tx_amount,tx_time_seconds,tx_time_days,tx_fraud,tx_fraud_scenario
0,2019-08-22 06:51:03,0,711,70.91,24663,0,0,0
1,2019-08-22 05:10:37,0,0,90.55,18637,0,0,0
2,2019-08-22 19:05:33,0,753,35.38,68733,0,0,0
3,2019-08-22 07:21:33,0,0,80.41,26493,0,0,0
4,2019-08-22 09:06:17,1,981,102.83,32777,0,0,0
5,2019-08-22 18:41:25,3,205,34.2,67285,0,0,0
6,2019-08-22 03:12:21,3,0,47.2,11541,0,0,0
7,2019-08-22 22:36:40,6,809,139.39,81400,0,0,0
8,2019-08-22 17:23:29,7,184,87.24,62609,0,0,0
9,2019-08-22 21:09:37,8,931,61.7,76177,0,0,0


In [52]:
begin_size = df.count()
print('Начальных размер DF: ', begin_size)

46988418

In [53]:
# Количество пропущенных значений по столбцам
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns if c != 'tx_datetime']).show()

+--------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|transaction_id|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+--------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|             0|          0|          0|        0|              0|           0|       0|                0|
+--------------+-----------+-----------+---------+---------------+------------+--------+-----------------+



In [54]:
# удаление строк с пустыми записями
df = df.na.drop("any")

In [59]:
# Удаление дубликатов по id транзакции
df = df.dropDuplicates(['transaction_id'])

In [61]:
end_size = df.count()
print('Итоговый размер DF: ', end_size)

In [69]:
print('Дельта изменения количества записей: ', begin_size - end_size)

Дельта изменения количества записей:  281


In [72]:
df.write.parquet(f"{target_s3}{filename}.parquet", mode="overwrite")
# df.write.parquet(f'{hdfs_host}{path}data_parquet/2022-11-04.parquet', mode="overwrite")

## Process ALl files

In [11]:
import boto3

In [12]:
KEY_ID = 'YCAJEvTr4KCBuJY-36m1fcaqj'
SECRET = 'YCPenldqUFHEzPrRb5SunkP-YRo0QPfJakAzPyt4'

In [13]:
def check_file_exist(filename):
    '''
    Check for file existance.
    
    NOW IS NOT WORKING WELL
    '''
    
    session = boto3.session.Session(aws_access_key_id=KEY_ID, aws_secret_access_key=SECRET)
    s3 = session.client(
        service_name='s3',
        endpoint_url='https://storage.yandexcloud.net'
    )
    filelist_s3 = [key['Key'].split('.')[0] for key in s3.list_objects(Bucket='mlops-bucket-parquet-20231030')['Contents']]
    if filename in filelist_s3:
        return True
    else:
        return False

In [None]:
begin_size_all = 0
end_size_all = 0

for filename in filelist:
    print('Filename: ', filename)
    
    if check_file_exist(filename):
        print(filename, ' already exist')
        print()
        continue
        
    df = spark.read.schema(schema)\
                .option("comment", "#")\
                .option("header", False)\
                .csv(f'{hdfs_host}{path}{filename}')
    begin_size = df.count()
    print('Начальных размер DF: ', begin_size)
    df = df.na.drop("any")
    df = df.dropDuplicates(['transaction_id'])
    end_size = df.count()
    print('Итоговый размер DF: ', end_size)
    print('Дельта изменения количества записей: ', begin_size - end_size)
    
    df.write.parquet(f"{target_s3}{filename}.parquet", mode="overwrite")
    print('Файл сохранен.')
    begin_size_all += begin_size
    end_size_all += end_size
    print()
    
print('Finish!')
print('Всего очищенных записей: ', begin_size_all - end_size_all)

Filename:  2019-08-22.txt
Начальных размер DF:  46988418
Итоговый размер DF:  46988137
Дельта изменения количества записей:  281
Файл сохранен.

Filename:  2019-09-21.txt
Начальных размер DF:  46994586
Итоговый размер DF:  46993705
Дельта изменения количества записей:  881
Файл сохранен.

Filename:  2019-10-21.txt
Начальных размер DF:  46994432
Итоговый размер DF:  46993895
Дельта изменения количества записей:  537
Файл сохранен.

Filename:  2019-11-20.txt
Начальных размер DF:  46992239
Итоговый размер DF:  46990123
Дельта изменения количества записей:  2116
Файл сохранен.

Filename:  2019-12-20.txt
Начальных размер DF:  46994937
Итоговый размер DF:  46993787
Дельта изменения количества записей:  1150


In [25]:
# !hadoop fs -mkdir ./data_parquet
# !hadoop dfs -rm -r hdfs://rc1a-dataproc-m-r02wxfzuv2ttjl1t.mdb.yandexcloud.net/user/ubuntu/data_parquet

In [4]:
# in_bucket = ['2019-08-22.parquet',
#              '2019-09-21.parquet', 
#              '2019-10-21.parquet', 
#              '2019-11-20.parquet', 
#              '2019-12-20.parquet', 
#              '2020-01-19.parquet', 
#              '2020-02-18.parquet', 
#              '2020-03-19.parquet', 
#              '2020-04-18.parquet', 
#              '2020-05-18.parquet',
#              '2020-06-17.parquet',
#             '2020-07-17.parquet',
#             '2020-08-16.parquet',
#             '2020-09-15.parquet',
#             '2020-10-15.parquet',
#             '2020-11-14.parquet',
#             '2020-12-14.parquet', 
#             '2021-01-13.parquet', 
#             '2021-02-12.parquet', 
#             '2021-03-14.parquet', 
#             '2021-04-13.parquet', 
#             '2021-05-13.parquet', 
#             '2021-06-12.parquet', 
#             '2021-07-12.parquet', 
#             '2021-08-11.parquet',
#             '2021-09-10.parquet', ]

In [15]:
begin_size_all = 0
end_size_all = 0

for filename in filelist:
    print('Filename: ', filename)
    filename = filename.split('.')[0]
    
    if filename in [i.split('.')[0] for i in in_bucket]:
        print(filename, ' already exist')
        print()
        continue
        
    df = spark.read.schema(schema)\
                .option("comment", "#")\
                .option("header", False)\
                .csv(f'{hdfs_host}{path}{filename}.txt')
    begin_size = df.count()
    print('Начальных размер DF: ', begin_size)
    df = df.na.drop("any") # очистка пустых значений
    df = df.dropDuplicates(['transaction_id']) # очистка дублей
    end_size = df.count()
    print('Итоговый размер DF: ', end_size)
    print('Дельта изменения количества записей: ', begin_size - end_size)
    
    df.write.parquet(f"{target_s3}{filename}.parquet", mode="overwrite")
#     df.write.parquet(f'{hdfs_host}{path}data_parquet/{filename}.parquet', mode="overwrite")
    print('Файл сохранен.')
    begin_size_all += begin_size
    end_size_all += end_size
    print()
    
print('Finish!')
print('Всего очищенных записей: ', begin_size_all - end_size_all)

Filename:  2019-08-22.txt
2019-08-22  already exist

Filename:  2019-09-21.txt
2019-09-21  already exist

Filename:  2019-10-21.txt
2019-10-21  already exist

Filename:  2019-11-20.txt
2019-11-20  already exist

Filename:  2019-12-20.txt
2019-12-20  already exist

Filename:  2020-01-19.txt
2020-01-19  already exist

Filename:  2020-02-18.txt
2020-02-18  already exist

Filename:  2020-03-19.txt
2020-03-19  already exist

Filename:  2020-04-18.txt
2020-04-18  already exist

Filename:  2020-05-18.txt
2020-05-18  already exist

Filename:  2020-06-17.txt
2020-06-17  already exist

Filename:  2020-07-17.txt
2020-07-17  already exist

Filename:  2020-08-16.txt
2020-08-16  already exist

Filename:  2020-09-15.txt
2020-09-15  already exist

Filename:  2020-10-15.txt
2020-10-15  already exist

Filename:  2020-11-14.txt
2020-11-14  already exist

Filename:  2020-12-14.txt
2020-12-14  already exist

Filename:  2021-01-13.txt
2021-01-13  already exist

Filename:  2021-02-12.txt
2021-02-12  already 

In [2]:
delta = 281 + 881 + 537 + 2116 + 1150 + 148 + 596 + 3344 + 317 + 278 + 1726 + 827 + 93 + 3481 + 2590 + 2259 + 402 + 160 + 2296 + 91 + 1752 + 825 + 799 + 566 + 2205 + 1071 + 247 + 2149 + 96 + 1699 + 1532 + 616 + 751 + 520 + 3906 + 371 + 309 + 571 + 687 + 2398
delta

46643

In [3]:
print('Количество удаленных записей: ', delta)

Количество удаленных записей:  46643
