# Очистка датасета

## Imports & inits

In [1]:
import os

import findspark


findspark.init()
findspark.find()


from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql import functions as F

In [None]:
# os.environ['PYSPARK_DRIVER_PYTHON'] = "python"
# os.environ['PYSPARK_PYTHON'] = "../.venv/bin/python"
# os.environ['PYSPARK_PYTHON'] = "/home/ubuntu/otus-hw/.venv/bin/python"

In [3]:
conf = (
    SparkConf().setMaster("yarn").setAppName("data_cleaning")
        .set("spark.executor.memory", "8g")
        .set("spark.driver.memory", "4g")
        .set("spark.sql.execution.arrow.pyspark.enabled", "true")
        .set("spark.executor.instances", 4)
        .set("spark.executor.cores", 2)
        .set('spark.sql.repl.eagerEval.enabled', True) 
)


spark = SparkSession.builder.config(conf=conf).getOrCreate()

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


## Get data & preproc

### HDFS

В данном моменте файлы уже загружены в HDFS из бакета

In [80]:
hdfsdir = '/user/data'
target_s3 = 's3a://hw3-data-cleaning/'
RESULT_BUCKET = "hw3-data-cleaning"
INPUT_BUCKET = "otus-hw-bucket"

In [81]:
filelist = [ line.rsplit(None,1)[-1] for line in sh.hdfs('dfs','-ls',hdfsdir).split('\n') if len(line.rsplit(None,1))][1:]
filelist = [f for f in filelist if f.endswith('.txt')]

In [91]:
filelist[:5]

['/user/data/2019-08-22.txt',
 '/user/data/2019-09-21.txt',
 '/user/data/2019-10-21.txt',
 '/user/data/2019-11-20.txt',
 '/user/data/2019-12-20.txt']

In [92]:
from pyspark.sql.types import *
schema = StructType([StructField("transaction_id", LongType(), True),
                     StructField("tx_datetime", TimestampType(), True),
                     StructField("customer_id", LongType(), True),
                     StructField("terminal_id", LongType(), True),
                     StructField("tx_amount", DoubleType(), True),
                     StructField("tx_time_seconds", LongType(), True),
                     StructField("tx_time_days", LongType(), True),
                     StructField("tx_fraud", LongType(), True),
                     StructField("tx_fraud_scenario", LongType(), True)])

In [93]:
import boto3

In [96]:
filename = filelist[0]
filename_clean = filelist[0].split('/')[-1].split(".")[0]

In [102]:
print("file name in hdfs:", filename)

file name in hdfs: /user/data/2019-08-22.txt


In [103]:
print("file name in to write:", filename_clean)

file name in to write: 2019-08-22


In [113]:
df = spark.read.schema(schema)\
                .option("comment", "#")\
                .option("header", False)\
                .csv(filename)

In [74]:
df.show()

+--------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|transaction_id|        tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+--------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|             0|2019-08-22 06:51:03|          0|        711|    70.91|          24663|           0|       0|                0|
|             1|2019-08-22 05:10:37|          0|          0|    90.55|          18637|           0|       0|                0|
|             2|2019-08-22 19:05:33|          0|        753|    35.38|          68733|           0|       0|                0|
|             3|2019-08-22 07:21:33|          0|          0|    80.41|          26493|           0|       0|                0|
|             4|2019-08-22 09:06:17|          1|        981|   102.83|          32777|           0|       0|   

### Preprocess

In [114]:
print("Количество до препроцессинга:", df.count())



Количество до препроцессинга: 46988418


                                                                                

In [115]:
df = df.na.drop("any") # очистка пустых значений
df = df.dropDuplicates(['transaction_id']) # очистка дублей

In [116]:
print("Количество после препроцессинга:", df.count())



Количество после препроцессинга: 46988137


                                                                                

## Запись в S3

In [110]:
filelist_s3 = list(set([
        key['Key'].split('.')[0]
        for key in s3.list_objects(Bucket=bucket)['Contents']
    ]))

In [111]:
filelist_s3

['2019-09-21', '2019-08-22']

In [109]:
def check_file_exist(filename: str, bucket: str = RESULT_BUCKET) -> bool:
    session = boto3.session.Session(aws_access_key_id=KEY_ID, aws_secret_access_key=SECRET)
    s3 = session.client(
        service_name='s3',
        endpoint_url='https://storage.yandexcloud.net'
    )
    filelist_s3 = list(set([
        key['Key'].split('.')[0]
        for key in s3.list_objects(Bucket=bucket)['Contents']
    ]))
    if filename in filelist_s3:
        return True
    else:
        return False

In [112]:
check_file_exist(filename_clean)

True

In [52]:
df.repartition(1).write.parquet(f"{target_s3}{fname_no_symbols}.parquet", mode="overwrite")

                                                                                