In [None]:
""" 
Planning a celebration is a balancing act of preparing just 
enough food to go around without being stuck eating the same leftovers 
for the next week. The key is anticipating how many guests will come. Grupo Bimbo must 
weigh similar considerations as it strives to meet daily consumer demand for fresh bakery products 
on the shelves of over 1 million stores along its 45,000 routes across Mexico.

Currently, daily inventory calculations are performed by direct delivery sales 
employees who must single-handedly predict the forces of supply, demand, and hunger based on 
their personal experiences with each store. With some breads carrying a one week shelf life, 
the acceptable margin for error is small.

In this competition, Grupo Bimbo invites Kagglers to develop a model to accurately 
forecast inventory demand based on historical sales data. Doing so will make sure consumers 
of its over 100 bakery products aren’t staring at empty shelves, while also reducing the 
amount spent on refunds to store owners with surplus product unfit for sale.

Things to note:

* There may be products in the test set that don't exist in the train set. 
        This is the expected behavior of inventory data, since there are new products being sold all the time. 
        Your model should be able to accommodate this.

* There are duplicate Cliente_ID's in cliente_tabla, which means one Cliente_ID 
        may have multiple NombreCliente that are very similar. This is due to the NombreCliente 
        being noisy and not standardized in the raw data, so it is up to you to decide how to clean up 
        and use this information. 
        
* The adjusted demand (Demanda_uni_equil) is always >= 0 since demand should be 
        either 0 or a positive value. The reason that Venta_uni_hoy - Dev_uni_proxima sometimes 
        has negative values is that the returns records sometimes carry over a few weeks.

Data fields

* Semana — Week number (From Thursday to Wednesday)
* Agencia_ID — Sales Depot ID
* Canal_ID — Sales Channel ID
* Ruta_SAK — Route ID (Several routes = Sales Depot)
* Cliente_ID — Client ID
* NombreCliente — Client name
* Producto_ID — Product ID
* NombreProducto — Product Name
* Venta_uni_hoy — Sales unit this week (integer)
* Venta_hoy — Sales this week (unit: pesos)
* Dev_uni_proxima — Returns unit next week (integer)
* Dev_proxima — Returns next week (unit: pesos)
* Demanda_uni_equil — Adjusted Demand (integer) (This is the target you will predict)

"""

In [None]:
# Count the elements with the label 'SIN NOMBRE' & 'NO IDENTIFICADO'
#df_TrainSpk0.filter(df_TrainSpk0.NombreCliente.isin(['SIN NOMBRE', 'NO IDENTIFICADO'])).count()

# Erase all the elements with the label 'SIN NOMBRE' & 'NO IDENTIFICADO'
#df_ClientSpk = df_ClientSpk.filter(df_ClientSpk.NombreCliente != 'SIN NOMBRE')
#df_ClientSpk = df_ClientSpk.filter(df_ClientSpk.NombreCliente != 'NO IDENTIFICADO')
#df_TrainSpk01 = df_TrainSpk.groupby(['Semana', 'Producto_ID']).agg({"Venta_hoy"}).collect()
#df_TrainSpk01 = df_TrainSpk.describe()

# Sales/devolution dataframe
#df_SD = df_TrainSpk.select(["Semana", "Dev_uni_proxima", "Venta_uni_hoy"])

#df_AvgDevProductWeek = df_DevProductWeek.groupby(["Semana"]).agg({"Dev_uni_proxima":"avg"}).collect()
#df_DevProductWeek = df_DevProductWeek.groupby(["Semana"]).agg({"Dev_uni_proxima":"sum"}).collect()
#df_ProductsWeek = df_ProductsWeek.groupby(["Semana"]).count().collect()

#df_DevWeek = df_SD.groupby(["Semana"]).agg({"Dev_uni_proxima":"sum"}).collect()
#df_SalWeek = df_SD.groupby(["Semana"]).agg({"Venta_uni_hoy": "sum"}).collect()

#df_PercDevolutions = df_TopAgencies.rdd.map(lambda x: (float(x.Devolucion_unidades)/(x.Devolucion_unidades + x.Venta_unidades)) * 100).collect()

#df_ASG00 = df_TrainSpk02.groupby(["Semana","Agencia_ID"]).agg({"Venta_uni_hoy": "sum"}).collect()
#df_ASG01 = df_AS.groupby(["Agencia_ID"]).agg({"Venta_uni_hoy": "sum"}).collect()

#pd999 = pd_TopAgencies.assign(Perc_Devolutions = lambda x: x.Devolucion_unidades/(x.Devolucion_unidades + x.Venta_unidades) * 100)

#pd_TrainSpk01.plot(x = 'Semana', y = 'Total Devolutions(units)', kind = 'bar', figsize = [15, 5], title = 'Dev Products by Week')
#pd_TrainSpk01.plot(x = 'Semana', y = 'Total Sales(units)', kind = 'bar', figsize = [15, 5], title = 'Total Products by Week')
#pd_TrainSpk01.plot(x = 'Semana', y = 'Total Sales(pesos)', kind = 'bar', figsize = [20, 5], title = 'Total Sales by Week')

#df_ASG01 = spark.createDataFrame(df_ASG01)
#df_ADG01 = df_ASG01.selectExpr("Agencia_ID as Agencia")

In [None]:
# The code was removed by DSX for sharing.

In [None]:
# Print the schema for each data frame

print("\nClient Schema:")
df_ClientSpk.printSchema()

print("\nProduct Schema:")
df_ProductSpk.printSchema()

print("\nTown Schema:")
df_TownSpk.printSchema()

print("\nTest Schema:")
df_TestSpk.printSchema()

print("\nTrain Schema:")
df_TrainSpk.printSchema()

In [None]:
# Set the conditions for the union 
CondClient = [df_TrainSpk.Cliente_ID == df_ClientSpk.Cliente_ID]
CondProduct = [df_TrainSpk.Producto_ID == df_ProductSpk.Producto_ID]
CondLocation = [df_TrainSpk.Agencia_ID == df_TownSpk.Agencia_ID]

# Do the join with the above conditions

df_TrainSpk0 = df_TrainSpk.join(df_ClientSpk, CondClient).drop(df_ClientSpk.Cliente_ID)
df_TrainSpk0 = df_TrainSpk0.join(df_ProductSpk, CondProduct).drop(df_ProductSpk.Producto_ID)
df_TrainSpk0 = df_TrainSpk0.join(df_TownSpk, CondLocation).drop(df_TownSpk.Agencia_ID)

In [None]:
# Show the result from the above union

df_TrainSpk0.take(1)

In [None]:
# Group the df by semana at the final we'll have the total prodocut sold by week and the total devolutions by week

df_TrainSpk01 = df_TrainSpk0.select(["Semana", "Dev_uni_proxima","Dev_proxima", "Venta_uni_hoy", "Venta_hoy"])
df_TrainSpk01 = df_TrainSpk01.groupby(["Semana"]).agg({"Dev_uni_proxima":"sum", "Dev_proxima" : "sum", "Venta_uni_hoy": "sum", "Venta_hoy": "sum"}).collect()

In [None]:
# In order to make a good visualization, we going to convert the Spark df to a Pandas df

pd_TrainSpk01 = pd.DataFrame(df_TrainSpk01)
print(pd_TrainSpk01)

In [None]:
pd_TrainSpk01.columns = ['Semana', 'Devoluciones(pesos)', 'Venta(unidades)', 'Ventas(pesos)', 'Devolucion(unidades)']

In [None]:
print("**********************************************")
print(pd_TrainSpk01)

In [None]:
pd_TrainSpk01 = pd_TrainSpk01.sort_values(by = ['Semana'], ascending = True)
print(pd_TrainSpk01)

In [None]:
# ****************************************************************************************************
# The next is the summary sales(units, pesos) and devolutions(units, pesos & % of devolution) by week

pd_TrainSpk01.plot(x = 'Semana', kind = 'bar', figsize = [30, 18], subplots = True)

In [None]:
# ********************************************************************************************************
# ********************************************************************************************************
# ********************************************************************************************************
# In order to know the best agencies we will define the df_AS = dataframe Agency Sales

df_AS = df_TrainSpk0

In [None]:
# df_ASG = dataframe Agency Sales Group, data grouped by Agencia with the total of sales by units and pesos and the devolutions by units

df_ASG01 = df_AS.groupby(["Agencia_ID"]).agg({"Venta_uni_hoy": "sum", "Dev_uni_proxima": "sum", "Venta_hoy": "sum", "Dev_proxima": "sum"}).collect()

In [None]:
pd_ASG01 = pd.DataFrame(df_ASG01)

In [None]:
pd_ASG01.columns = ['Agencia', 'Devolucion_pesos', 'Venta_unidades','Venta_pesos', 'Devolucion_unidades']

In [None]:
# Sow the pandas dataframe with the columns renamed

pd_ASG01

In [None]:
# pd_ASG02: pandas dataframe sorted by sales - units
# pd_ASG03: pandas dataframe sorted by sales - money

pd_ASG02 = pd_ASG01.sort_values(by = ['Venta_unidades'], ascending = False)
pd_ASG03 = pd_ASG01.sort_values(by = ['Venta_pesos'], ascending = False)

In [None]:
# Plot the sorted pandas data frames

pd_ASG02.plot(x = 'Agencia', y = 'Venta_unidades', kind = 'bar', figsize = [20, 7], title = 'Venta por agencia - unidades')
pd_ASG03.plot(x = 'Agencia', y = 'Venta_pesos', kind = 'bar', figsize = [20, 7], title = 'Venta por agencia - pesos')

In [None]:
# Following the 80/20 rule
# The 20 % of the agencies will generate the 80 % of the sales - units & money

TotalAgency = len(pd_ASG01)
HundredPercentage = 1
TwentyPercentage = 0.2

TopAgencies = int((TwentyPercentage * TotalAgency)/HundredPercentage)

pd_TopAgenciesUnits = pd_ASG02.head(TopAgencies)
pd_TopAgenciesMoney = pd_ASG03.head(TopAgencies)

print("Total agencies: " + str(TotalAgency))
print("20 % top agencies: " + str(TopAgencies))

In [None]:
# In the two graphics we can see:
# - Top agencies by sales(units)
# - Top agencies by sales(pesos)

pd_TopAgenciesUnits.plot(x = 'Agencia', y = 'Venta_unidades', kind = 'bar', figsize = [20, 7], title = 'Agencias con mas ventas - unidades')
pd_TopAgenciesMoney.plot(x = 'Agencia', y = 'Venta_pesos', kind = 'bar', figsize = [20, 7], title = 'Agencias con mas ventas - pesos')

In [None]:
df_TopAgenciesUnits = spark.createDataFrame(pd_TopAgenciesUnits)
df_TopAgenciesMoney = spark.createDataFrame(pd_TopAgenciesMoney)

In [None]:
df_TopAgenciesUnits.show()
df_TopAgenciesMoney.show()

In [None]:
# **********************************************************************************************************************************
ParametersUnits = (df_TopAgenciesUnits.Devolucion_unidades/(df_TopAgenciesUnits.Venta_unidades + df_TopAgenciesUnits.Devolucion_unidades))*100
df_TopAgenciesUnits = df_TopAgenciesUnits.withColumn("Porc_Devolucion_unidades", ParametersUnits)

# **********************************************************************************************************************************

ParametersMoney = (df_TopAgenciesMoney.Devolucion_pesos/(df_TopAgenciesMoney.Venta_pesos + df_TopAgenciesMoney.Devolucion_pesos))*100
df_TopAgenciesMoney = df_TopAgenciesMoney.withColumn("Porc_Devolucion_pesos", ParametersMoney)

In [None]:
df_TopAgenciesUnits.show()
df_TopAgenciesMoney.show()

In [None]:
pd_TopAgenciesUnits = df_TopAgenciesUnits.toPandas()
pd_TopAgenciesMoney = df_TopAgenciesMoney.toPandas()

In [None]:
pd_TopAgenciesUnits.head(3)

In [None]:
pd_TopAgenciesMoney.head(3)

In [None]:
# ********************************************************************************************************
# ********************************************************************************************************
# Top agencies in sales - units
# Plot by agency the sales in units, the sales in pesos, the devolutions in units and the devolutions in %
# ********************************************************************************************************
# ********************************************************************************************************

pd_TopAgenciesUnits.plot.bar(x = 'Agencia', subplots = True, sort_columns=False, figsize = [30, 20])

In [None]:
# ********************************************************************************************************
# ********************************************************************************************************
# Top agencies in sales - pesos
# Plot by agency the sales in units, the sales in pesos, the devolutions in units and the devolutions in %
# ********************************************************************************************************
# ********************************************************************************************************

pd_TopAgenciesMoney.plot.bar(x = 'Agencia', subplots = True, sort_columns=False, figsize = [30, 20])

In [None]:
# Set the conditions for the union 

CondLocationUnits = [df_TopAgenciesUnits.Agencia == df_TownSpk.Agencia_ID]
CondLocationMoney = [df_TopAgenciesMoney.Agencia == df_TownSpk.Agencia_ID]

# Do the union with the above conditions

df_TopAgenciesUnits = df_TopAgenciesUnits.join(df_TownSpk, CondLocationUnits).drop(df_TownSpk.Agencia_ID)
df_TopAgenciesMoney = df_TopAgenciesMoney.join(df_TownSpk, CondLocationMoney).drop(df_TownSpk.Agencia_ID)

In [None]:
df_TopAgenciesUnits.show()
df_TopAgenciesMoney.show()

In [None]:
df_TopAgenciesUnitsTown = df_TopAgenciesUnits.groupby(["State"]).agg({"Venta_unidades":"sum", "Devolucion_unidades": "sum", "Agencia":"count"}).collect()
df_TopAgenciesMoneyTown = df_TopAgenciesMoney.groupby(["State"]).agg({"Venta_pesos":"sum", "Devolucion_pesos": "sum", "Agencia":"count"}).collect()

In [None]:
pd_TopAgenciesUnitsTown = pd.DataFrame(df_TopAgenciesUnitsTown)
pd_TopAgenciesMoneyTown = pd.DataFrame(df_TopAgenciesMoneyTown)

In [None]:
print(pd_TopAgenciesUnitsTown.head(3))
print(pd_TopAgenciesMoneyTown.head(3))

In [None]:
pd_TopAgenciesUnitsTown.columns = ['Estado', 'No Agencias','Venta_unidades', 'Devolucion_unidades']
pd_TopAgenciesUnitsTown = pd_TopAgenciesUnitsTown.sort_values(by = ['Venta_unidades'], ascending = False)

pd_TopAgenciesMoneyTown.columns = ['Estado', 'No Agencias','Devolucion_pesos', 'Venta_pesos']
pd_TopAgenciesMoneyTown = pd_TopAgenciesMoneyTown.sort_values(by = ['Venta_pesos'], ascending = False)

In [None]:
df_TopAgenciesUnitsTown = spark.createDataFrame(pd_TopAgenciesUnitsTown)
df_TopAgenciesMoneyTown = spark.createDataFrame(pd_TopAgenciesMoneyTown)

In [None]:
df_TopAgenciesUnitsTown.show()
df_TopAgenciesMoneyTown.show()

In [None]:
ParametersUnits_ = (df_TopAgenciesUnitsTown.Devolucion_unidades/(df_TopAgenciesUnitsTown.Venta_unidades + df_TopAgenciesUnitsTown.Devolucion_unidades))*100
df_TopAgenciesUnitsTown = df_TopAgenciesUnitsTown.withColumn("Porc_Devolucion_unidades", ParametersUnits_)

ParametersMoney_ = (df_TopAgenciesMoneyTown.Devolucion_pesos/(df_TopAgenciesMoneyTown.Venta_pesos + df_TopAgenciesMoneyTown.Devolucion_pesos))*100
df_TopAgenciesMoneyTown = df_TopAgenciesMoneyTown.withColumn("Porc_Devolutions_pesos", ParametersMoney_)

In [None]:
df_TopAgenciesUnitsTown.show()
df_TopAgenciesMoneyTown.show()

In [None]:
pd_TopAgenciesUnitsTown = df_TopAgenciesUnitsTown.toPandas()
pd_TopAgenciesMoneyTown = df_TopAgenciesMoneyTown.toPandas()

In [None]:
pd_TopAgenciesUnitsTown = pd_TopAgenciesUnitsTown.sort_values(by = ['Venta_unidades'], ascending = False)
pd_TopAgenciesMoneyTown = pd_TopAgenciesMoneyTown.sort_values(by = ['Venta_pesos'], ascending = False)

In [None]:
# ********************************************************************************************************
# ********************************************************************************************************
# Plot by state the numer of agencies, the sales in units, the sales in pesos, the devolutions in units 
# and the devolutions in %
# ********************************************************************************************************
# ********************************************************************************************************

pd_TopAgenciesUnitsTown.plot.bar(x = 'Estado', subplots = True, figsize = [30, 20])

In [None]:
# ********************************************************************************************************
# ********************************************************************************************************
# Plot by state the numer of agencies the sales in units, the sales in pesos, the devolutions in units 
# and the devolutions in %
# ********************************************************************************************************
# ********************************************************************************************************

pd_TopAgenciesMoneyTown.plot.bar(x = 'Estado', subplots = True, figsize = [30, 20])

In [None]:
df_CanalID = df_TrainSpk0

In [None]:
# df_CanalId0: dataframe grouped by Agencia_ID & Canal_ID
# df_CanalId1: dataframe grouped by Canal_ID

df_CanalID0 = df_CanalID.groupby(["Agencia_ID","Canal_ID"]).agg({"Venta_uni_hoy":"sum", "Venta_hoy":"sum","Dev_uni_proxima":"sum", "Dev_proxima":"sum"}).collect()
df_CanalID1 = df_CanalID.groupby(["Canal_ID"]).agg({"Venta_uni_hoy":"sum", "Venta_hoy":"sum", "Dev_uni_proxima":"sum", "Dev_proxima":"sum"}).collect()

In [None]:
df_CanalID0 = spark.createDataFrame(df_CanalID0)
df_CanalID1 = spark.createDataFrame(df_CanalID1)

In [None]:
pd_CanalID1 = df_CanalID1.toPandas()

In [None]:
pd_CanalID1 = pd_CanalID1.sort_values(by = ['sum(Venta_hoy)'], ascending = False)

In [None]:
# Plot:
# - Devolutions by channel (units & pesos)
# - Sales by channel (units & pesos)
# the most important channel is the 1 & second the place is for the 2

pd_CanalID1.plot.bar(x = 'Canal_ID', subplots = True, figsize = [30, 20])

In [None]:
# df_CanalID2: data frame 2 canal Id 
# We're selecting the two most important channels according to sales in pesos

CondChannels = df_CanalID0.Canal_ID <= 2

df_CanalID2 = df_CanalID0.where(CondChannels).collect()

In [None]:
df_CanalID2 = spark.createDataFrame(df_CanalID2)

In [None]:
TopAgenciesbyMoney = df_TopAgenciesMoney.select(["Agencia"])
TopAgenciesbyUnits = df_TopAgenciesMoney.select(["Agencia"])

In [None]:
# Conditions for the join
# 00 Top agencies in relation with the sales in mxn (pesos)
# 01 Top agencies in relation with the sales in units

TopAgenciesCanalId_Cond00 = [df_CanalID2.Agencia_ID == TopAgenciesbyMoney.Agencia]
TopAgenciesCanalId_Cond01 = [df_CanalID2.Agencia_ID == TopAgenciesbyUnits.Agencia]

# Execute the join with the above conditions
# df_CanalID3 --> top agencies by money
# df_CanalID4 --> top agencies by units

df_CanalID3 = df_CanalID2.join(TopAgenciesbyMoney, TopAgenciesCanalId_Cond00).drop(TopAgenciesbyMoney.Agencia)
df_CanalID4 = df_CanalID2.join(TopAgenciesbyUnits, TopAgenciesCanalId_Cond01).drop(TopAgenciesbyUnits.Agencia)

In [None]:
# Conditions for the join
# 00 Top agencies in relation with the sales in mxn (pesos)
# 01 Top agencies in relation with the sales in units

TopAgenciesCanalId_Cond00 = [df_CanalID2.Agencia_ID == TopAgenciesbyMoney.Agencia]
TopAgenciesCanalId_Cond01 = [df_CanalID2.Agencia_ID == TopAgenciesbyUnits.Agencia]

# Execute the join with the above conditions
# df_CanalID3 --> top agencies by money
# df_CanalID4 --> top agencies by units

df_CanalID3 = df_CanalID2.join(TopAgenciesbyMoney, TopAgenciesCanalId_Cond00).drop(TopAgenciesbyMoney.Agencia)
df_CanalID4 = df_CanalID2.join(TopAgenciesbyUnits, TopAgenciesCanalId_Cond01).drop(TopAgenciesbyUnits.Agencia)

In [None]:
# Create the pandas dataframe in order to plot them

pd_CanalID3 = df_CanalID3.toPandas()
pd_CanalID4 = df_CanalID4.toPandas()

In [None]:
# df_CanalID3 --> top agencies by money with channels 1 or 2

pd_CanalID3 = pd_CanalID3.sort_values(by = ['Agencia_ID'], ascending = False)

In [None]:
# df_CanalID4 --> top agencies by units with channels 1 or 2
# At this point we know the top agencies with channel 1 or 2 and the toal sales(unit & money) by channel

pd_CanalID4 = pd_CanalID4.sort_values(by = ['Agencia_ID'], ascending = False)

In [None]:
# By we'll continue with the product analysis in order to know the top products 
# Later we'll come back to our top agencies in order to know the top products in the top agencies 
# and check if the top products match with the products of the top agencies

In [None]:
# The first step si to preprocess the df products in order to know the brand, short name and pieces
# Is easier do it with pandas than Spark

pd_ProductSpk = df_ProductSpk.toPandas()

In [None]:
pd_ProductSpk['ShortName'] = pd_ProductSpk.NombreProducto.str.extract('^(\D*)').astype('string')
pd_ProductSpk['Brand'] = pd_ProductSpk.NombreProducto.str.extract('^.+\s(\D+) \d+$').astype('string')
pd_ProductSpk['Pieces'] =  pd_ProductSpk.NombreProducto.str.extract('(\d+)p ').astype('float')

In [None]:
pd_ProductSpk.head()

In [None]:
# df_Products: souce dataframe for products

df_Products = spark.createDataFrame(pd_ProductSpk)

In [None]:
df_Products1 = df_Products.groupby(["ShortName"]).count().collect()

In [None]:
df_Products1 = spark.createDataFrame(df_Products1)

In [None]:
df_Products1.show()