### step 1. configuration and lib imports

In [0]:
dbutils.widgets.text("csv_url", "url")
dbutils.widgets.text("par_url", "url")

In [0]:
csv_file = dbutils.widgets.get("csv_url")
parquet_file = dbutils.widgets.get("par_url")

In [0]:
import pandas
import requests
from io import StringIO, BytesIO

from pyspark.sql.functions import lit, col, when, to_timestamp, monotonically_increasing_id, countDistinct, first, month, hour


### step 2. fetching files from online storage 
_(can't import files from free tier databricks, so this is alternative)_

In [0]:
csv_response = requests.get(csv_file)
par_response = requests.get(parquet_file)

In [0]:
pandas_csv = pandas.read_csv(StringIO(csv_response.text), sep=',', header=0)
pandas_parquet = pandas.read_parquet(BytesIO(par_response.content))

### step 3. Creating databricks dataframe

In [0]:
df_csv = spark.createDataFrame(pandas_csv)

In [0]:
# anonimizing merchant names (requested in Data Dictionary)
df_csv = df_csv.withColumn('merchant_name', lit('Merchant'))

In [0]:
df_par = spark.createDataFrame(pandas_parquet)

In [0]:
# display(df_csv.limit(10))

In [0]:
# display(df_par.limit(10))

### step 4. CSV data quality checks and data cleaning

In [0]:
# checking for duplicates
duplicate_rows_csv = df_csv.groupBy(df_csv.merchant_id).count().filter(col("count") > 1).orderBy(col("count").desc())
# display(duplicate_rows_csv.limit(10))

In [0]:
cleaned_df_csv = df_csv.join(duplicate_rows_csv, df_csv.merchant_id == duplicate_rows_csv.merchant_id, 'left')\
    .withColumn("quailty_status", when(duplicate_rows_csv.merchant_id.isNull(), lit("clean")).otherwise(lit("duplicate")))\
    .drop(duplicate_rows_csv.merchant_id).drop("count")
# display(cleaned_df_csv.limit(10))

#### step 4.1. unifying duplicated records

In [0]:
dupl_df = cleaned_df_csv.where("quailty_status = 'duplicate'")

In [0]:
compressed_dupl_df = dupl_df.groupBy("merchant_id").agg(
    when(countDistinct("merchant_name") == 1, first("merchant_name")).otherwise(lit("Unknown")).alias("merchant_name"),
    when(countDistinct("merchant_group_id") == 1, first("merchant_group_id")).otherwise(lit(-1)).alias("merchant_group_id"),
    when(countDistinct("merchant_category_id") == 1, first("merchant_category_id")).otherwise(lit(-1)).alias("merchant_category_id"),
    when(countDistinct("subsector_id") == 1, first("subsector_id")).otherwise(lit(-1)).alias("subsector_id"),
    when(countDistinct("numerical_1") == 1, first("numerical_1")).otherwise(lit(-1)).alias("numerical_1"),
    when(countDistinct("numerical_2") == 1, first("numerical_2")).otherwise(lit(-1)).alias("numerical_2"),
    when(countDistinct("most_recent_sales_range") == 1, first("most_recent_sales_range")).otherwise(lit("Unknown")).alias("most_recent_sales_range"),
    when(countDistinct("most_recent_purchases_range") == 1, first("most_recent_purchases_range")).otherwise(lit("Unknown")).alias("most_recent_purchases_range" ),
    when(countDistinct("avg_sales_lag3") == 1, first("avg_sales_lag3")).otherwise(lit(-1)).alias("avg_sales_lag3"),
    when(countDistinct("avg_purchases_lag3") == 1, first("avg_purchases_lag3")).otherwise(lit(-1)).alias("avg_purchases_lag3"),
    when(countDistinct("active_months_lag3") == 1, first("active_months_lag3")).otherwise(lit(-1)).alias("active_months_lag3"),
    when(countDistinct("avg_sales_lag6") == 1, first("avg_sales_lag6")).otherwise(lit(-1)).alias("avg_sales_lag6"),
    when(countDistinct("avg_purchases_lag6") == 1, first("avg_purchases_lag6")).otherwise(lit(-1)).alias("avg_purchases_lag6"),
    when(countDistinct("active_months_lag6") == 1, first("active_months_lag6")).otherwise(lit(-1)).alias("active_months_lag6"),
    when(countDistinct("avg_sales_lag12") == 1, first("avg_sales_lag12")).otherwise(lit(-1)).alias("avg_sales_lag12"),
    when(countDistinct("avg_purchases_lag12") == 1, first("avg_purchases_lag12")).otherwise(lit(-1)).alias("avg_purchases_lag12"),
    when(countDistinct("active_months_lag12") == 1, first("active_months_lag12")).otherwise(lit(-1)).alias("active_months_lag12"),
    when(countDistinct("city_id") == 1, first("city_id")).otherwise(lit(-1)).alias("city_id"),
    when(countDistinct("state_id") == 1, first("state_id")).otherwise(lit(-1)).alias("state_id"),
    when(countDistinct("quailty_status") == 1, first("quailty_status")).otherwise(lit("Unknown")).alias("quailty_status")
)

In [0]:
# display(compressed_dupl_df.limit(10))

In [0]:
final_cleaned_df_csv = compressed_dupl_df\
    .union(cleaned_df_csv.where("quailty_status = 'clean'"))

In [0]:
final_cleaned_df_csv = final_cleaned_df_csv.withColumnRenamed("city_id", "csv_city_id")\
    .withColumnRenamed("merchant_category_id", "csv_merchant_category_id")\
    .withColumnRenamed("state_id", "csv_state_id")\
    .withColumnRenamed("subsector_id", "csv_subsector_id")

In [0]:
# display(final_cleaned_df_csv.where("quailty_status = 'duplicate'"))
# ALL average sales and purchases columns have data quality issues

### step 5. parquet data quailty checks

In [0]:
# checking consistency
# display(df_par.select("authorized_flag").distinct())
# display(df_par.select("category").distinct())
# display(df_par.selectExpr("max(state_id)"))

In [0]:
cleaned_df_par = df_par.withColumn("category", when(col("category").isNull(), lit("Unknown")).otherwise(col("category")))\
    .withColumn("purchase_date", to_timestamp(col("purchase_date"), "yyyy-MM-dd HH:mm:ss"))\
    .withColumn("id", monotonically_increasing_id()+1)

In [0]:
# display(cleaned_df_par.limit(10))

In [0]:
# display(cleaned_df_par.selectExpr("max(purchase_date)"))

In [0]:
# print(cleaned_df_par.count())

### step 6. Joining tables

In [0]:
main_df = cleaned_df_par.join(final_cleaned_df_csv, [cleaned_df_par.merchant_id == final_cleaned_df_csv.merchant_id], 'left')\
    .drop(final_cleaned_df_csv.merchant_id)

In [0]:
# display(main_df.limit(10))

In [0]:
%skip
main_df.createOrReplaceTempView("merchants_transaction_cleaned_data")

In [0]:
%skip
%sql

SELECT merchant_id, city_id, csv_city_id
FROM merchants_transaction_cleaned_data
WHERE csv_city_id is not NULL
AND csv_city_id <> -1
--AND city_id = -1
AND csv_city_id <> city_id
LIMIT 20

In [0]:
%skip
%sql

SELECT merchant_id, merchant_category_id, csv_merchant_category_id
FROM merchants_transaction_cleaned_data
WHERE csv_merchant_category_id is not NULL
AND csv_merchant_category_id <> -1
AND csv_merchant_category_id <> merchant_category_id
LIMIT 20

#### step 6.1. cleaning inconsistent data (city_id, state_id, subsector_id and merchant_category_id)

In [0]:
final_main_df = main_df.withColumn("calc_city_id",
    when(col("city_id") == col("csv_city_id"), col("city_id"))\
    .when( ((col("city_id").isNull()) | (col("city_id") == -1)) & ((col("csv_city_id").isNotNull()) & (col("csv_city_id") != -1)), col("csv_city_id"))\
    .when( ((col("csv_city_id").isNull()) | (col("csv_city_id") == -1)) & ((col("city_id").isNotNull()) & (col("city_id") != -1)), col("city_id"))\
    .otherwise(-1)
).withColumn("calc_merchant_category_id",
    when(col("merchant_category_id") == col("csv_merchant_category_id"), col("merchant_category_id"))\
    .when( ((col("merchant_category_id").isNull()) | (col("merchant_category_id") == -1)) & ((col("csv_merchant_category_id").isNotNull()) & (col("csv_merchant_category_id") != -1)), col("csv_merchant_category_id"))\
    .when( ((col("csv_merchant_category_id").isNull()) | (col("csv_merchant_category_id") == -1)) & ((col("merchant_category_id").isNotNull()) & (col("merchant_category_id") != -1)), col("merchant_category_id"))\
    .otherwise(-1)
).withColumn("calc_state_id",
    when(col("state_id") == col("csv_state_id"), col("state_id"))\
    .when( ((col("state_id").isNull()) | (col("state_id") == -1)) & ((col("csv_state_id").isNotNull()) & (col("csv_state_id") != -1)), col("csv_state_id"))\
    .when( ((col("csv_state_id").isNull()) | (col("csv_state_id") == -1)) & ((col("state_id").isNotNull()) & (col("state_id") != -1)), col("state_id"))\
    .otherwise(-1)
).withColumn("calc_subsector_id",
    when(col("subsector_id") == col("csv_subsector_id"), col("subsector_id"))\
    .when( ((col("subsector_id").isNull()) | (col("subsector_id") == -1)) & ((col("csv_subsector_id").isNotNull()) & (col("csv_subsector_id") != -1)), col("csv_subsector_id"))\
    .when( ((col("csv_subsector_id").isNull()) | (col("csv_subsector_id") == -1)) & ((col("subsector_id").isNotNull()) & (col("subsector_id") != -1)), col("subsector_id"))\
    .otherwise(-1)
)

#### step 6.2 adding needed columns for presentation notebook

In [0]:
final_main_df = final_main_df.withColumn("purchase_month", month(col("purchase_date")))\
    .withColumn("purchase_hour", hour(col("purchase_date")))\
    .drop("city_id").drop("csv_city_id")\
    .drop("merchant_category_id").drop("csv_merchant_category_id")\
    .drop("state_id").drop("csv_state_id")\
    .drop("subsector_id").drop("csv_subsector_id")
# not needed columns are droped here due to performance constrains of free servreless cluster

In [0]:
# chekcing if join caused duplication of records - no of records: 7 274 367
# print(final_main_df.count())

In [0]:
# display(final_main_df.limit(10))

### step 7. Saving clean dataframe in catalog

In [0]:
main_df.write.mode("overwrite").format("delta").saveAsTable("default.merchants_transaction_clean_data")

In [0]:
dbutils.notebook.exit("Table 'default.merchants_transaction_clean_data' created succefully")