# dim_source_store_sale

## Import libraries

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.functions import udf,upper,lit,monotonically_increasing_id, row_number,col
from pyspark.sql.types import StringType, LongType, DecimalType, TimestampType, DateType, BooleanType
import pandas as pd
import numpy as np
import datetime
import datetime
from pyspark.sql.utils import AnalysisException
from pyspark.sql.window import Window
from yody_function.deltalake import upsert_deltalake
from yody_function.support_function import last_modify_time, top_modify_time, print_log
from yody_function.bigquery import upsert_bigquery
from datetime import datetime, timedelta

## Load config

In [2]:
ENV = 'prod'
HDFS_MASTER = 'gs://yody-lakehouse'
HDFS_MASTER_BAKUP = 'gs://yody-backup'
SPARK_HOME = 'yarn'
DATA_SOURCE = '/lading-zone/'
DATA_STORE = f'/dwh/{ENV}/dim/'

## Initializing Spark Session

In [3]:
sparkSession = SparkSession\
.builder\
.appName("dim_soure_store_sale")\
.master(SPARK_HOME)\
.config("spark.cores.max", '2')\
.config("spark.executor.memory", '2g')\
.config("spark.sql.debug.maxToStringFields", 255)\
.config("spark.jars", "gs://yody-lakehouse/job/jar_file/delta-core_2.12-1.0.1.jar,gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.28.0.jar")\
.config("spark.sql.parquet.enableVectorizedReader", "false")\
.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/19 09:11:43 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
24/06/19 09:11:43 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
24/06/19 09:11:43 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
24/06/19 09:11:43 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


### Connect to HDFS

In [4]:
def apply_columns(df, columns, func=lambda x: F.col(x), naming_func=lambda x: x):
    for col in columns:
        if col not in df.columns:
            raise Exception(f"not found column {str(col)} in {df.columns}")
        else:
            df = df.withColumn(naming_func(col), func(col))
    return df

def begin_date_store(sparkSession, env, _type = "normal"):
    """
        type: ["normal", "incremental", "overwrite"]
    """
    path_save_table = f"gs://yody-lakehouse/staging/unicorn/{env}/begin_store_mapping"
    columns_key = ["store_id"]
        
    if(_type == 'normal'):
        pass
    else:
        df_ = sparkSession.read.parquet(HDFS_MASTER + f"/lading-zone/prod/prod_order_service/orders/*")\
            .filter("(status == 'finished') and (channel == 'POS')")\
            .filter("is_deleted == 0")\
            .groupby("store_id").agg(
                F.min("created_date").alias("begin_date")
            )\
            .withColumn("begin_date", 
                        F.when(F.col("store_id") == 192, "2022-06-07 04:07:08.0")
                        .when(F.col("store_id") == 207, "2022-11-15 13:13:02.0")
                        .otherwise(F.col("begin_date"))
            )
        
        if(_type == 'overwrite'):
            upsert_deltalake(
                sparkSession,
                df_upsert=df_,
                path_table=path_save_table,
                columns_key=columns_key,
                mode="overwrite"
            )
        if(_type == 'incremental'):
            df_old = sparkSession.read.format("delta").load(path_save_table)\
                .withColumnRenamed("begin_date", "begin_date_old")
            df_.printSchema()
            df_ = df_.join(df_old, ["store_id"], "left")\
                .filter("begin_date_old is null")\
                .select("store_id", "begin_date")\
                .cache()
            print_log(f"so luong cua hang moi {df_.count()}")
            
            df_.printSchema()
            upsert_deltalake(
                sparkSession = sparkSession,
                df_upsert=df_,
                path_table=path_save_table,
                columns_key=columns_key,
                mode="upsert"
            )
        
    return sparkSession.read.format("delta").load(path_save_table)

### read data

In [5]:
df_department = sparkSession\
.read\
.parquet(HDFS_MASTER + f"/lading-zone/{ENV}/{ENV}_account_service/department")

df_store = sparkSession\
.read\
.parquet(HDFS_MASTER + f"/lading-zone/{ENV}/{ENV}_core_service/store")

df_source = sparkSession\
.read\
.parquet(HDFS_MASTER + f"/lading-zone/{ENV}/{ENV}_core_service/source")

df_city = sparkSession\
.read\
.parquet(HDFS_MASTER + f"/lading-zone/{ENV}/{ENV}_content_service/city")

df_economic_zone = sparkSession\
.read\
.parquet(HDFS_MASTER + f"/lading-zone/{ENV}/{ENV}_core_service/economic_zone")

df_manager = sparkSession\
.read\
.option('header', 'true')\
.csv('gs://yody-lakehouse/lading-zone/google_sheet/manager_by_store/final.csv')

df_trainer = sparkSession\
.read\
.option('header', 'true')\
.csv('gs://yody-lakehouse/lading-zone/google_sheet/data_store/final.csv')

df_rsm = sparkSession\
.read\
.option('header', 'true')\
.csv('gs://yody-lakehouse/lading-zone/google_sheet/map_asm/final.csv')

24/06/19 09:12:12 WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/06/19 09:12:27 WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/06/19 09:12:42 WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/06/19 09:12:57 WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
24/06/19 09:13:12 WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are regist

In [6]:
if(ENV == "dev"):
    df_fake = pd.read_csv("https://docs.google.com/spreadsheet/ccc?key=1owWynqISRn4p4zowOjSffI6kihV7wtx9Cxaf3ZLbhwg&output=csv")
    df_fake = sparkSession.createDataFrame(df_fake)
    df_source = df_source.drop(*["department_id"])\
        .join(df_fake, "id", "left")

In [7]:
# last
df_department = last_modify_time(df_department, "id", "updated_date").filter("is_deleted == 0")
df_store = last_modify_time(df_store, "id", "updated_date").filter("is_deleted == 0")
df_source = last_modify_time(df_source, "id", "updated_date").filter("is_deleted == 0")
df_city = last_modify_time(df_city, "id", "updated_date").filter("is_deleted == 0")
df_economic_zone = last_modify_time(df_economic_zone, "id", "updated_date").filter("is_deleted == 0")

# select
df_department = df_department.select("id", "parent_id", "name", 'updated_date', 'is_deleted')

#### begin_date_store

In [8]:
df_begin = begin_date_store(sparkSession, ENV, "incremental")

df_begin = df_begin\
.withColumn("source_store_key", F.concat(F.lit("ST"), F.col("store_id")))\
.withColumn("begin_date_07", F.col("begin_date") + F.expr("INTERVAL 7 HOURS"))\
.select("source_store_key", "begin_date_07")

                                                                                

root
 |-- store_id: long (nullable = true)
 |-- begin_date: string (nullable = true)



                                                                                

2024-06-19 09:39:35.258 INFO - so luong cua hang moi 0
root
 |-- store_id: long (nullable = true)
 |-- begin_date: string (nullable = true)

2024-06-19 09:39:35.489 INFO - upsert table


                                                                                

2024-06-19 09:39:55.095 INFO - vacuum - 0


                                                                                

Deleted 0 files and directories in a total of 1 directories.


#### manager_by_store

In [9]:
df_manager = df_manager.filter(F.col('TỈNH')!='TỈNH')
raw_cols = [i for i in df_manager.columns if i.strip().lower() in ('tỉnh', 'asm_name')]

df_manager = apply_columns(df_manager, 
                           columns=raw_cols,
                           func=F.upper,
                           naming_func=lambda x: x.strip().lower()
                          )
df_manager = df_manager\
.withColumn('city_name', F.upper(F.trim('tỉnh')))\
.withColumn('asm_name', F.upper(F.trim('asm_name')))\
.select('city_name', 'asm_name')

df_manager = df_manager.dropDuplicates()

In [10]:
# change region
df_rsm = df_rsm.filter(F.col('id').cast('int').isNotNull())
raw_cols = [i for i in df_rsm.columns if i.strip().lower() in ('id', 'rsm_name', 'region')]

df_rsm = df_rsm\
.withColumn('city_id', F.col('id').cast('int'))\
.withColumn('rsm_name', F.upper(F.trim('rsm_name')))\
.withColumn('region', F.when(F.lower('region')=='none', F.lit(None).cast('string')).otherwise(F.col('region')))\
.withColumn('region', F.upper(F.trim(F.col("region"))))\
.select('city_id', 'rsm_name', 'region')
df_rsm = df_rsm.dropDuplicates()

#### data_store

In [11]:
df_trainer = df_trainer.filter('CAST(STT AS INT) IS NOT NULL')
raw_cols = [i for i in df_trainer.columns if i.strip().lower() in ('store_name', 'trainer_code')]

df_trainer = apply_columns(df_trainer, 
                           columns=raw_cols,
                           func=lambda x: F.upper(x), 
                           naming_func=lambda x: x.strip().lower()
                          )

clean_string = lambda x: F.upper(F.trim(x))
df_trainer = df_trainer\
.withColumn('source_store_name', clean_string('store_name'))\
.withColumn('trainer_code', 
            F.when(F.expr('lower(trainer_code) = "none"'), F.lit(None).cast('string')).otherwise(F.col('trainer_code')))\
.withColumn('trainer_code', clean_string('trainer_code'))\
.select('source_store_name', 'trainer_code')

df_trainer = df_trainer.dropDuplicates()

## Transformations

### Preprocess on source data

#### Add `source_store_id`, `source_store_name`, `source_store_key`, `source_store_type`, and `type`

In [12]:
df_source = df_source\
.withColumnRenamed('id', 'source_store_id')\
.withColumnRenamed('name', 'source_store_name')\
.withColumn('source_store_key', F.concat(F.lit("SO"), F.col("source_store_id")))\
.withColumn('source_store_type', F.lit(''))\
.withColumn('type', F.lit('source'))\
.withColumn('source_store_code', F.col('code'))\
.withColumn('longitude', F.lit(None).cast(StringType()))

In [13]:
df_store = df_store\
.withColumnRenamed('id', 'source_store_id')\
.withColumnRenamed('name', 'source_store_name')\
.withColumn('source_store_key', F.concat(F.lit("ST"), F.col("source_store_id")))\
.withColumn('source_store_type', F.col('type'))\
.withColumn('type', F.lit('store'))\
.withColumn('source_store_code', F.col('code'))\
.withColumn('city_name', F.trim(F.upper('city_name')))\
.withColumn('ward_name', F.trim(F.upper('ward_name')))\
.withColumn('district_name', F.trim(F.upper('district_name')))

#### Add missing columns for df_source

- Add `square` column with default **0** value and convert to the **DecimalType(10, 2)** datatype.

- Add `city_id`, `rank_name`, and `is_saleable` columns with default **null** value.

- Add `status` based on the below condition:

    - If **is_active is True** then **status = 'active'**.
    
    - Otherwise **status = 'inactive'**.

In [14]:
df_source = df_source\
.withColumn('square', F.lit(0).cast(DecimalType(precision=10, scale=2)))\
.withColumn('city_id', F.lit(None).cast(LongType()))\
.withColumn('rank_name', F.lit(None).cast(StringType()))\
.withColumn('is_saleable', F.lit(None).cast(BooleanType()))\
.withColumn('latitude', F.lit(None).cast(StringType()))\
.withColumn('longtitude', F.lit(None).cast(StringType()))\
.withColumn('city_name', F.lit(None).cast(StringType()))\
.withColumn('ward_name', F.lit(None).cast(StringType()))\
.withColumn('district_name', F.lit(None).cast(StringType()))\
.withColumn('asm_name', F.lit(None).cast(StringType()))\
.withColumn('rsm_name', F.lit(None).cast(StringType()))\
.withColumn('trainer_code', F.lit(None).cast(StringType()))


df_source = df_source\
.withColumn('status', F.when(F.col('is_active')==F.lit(True), F.lit('active')).otherwise(F.lit('inactive')))

#### Correct `square` and `status` columns for df_store

In [15]:
df_store = df_store\
.withColumn('square', F.col('square').cast(DecimalType(precision=10, scale=2)))\
.withColumn('status', F.lower(F.col('status')))

df_store = df_store.fillna(0, subset=['square'])

#### Join to get df_city_region

In [16]:
df_economic_zone = df_economic_zone\
.withColumnRenamed('id', 'economic_zone_id')\
.withColumnRenamed('name', 'region')\
.withColumn('region', F.upper(F.trim(F.col("region"))))

df_city = df_city.withColumnRenamed('id', 'city_id')

# change region
df_city_region = df_city.join(df_rsm, ['city_id'], 'left')#(df_economic_zone, ["economic_zone_id"])
df_city_region = df_city_region\
.select('city_id', 'region')\
.filter("city_id is not null")\
.dropDuplicates()

#### Clean string field of 4 dataframes

- Replace string '\x00' by ''(empty string).

- Clean on the `name` for **df_department**, 
- Clean on the `source_store_name` for **df_source**,
- Clean on the `source_store_name` and `rank_name` **df_store**.
- Clean on the `region` for **df_city_region**.

In [17]:
clean_name_rule = lambda name: F.upper(F.regexp_replace(F.col(name), '\x00', ''))

df_department = df_department.withColumn('name', clean_name_rule('name'))

df_source = df_source.withColumn('source_store_name', clean_name_rule('source_store_name'))

df_store = df_store\
.withColumn('source_store_name', clean_name_rule('source_store_name'))\
.withColumn('rank_name', clean_name_rule('rank_name'))

df_city_region = df_city_region.withColumn('region', clean_name_rule('region'))

In [18]:
df_store = df_store\
.join(df_manager, ['city_name'], 'left')\
.join(df_trainer, ['source_store_name'], 'left')\
.join(df_rsm, ['city_id'], 'left')

mapping_sheet_cond = F.col('source_store_type') == 'store'
df_store = df_store\
.withColumn('asm_name', F.when(mapping_sheet_cond, F.col('asm_name')).otherwise(F.lit(None).cast('string')))\
.withColumn('rsm_name', F.when(mapping_sheet_cond, F.col('rsm_name')).otherwise(F.lit(None).cast('string')))\
.withColumn('trainer_code', F.when(mapping_sheet_cond, F.col('trainer_code')).otherwise(F.lit(None).cast('string')))

### Process target data

1. Union to the united form of df_source_store

2. Join with df_begin

In [19]:
union_selected_cols = [
    'source_store_key', 
    'source_store_id', 
    'source_store_code',
    'source_store_name', 
    'source_store_type',
    'department_id',
    'city_id',
    'type',
    'status',
    'is_saleable', 
    'rank_name',
    'square',
    'latitude',
    'longitude',
    'city_name',
    'ward_name', 
    'district_name',
    'is_deleted',
    'asm_name',
    'trainer_code',
    'rsm_name'
]

In [20]:
df_source = df_source.select(union_selected_cols)
df_store = df_store.select(union_selected_cols)

In [21]:
df_source_store_trans = df_source.union(df_store)
df_source_store_trans = df_source_store_trans.join(df_begin, "source_store_key", "left")
df_source_store_trans = df_source_store_trans\
.fillna("1900-01-01", subset=["begin_date_07"])\
.withColumn("begin_date_07", F.col("begin_date_07").cast(DateType()))

#### Create level dataframe indicated the company's schema

- Try to left join 6 times for finding 3 main levels of each department.

- Could not modify the below block due to implicited 6 dataframe's names.

In [22]:
df_lv = df_department.alias("d0")\
    .join(df_department.alias("d1"), col("d0.parent_id") == col("d1.id"), "left")\
    .join(df_department.alias("d2"), col("d1.parent_id") == col("d2.id"), "left")\
    .join(df_department.alias("d3"), col("d2.parent_id") == col("d3.id"), "left")\
    .join(df_department.alias("d4"), col("d3.parent_id") == col("d4.id"), "left")\
    .join(df_department.alias("d5"), col("d4.parent_id") == col("d5.id"), "left")\
    .join(df_department.alias("d6"), col("d5.parent_id") == col("d6.id"), "left")\
    .withColumn("department_lv1", F.when(col("d6.parent_id") == -1, col("d4.name"))
                                    .when(col("d5.parent_id") == -1, col("d3.name"))
                                    .when(col("d4.parent_id") == -1, col("d2.name"))
                                    .when(col("d3.parent_id") == -1, col("d1.name"))
                                    .when(col("d2.parent_id") == -1, col("d0.name"))
                                    .otherwise("")
    )\
    .withColumn("department_lv2", F.when(col("d6.parent_id") == -1, col("d3.name"))
                                    .when(col("d5.parent_id") == -1, col("d2.name"))
                                    .when(col("d4.parent_id") == -1, col("d1.name"))
                                    .when(col("d3.parent_id") == -1, col("d0.name"))
                                    .otherwise("")
    )\
    .withColumn("department_lv3", F.when(col("d6.parent_id") == -1, col("d2.name"))
                                    .when(col("d5.parent_id") == -1, col("d1.name"))
                                    .when(col("d4.parent_id") == -1, col("d0.name"))
                                    .otherwise("")
        )\
    .withColumn("department_lv4", F.when(col("d6.parent_id") == -1, col("d1.name"))
                                    .when(col("d5.parent_id") == -1, col("d0.name"))
                                    .otherwise("")
    )\
    .selectExpr("d0.id as department_id", "department_lv1", "department_lv2", "department_lv3", "department_lv4")

### Xử lý level

In [23]:
lv1_offline_cond = (F.col('department_lv1')=='HỆ THỐNG KINH DOANH OFFLINE') | (F.col('department_lv1')=='HT KD OFFLINE')
lv1_online_cond = (F.col('department_lv1')=='HỆ THỐNG KINH DOANH ONLINE') | (F.col('department_lv1')=='HT KD ONLINE')
online_cond = F.col('department_lv2') == 'ONLINE'
ecom_cond = F.col('department_lv2') == 'ECOM'
selected_cols = ["department_id", "department_lv1", "department_lv2", "department_lv3"]

df_lv_off = df_lv.filter(lv1_offline_cond)\
                 .withColumn('department_lv1', F.lit('KINH DOANH OFFLINE'))\
                 .select(selected_cols)

df_lv_onl = df_lv.filter(lv1_online_cond)\
                 .withColumn('department_lv1', F.when(online_cond, 'KINH DOANH ONLINE')\
                                                .when(ecom_cond, 'E - COMMERCE')\
                                                .otherwise(F.col('department_lv2')))\
                 .withColumn('department_lv2', F.col('department_lv3'))\
                 .withColumn('department_lv3', F.col('department_lv4'))\
                 .select(selected_cols)

df_lv_oth = df_lv.filter(~(lv1_online_cond | lv1_offline_cond))\
                 .select(selected_cols)

df_lv = df_lv_off.union(df_lv_onl).union(df_lv_oth)

#### Joing to get department levels and city_region data

In [24]:
df_source_store_trans = df_source_store_trans.join(df_lv, "department_id", "left")
df_source_store_trans = df_source_store_trans.join(df_city_region, "city_id", "left")

## Final output

### Upsert deltalake on HDFS

In [25]:
# HDFS_MASTER+DATA_STORE+"dim_source_store_sale"
selected_cols = ['source_store_key', 'source_store_id', 'source_store_code', 'type', 'department_id', "department_lv1", 
                 "department_lv2", "department_lv3", "source_store_name", "source_store_type", 'rank_name', "region", 
                 "status", 'is_saleable', 'square', "begin_date_07", "latitude", "longitude", 'city_name', 'ward_name', 
                 'district_name', "is_deleted", "asm_name", 'trainer_code', 'rsm_name']
dim_source_store_sale = df_source_store_trans.select(selected_cols).cache()

In [None]:
path_table = HDFS_MASTER+DATA_STORE+"dim_source_store_sale"
columns_key = ['source_store_key']

upsert_deltalake(sparkSession=sparkSession, 
                 df_upsert=dim_source_store_sale, 
                 path_table=path_table, 
                 columns_key=columns_key,
                 #mode='upsert',
#                  mode='overwrite', truncate=True,
#                  vacuum_time = None
                )

                                                                                

2024-06-19 09:41:14.503 INFO - upsert table


                                                                                

2024-06-19 09:42:08.921 INFO - vacuum - 0


                                                                                

Deleted 7 files and directories in a total of 1 directories.


### Upsert bigquery

In [None]:
json_info = {
    "project_id": "yody-data-platform",
    "dataset": f"{ENV}_yody_analytics",
    "table_name": "dim_source_store_sale"
}
columns_key = ['source_store_key']

In [None]:
upsert_bigquery(df_upsert = dim_source_store_sale, 
                json_info=json_info, 
                columns_key=columns_key, 
                truncate = True,
                mode='overwrite'
               )

2024-06-19 09:42:50.099 INFO - write overwrite table - prod_yody_analytics.dim_source_store_sale


                                                                                

True

## Stop Session

In [None]:
sparkSession.stop()