In [None]:
import os
import socket
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# S3
aws_access_key = "aws_access_key"
aws_secret_key = "aws_secret_key"
s3_bucket = "datasets"
s3_endpoint_url = "private"
 
APACHE_MASTER_IP = socket.gethostbyname("apache-spark-master-0.apache-spark-headless.apache-spark.svc.cluster.local")
APACHE_MASTER_URL = f"spark://{APACHE_MASTER_IP}:7077"
POD_IP = os.environ["MY_POD_IP"]

SPARK_APP_NAME = f"spark-{os.environ['HOSTNAME']}"  
JARS = """/nfs/env/lib/python3.8/site-packages/pyspark/jars/clickhouse-native-jdbc-shaded-2.6.5.jar, 
/nfs/env/lib/python3.8/site-packages/pyspark/jars/hadoop-aws-3.3.4.jar,
/nfs/env/lib/python3.8/site-packages/pyspark/jars/aws-java-sdk-bundle-1.12.433.jar,
/nfs/env/lib/python3.8/site-packages/pyspark/jars/postgresql-42.7.4.jar
"""

MEM = "512m"  
CORES = 1  

spark = SparkSession. \
    builder. \
    appName(SPARK_APP_NAME). \
    master("local"). \
    config("spark.executor.memory", MEM). \
    config("spark.jars", JARS). \
    config("spark.executor.cores", CORES). \
    config("spark.hadoop.fs.s3a.endpoint", s3_endpoint_url). \
    config("spark.hadoop.fs.s3a.access.key", aws_access_key). \
    config("spark.hadoop.fs.s3a.secret.key", aws_secret_key). \
    config("fs.s3a.endpoint", s3_endpoint_url). \
    config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem"). \
    config("spark.hadoop.fs.s3a.path.style.access", True). \
    config("spark.hadoop.fs.s3a.committer.name", "directory"). \
    config("spark.hadoop.fs.s3a.aws.credentials.provider",
           "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"). \
    config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false"). \
    getOrCreate()



25/08/06 14:31:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
#Проверка качестава данных
def check(df, show_examples=True, example_limit=5):
    total_rows = df.count()
    print(f"\n Общее количество строк в DataFrame: {total_rows}\n")

    for column in df.columns:
        unique_count = df.select(countDistinct(col(column))).collect()[0][0]
        null_count = df.filter(col(column).isNull()).count()
        empty_count = df.filter(
            (col(column).isNotNull()) & (col(column).cast(StringType()) == "")
        ).count()

        null_pct = (null_count / total_rows * 100) if total_rows else 0
        empty_pct = (empty_count / total_rows * 100) if total_rows else 0

        # Основная строка статистики
        print(f"{column:<20} | уник: {unique_count:<5} | nulls: {null_count:<4} ({null_pct:>4.1f}%) | пустых: {empty_count:<4} ({empty_pct:>4.1f}%)", end="")

        # Примеры значений в одной строке
        if show_examples:
            examples = df.select(column).distinct().limit(example_limit).collect()
            values = [str(row[0]) for row in examples]
            joined = ', '.join(values)
            print(f" | Примеры: {joined}")
        else:
            print()

    print("\n Анализ завершён.\n")

In [None]:
# Читаем данные
users_df = spark.read.csv(
    "s3a://datasets_2/users.csv",
    header=True,
    inferSchema=True
)


transactions_df = spark.read.csv(
    "s3a://datasets_2/transactions.csv",
    header=True,
    inferSchema=True
)

25/08/06 14:31:47 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


                                                                                

In [4]:
# выбираем нужные столбцы
users_clean = users_df.select(
    "user_id",
    "email",
    "registration_date"
)

# выбираем нужные столбцы
transactions_clean = transactions_df.select(
    "transaction_id",
    "user_id",
    "transaction_date",
    "amount",
    "status",
    "payment_method",
    "currency"
)

In [5]:
check(users_df,show_examples=False)
check(transactions_df,show_examples=False)


 Общее количество строк в DataFrame: 1000

user_id              | уник: 1000  | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%)
email                | уник: 1000  | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%)
registration_date    | уник: 1000  | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%)
phone_number         | уник: 949   | nulls: 51   ( 5.1%) | пустых: 0    ( 0.0%)
user_agent           | уник: 958   | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%)
ip_address           | уник: 1000  | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%)
last_login_date      | уник: 902   | nulls: 98   ( 9.8%) | пустых: 0    ( 0.0%)

 Анализ завершён.


 Общее количество строк в DataFrame: 5000

transaction_id       | уник: 5000  | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%)
user_id              | уник: 989   | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%)
transaction_date     | уник: 5000  | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%)
amount               | уник: 5000  | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%)
status      

In [6]:
transactions_clean = transactions_clean.filter(col('status') == 'completed')
transactions_clean = transactions_clean.withColumn("currency",lower(col("currency")))\
                                     .withColumn("payment_method",lower(col("payment_method")))\
                                     .withColumn("payment_method",regexp_replace(col("payment_method"), "[ _]", ""))\
                                     .withColumn('amount', when((col('amount').isNull()), 0).otherwise(col('amount')))\
                                     .withColumn('status', when((col('status').isNull()), 'failed').otherwise(col('status')))  
check(transactions_clean)


 Общее количество строк в DataFrame: 4225

transaction_id       | уник: 4225  | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%) | Примеры: 148, 463, 833, 1238, 1342
user_id              | уник: 975   | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%) | Примеры: 148, 833, 471, 496, 463
transaction_date     | уник: 4225  | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%) | Примеры: 2024-01-24 14:22:46.699853, 2024-10-26 22:58:03.130009, 2024-05-05 07:27:29.844058, 2024-01-15 00:54:22.913274, 2024-09-01 07:39:00.271930
amount               | уник: 4225  | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%) | Примеры: 114.43160754491899, 80.9796371294577, 100.23005435485695, 184.21742580624905, 127.9313266377257
status               | уник: 1     | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%) | Примеры: completed
payment_method       | уник: 3     | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%) | Примеры: paypal, banktransfer, card
currency             | уник: 3     | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%) | Пример

In [7]:
# Объединяем таблицы и считаем метрики
user_transactions = users_clean.join(
    transactions_clean,
    "user_id",
    "left"
)

In [8]:
user_metrics = user_transactions.groupBy("user_id").agg(
    count("transaction_id").alias("transaction_count"),
    sum("amount").alias("total_amount"),
    count(when(col("payment_method").isin(["card"]), 1)).alias("card_payments_count"),
    count(when(col("payment_method").isin(["paypal"]), 1)).alias("paypal_payments_count"),
    count(when(col("currency").isin(["usd"]), 1)).alias("usd_payments_count"),
    count(when(col("currency").isin(["eur"]), 1)).alias("eur_payments_count")
)

In [9]:
# Фильтр только пользователей с транзакциями
user_metrics = user_metrics.filter(col('transaction_count') > 1)

In [10]:
# Добавляем дополнительную информацию о пользователях
final_df = user_metrics.join(
    users_clean,
    "user_id"
).select(
    "user_id",
    "email",
    "registration_date",
    "transaction_count",
    "total_amount",
    "card_payments_count",
    "paypal_payments_count",
    "usd_payments_count",
    "eur_payments_count"
)

In [13]:
# Проверки качества данных
duplicates = final_df.groupBy("user_id").count().filter(col("count") > 1).count()
null_amounts = final_df.filter(col("total_amount").isNull()).count()

print(f"Found {duplicates} duplicate user_ids")
print(f"Found {null_amounts} users with null total_amount")

check(final_df)

Found 0 duplicate user_ids
Found 0 users with null total_amount

 Общее количество строк в DataFrame: 913

user_id              | уник: 913   | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%) | Примеры: 148, 463, 471, 496, 833
email                | уник: 913   | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%) | Примеры: mitchellpeters@example.com, williamsmatthew@example.org, joshua35@example.org, LONGANDREA@hernandez-ward.biz, stricklandfrank@example.com
registration_date    | уник: 913   | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%) | Примеры: 2024-01-21 12:41:54.468182, 2023-07-14 20:08:01.656777, 2024-10-27 16:54:59.562310, 2024-04-05 15:53:40.828740, 2023-12-13 23:54:24.752388
transaction_count    | уник: 11    | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%) | Примеры: 7, 6, 9, 5, 10
total_amount         | уник: 913   | nulls: 0    ( 0.0%) | пустых: 0    ( 0.0%) | Примеры: 531.738579897763, 421.64959818552154, 391.28061268096957, 850.3654022149947, 437.5471795783169
card_payments_count  | уник

In [14]:
spark.stop()