In [1]:
!pip install findspark

Defaulting to user installation because normal site-packages is not writeable
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = (
    SparkSession
        .builder
        .appName("OTUS")
        .getOrCreate()
)

In [3]:
!hdfs dfs -mkdir -p /user/ubuntu/data

In [4]:
!hadoop distcp s3a://otus-mlops-source-data/2019-08-22.txt /user/ubuntu/data/2019-08-22.txt

2025-12-10 13:07:04,417 INFO tools.DistCp: Input Options: DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, ignoreFailures=false, overwrite=false, append=false, useDiff=false, useRdiff=false, fromSnapshot=null, toSnapshot=null, skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, mapBandwidth=0.0, copyStrategy='uniformsize', preserveStatus=[], atomicWorkPath=null, logPath=null, sourceFileListing=null, sourcePaths=[s3a://otus-mlops-source-data/2019-08-22.txt], targetPath=/user/ubuntu/data/2019-08-22.txt, filtersFile='null', blocksPerChunk=0, copyBufferSize=8192, verboseLog=false, directWrite=false}, sourcePaths=[s3a://otus-mlops-source-data/2019-08-22.txt], targetPathExists=false, preserveRawXattrsfalse
2025-12-10 13:07:04,526 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2025-12-10 13:07:04,669 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2025-12-10 13:07:04,669 INFO impl.MetricsSystemImp

In [5]:
!hdfs dfs -ls data

Found 1 items
-rw-r--r--   1 ubuntu hadoop 2807409271 2025-12-10 13:07 data/2019-08-22.txt


In [6]:
#bucket = "oa-otus-bucket-b1gfn58hulkq64deu28q"
#file_path = f"s3a://{bucket}/2019-08-22.txt"
file_path = f"data/2019-08-22.txt"

# –ß–∏—Ç–∞–µ–º —Ç–µ–∫—Å—Ç, –ø—Ä–æ–ø—É—Å–∫–∞–µ–º –ø–µ—Ä–≤—É—é —Å—Ç—Ä–æ–∫—É
lines = spark.sparkContext.textFile(file_path)
indexed = lines.zipWithIndex().filter(lambda x: x[1] > 0).map(lambda x: x[0])

# üî• –û–±–æ—Ä–∞—á–∏–≤–∞–µ–º –∫–∞–∂–¥—É—é —Å—Ç—Ä–æ–∫—É –≤ –∫–æ—Ä—Ç–µ–∂ (–æ–±—è–∑–∞—Ç–µ–ª—å–Ω–æ!)
rdd_of_tuples = indexed.map(lambda line: (line,))

df_clean = spark.createDataFrame(rdd_of_tuples, ["value"])

# –ü–∞—Ä—Å–∏–º
df = df_clean.select(split(col("value"), ",").alias("cols")).select(
    col("cols")[0].cast("long").alias("tranaction_id"),
    col("cols")[1].cast("string").alias("tx_datetime"),
    col("cols")[2].cast("long").alias("customer_id"),
    col("cols")[3].cast("long").alias("terminal_id"),
    col("cols")[4].cast("double").alias("tx_amount"),
    col("cols")[5].cast("long").alias("tx_time_seconds"),
    col("cols")[6].cast("long").alias("tx_time_days"),
    col("cols")[7].cast("int").alias("tx_fraud"),
    col("cols")[8].cast("int").alias("tx_fraud_scenario")
)

df.cache()
print(f"–ó–∞–≥—Ä—É–∂–µ–Ω–æ {df.count():,} —Å—Ç—Ä–æ–∫")

–ó–∞–≥—Ä—É–∂–µ–Ω–æ 46,988,418 —Å—Ç—Ä–æ–∫


In [7]:
print("üîç –ê–Ω–∞–ª–∏–∑ –∫–∞—á–µ—Å—Ç–≤–∞ –¥–∞–Ω–Ω—ã—Ö...\n")

# 1. –ü—Ä–æ–≤–µ—Ä–∫–∞ –Ω–∞ NULL (—Ä–µ–∑—É–ª—å—Ç–∞—Ç –ø—Ä–∏–≤–µ–¥–µ–Ω–∏—è —Ç–∏–ø–æ–≤)
null_counts = df.select([
    sum(col(c).isNull().cast("int")).alias(c) for c in df.columns
]).collect()[0]

print("‚ùó –ü—Ä–æ–ø—É—â–µ–Ω–Ω—ã–µ –∏–ª–∏ –Ω–µ–∫–æ—Ä—Ä–µ–∫—Ç–Ω—ã–µ –∑–Ω–∞—á–µ–Ω–∏—è (–ø–æ—Å–ª–µ cast):")
for col_name in df.columns:
    cnt = null_counts[col_name]
    if cnt > 0:
        print(f"  ‚Ä¢ {col_name}: {cnt:,}")

# 2. –õ–æ–≥–∏—á–µ—Å–∫–∏–µ –ø—Ä–æ–≤–µ—Ä–∫–∏
checks = {
    "tranaction_id < 0": df.filter(col("tranaction_id") < 0).count(),
    "tx_amount <= 0": df.filter(col("tx_amount") <= 0).count(),
    "tx_time_seconds < 0": df.filter(col("tx_time_seconds") < 0).count(),
    "tx_time_days < 0": df.filter(col("tx_time_days") < 0).count(),
    "tx_fraud not in (0,1)": df.filter(~col("tx_fraud").isin([0, 1])).count(),
    "tx_fraud_scenario < 0": df.filter(col("tx_fraud_scenario") < 0).count(),
}

print("\n‚ùó –õ–æ–≥–∏—á–µ—Å–∫–∏–µ –Ω–∞—Ä—É—à–µ–Ω–∏—è:")
for desc, cnt in checks.items():
    if cnt > 0:
        print(f"  ‚Ä¢ {desc}: {cnt:,}")

# 3. –ù–µ–∫–æ—Ä—Ä–µ–∫—Ç–Ω—ã–µ –¥–∞—Ç—ã
df_parsed = df.withColumn(
    "parsed_dt",
    to_timestamp(col("tx_datetime"), "yyyy-MM-dd HH:mm:ss")
)
invalid_dates = df_parsed.filter(col("parsed_dt").isNull()).count()
if invalid_dates > 0:
    print(f"  ‚Ä¢ –ù–µ–∫–æ—Ä—Ä–µ–∫—Ç–Ω—ã–µ –¥–∞—Ç—ã: {invalid_dates:,}")

# 4. –ü—Ä–æ—Ç–∏–≤–æ—Ä–µ—á–∏–µ: –º–æ—à–µ–Ω–Ω–∏—á–µ—Å—Ç–≤–æ –±–µ–∑ —Å—Ü–µ–Ω–∞—Ä–∏—è
bad_fraud = df.filter((col("tx_fraud") == 1) & (col("tx_fraud_scenario") == 0)).count()
if bad_fraud > 0:
    print(f"  ‚Ä¢ tx_fraud=1, –Ω–æ tx_fraud_scenario=0: {bad_fraud:,}")

# 5. –î—É–±–ª–∏–∫–∞—Ç—ã –ø–æ tranaction_id
duplicates = df.groupBy("tranaction_id").count().filter(col("count") > 1).count()
if duplicates > 0:
    print(f"  ‚Ä¢ –î—É–±–ª–∏—Ä—É—é—â–∏—Ö—Å—è ID —Ç—Ä–∞–Ω–∑–∞–∫—Ü–∏–π: {duplicates:,}")

# 6. –°—Ç–∞—Ç–∏—Å—Ç–∏–∫–∞
stats = df.agg(
    sum("tx_fraud").alias("fraud_count"),
    countDistinct("customer_id").alias("customers"),
    countDistinct("terminal_id").alias("terminals"),
    min("tx_amount").alias("min_amt"),
    max("tx_amount").alias("max_amt")
).collect()[0]

print("\nüìä –°—Ç–∞—Ç–∏—Å—Ç–∏–∫–∞:")
print(f"  –í—Å–µ–≥–æ —Ç—Ä–∞–Ω–∑–∞–∫—Ü–∏–π: {df.count():,}")
print(f"  –ú–æ—à–µ–Ω–Ω–∏—á–µ—Å–∫–∏—Ö: {int(stats['fraud_count']):,}")
print(f"  –£–Ω–∏–∫–∞–ª—å–Ω—ã—Ö –∫–ª–∏–µ–Ω—Ç–æ–≤: {stats['customers']:,}")
print(f"  –£–Ω–∏–∫–∞–ª—å–Ω—ã—Ö —Ç–µ—Ä–º–∏–Ω–∞–ª–æ–≤: {stats['terminals']:,}")
print(f"  –°—É–º–º–∞: –æ—Ç {stats['min_amt']:.2f} –¥–æ {stats['max_amt']:.2f}")

üîç –ê–Ω–∞–ª–∏–∑ –∫–∞—á–µ—Å—Ç–≤–∞ –¥–∞–Ω–Ω—ã—Ö...

‚ùó –ü—Ä–æ–ø—É—â–µ–Ω–Ω—ã–µ –∏–ª–∏ –Ω–µ–∫–æ—Ä—Ä–µ–∫—Ç–Ω—ã–µ –∑–Ω–∞—á–µ–Ω–∏—è (–ø–æ—Å–ª–µ cast):

‚ùó –õ–æ–≥–∏—á–µ—Å–∫–∏–µ –Ω–∞—Ä—É—à–µ–Ω–∏—è:
  ‚Ä¢ tx_amount <= 0: 884
  ‚Ä¢ –ù–µ–∫–æ—Ä—Ä–µ–∫—Ç–Ω—ã–µ –¥–∞—Ç—ã: 100
  ‚Ä¢ –î—É–±–ª–∏—Ä—É—é—â–∏—Ö—Å—è ID —Ç—Ä–∞–Ω–∑–∞–∫—Ü–∏–π: 181

üìä –°—Ç–∞—Ç–∏—Å—Ç–∏–∫–∞:
  –í—Å–µ–≥–æ —Ç—Ä–∞–Ω–∑–∞–∫—Ü–∏–π: 46,988,418
  –ú–æ—à–µ–Ω–Ω–∏—á–µ—Å–∫–∏—Ö: 2,527,005
  –£–Ω–∏–∫–∞–ª—å–Ω—ã—Ö –∫–ª–∏–µ–Ω—Ç–æ–≤: 988,545
  –£–Ω–∏–∫–∞–ª—å–Ω—ã—Ö —Ç–µ—Ä–º–∏–Ω–∞–ª–æ–≤: 1,006
  –°—É–º–º–∞: –æ—Ç 0.00 –¥–æ 3773.34


In [8]:
bad_rows = df.filter(
    col("tranaction_id").isNull() |
    (col("tx_amount") <= 0) |
    (~col("tx_fraud").isin([0, 1]))
)

# –°–æ—Ö—Ä–∞–Ω–∏ –≤ Object Storage (–∑–∞–º–µ–Ω–∏—Ç–µ bucket)
bad_rows.write.mode("overwrite").option("header", "true").csv("s3a://oa-otus-bucket-b1gfn58hulkq64deu28q/bad_transactions")