In [1]:
!pip install findspark
!pip install pyspark


In [2]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [None]:
spark = SparkSession.builder.appName("TransactionAnalytics").getOrCreate()

In [45]:
# Define correct column names
columns = [
    "transaction_id", "tx_datetime", "customer_id", "terminal_id",
    "tx_amount", "tx_time_seconds", "tx_time_days", "tx_fraud", "tx_fraud_scenario"
]

# Define correct column names
numeric_columns = [
    "transaction_id", "customer_id", "terminal_id",
    "tx_amount", "tx_time_seconds", "tx_time_days", "tx_fraud", "tx_fraud_scenario"
]

In [55]:
df = spark.read.option("comment", "#").option("sep", ",").csv("/user/ubuntu/data/").toDF(*columns)

                                                                                

In [60]:
data_row_count = df.count()
data_row_count

                                                                                

1879794138

### Схема данных

In [15]:
df.printSchema()

root
 |-- transaction_id: string (nullable = true)
 |-- tx_datetime: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- terminal_id: string (nullable = true)
 |-- tx_amount: string (nullable = true)
 |-- tx_time_seconds: string (nullable = true)
 |-- tx_time_days: string (nullable = true)
 |-- tx_fraud: string (nullable = true)
 |-- tx_fraud_scenario: string (nullable = true)



### Проверка на тип данных

In [40]:
df.groupBy(F.col("tx_fraud")).count().show()

[Stage 86:>                                                         (0 + 1) / 1]

+--------+----------+
|tx_fraud|     count|
+--------+----------+
|       0|1768455409|
|       1| 111338729|
+--------+----------+



                                                                                

In [41]:
df.groupBy(F.col("tx_fraud_scenario")).count().show()

[Stage 89:>                                                         (0 + 1) / 1]

+-----------------+----------+
|tx_fraud_scenario|     count|
+-----------------+----------+
|                3|   2608461|
|                0|1768455409|
|                1|   1018093|
|                2| 107712175|
+-----------------+----------+



                                                                                

In [57]:
%%time
wrong_data_type = dict()
for col in numeric_columns:
    wrong_data_type[col] = df.filter(F.col(col).cast("int").isNull()).count()



CPU times: user 3.3 s, sys: 690 ms, total: 3.99 s
Wall time: 41min 1s


                                                                                

In [62]:
for col, count in wrong_data_type.items():
    print(col + ' has ' + str(count) + ' wrong data type rows')

transaction_id has 0 wrong data type rows
customer_id has 0 wrong data type rows
terminal_id has 40312 wrong data type rows
tx_amount has 0 wrong data type rows
tx_time_seconds has 0 wrong data type rows
tx_time_days has 0 wrong data type rows
tx_fraud has 0 wrong data type rows
tx_fraud_scenario has 0 wrong data type rows


In [58]:
dt_count = df.filter(F.when(df.tx_datetime.rlike('\\d{4}-(0[1-9]|1[012])-(0[1-9]|[12][0-9]|3[01])'), False).otherwise(True)).count()

                                                                                

In [63]:
print('Column tx_datetime has ' + str(dt_count) + ' wrong type rows')

Column tx_datetime has 0 wrong type rows


### Проверка на границы значений

In [65]:
%%time
wrong_data_range = dict()
for col in numeric_columns:
    wrong_data_range[col] = df.filter(F.col(col) < 0).count()



CPU times: user 3.49 s, sys: 686 ms, total: 4.18 s
Wall time: 41min 47s


                                                                                

In [66]:
for col, count in wrong_data_range.items():
    print(col + ' has ' + str(count) + ' wrong data range rows')

transaction_id has 0 wrong data range rows
customer_id has 14623 wrong data range rows
terminal_id has 0 wrong data range rows
tx_amount has 0 wrong data range rows
tx_time_seconds has 0 wrong data range rows
tx_time_days has 0 wrong data range rows
tx_fraud has 0 wrong data range rows
tx_fraud_scenario has 0 wrong data range rows


### Проверка ID транзакций на уникальность

In [61]:
# min_id = df.select("transaction_id").min()
# max_id = df.select("transaction_id").max()
transaction_id_count = df.select("transaction_id").distinct().count()
print(transaction_id_count)
print('There are ' + str(data_row_count - transaction_id_count) + ' not unique transaction id rows' )



1879791585
There are 2553 not unique transaction id rows


                                                                                

### Проверка на nan

In [11]:
from pyspark.sql.functions import isnan, when, count, col

In [14]:
%%time
df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).show()



+--------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|transaction_id|tx_datetime|customer_id|terminal_id|tx_amount|tx_time_seconds|tx_time_days|tx_fraud|tx_fraud_scenario|
+--------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+
|             0|          0|          0|          0|        0|              0|           0|       0|                0|
+--------------+-----------+-----------+-----------+---------+---------------+------------+--------+-----------------+

CPU times: user 1.01 s, sys: 246 ms, total: 1.26 s
Wall time: 27min 32s


                                                                                