In [1]:
!pip install faker


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [26]:
import pyspark
from faker import Faker
from pyspark.sql import SparkSession, DataFrame, functions as F, types as t
from pyspark.sql.window import Window
import random
from datetime import datetime, timedelta, date

spark = SparkSession \
    .builder \
    .master("local[1]") \
    .appName("pyspark-test01") \
    .config("spark.sql.warehouse.dir", "spark-warehouse") \
    .enableHiveSupport() \
    .getOrCreate()
print(spark.catalog.listDatabases())

fake = Faker()
Faker.seed(42)
random.seed(42)


24/04/01 11:12:13 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


[Database(name='default', catalog='spark_catalog', description='Default Hive database', locationUri='file:/workspaces/devcontainer-universal/spark-warehouse')]


In [27]:
@F.udf(t.IntegerType())
def f_int(min:int, max:int) -> int:
    return fake.random_int(min=min, max=max)

@F.udf
def f_intUnique(digits:int) -> int:
    return fake.unique.random_number(digits=digits)

@F.udf
def f_choise(list:list, weights:list) -> str:
    return random.choices(list,weights=weights)[0]

@F.udf
def f_bool() -> bool:
    return fake.boolean()

@F.udf
def f_GUID() -> str:
    return fake.uuid4()

@F.udf
def f_hash() -> str:
    return fake.md5(raw_output=False)

@F.udf
def f_date(start:str='-3y', end:str='now') -> date:
    return fake.date_between(start_date=start, end_date=end)


In [33]:
consumers = spark.read.table('consumer_dimension').select('Consumer_Key')
consumers.count()

tmp_df = consumers.withColumn('random_int', f_int(F.lit(1),F.lit(2)))
tmp_df.show()

+------------+----------+
|Consumer_Key|random_int|
+------------+----------+
|        1701|         1|
|       23735|         1|
|       35873|         2|
|       43076|         1|
|       43219|         1|
|       48349|         1|
|       50710|         1|
|       52774|         1|
|       71603|         2|
|       79415|         1|
|       79470|         1|
|       81071|         1|
|      101757|         1|
|      103119|         1|
|      109751|         1|
|      120461|         1|
|      127543|         2|
|      131637|         1|
|      133033|         2|
|      139259|         2|
+------------+----------+
only showing top 20 rows



In [45]:
groupBy_columns = ['Division_Code_Key','Region_Code_Key','Affiliate_Code_Key','Market_Code_Key']

# Total Buyers, will join it to Consumer Dimention on Consumer Master Reg GUID
trans_head = spark.read.table('transaction_header').select('Source_Consumer_Key')
master_reg = spark.read.table('master_registry').select('Source_Consumer_Key', 'Consumer_Master_Registry_Global_Unique_Identifier')
total_buyers = (
    master_reg
    .join(trans_head, master_reg.Source_Consumer_Key == trans_head.Source_Consumer_Key, 'left')
)

In [None]:
# Lifecycle
lifecycle = (
    spark.read.table('consumer_lifecycle')
    .select('Consumer_Key','Consumer_Lifecycle_Status_Code_Key')
    .where(F.col('Consumer_Lifecycle_Status_Current_Record_Indicator') == 1)
)

In [None]:
csv_consumer_address = (
    spark.read.csv('src/address_2024_03_29.csv', header=True, inferSchema=True, quote='"')
    Consumer_Address_Last_Postal_Opt_In_Opt_Out_Update_Source_System_Code_Key
    # .withColumn('Consumer_Address_Key', F.col('Consumer_Address_Key')).cast('bigint'))
    # .withColumn('Consumer_Address_Country_Code_Key', F.col('Consumer_Address_Country_Code_Key').cast('bigint'))
    .withColumn('Consumer_Address_Last_Update_Source_System_Code_Key', F.col('Consumer_Address_Last_Update_Source_System_Key').cast('bigint'))
    .withColumn('Consumer_Address_Postal_Opt_In_Opt_Out_Update_Timestamp', F.col('Consumer_Address_Postal_Opt_In_Opt_Out_Update_Date'))
    .withColumn('Consumer_Address_Record_Valid_To_Timestamp', F.col('Consumer_Address_Record_Valid_To_Date'))
    .withColumn('Consumer_Address_Source_Last_Update_Timestamp', F.col('Consumer_Address_Source_Last_Update_Date'))
    .withColumn('Consumer_Address_Last_Postal_Opt_In_Opt_Out_Update_Source_System_Code_Key', F.lit(1))
    .withColumn('Consumer_Address_Record_Valid_From_Timestamp', F.lit(1))
    .drop('HUI')
    .drop('Consumer_Address_Record_Valid_To_Date')
    .drop('Consumer_Address_Source_Last_Update_Date')
    .drop('Consumer_Address_Last_Postal_Opt_In_Opt_Out_Update_Source_System_Key')
    .drop('Consumer_Address_Last_Update_Source_System_Key')
    .drop('Consumer_Address_Postal_Opt_In_Opt_Out_Update_Date')
)
csv_consumer_address.show()
csv_consumer_address.write.insertInto('consumer_address')


In [None]:
# Loyalty
loyalty = spark.read.table('consumer_loyalty')

In [74]:
# Contactables
phone = spark.read.table('consumer_phone').select('Consumer_Key','Consumer_Phone_SMS_Opt_In_Opt_Out_Indicator').where(F.col('Consumer_Phone_Current_Record_Indicator') == 1)
email = spark.read.table('consumer_email').select('Consumer_Key','Consumer_Email_Opt_In_Opt_Out_Indicator').where(F.col('Consumer_Email_Current_Record_Indicator') == 1)
address = spark.read.table('consumer_address').select('Consumer_Key','Consumer_Address_Postal_Opt_In_Opt_Out_Indicator','Consumer_Address_Current_Record_Indicator').where(F.col('Consumer_Address_Current_Record_Indicator')==1)
consumers = spark.read.table('consumer_dimension').select('Consumer_Key', *groupBy_columns)
contactable = (consumers
               .join(phone, consumers.Consumer_Key == phone.Consumer_Key, 'left').drop(phone.Consumer_Key)
               .join(email, consumers.Consumer_Key == email.Consumer_Key, 'left').drop(email.Consumer_Key)
               .join(address, consumers.Consumer_Key == address.Consumer_Key, 'left').drop(address.Consumer_Key)
)
contactable.show()


+------------+-----------------+---------------+------------------+---------------+-------------------------------------------+---------------------------------------+------------------------------------------------+-----------------------------------------+
|Consumer_Key|Division_Code_Key|Region_Code_Key|Affiliate_Code_Key|Market_Code_Key|Consumer_Phone_SMS_Opt_In_Opt_Out_Indicator|Consumer_Email_Opt_In_Opt_Out_Indicator|Consumer_Address_Postal_Opt_In_Opt_Out_Indicator|Consumer_Address_Current_Record_Indicator|
+------------+-----------------+---------------+------------------+---------------+-------------------------------------------+---------------------------------------+------------------------------------------------+-----------------------------------------+
|        1701|                1|             10|            886963|              1|                                          0|                                   NULL|                                            NULL|       