In [0]:
dbutils.widgets.dropdown('reset_all_data', 'false', ['true', 'false'], 'Reset all data')
dbutils.widgets.text('catalogName',  'maxkoehler_demos' , 'Catalog Name')
dbutils.widgets.text('dbName',  'demand_db' , 'Database Name')

In [0]:
print("Starting ./_resources/01-data-generator")

In [0]:
catalogName = dbutils.widgets.get('catalogName')
dbName = dbutils.widgets.get('dbName')
reset_all_data = dbutils.widgets.get('reset_all_data') == 'true'

In [0]:
print(catalogName)
print(dbName)
print(reset_all_data)

In [0]:
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalogName}")
spark.sql(f"""USE CATALOG {catalogName}""")

In [0]:
spark.sql(f"CREATE DATABASE IF NOT EXISTS {dbName}")
spark.sql(f"""USE {dbName}""")

# Hierarchical Time Series Generator
This notebook-section simulates hierarchical time series data

## Simulate demand series data

In [0]:
#################################################
# Python Packages
#################################################
import pandas as pd
import numpy as np
import datetime

from dateutil.relativedelta import relativedelta
from dateutil import rrule

import os
import string
import random

from pyspark.sql.functions import pandas_udf, PandasUDFType, concat_ws
from pyspark.sql.types import StructType,StructField, StringType, DateType

In [0]:
#################################################
# Parameters
#################################################
n=3 # Number of SKU's per product
ts_length_in_years = 3 # Length of a time series in years

In [0]:
#################################################
# Create a Product Table
#################################################

data = [("Lidar",  "LID"),
    ("Camera", "CAM"),
    ("Long Range Radar", "LRR"),
    ("Short Range Radar", "SRR")
  ]

schema = StructType([ \
    StructField("Product",StringType(),True), \
    StructField("SKU_Prefix",StringType(),True)
  ])
 
product_identifier_lookup = spark.createDataFrame(data=data,schema=schema)

display(product_identifier_lookup)

In [0]:
#################################################
# Create a product hierarchy by simulating SKUs for each product
#################################################

# Define schema of output data-frame
product_hierarchy_schema = StructType([StructField("SKU_Postfix", StringType(), True)] + product_identifier_lookup.schema.fields)

# Help-function to generate a random string
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))

# Create a Pandas UDF to simulate unique SKU's, i.e. n random strings without repetition
def id_sequence_generator(pdf):
  random.seed(123)
  res = set()
  while True:
    res.add(id_generator())
    if len(res) >= n:
      break
  
  pdf_out = pd.DataFrame()
  pdf_out["SKU_Postfix"] = list(res)
  pdf_out["Product"] = pdf["Product"].iloc[0]
  pdf_out["SKU_Prefix"] = pdf["SKU_Prefix"].iloc[0]
  
  return pdf_out

# Apply the Pandas UDF and clean up
product_hierarchy = ( \
  product_identifier_lookup \
  .groupby("SKU_Prefix", "Product") \
  .applyInPandas(id_sequence_generator, product_hierarchy_schema) \
  .withColumn("SKU", concat_ws('_',"SKU_Prefix","SKU_Postfix")) \
  .select("Product","SKU")
      )

# Check that the number of rows is what is expected
assert product_hierarchy.count() == (n * product_identifier_lookup.count()), "Number of rows in final table contradicts with input parameters"

display(product_hierarchy)

In [0]:
#################################################
# Create a Pandas DataFrame with common dates for ALL time series
#################################################
# End Date: Make it a Monday the week before
end_date = datetime.datetime.today()
end_date = end_date + datetime.timedelta(-end_date.weekday() -7 )

# Start date: Is a monday, since we will go back integer number of weeks
start_date = end_date + relativedelta(weeks= (-ts_length_in_years * 52))

# Make a sequence 
date_range = list(rrule.rrule(rrule.WEEKLY, dtstart=start_date, until=end_date))
date_range = [x.date() for x in date_range]

date_range = pd.DataFrame(date_range, columns =['Date'])

display(date_range)

In [0]:
#################################################
# Create a Pandas DataFrame with common dates for ALL time series
#################################################
# End Date: Make it a Monday the week before
end_date = datetime.datetime.today()
end_date = end_date + datetime.timedelta(-end_date.weekday() -7 )

# Start date: Is a monday, since we will go back integer number of weeks
start_date = end_date + relativedelta(weeks= (-ts_length_in_years * 52))

# Make a sequence 
date_range = list(rrule.rrule(rrule.WEEKLY, dtstart=start_date, until=end_date))
date_range = [x.date() for x in date_range]

date_range = pd.DataFrame(date_range, columns =['Date'])


# Derive X-mas Correction factor for demand
date_range = date_range.assign(Week = pd.DatetimeIndex(date_range['Date']).isocalendar().week.tolist())

conditions_xmas = [
      date_range.Week == 51,
      date_range.Week >= 52,
      date_range.Week == 1,
      date_range.Week == 2,
      date_range.Week == 3,
      date_range.Week == 4
    ]

choices_xmas = [
  0.85,
  0.8,
  1.1,
  1.15,
  1.1,
  1.05
]

date_range[ "Factor_XMas" ] = np.select(conditions_xmas, choices_xmas, default= 1.0)

display(date_range)

In [0]:
#################################################
# Enhance the product table with parameters for simulating time series
#################################################

# Get a list of all products from the hierarchy table and generate a list 
from  pyspark.sql.types import FloatType, ArrayType, IntegerType
import pyspark.sql.functions as F
from pyspark.sql.window import Window


# Define schema for new columns
arma_schema = StructType(
  [
    StructField("Variance_RN", FloatType(), True),
    StructField("Offset_RN", FloatType(), True),
    StructField("AR_Pars_RN", ArrayType(FloatType()), True),
    StructField("MA_Pars_RN", ArrayType(FloatType()), True)
  ]
)

# Generate random numbers for the ARMA process
np.random.seed(123)
n_ = product_identifier_lookup.count()
variance_random_number = list(abs(np.random.normal(100, 50, n_)))
offset_random_number = list(np.maximum(abs(np.random.normal(10000, 5000, n_)), 4000))
ar_length_random_number = np.random.choice(list(range(1,4)), n_)
ar_parameters_random_number = [np.random.uniform(low=0.1, high=0.9, size=x) for x in ar_length_random_number] 
ma_length_random_number = np.random.choice(list(range(1,4)), n_)
ma_parameters_random_number = [np.random.uniform(low=0.1, high=0.9, size=x) for x in ma_length_random_number] 


# Collect in a dataframe
pdf_helper = (pd.DataFrame(variance_random_number, columns =['Variance_RN']). 
              assign(Offset_RN = offset_random_number).
              assign(AR_Pars_RN = ar_parameters_random_number).
              assign(MA_Pars_RN = ma_parameters_random_number) 
             )

# Append column-wise
spark_df_helper = spark.createDataFrame(pdf_helper, schema=arma_schema)
spark_df_helper = spark_df_helper.withColumn(
  "row_id", F.row_number().over(Window().partitionBy().orderBy("Offset_RN")) 
  # dummy window just to get matching row numbers
  )
product_identifier_lookup_with_row_ids = product_identifier_lookup.withColumn(
  "row_id", F.row_number().over(Window().partitionBy().orderBy("Product"))
  # dummy window just to get matching row numbers
  )
product_identifier_lookup_extended = product_identifier_lookup_with_row_ids.join(
  spark_df_helper, ("row_id")
  ).drop("row_id")
product_identifier_lookup = product_identifier_lookup_with_row_ids.drop("row_id")
product_hierarchy_extended = product_hierarchy.join(product_identifier_lookup_extended.drop("SKU_Prefix"), ["Product"], how = "inner")
assert product_identifier_lookup_extended.count() == product_identifier_lookup.count(), "Ambiguous number of rows after join"

display(product_identifier_lookup_with_row_ids)

In [0]:
display(product_hierarchy_extended)

In [0]:
import statsmodels.api as sm
import matplotlib.pyplot as plt

from pyspark.sql.functions import col

from pyspark.sql.functions import row_number, sqrt, round

#################################################
# Generate an individual time series for each Product-SKU combination
#################################################

# function to generate an ARMA process
def generate_arma(arparams, maparams, var, offset, number_of_points, plot):
  np.random.seed(123)
  ar = np.r_[1, arparams] 
  ma = np.r_[1, maparams] 
  y = sm.tsa.arma_generate_sample(ar, ma, number_of_points, scale=var, burnin= 3000) + offset

  
  if plot:
    x = np.arange(1, len(y) +1)
    plt.plot(x, y, color ="red")
    plt.show()
    
  return(y)


#Schema for output dataframe
sku_ts_schema = StructType(  product_hierarchy.schema.fields + 
                    [
                      StructField("Date", DateType(), True),
                      StructField("Demand", FloatType(), True),
                      StructField("Factor_XMas", FloatType(), True),                     
                      StructField("Row_Number", FloatType(), True) 
                    ])


def time_series_generator_pandas_udf(pdf):
  out_df = date_range.assign(
   Demand = generate_arma(arparams = pdf.AR_Pars_RN.iloc[0], 
                          maparams= pdf.MA_Pars_RN.iloc[0], 
                          var = pdf.Variance_RN.iloc[0], 
                          offset = pdf.Offset_RN.iloc[0], 
                          number_of_points = date_range.shape[0], 
                          plot = False),
    Product = pdf.Product.iloc[0],
    SKU = pdf.SKU.iloc[0]
  )

  out_df = out_df[["Product", "SKU", "Date", "Demand", "Factor_XMas"]]
  
  out_df["Row_Number"] = range(0,len(out_df))

  return(out_df)

demand_df = ( 
  product_hierarchy_extended 
  .groupby("Product", "SKU") 
  .applyInPandas(time_series_generator_pandas_udf, sku_ts_schema)
  .withColumn("Demand" , col("Demand") * col("Factor_XMas"))
  .withColumn("Demand" , round(col("Demand")))
  .select(col("Product"), col("SKU"), col("Date"), col("Demand") )
) 


display(demand_df)

In [0]:
#spark.sql("DROP TABLE IF EXISTS part_level_demand")

In [0]:
demand_df.write.mode("overwrite").saveAsTable("part_level_demand")

# Simulate BoM Data
This notebook section simulates Bill-Of-Material data

## Simulate data

In [0]:
import string
import networkx as nx
import random
import numpy as np
import os

In [0]:
def id_generator(size=6, chars=string.ascii_uppercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))

def generate_random_strings(n):
  random.seed(123)
  random_mat_numbers = set()
  while True:
    random_mat_numbers.add(id_generator(size=5))
    if len(random_mat_numbers) >= n:
      break
  return(random_mat_numbers)

In [0]:
def extend_one_step(node_from_):
  res_ = [  ]
  node_list_to_be_extended_ = [  ]
  # second level
  random_split_number = random.randint(2, 4)
  for i in range(random_split_number):
    node_to = random_mat_numbers.pop()
    node_list_to_be_extended_.append(node_to)
    res_.append((node_to, node_from_))
  return res_, node_list_to_be_extended_

In [0]:
def extend_one_level(node_list_to_be_extended, level, sku):
  
  
  print(f"""In  'extend_one_level': level={level} and sku = {sku}  """)
  
  if level == 1:
    head_node = random_mat_numbers.pop() 
    node_list_to_be_extended_one_level = [ ]
    node_list_to_be_extended_one_level.append(head_node)
    res_one_level = [ (head_node, sku) ]
  else:
    res_one_level = [ ]
    node_list_to_be_extended_one_level = [ ]
    
    if len(node_list_to_be_extended) > 2:
      node_list_to_be_extended_ = node_list_to_be_extended[ : 3 ]
    else:
      node_list_to_be_extended_ = node_list_to_be_extended

    for node in node_list_to_be_extended_:
      res_one_step = [ ]
      node_list_to_be_extended_one_step = [ ]
      
      res_one_step, node_list_to_be_extended_one_step = extend_one_step(node)    
      res_one_level.extend(res_one_step)
      node_list_to_be_extended_one_level.extend(node_list_to_be_extended_one_step)
  
  return res_one_level, node_list_to_be_extended_one_level

In [0]:
#Generate a set of material numbers
random_mat_numbers = generate_random_strings(1000000)

In [0]:
#Create a listof all SKU's
#demand_df = spark.read.table(f"part_level_demand")
#all_skus = demand_df.select('SKU').distinct().rdd.flatMap(lambda x: x).collect()

In [0]:
%python
# Create a list of all SKU's
demand_df = spark.read.table(f"part_level_demand")
all_skus = demand_df.select('SKU').distinct().collect()
all_skus = [row['SKU'] for row in all_skus]
all_skus

In [0]:
# Generaze edges
depth = 3
edge_list = [ ]

for sku in all_skus: 
  new_node_list = [ ]
  for level_ in range(1, (depth + 1)):
    new_edge_list, new_node_list = extend_one_level(new_node_list, level = level_, sku=sku)
    edge_list.extend(new_edge_list)

In [0]:
# Define the graph 
g=nx.DiGraph()
g.add_edges_from(edge_list)  

In [0]:
# Assign a quantity for the graph
edge_df = nx.to_pandas_edgelist(g)
edge_df = edge_df.assign(qty = np.where(edge_df.target.str.len() == 10, 1, np.random.randint(1,4, size=edge_df.shape[0])))

In [0]:
from pyspark.sql.functions import split, col

sku_list = [row.SKU for row in demand_df.withColumn("SKU", split(col("SKU"), "_")[0]).select("SKU").distinct().collect()]
search_expression = "|".join(sku_list) + "_.*"
search_expression

In [0]:
#Create the final mat number to sku mapper 
final_mat_number_to_sku_mapper = edge_df[edge_df.target.str.match(search_expression)][["source","target"]]
final_mat_number_to_sku_mapper = final_mat_number_to_sku_mapper.rename(columns={"source": "final_mat_number", "target": "sku"} )
final_mat_number_to_sku_mapper

In [0]:
#Create the fnal mat number to sku mapper
#final_mat_number_to_sku_mapper = edge_df[edge_df.target.str.match(search_expression)][["source","target"]]
#final_mat_number_to_sku_mapper = final_mat_number_to_sku_mapper.rename(columns={"source": "final_mat_number", "target": "sku"} )

In [0]:
# Create BoM
bom = edge_df[~edge_df.target.str.match(search_expression)]
bom = bom.rename(columns={"source": "material_in", "target": "material_out"} )
bom

In [0]:
bom_df = spark.createDataFrame(bom) 
final_mat_number_to_sku_mapper_df = spark.createDataFrame(final_mat_number_to_sku_mapper)

## Register tables in database

In [0]:
bom_df.write.mode("overwrite").saveAsTable("bom")

In [0]:
final_mat_number_to_sku_mapper_df.write.mode("overwrite").saveAsTable("sku_mapper")

In [0]:
display(spark.sql(f"select * from sku_mapper"))

In [0]:
display(spark.sql(f"select * from bom"))

In [0]:
print("Ending ./_resources/01-data-generator")