## Step 0: Load libraries

In [0]:
pip install pyspark

In [0]:
pip install pandas

## Step 1: Load Data

In [0]:
"""
driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
url = "jdbc:sqlserver://projectbibda.database.windows.net;DatabaseName=Project"
table = "dbo.Anagrafica_Store"
user = "projectbibda"
password = "xx10!"

Client = spark.read.format("jdbc")\
  .option("driver", driver)\
  .option("url", url)\
  .option("dbtable", table)\
  .option("user", user)\
  .option("password", password)\
  .load()

Client.show()
"""

## File Vendite : Sales_DG.csv

In [0]:
from pyspark.sql.types import *

file_location = "dbfs:/FileStore/tables/Sales_DG.csv"
delimiter = "\t"

schema =   StructType([StructField('customer', StringType(), True),
            StructField('date', StringType(), True),
            StructField('ticket', StringType(), True),
            StructField('model', StringType(), True),
            StructField('qty', FloatType(), True),
            StructField('sales', StringType(), True)])



# read the training file into a dataframe
sales_csv = spark.read.csv(
  'dbfs:/FileStore/tables/DG/Sales_DG.csv', 
  header=True, 
  schema=schema,
  sep=delimiter
  )

# make the dataframe queriable as a temporary view
sales_csv.createOrReplaceTempView('sales_csv')

# show data
sales_csv.show(5)

In [0]:
sales_csv.printSchema()

In [0]:
from pyspark.sql.functions import *

#substring - rebuild correct date format
sales_view = sales_csv.withColumn('day', concat(lit("20"), substring('date',7,2), lit("-"), substring('date',4,2), lit("-"), substring('date',1,2)))

#modified field "date" into day (datetype) 
sales_view = sales_view.withColumn('day', to_date(sales_view.day, 'yyyy-MM-dd'))


#specify column with comma as decimal and eliminate point for "thousands"

sales_view = sales_view.withColumn('sales', regexp_replace('sales', '\\.', ''))
sales_view = sales_view.withColumn('sales', regexp_replace('sales', ',', '.'))
sales_view = sales_view.withColumn('sales', sales_view['sales'].cast("float"))

#ticket substring 6 char after the first one


sales_view = sales_view.withColumn('ticket', regexp_replace('ticket', ' ', ''))
sales_view = sales_view.withColumn('substr_store', substring('ticket',0,2))

#case when: store from ticket
sales_view = sales_view.withColumn('store', when(sales_view.substr_store == "10", substring('ticket',2,6))
                                 .when(sales_view.substr_store == "1V", substring('ticket',3,6))
                                 .when(sales_view.substr_store == "1O", substring('ticket',3,6))
                                 .when(sales_view.substr_store == "1R", substring('ticket',3,6))
                                 .otherwise("000000"))

# drop columns
columns_to_drop = ['ticket', 'substr_store', 'date']
sales_view = sales_view.drop(*columns_to_drop)


sales_view.createOrReplaceTempView('sales_view')

# show data
sales_view.show(5)


In [0]:
sales_view.printSchema()

In [0]:
#Check sales by store

"""

%sql
 
SELECT distinct store, sum(sales)
FROM sales_view
Group by store

"""

## File Clienti : Anagrafica_Customer.csv

In [0]:
file_location = "/FileStore/tables/DG/Anagrafica_Customer.csv"
delimiter = ","

schema =   StructType([StructField('customer_crm', StringType(), True),
            StructField('cust_nationality', StringType(), True),
            StructField('cust_country', StringType(), True),
            StructField('cust_birth_date', StringType(), True),
            StructField('cust_gender', StringType(), True),
            StructField('cust_generation', StringType(), True)])



# read the training file into a dataframe
customer = spark.read.csv(
  'dbfs:/FileStore/tables/DG/Anagrafica_Customer.csv', 
  header=True, 
  schema=schema,
  sep=delimiter
  )

# make the dataframe queriable as a temporary view
customer.createOrReplaceTempView('customer')

# show data
customer.show(5)

## File Negozi : Anagrafica_Store.csv

In [0]:
file_location = "/FileStore/tables/DG/Anagrafica_Store.csv"
delimiter = ","

schema =   StructType([StructField('store_cod', StringType(), True),
            StructField('store_desc', StringType(), True),
            StructField('store_region_cod', StringType(), True),
            StructField('store_region_desc', StringType(), True),
            StructField('store_subregion_cod', StringType(), True),
            StructField('store_subregion_desc', StringType(), True),
            StructField('store_country_cod', StringType(), True),
            StructField('store_country_desc', StringType(), True),
            StructField('store_city', StringType(), True),
            StructField('store_status', StringType(), True),
            StructField('store_tipo', StringType(), True),
            StructField('store_target', StringType(), True)])



# read the training file into a dataframe
store = spark.read.csv(
  'dbfs:/FileStore/tables/DG/Anagrafica_Store.csv', 
  header=True, 
  schema=schema,
  sep=delimiter
  )


# make the dataframe queriable as a temporary view
store.createOrReplaceTempView('store')

# show data
store.show(5)


## File Prodotti : Anagrafica_Prodotto.csv

In [0]:
file_location = "/FileStore/tables/DG/Anagrafica_Prodotto.csv"
delimiter = ";"

schema =   StructType([StructField('prod_brand_desc', StringType(), True),
            StructField('prod_brand_code', StringType(), True),
            StructField('prod_retail_line_group', StringType(), True),
            StructField('prod_line_desc', StringType(), True),
            StructField('prod_line_code', StringType(), True),
            StructField('prod_commercial_class_desc', StringType(), True),
            StructField('prod_commercial_class_code', StringType(), True),
            StructField('prod_model_desc', StringType(), True),
            StructField('prod_model_desc_2', StringType(), True),
            StructField('prod_model_desc_3', StringType(), True),
            StructField('prod_model_code', StringType(), True),
            StructField('prod_color_desc', StringType(), True),
            StructField('prod_color_code', StringType(), True),
            StructField('prod_model_part_color_code', StringType(), True),
            StructField('prod_article_desc', StringType(), True)])



# read the training file into a dataframe
product = spark.read.csv(
  'dbfs:/FileStore/tables/DG/Anagrafica_Prodotto.csv', 
  header=True, 
  schema=schema,
  sep=delimiter
  )

product = product.withColumn('prod_article_desc', regexp_replace('prod_article_desc', '"', '_'))

product = product.withColumn('prod_commercial_class_desc', when(product.prod_commercial_class_desc == "FORMALE", "SHOES FORMALE")
                                                           .otherwise(product.prod_commercial_class_desc))

# make the dataframe queriable as a temporary view
product.createOrReplaceTempView('product')

# show data
product.show(5)

### costruisco id scontrino : negozio-cliente-giorno

In [0]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

SQL_Transaction = "SELECT store||customer||day||model as Ticket_Row, store||customer||day as Ticket, * FROM sales_view left join store on store = store_cod left join customer on customer = customer_crm inner join product on model = prod_model_part_color_code"

sales_dg = sqlContext.sql(SQL_Transaction)

#sales_dg.show(5)

In [0]:
sales_dg.printSchema()

In [0]:
sales_dg.write.mode("overwrite").option("delimiter", "\t").option("decimal", ",").option("encoding", "ISO-8859–1").saveAsTable("olap_sales_table")

In [0]:
%sql

select distinct prod_commercial_class_desc from olap_sales_table;
select count(*) from olap_sales_table

count(1)
5824001


In [0]:
sales_dg.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("delimiter", "\t").option("header", "true").save("dbfs:/FileStore/df/sales_dg.csv")

In [0]:
#dbutils.fs.rm("dbfs:/FileStore/tables/Anagrafica_Prodotto.csv")

## Step 2: Market Basket Analysis

In [0]:
import pandas as pd

In [0]:
from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark.sql import functions as F

from pyspark.sql.types import StructType,StructField 
from pyspark.sql.types import StringType, IntegerType, ArrayType

from pyspark.ml.fpm import FPGrowth

#### unità: scontrino (cliente-giorno-negozio) , item : classe commerciale

In [0]:
#raggruppo
groups_class = sales_dg.groupby(['Ticket']).agg(F.collect_set('prod_commercial_class_desc').alias('items'))
#tolgo i record con solo 1 item
groups_class = groups_class.select('*',size('items').alias('n_items'))
groups_class_no_singoli = groups_class.filter(groups_class.n_items > 1)
#salvo i risultati

groups_class.coalesce(1).write.mode("overwrite").format("json").option("delimiter", "\t").option("header", "true").save("dbfs:/FileStore/df/item_by_commercial_class.json")
groups_class_no_singoli.coalesce(1).write.mode("overwrite").format("json").option("delimiter", "\t").option("header", "true").save("dbfs:/FileStore/df/item_by_commercial_class_no1.json")

##### unità: scontrino (cliente-giorno-negozio) , item : model desc 2 (prodotto riclassificato)

In [0]:
#raggruppo
groups_model_2 = sales_dg.groupby(['Ticket']).agg(F.collect_set('prod_model_desc_2').alias('items'))
#tolgo i record con solo 1 item
groups_model_2 = groups_model_2.select('*',size('items').alias('n_items'))
groups_model_2_no_singoli = groups_model_2.filter(groups_model_2.n_items > 1)
#salvo i risultati
groups_model_2.coalesce(1).write.mode("overwrite").format("json").option("delimiter", "\t").option("header", "true").save("dbfs:/FileStore/df/item_by_model_desc_2.json")
groups_model_2_no_singoli.coalesce(1).write.mode("overwrite").format("json").option("delimiter", "\t").option("header", "true").save("dbfs:/FileStore/df/item_by_model_desc_2_no1.json")

##### unità: scontrino (cliente-giorno-negozio) , item : model desc 3 (prodotto riclassificato + MAN/WOM/BABY/)

In [0]:
#raggruppo
groups_model_3 = sales_dg.groupby(['Ticket']).agg(F.collect_set('prod_model_desc_3').alias('items'))
#tolgo i record con solo 1 item
groups_model_3= groups_model_3.select('*',size('items').alias('n_items'))
groups_model_3_no_singoli = groups_model_3.filter(groups_model_3.n_items > 1)
#salvo i risultati
groups_model_3.coalesce(1).write.mode("overwrite").format("json").option("delimiter", "\t").option("header", "true").save("dbfs:/FileStore/df/item_by_model_desc_3.json")
groups_model_3_no_singoli.coalesce(1).write.mode("overwrite").format("json").option("delimiter", "\t").option("header", "true").save("dbfs:/FileStore/df/item_by_model_desc_3_no1.json")

### MBA 1 : item è la classe commerciale

In [0]:
#modifico solo il valore di groups, così il resto rimane uguale
groups = groups_class
groups_no_singoli = groups_class_no_singoli

In [0]:
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.001, minConfidence=0.01)
model = fpGrowth.fit(groups)

# Display frequent itemsets.
fp = model.freqItemsets
#fp.sort(fp.freq.desc()).show(n=20)

#! salvare questi output in csv !
regole=model.associationRules

#### regole con dataset filtrato : item > 1 ####

fpGrowth = FPGrowth(itemsCol="items", minSupport=0.001, minConfidence=0.01)
model = fpGrowth.fit(groups_no_singoli)

# Display frequent itemsets.
fp = model.freqItemsets
#fp.sort(fp.freq.desc()).show(n=20)

#! salvare questi output in csv !
regole_no_singoli=model.associationRules
#regole.sort(regole.support.desc()).show()


### salvo output regole create MBA 1

In [0]:
regole.coalesce(1).write.mode("overwrite").format("json").option("delimiter", "\t").option("header", "true").save("dbfs:/FileStore/df/regole_commercial_class.json")
regole_no_singoli.coalesce(1).write.mode("overwrite").format("json").option("delimiter", "\t").option("header", "true").save("dbfs:/FileStore/df/regole_commercial_class_no1.json")

In [0]:
#COMANDI PER MOSTRARE A VIDEO I RISULTATI (disattivati per ora)

#regole.sort(regole.support.desc()).show()
#model.associationRules.show()
#model.transform(groups).show()

### MBA 2 : item il model_desc_2 (prodotto riclassificato)

In [0]:
#modifico solo il valore di groups, così il resto rimane uguale
groups = groups_model_2
groups_no_singoli = groups_model_2_no_singoli

In [0]:
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.001, minConfidence=0.01)
model = fpGrowth.fit(groups)

# Display frequent itemsets.
fp = model.freqItemsets
#fp.sort(fp.freq.desc()).show(n=20)

#! salvare questi output in csv !
regole=model.associationRules
#regole.sort(regole.support.desc()).show()

#### regole con dataset filtrato : item > 1 ####

fpGrowth = FPGrowth(itemsCol="items", minSupport=0.001, minConfidence=0.01)
model = fpGrowth.fit(groups_no_singoli)

# Display frequent itemsets.
fp = model.freqItemsets
#fp.sort(fp.freq.desc()).show(n=20)

#! salvare questi output in csv !
regole_no_singoli=model.associationRules
#regole.sort(regole.support.desc()).show()

### salvo output regole create MBA 2

In [0]:
regole.coalesce(1).write.mode("overwrite").format("json").option("delimiter", "\t").option("header", "true").save("dbfs:/FileStore/df/regole_model_desc_2.json")
regole_no_singoli.coalesce(1).write.mode("overwrite").format("json").option("delimiter", "\t").option("header", "true").save("dbfs:/FileStore/df/regole_model_desc_2_no1.json")

### MBA 3 : item il model_desc_2 (prodotto riclassificato+WOM/MAN/BABY)

In [0]:
#modifico solo il valore di groups, così il resto rimane uguale
groups = groups_model_3 
groups_no_singoli = groups_model_3_no_singoli

In [0]:
fpGrowth = FPGrowth(itemsCol="items", minSupport=0.001, minConfidence=0.01)
model = fpGrowth.fit(groups)

# Display frequent itemsets.
fp = model.freqItemsets
#fp.sort(fp.freq.desc()).show(n=20)

#! salvare questi output in csv !
regole=model.associationRules
#regole.sort(regole.support.desc()).show()

#### regole con dataset filtrato: item > 1 ####

fpGrowth = FPGrowth(itemsCol="items", minSupport=0.001, minConfidence=0.01)
model = fpGrowth.fit(groups_no_singoli)

# Display frequent itemsets.
fp = model.freqItemsets
#fp.sort(fp.freq.desc()).show(n=20)

#! salvare questi output in csv !
regole_no_singoli=model.associationRules
#regole.sort(regole.support.desc()).show()

### salvo output regole create MBA 3

In [0]:
regole.coalesce(1).write.mode("overwrite").format("json").option("delimiter", "\t").option("header", "true").save("dbfs:/FileStore/df/regole_model_desc_3.json")
regole_no_singoli.coalesce(1).write.mode("overwrite").format("json").option("delimiter", "\t").option("header", "true").save("dbfs:/FileStore/df/regole_model_desc_3_no1.json")