In [10]:
import findspark
findspark.init()
findspark.find()

import datetime
import logging
import subprocess

from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


logging.basicConfig(level=logging.INFO)

app_name = "convert-csv-to-parquet-hdfs"
hdfs_host = "rc1a-dataproc-m-0phjwjdfohabk5n0.mdb.yandexcloud.net"
src_dir = "/fraud-data-src"
dest_dir = "/fraud-data-parquet"

spark = (
    SparkSession.builder
    .appName(app_name)
    .getOrCreate()
)
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)  # to pretty print pyspark.DataFrame in jupyter

schema = StructType([StructField("transaction_id", LongType(), True),
                     StructField("tx_datetime", TimestampType(), True),
                     StructField("customer_id", LongType(), True),
                     StructField("terminal_id", LongType(), True),
                     StructField("tx_amount", DoubleType(), True),
                     StructField("tx_time_seconds", LongType(), True),
                     StructField("tx_time_days", LongType(), True),
                     StructField("tx_fraud", LongType(), True),
                     StructField("tx_fraud_scenario", LongType(), True)])


def convert_csv_to_parquet(spark_session, src_filepath, dest_filepath):
    logging.info(f"Converting {src_filepath} to {src_filepath}")
    src_filepath_hdfs = f"hdfs://{hdfs_host}{src_filepath}"
    logging.info(f"{src_filepath_hdfs=}")
    dest_filepath_hdfs = f"hdfs://{hdfs_host}{dest_filepath}"
    logging.info(f"{dest_filepath_hdfs=}")
    logging.info(f"Loading dataframe from {src_filepath_hdfs=}")
    df = spark_session.read.schema(schema).option("comment", "#").option("header", False).csv(src_filepath_hdfs)
    logging.info(f"Writing dataframe to {dest_filepath_hdfs=}")
    df.coalesce(1).write.mode('overwrite').parquet(dest_filepath_hdfs)
    logging.info(f"Success. Converted {src_filepath} to {src_filepath}")


def run_convert_csv_to_parquet():
    cmd = f"hadoop fs -ls {src_dir} | sed '1d;s/  */ /g' | cut -d\  -f8"
    files = subprocess.check_output(cmd, shell=True).strip().decode("utf-8").split('\n')

    for src_filepath in files:
        dest_filepath = f"{dest_dir}/{Path(src_filepath).stem}.parquet"
        convert_csv_to_parquet(spark, src_filepath, dest_filepath)

In [11]:
filepath = f"{dest_dir}/2020-03-19.parquet"
df = spark.read.schema(schema).parquet(filepath)

In [12]:
dt_from = datetime.datetime(2020, 3, 20, 0, 0, 0)
dt_to = datetime.datetime(2020, 3, 20, 23, 59, 59)
day_df = df.filter((df.tx_datetime >= dt_from) & (df.tx_datetime <= dt_to))
count_day_df = day_df.count()
count_day_df

1565254

In [13]:
count_customer_id = day_df.select(countDistinct("customer_id"))
count_customer_id

count(DISTINCT customer_id)
751854


In [14]:
count_terminal_id = day_df.select(countDistinct("terminal_id"))
count_terminal_id

count(DISTINCT terminal_id)
1003


In [15]:
day_df.count() / count_customer_id.collect()[0][0]

2.081858977939866

In [14]:
# dest_filepath = f"{dest_dir}/2020-03-19.parquet"
dest_filepath = f"{dest_dir}/*.parquet"
df = spark.read.schema(schema).parquet(dest_filepath)

In [15]:
max_tx_datetime = df.select(max(df.tx_datetime)).collect()[0][0]
max_tx_datetime

datetime.datetime(2022, 12, 3, 23, 59, 59)

In [16]:
days = 90
start_date = max_tx_datetime - datetime.timedelta(days=days)
start_date

datetime.datetime(2022, 9, 4, 23, 59, 59)

In [17]:
last_days_df = df.filter(df.tx_datetime >= start_date)

In [18]:
last_days_min_tx_datetime = last_days_df.select(min(last_days_df.tx_datetime)).collect()[0][0]
last_days_min_tx_datetime

datetime.datetime(2022, 9, 4, 23, 59, 59)

In [12]:
count_customer_id = df.select(countDistinct("customer_id"))
count_customer_id

count(DISTINCT customer_id)
996611


In [13]:
count_terminal_id = df.select(countDistinct("terminal_id"))
count_terminal_id

count(DISTINCT terminal_id)
1007


In [19]:
spark.stop()