# Подготовка данных

## Считаем данные в исходном txt формате как csv

In [1]:
!pip install findspark

Defaulting to user installation because normal site-packages is not writeable
Defaulting to user installation because normal site-packages is not writeable


Перезапустить kernel после установки если пакеты поставились заново!

In [2]:
import findspark
findspark.init()

### Список файлов для обработки

In [3]:
!hdfs dfs -ls

Found 1 items
drwxr-xr-x   - ubuntu hadoop          0 2025-02-15 12:30 data


In [4]:
!hdfs dfs -ls data

Found 40 items
-rw-r--r--   1 ubuntu hadoop 2807409271 2025-02-15 12:25 data/2019-08-22.txt
-rw-r--r--   1 ubuntu hadoop 2854479008 2025-02-15 12:06 data/2019-09-21.txt
-rw-r--r--   1 ubuntu hadoop 2895460543 2025-02-15 12:11 data/2019-10-21.txt
-rw-r--r--   1 ubuntu hadoop 2939120942 2025-02-15 12:09 data/2019-11-20.txt
-rw-r--r--   1 ubuntu hadoop 2995462277 2025-02-15 12:14 data/2019-12-20.txt
-rw-r--r--   1 ubuntu hadoop 2994906767 2025-02-15 12:26 data/2020-01-19.txt
-rw-r--r--   1 ubuntu hadoop 2995431240 2025-02-15 12:18 data/2020-02-18.txt
-rw-r--r--   1 ubuntu hadoop 2995176166 2025-02-15 12:19 data/2020-03-19.txt
-rw-r--r--   1 ubuntu hadoop 2996034632 2025-02-15 12:28 data/2020-04-18.txt
-rw-r--r--   1 ubuntu hadoop 2995666965 2025-02-15 12:23 data/2020-05-18.txt
-rw-r--r--   1 ubuntu hadoop 2994699401 2025-02-15 12:18 data/2020-06-17.txt
-rw-r--r--   1 ubuntu hadoop 2995810010 2025-02-15 12:08 data/2020-07-17.txt
-rw-r--r--   1 ubuntu hadoop 2995995152 2025-02-

Создадим SparkSession

In [5]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
        .builder
        .appName("Convert from txt to parquet")
        .getOrCreate()
)

In [6]:
from pyspark.sql.types import StructType, StructField, LongType, TimestampType, IntegerType, DoubleType, BooleanType

schema = StructType([
    StructField("transaction_id", LongType(), True),
    StructField("tx_datetime", TimestampType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("terminal_id", IntegerType(), True),
    StructField("tx_amount", DoubleType(), True),
    StructField("tx_time_seconds", LongType(), True),
    StructField("tx_time_days", IntegerType(), True),
    StructField("tx_fraud", IntegerType(), True),
    StructField("tx_fraud_scenario", IntegerType(), True)
])

# Просмотр данных в файле
Загрузим данные из одного файла.

In [8]:
file_path = "data/2022-09-05.txt"
whole_data = "data/"

df_single_file = spark.read.csv(
    file_path,
    header=False,  
    comment="#",  # comment character
    schema=schema, 
    sep=",",       # separator (comma in this case)
    mode="PERMISSIVE" # Handles lines with more or fewer columns.
)

In [9]:
df_single_file.show(5)     # Show the data

+--------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|transaction_id|        tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+--------------+-------------------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|    1738801610|2022-09-05 14:50:42|          0|        934|    27.64|       95957442|        1110|       0|                0|
|    1738801611|2022-09-05 12:36:51|          0|        612|    23.99|       95949411|        1110|       0|                0|
|    1738801612|2022-09-05 03:59:07|          0|        753|    62.75|       95918347|        1110|       1|                2|
|    1738801613|2022-09-05 15:56:48|          1|        981|    74.39|       95961408|        1110|       1|                2|
|    1738801614|2022-09-05 21:16:44|          2|        245|    50.04|       95980604|        1110|       0|   

## Конвертация в формат parquet
Читаем весь датасет

In [10]:
df_all_txt = spark.read.csv(
    file_path,
    header=False,
    comment="#",  # comment character
    schema=schema,
    sep=",",       # separator (comma in this case)
    mode="PERMISSIVE" # Handles lines with more or fewer columns.
)

Конвертируем в формат parquet и сохраняем на диск

In [12]:
(
    df_all_txt
        .repartition(10)
        .write
        .mode("overwrite")
        .parquet("data_convert/data_raw.parquet")
)