#### https://sparkbyexamples.com/pyspark/python-no-module-named-pyspark-error/
#### https://spark.apache.org/docs/latest/sql-data-sources-json.html

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import array

In [14]:
spark = SparkSession \
    .builder \
    .appName("Spark_json_test1") \
    .getOrCreate()

In [15]:
spark

### Load json file of transactions

In [16]:
multiline_df = spark.read.option("multiline","true").option("inferSchema","true") \
      .json("C:\Anna\Data science\Big Data\Esselunga project\\big-data\Transactions_example3.json")
multiline_df.show()   

+--------------------+-----------+------+--------------------+-------+
|           client_id|fidaty_card|  name|       products_list|surname|
+--------------------+-----------+------+--------------------+-------+
|b3667dab-fead-438...|       True|giulia|[{Rizzoli, filett...|bianchi|
|1422cc9e-5c31-436...|       True| mario|[{Taylors of Harr...|foscolo|
+--------------------+-----------+------+--------------------+-------+



In [17]:
multiline_df.printSchema()

root
 |-- client_id: string (nullable = true)
 |-- fidaty_card: string (nullable = true)
 |-- name: string (nullable = true)
 |-- products_list: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- quantity: long (nullable = true)
 |    |    |-- upc: string (nullable = true)
 |-- surname: string (nullable = true)



In [18]:
##Making a query test using spark commands
multiline_df.select('products_list').show()

+--------------------+
|       products_list|
+--------------------+
|[{Rizzoli, filett...|
|[{Taylors of Harr...|
+--------------------+



In [19]:
from pyspark.sql.functions import explode_outer
exploding = multiline_df.withColumn("products_new", explode_outer("products_list"))
#multiline_df.select(multiline_df.products_list,explode(array(multiline_df.products_list))).show()
exploding.show()

+--------------------+-----------+------+--------------------+-------+--------------------+
|           client_id|fidaty_card|  name|       products_list|surname|        products_new|
+--------------------+-----------+------+--------------------+-------+--------------------+
|b3667dab-fead-438...|       True|giulia|[{Rizzoli, filett...|bianchi|{Rizzoli, filetti...|
|b3667dab-fead-438...|       True|giulia|[{Rizzoli, filett...|bianchi|{Barilla, mezze p...|
|b3667dab-fead-438...|       True|giulia|[{Rizzoli, filett...|bianchi|{Rizzoli, filetti...|
|1422cc9e-5c31-436...|       True| mario|[{Taylors of Harr...|foscolo|{Taylors of Harro...|
|1422cc9e-5c31-436...|       True| mario|[{Taylors of Harr...|foscolo|{Taylors of Harro...|
|1422cc9e-5c31-436...|       True| mario|[{Taylors of Harr...|foscolo|{Barilla, mezze p...|
+--------------------+-----------+------+--------------------+-------+--------------------+



In [20]:
exploding.printSchema()

root
 |-- client_id: string (nullable = true)
 |-- fidaty_card: string (nullable = true)
 |-- name: string (nullable = true)
 |-- products_list: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- price: double (nullable = true)
 |    |    |-- quantity: long (nullable = true)
 |    |    |-- upc: string (nullable = true)
 |-- surname: string (nullable = true)
 |-- products_new: struct (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- price: double (nullable = true)
 |    |-- quantity: long (nullable = true)
 |    |-- upc: string (nullable = true)



In [21]:
from pyspark.sql.types import StructType,StructField, StringType, NumericType
schema = StructType([ 
    StructField("upc",StringType(),True), 
    StructField("description",StringType(),True), 
    StructField("price",NumericType(),True), 
    StructField("quantity", NumericType(), True)
  ])


In [22]:
new_table = exploding.select('products_new.*')
new_table.show()

+--------------------+-----+--------+-----------------+
|         description|price|quantity|              upc|
+--------------------+-----+--------+-----------------+
|Rizzoli, filetti ...| 2.29|       2|2078-878573272326|
|Barilla, mezze pe...| 0.85|       3|2568-745573272456|
|Rizzoli, filetti ...| 2.29|       2|2078-878573272326|
|Taylors of Harrog...| 3.79|       3|2529-338608111118|
|Taylors of Harrog...| 3.79|       4|2529-338608111118|
|Barilla, mezze pe...| 0.85|       5|2568-745573272456|
+--------------------+-----+--------+-----------------+



#### Raggruppo per upc e sommo le quantità vendute. La quantità venduta per ogni prodotto è la daily sales velocity, che entra poi nel calcolo del ROP

In [23]:
rop_indicators = new_table.groupBy('upc','description').sum('quantity').withColumnRenamed('sum(quantity)','daily sales velocity')
rop_indicators.printSchema()

root
 |-- upc: string (nullable = true)
 |-- description: string (nullable = true)
 |-- daily sales velocity: long (nullable = true)



#### Aggiungo una colonna per il ROP iniziale, una per lo stock level, una per il lead time (espresso in giorni) e una per la reorder quantity. Tutti questi valori sono fittizi e si possono poi sostituire con quelli inseriti dell'inventory di ogni retailer (o del supplier, a seconda che si stia ricalcolando il ROP per il singolo retailer o per il supplier centrale).

#### rand() genera un numero random da 0 a 1, perciò l'ho moltiplicato per 10. Con floor arrotondo per difetto all'intero più vicino e infine sommo 1 per evitare di avere valori pari a 0.

In [29]:
from pyspark.sql.functions import lit, floor, rand

rop_indicators = rop_indicators.withColumn("rop", (floor(rand()*10))+1) #rop iniziale
rop_indicators = rop_indicators.withColumn("stock level", (floor(rand()*10))+1)
rop_indicators = rop_indicators.withColumn("lead time", (floor(rand()*2))+1)
rop_indicators = rop_indicators.withColumn("reorder quantity", (floor(rand()*10))+1)
rop_indicators.show()

+-----------------+--------------------+--------------------+---+-----------+---------+----------------+
|              upc|         description|daily sales velocity|rop|stock level|lead time|reorder quantity|
+-----------------+--------------------+--------------------+---+-----------+---------+----------------+
|2568-745573272456|Barilla, mezze pe...|                   8| 10|          9|        2|               7|
|2529-338608111118|Taylors of Harrog...|                   7|  5|          7|        2|               3|
|2078-878573272326|Rizzoli, filetti ...|                   4|  2|          8|        2|               9|
+-----------------+--------------------+--------------------+---+-----------+---------+----------------+



#### Creo una colonna 'new rop' in cui inserisco il nuovo valore del rop se esso si discosta per più del 10% (al rialzo o al ribasso) dal rop precedente oppure il vecchio rop se tale scostamento è inferiore al 10%. 

In [32]:
from pyspark.sql.functions import when
rop_indicators = rop_indicators.withColumn(
    "new rop", when((rop_indicators['daily sales velocity']*rop_indicators['lead time'])>(1.10*rop_indicators['rop']), (rop_indicators['daily sales velocity']*rop_indicators['lead time']))
    .when((rop_indicators['daily sales velocity']*rop_indicators['lead time'])<(0.90*rop_indicators['rop']), (rop_indicators['daily sales velocity']*rop_indicators['lead time']))
    .otherwise(rop_indicators['rop'])
    )
rop_indicators.show()

+-----------------+--------------------+--------------------+-----------+---------+----------------+---+-------+
|              upc|         description|daily sales velocity|stock level|lead time|reorder quantity|rop|new rop|
+-----------------+--------------------+--------------------+-----------+---------+----------------+---+-------+
|2568-745573272456|Barilla, mezze pe...|                   8|          9|        2|               7| 25|     16|
|2529-338608111118|Taylors of Harrog...|                   7|          7|        2|               3| 21|     14|
|2078-878573272326|Rizzoli, filetti ...|                   4|          8|        2|               9| 16|      8|
+-----------------+--------------------+--------------------+-----------+---------+----------------+---+-------+



#### Elimino la vecchia colonna rop e rinomino la nuova, così poi si può mettere lo script in un ciclo che viene eseguito periodicamente una volta al giorno a fine giornata.

In [33]:
rop_indicators = rop_indicators.drop('rop').withColumnRenamed('new rop', 'rop')
rop_indicators.show()

+-----------------+--------------------+--------------------+-----------+---------+----------------+---+
|              upc|         description|daily sales velocity|stock level|lead time|reorder quantity|rop|
+-----------------+--------------------+--------------------+-----------+---------+----------------+---+
|2568-745573272456|Barilla, mezze pe...|                   8|          9|        2|               7| 16|
|2529-338608111118|Taylors of Harrog...|                   7|          7|        2|               3| 14|
|2078-878573272326|Rizzoli, filetti ...|                   4|          8|        2|               9|  8|
+-----------------+--------------------+--------------------+-----------+---------+----------------+---+

