#### Setup

In [1]:
import pyspark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
7,application_1640569960358_0010,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
%%configure -f
{
"conf" :
{
"spark.serializer" : "org.apache.spark.serializer.KryoSerializer",
"spark.sql.legacy.parquet.int96RebaseModeInRead" : "CORRECTED",
"spark.sql.legacy.parquet.datetimeRebaseModeInWrite" : "CORRECTED",
"spark.sql.legacy.parquet.datetimeRebaseModeInRead" : "CORRECTED",
"spark.sql.legacy.timeParserPolicy" : "LEGACY"
}
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
8,application_1640569960358_0011,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
6,application_1640569960358_0009,pyspark,busy,Link,Link,,
8,application_1640569960358_0011,pyspark,idle,Link,Link,,✔


In [3]:
import time

from datetime import datetime, timedelta
from functools import reduce

from pyspark import SparkConf, StorageLevel
from pyspark.sql import SparkSession, HiveContext, Window
import pyspark.sql.functions as F

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Utils

In [4]:
def to_uri(bucket, key):
    """
    Transforms bucket & key strings into S3 URI

    Args:
        bucket (string): name of the S3 bucket
        key (string): S3 key

    Returns:
        object (string): URI format
    """
    return 's3://{}/{}'.format(bucket, key)


def spark_read_parquet_s3(spark, bucket, path):
    """
    Read parquet file(s) hosted on a S3 bucket, load and return as spark dataframe

    Args:
        spark (SparkSession): spark app
        bucket (string): S3 bucket
        path (string): full path to the parquet directory or file within the S3 bucket

    Returns:
        (SparkDataframe): data loaded
    """
    return spark.read.parquet(to_uri(bucket, path))


def spark_write_parquet_s3(df, bucket, dir_path, repartition=10, mode='overwrite'):
    """
    Write a in-memory SparkDataframe to parquet files on a S3 bucket

    Args:
        df (SparkDataframe): the data to save
        bucket (string): S3 bucket
        dir_path (string): full path to the parquet directory within the S3 bucket
        repartition (int): number of partitions files to write
        mode (string): writing mode
    """
    df.repartition(repartition).write.parquet(to_uri(bucket, dir_path), mode=mode)
    

def spark_write_parquet_s3_coal(df, bucket, dir_path, mode='overwrite'):
    df.coalesce(10).write.parquet(to_uri(bucket, dir_path), mode=mode)
    
    
def get_timer(starting_time):
    """
    Displays the time that has elapsed between the input timer and the current time.

    Args:
        starting_time (timecode): timecode from Python 'time' package
    """
    end_time = time.time()
    minutes, seconds = divmod(int(end_time - starting_time), 60)
    print("{} minute(s) {} second(s)".format(int(minutes), seconds))


def union_all(l_df):
    """
    Apply union function on all spark dataframes in l_df

    """
    return reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), l_df)


def date_to_week_id(date):
    """
    Turn a date to Decathlon week id
    Args:
        date (str, pd.Timestamp or pd.Series): the date or pandas column of dates
    Returns:
        (int): the week id

    """
    day_of_week = date.strftime("%w")
    date = date if (day_of_week != '0') else date + timedelta(days=1)
    return int(str(date.isocalendar()[0]) + str(date.isocalendar()[1]).zfill(2))


def get_current_week_id():
    """
    Return current week id (international standard ISO 8601 - first day of week
    is Sunday, with format 'YYYYWW', as integer

    """
    return date_to_week_id(datetime.today())


def get_shift_n_week(week_id, nb_weeks):
    """
    Return input week_id shifted by nb_weeks (could be negative)

    """
    shifted_date = datetime.strptime(str(week_id) + '1', '%G%V%u') + timedelta(weeks=nb_weeks)
    ret_week_id = date_to_week_id(shifted_date)
    return ret_week_id

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Fetch data & Processing

In [5]:
bucket_refined = 'fcst-workspace/forecast-cn/fcst-refined-demand-forecast-dev'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
#df_tree = spark_read_parquet_s3(spark,bucket_refined,'global/model_week_tree')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
# ref_model_pool
ref_model_pool = spark_read_parquet_s3(spark,bucket_refined,'test_data/cold_start_dev/ref_model_pool_202138.parquet')
ref_model_pool = ref_model_pool.selectExpr('model_id','family_id','dsm_code','product_nature_id','average_price')
ref_model_pool.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+---------+--------+-----------------+------------------+
|model_id|family_id|dsm_code|product_nature_id|     average_price|
+--------+---------+--------+-----------------+------------------+
|  720719|     3174|    3322|            25715|20.000315014540725|
|  765283|     3462|    null|                0| 19.23076923076923|
+--------+---------+--------+-----------------+------------------+
only showing top 2 rows

In [8]:
ref_model_pool_2 = ref_model_pool.selectExpr('model_id as model_id_dp','family_id as family_id_dp',
                                             'dsm_code as dsm_code_dp','product_nature_id as product_nature_id_dp',
                                             'average_price as average_price_dp')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# create ref_model_base with random sampling
ref_model_base = ref_model_pool.alias('df1').join(ref_model_pool_2.alias('df2'),
                                                  F.col('df1.model_id') != F.col('df2.model_id_dp'),how='left')
#.filter('df1.product_nature_id == df2.product_nature_id_dp')
ref_model_base = ref_model_base.sample(withReplacement=True, fraction=0.05, seed=3)
ref_model_base.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+---------+--------+-----------------+------------------+-----------+------------+-----------+--------------------+------------------+
|model_id|family_id|dsm_code|product_nature_id|     average_price|model_id_dp|family_id_dp|dsm_code_dp|product_nature_id_dp|  average_price_dp|
+--------+---------+--------+-----------------+------------------+-----------+------------+-----------+--------------------+------------------+
|  720719|     3174|    3322|            25715|20.000315014540725|     864028|        3180|     181371|               25216|152.00243646907234|
|  720719|     3174|    3322|            25715|20.000315014540725|     962789|       34905|     326724|               25014| 61.18427534146343|
+--------+---------+--------+-----------------+------------------+-----------+------------+-----------+--------------------+------------------+
only showing top 2 rows

In [10]:
ref_model_base.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

3464843

In [11]:
# features = ['family_id','sub_department_id','department_id','univers_id','product_nature_id','sports']
ref_model_base = ref_model_base\
.withColumn('is_same_family',F.expr('case when family_id = family_id_dp then 1 else 0 end as is_same_family'))\
.withColumn('is_same_dsm',F.expr('case when dsm_code = dsm_code_dp then 1 else 0 end as is_same_dsm'))\
.withColumn('is_same_product_nature',F.expr('case when product_nature_id = product_nature_id_dp then 1 else 0 end as is_same_product_nature'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# fetch transaction data
ref_model_transaction = spark_read_parquet_s3(spark,bucket_refined,'test_data/cold_start_dev/ref_model_transaction_202138.parquet')
ref_model_transaction = ref_model_transaction.select('model_id','week_id','sales_quantity')\
#.join(ref_model_pool.select('model_id','family_id','dsm_code','product_nature_id'),on='model_id',how='left')
ref_model_transaction.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-------+--------------+
|model_id|week_id|sales_quantity|
+--------+-------+--------------+
|  720719| 202031|          63.0|
|  720719| 202032|          78.0|
+--------+-------+--------------+
only showing top 2 rows

In [13]:
# calculating similarity
ref_model_base_tran = ref_model_base.join(ref_model_transaction, on='model_id', how='left')

ref_model_base_tran = ref_model_base_tran.alias('df1')\
.join(ref_model_transaction.selectExpr('model_id','week_id','sales_quantity as sales_quantity_dp').alias('df2'),
      [F.col('df1.model_id_dp') == F.col('df2.model_id'),F.col('df1.week_id') == F.col('df2.week_id')],how='left')

ref_model_base_tran.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+---------+--------+-----------------+------------------+-----------+------------+-----------+--------------------+------------------+--------------+-----------+----------------------+-------+--------------+--------+-------+-----------------+
|model_id|family_id|dsm_code|product_nature_id|     average_price|model_id_dp|family_id_dp|dsm_code_dp|product_nature_id_dp|  average_price_dp|is_same_family|is_same_dsm|is_same_product_nature|week_id|sales_quantity|model_id|week_id|sales_quantity_dp|
+--------+---------+--------+-----------------+------------------+-----------+------------+-----------+--------------------+------------------+--------------+-----------+----------------------+-------+--------------+--------+-------+-----------------+
|  720719|     3174|    3322|            25715|20.000315014540725|     864028|        3180|     181371|               25216|152.00243646907234|             0|          0|                     0| 202137|          41.0|  864028| 202137|           

In [14]:
ref_model_base_tran = ref_model_base_tran.withColumn('difference',ref_model_base_tran.sales_quantity - ref_model_base_tran.sales_quantity_dp)

ref_model_distance = ref_model_base_tran.groupby(['df1.model_id','model_id_dp']).agg(F.sqrt(F.sum('difference')**2).alias('model_euc_distance'))
family_model_distance = ref_model_base_tran.groupby(['df1.family_id','family_id_dp']).agg(F.sqrt(F.sum('difference')**2).alias('family_euc_distance'))
dsm_model_distance = ref_model_base_tran.groupby(['df1.dsm_code','dsm_code_dp']).agg(F.sqrt(F.sum('difference')**2).alias('dsm_euc_distance'))
prod_nature_model_distance = ref_model_base_tran.groupby(['df1.product_nature_id','product_nature_id_dp']).agg(F.sqrt(F.sum('difference')**2).alias('prod_nature_euc_distance'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
ref_model_distance.show(2)
ref_model_distance.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----------+------------------+
|model_id|model_id_dp|model_euc_distance|
+--------+-----------+------------------+
|  765283|    8355692|           22661.0|
|  863881|    8554577|             915.0|
+--------+-----------+------------------+
only showing top 2 rows

<bound method DataFrame.count of DataFrame[model_id: bigint, model_id_dp: bigint, model_euc_distance: double]>

In [18]:
family_model_distance.show(2)
family_model_distance.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+------------+-------------------+
|family_id|family_id_dp|family_euc_distance|
+---------+------------+-------------------+
|     1165|        2250|           214378.0|
|    11956|       34374|             4921.0|
+---------+------------+-------------------+
only showing top 2 rows

498976

In [21]:
dsm_model_distance.show(2)
dsm_model_distance.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+-----------+----------------+
|dsm_code|dsm_code_dp|dsm_euc_distance|
+--------+-----------+----------------+
|  326551|     169253|        341059.0|
|  326558|      12895|           535.0|
+--------+-----------+----------------+
only showing top 2 rows

3091053

In [22]:
prod_nature_model_distance.show(2)
prod_nature_model_distance.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+--------------------+------------------------+
|product_nature_id|product_nature_id_dp|prod_nature_euc_distance|
+-----------------+--------------------+------------------------+
|            25216|               11797|               1333183.0|
|              262|               10492|                 23511.0|
+-----------------+--------------------+------------------------+
only showing top 2 rows

340692

In [None]:
'''
ref_model_base = ref_model_base.alias('df1').join(ref_model_distance.alias('df2'),
                                                  [F.col('df1.model_id') == F.col('df2.model_id'),F.col('df1.model_id_dp') == F.col('df2.model_id_dp')],how='left')\
.join(family_model_distance.alias('df3'),
      [F.col('df1.family_id') == F.col('df3.family_id'),F.col('df1.family_id_dp') == F.col('df3.family_id_dp')],how='left')\
.join(dsm_model_distance.alias('df4'),
      [F.col('df1.dsm_code') == F.col('df4.dsm_code'),F.col('df1.dsm_code_dp') == F.col('df4.dsm_code_dp')],how='left')\
.join(prod_nature_model_distance.alias('df5'),
      [F.col('df1.prduct_nature_id') == F.col('df5.prduct_nature_id'),F.col('df1.prduct_nature_id_dp') == F.col('df5.prduct_nature_id_dp')],how='left')
'''

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
ref_model_base = ref_model_base.selectExpr('model_id','model_id_dp','is_same_family','is_same_dsm','is_same_product_nature',
                                           'average_price','average_price_dp')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
# Save to S3
spark_write_parquet_s3(ref_model_base, 'fcst-workspace',
                       'forecast-cn/fcst-refined-demand-forecast-dev/test_data/cold_start_dev/ref_model_base',
                       repartition=10, mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
spark_write_parquet_s3(ref_model_distance, 'fcst-workspace',
                       'forecast-cn/fcst-refined-demand-forecast-dev/test_data/cold_start_dev/ref_model_distance',
                       repartition=10, mode='overwrite')

spark_write_parquet_s3(family_model_distance, 'fcst-workspace',
                       'forecast-cn/fcst-refined-demand-forecast-dev/test_data/cold_start_dev/family_model_distance',
                       repartition=10, mode='overwrite')

spark_write_parquet_s3(dsm_model_distance, 'fcst-workspace',
                       'forecast-cn/fcst-refined-demand-forecast-dev/test_data/cold_start_dev/dsm_model_distance',
                       repartition=10, mode='overwrite')

spark_write_parquet_s3(prod_nature_model_distance, 'fcst-workspace',
                       'forecast-cn/fcst-refined-demand-forecast-dev/test_data/cold_start_dev/prod_nature_model_distance',
                       repartition=10, mode='overwrite')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [93]:
'''
spark_write_parquet_s3_coal(ref_model_base, 'fcst-workspace',
                            'forecast-cn/fcst-refined-demand-forecast-dev/test_data/cold_start_dev/ref_model_base_tran',
                            mode='overwrite')
'''

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Interrupted by user


In [None]:
'''
# Fetch sales data
cutoff = 202138
# Shutdown reconstructed sales
reconstructed_sales_lockdowns = read_multipart_parquet_s3('fcst-workspace/forecast-cn/fcst-refined-demand-forecast-dev',
                                           'global/reconstructed_sales_lockdowns.parquet', prefix_filename ='part-')
# Sales refining
df_sales = read_multipart_parquet_s3('fcst-workspace/forecast-cn/fcst-refined-demand-forecast-dev',
                                     'global/model_week_sales', prefix_filename ='part-')
df_sales = df_sales.groupby(by=['model_id','week_id','date'],as_index=False).agg({'sales_quantity':'sum'})
df_sales = df_sales[df_sales['week_id'] < cutoff]
df_sales = apply_lockdowns_reconstruction(df_sales, reconstructed_sales_lockdowns)
df_weekly_sales = df_sales

# filter out the sales in the latest 60 weeks
df_weekly_sales = df_weekly_sales[df_weekly_sales['week_id'].apply(lambda x:get_week_id(x,60)) >= 202138]

ref_model_base = ref_model_pool.merge(df_weekly_sales[['model_id','week_id','sales_quantity']], on='model_id', how='left')
#ref_model_base.to_csv('result/ref_model_base.csv',index=False)
write_df_to_parquet_on_s3(ref_model_base, 'fcst-workspace', 
                          'forecast-cn/fcst-refined-demand-forecast-dev/test_data/cold_start_dev/ref_model_transaction_202138.parquet')
'''