Merchants CSV - Exploratory Data Analysis

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as F

spark = SparkSession.builder \
    .appName("MyApp") \
    .master("local[*]") \
    .config("spark.sql.warehouse.dir", "/tmp") \
    .config("spark.hadoop.security.authentication", "simple") \
    .config("spark.ui.showConsoleProgress", "false") \
    .getOrCreate()

csv_file_path = "merchants-subset.csv"

df = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(csv_file_path)


df_unique_merchants = df.dropDuplicates(["merchant_id"])
total_merchants = df_unique_merchants.count()

total_distinct_merchants = df_unique_merchants.select("merchant_id").distinct().count()
print(f"Total number of merchants: {total_merchants}")
print(f"Number of distinct merchants: {total_distinct_merchants}")
df.printSchema()


print("Duplicate merchants analysis: ")
df_unique_merchants.groupBy("merchant_id").count().filter("count > 1").show()
print("Inspect four same merchant ids: ")
df_unique_merchants.filter(df_unique_merchants["merchant_id"] == "M_ID_ef233cff26").show()
df_unique_merchants.filter(df_unique_merchants["merchant_id"] == "M_ID_992a180b15").show()
df_unique_merchants.filter(df_unique_merchants["merchant_id"] == "M_ID_a8767b29ef").show() 




Total number of merchants: 334633
Number of distinct merchants: 334633
root
 |-- merchant_name: string (nullable = true)
 |-- merchant_id: string (nullable = true)
 |-- merchant_group_id: integer (nullable = true)
 |-- merchant_category_id: integer (nullable = true)
 |-- subsector_id: integer (nullable = true)
 |-- numerical_1: double (nullable = true)
 |-- numerical_2: double (nullable = true)
 |-- most_recent_sales_range: string (nullable = true)
 |-- most_recent_purchases_range: string (nullable = true)
 |-- avg_sales_lag3: double (nullable = true)
 |-- avg_purchases_lag3: string (nullable = true)
 |-- active_months_lag3: integer (nullable = true)
 |-- avg_sales_lag6: double (nullable = true)
 |-- avg_purchases_lag6: string (nullable = true)
 |-- active_months_lag6: integer (nullable = true)
 |-- avg_sales_lag12: double (nullable = true)
 |-- avg_purchases_lag12: string (nullable = true)
 |-- active_months_lag12: integer (nullable = true)
 |-- city_id: integer (nullable = true)
 |--

Historical trancastions for merchants PARQUET - Exploratory Data Analysis

In [6]:
from pyspark.sql.functions import col, sum
from pyspark.sql import functions as F

def generate_id_for_null_values(df):
    return df.withColumn(
            "merchant_id",
            F.when(F.col("merchant_id").isNull(), F.concat(F.lit("G_ID_"), F.expr("substring(cast(monotonically_increasing_id() as string), -10, 10)")))
            .otherwise(F.col("merchant_id"))
        )

parquet_file_path = "historical-transactions.snappy.parquet"

df = spark.read.parquet(parquet_file_path)
df.show(10)
print(df.count())
df.printSchema()
print("Null values per column:")
null_counts = df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns])

null_counts.show()

df = generate_id_for_null_values(df)

print("Merchant transcations with merchant_id null: ")
df.filter(F.col("installments") == 9).show()

+---------------+---------------+-------+------------+--------+--------------------+---------------+---------+-------------------+--------+------------+---------------+
|authorized_flag|    customer_id|city_id|installments|category|merchant_category_id|    merchant_id|month_lag|      purchase_date|state_id|subsector_id|purchase_amount|
+---------------+---------------+-------+------------+--------+--------------------+---------------+---------+-------------------+--------+------------+---------------+
|              N|C_ID_2730b57487|     63|           3|       C|                 456|M_ID_fe83cec110|       -9|2017-05-19 16:01:59|       9|          21|       18970.06|
|              Y|C_ID_95429da153|     38|           3|       C|                 884|M_ID_9f11c207c1|       -2|2017-12-07 22:33:34|       7|          27|       14904.52|
|              Y|C_ID_cdc7680c7f|    160|           1|       B|                 278|M_ID_ac78c7d82a|       -3|2017-11-17 00:00:00|      21|          37|   

Joined Merchant CSV and Historical Transactions - EDA

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql import functions as F

def merge_columns(col1, col2):
    return F.when(F.col(col1).isNotNull() & (F.col(col1) != -1), F.col(col1)) \
            .otherwise(F.when(F.col(col2).isNotNull() & (F.col(col2) != -1), F.col(col2))
            .otherwise(F.lit(-1)))


parquet_file_path = "historical-transactions.snappy.parquet"
csv_file_path = "merchants-subset.csv"


df_historical_merchants_transactions = spark.read.parquet(parquet_file_path).withColumnRenamed("merchant_id", "ht_merchant_id") \
                                                                            .withColumnRenamed("city_id", "ht_city_id") \
                                                                            .withColumnRenamed("state_id", "ht_state_id")
df_merchants = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(csv_file_path)


joined_df = df_historical_merchants_transactions.join(df_merchants, df_historical_merchants_transactions["ht_merchant_id"] == df_merchants["merchant_id"], "left")
#joined_df = joined_df.filter( (F.col("merchant_id") == 'M_ID_15a61b0913') & (F.col("purchase_date").like("2018-01%")))

df_merged = joined_df.withColumn("final_city_id", merge_columns("ht_city_id", "city_id")) \
                     .withColumn("final_state_id", merge_columns("ht_state_id", "state_id"))

# Show the result

df_merged = df_merged.filter(F.col("final_state_id") == -1)
df_merged.show()
df_merged.count()


distinct_state_count = df.select("final_city_id").distinct().count()

# Print the result
print(f"Number of distinct states: {distinct_state_count}")





+---------------+---------------+----------+------------+--------+--------------------+---------------+---------+-------------------+-----------+------------+---------------+-------------------+---------------+-----------------+--------------------+------------+-----------+-----------+-----------------------+---------------------------+--------------+------------------+------------------+--------------+------------------+------------------+---------------+-------------------+-------------------+-------+--------+-------------+--------------+
|authorized_flag|    customer_id|ht_city_id|installments|category|merchant_category_id| ht_merchant_id|month_lag|      purchase_date|ht_state_id|subsector_id|purchase_amount|      merchant_name|    merchant_id|merchant_group_id|merchant_category_id|subsector_id|numerical_1|numerical_2|most_recent_sales_range|most_recent_purchases_range|avg_sales_lag3|avg_purchases_lag3|active_months_lag3|avg_sales_lag6|avg_purchases_lag6|active_months_lag6|avg_sales

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `final_city_id` cannot be resolved. Did you mean one of the following? [`city_id`, `state_id`, `merchant_id`, `subsector_id`, `category`].;
'Project ['final_city_id]
+- Relation [authorized_flag#1115,customer_id#1116,city_id#1117L,installments#1118L,category#1119,merchant_category_id#1120L,merchant_id#1121,month_lag#1122L,purchase_date#1123,state_id#1124L,subsector_id#1125L,purchase_amount#1126] parquet
