# **Problem Statement**

#### Analyze and assisting IFCO's Data Team in the analysis of some business data. For this purpose, you have been provided with two files:

    1. orders.csv (which contains facmtual information regarding the orders received)
    2. invoicing_data.json (which contains invoicing information)

#### For this exercise, you can only use Python, PySpark or SQL (e.g. in dbt). Unit testing is essential for ensuring the reliability and correctness of your code. Please include appropriate unit tests for each task.

## Solution Approach.

**1. Ingest orders data (CSV) into a dataframe.**

        - Transform and clean the order data and create a view orders.**

**3. Ingest invoices data (JSON) into a data frame, and create a view, invoices.**

**4. Create a Order View (ORDER_VW)**

    1. Derive the contact_full_name and in case of data not available, use the placeholder "John Doe".
    2. The field for contact_address should adhere to the following information and format: "city name, postal code". 
    3. In the event that the city name is not available, the placeholder "Unknown" should be used. 
    4. Similarly, if the postal code is not known, the placeholder "UNK00" should be used.

**5. Create a Salesowners View (SALESOWNERS_VW). Create a normalized view of sales owners.** 

**6. Create a Salesowners Commission View (sales_owner_commission_vw)**

        - Identify the primary sales owner, Co-owner 1, and Co-owner 2 who have contributed to the acquisition.
        - Join Orders and Invoices based on the order ID, and get the invoiced value.
              * - Assumption: VAT is not included in the  calculation as the details are not clear. * 
        - Calculate the commissions based on the below procedure:
               1. Main Owner: 6% of the net invoiced value.
               2. Co-owner 1 (second in the list): 2.5% of the net invoiced value.
               3. Co-owner 2 (third in the list): 0.95% of the net invoiced value.
               4. The rest of the co-owners do not receive anything.
               

In [1]:
import pandas as pd

df = pd.read_csv ("/app/data/orders.csv", sep=';')
df.head()

Unnamed: 0,order_id,date,company_id,company_name,crate_type,contact_data,salesowners
0,f47ac10b-58cc-4372-a567-0e02b2c3d479,29.01.22,1e2b47e6-499e-41c6-91d3-09d12dddfbbd,Fresh Fruits Co,Plastic,"[{ ""contact_name"":""Curtis"", ""contact_surname"":...","Leonard Cohen, Luke Skywalker, Ammy Winehouse"
1,f47ac10b-58cc-4372-a567-0e02b2c3d480,21.02.22,0f05a8f1-2bdf-4be7-8c82-4c9b58f04898,Veggies Inc,Wood,"[{ ""contact_name"":""Maria"", ""contact_surname"":""...","Luke Skywalker, David Goliat, Leon Leonov"
2,f47ac10b-58cc-4372-a567-0e02b2c3d481,03.04.22,1e2b47e6-499e-41c6-91d3-09d12dddfbbd,Fresh Fruits c.o,Metal,"[{ ""contact_name"":""Para"", ""contact_surname"":""C...",Luke Skywalker
3,f47ac10b-58cc-4372-a567-0e02b2c3d482,14.07.21,1c4b0b50-1d5d-463a-b56e-1a6fd3aeb7d6,Seafood Supplier,Plastic,,"David Goliat, Leonard Cohen"
4,f47ac10b-58cc-4372-a567-0e02b2c3d483,23.10.22,34538e39-cd2e-4641-8d24-3c94146e6f16,Meat Packers Ltd,Plastic,,"Chris Pratt, David Henderson, Marianov Merschi..."


# ** Data Ingestion - Orders **

**1. Define schema for the input DataFrame, and also for the nested contact_data**
    - Ensuring cp is treated as String for consistency
    
**2. Read the data into dataframe using the defined schema.**

**3. Clean the data to convert to tabular format.**
         - Clean the contact_data column by replacing "" with "
         - Clean the contact_data column by replacing the enclosing " with empty string
         - Ensure empty or null values are handled before parsing
         
**4. Parse the contact_data column into a nested structure**
    - Flatten the nested JSON using explode if needed

**5. Register DataFrame as a SQL temporary view**

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
from pyspark.sql.functions import col, from_json, trim, when, explode, regexp_replace, explode_outer

# Initialize Spark session
spark = SparkSession.builder.appName("OrderProcessing").getOrCreate()

# Define schema for the DataFrame
schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("date", StringType(), True),
    StructField("company_id", StringType(), True),
    StructField("company_name", StringType(), True),
    StructField("crate_type", StringType(), True),
    StructField("contact_data", StringType(), True),
    StructField("salesowners", StringType(), True)
])

# Define schema for contact_data (JSON array)
contact_schema = ArrayType(
    StructType([
        StructField("contact_name", StringType(), True),
        StructField("contact_surname", StringType(), True),
        StructField("city", StringType(), True),
        StructField("cp", StringType(), True)  # Ensuring cp is treated as String for consistency
    ])
)

# Read the CSV file
df = spark.read.option("header", True).option("delimiter", ";").schema(schema).csv("/app/data/orders.csv")

# Clean the contact_data column by replacing "" with "
df_temp2 = df.withColumn("contact_data", regexp_replace(col("contact_data"), '""', '"'))

#df_temp2.show(truncate=False)

# Clean the contact_data column by replacing the enclosing " with empty string
df_temp3 = df_temp2.withColumn("contact_data", regexp_replace(col("contact_data"), '^"|"$',''))

#df_temp3.show(truncate=False)

# Ensure empty or null values are handled before parsing
df_cleaned = df_temp3.withColumn(
    "contact_data",
    when(col("contact_data") == "", 'Unknown').otherwise(col("contact_data"))
)  

# Parse the contact_data column into a nested structure
df_parsed = df_cleaned.withColumn("contact_data_parsed", from_json(col("contact_data"), contact_schema))

# # Show the result
# df_parsed.show(truncate=False)

# df_parsed.printSchema()

# Flatten the nested JSON using explode if needed
df_flattened = df_parsed.select(
    "order_id", 
    "date", 
    "company_id",
    "company_name", 
    "crate_type", 
    "contact_data",
    "contact_data_parsed",
    explode_outer(col("contact_data_parsed")),
    "salesowners"
)

# df_flattened.show(truncate=False)

# df_flattened.printSchema()

df_final = df_flattened.select(
    "order_id",
    "date",
    "company_id",
    "company_name",
    "crate_type",
    col("contact_data_parsed.contact_name").alias("contact_name"),
    col("contact_data_parsed.contact_surname").alias("contact_surname"),
    col("contact_data_parsed.city").alias("contact_city"),
    col("contact_data_parsed.cp").alias("contact_cp"),
    "salesowners"  
)

# Show results
#df_final.show(truncate=False)

# # Register DataFrame as a SQL temporary view
df_final.createOrReplaceTempView("orders")

# # Run a Spark SQL query
df_result = spark.sql("SELECT * FROM orders")

# # Show the results
df_result.show(truncate=False)

df_result.describe().show()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/06 12:18:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+------------------------------------+--------+------------------------------------+---------------------+----------+------------+---------------+------------------------+----------+------------------------------------------------------------+
|order_id                            |date    |company_id                          |company_name         |crate_type|contact_name|contact_surname|contact_city            |contact_cp|salesowners                                                 |
+------------------------------------+--------+------------------------------------+---------------------+----------+------------+---------------+------------------------+----------+------------------------------------------------------------+
|f47ac10b-58cc-4372-a567-0e02b2c3d479|29.01.22|1e2b47e6-499e-41c6-91d3-09d12dddfbbd|Fresh Fruits Co      |Plastic   |[Curtis]    |[Jackson]      |[Chicago]               |[12345]   |Leonard Cohen, Luke Skywalker, Ammy Winehouse               |
|f47ac10b-58cc-4372-a567

25/02/06 12:18:24 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-------+--------------------+--------+--------------------+-------------------+----------+--------------------+
|summary|            order_id|    date|          company_id|       company_name|crate_type|         salesowners|
+-------+--------------------+--------+--------------------+-------------------+----------+--------------------+
|  count|                  62|      62|                  62|                 62|        62|                  62|
|   mean|                NULL|    NULL|                NULL|               NULL|      NULL|                NULL|
| stddev|                NULL|    NULL|                NULL|               NULL|      NULL|                NULL|
|    min|f47ac10b-58cc-437...|01.04.22|012f20c6-00d5-4f4...|      Farm Fresh Co|     Metal|Ammy Winehouse, L...|
|    max|f47ac10b-58cc-437...|31.12.22|fa14c3ed-3c48-49f...|healthy snacks c.o.|      Wood|Yuri Gagarin, Leo...|
+-------+--------------------+--------+--------------------+-------------------+----------+-----

# ** Data Ingestion - Invoices **

    1. Load JSON file
    2. Extract invoices array and flatten it
    3. Register DataFrame as a SQL temporary view

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col

# Initialize Spark Session
spark = SparkSession.builder.appName("InvoiceProcessing").getOrCreate()

# Load JSON file
df = spark.read.option("multiline", "true").json("/app/data/invoicing_data.json")

# Extract invoices array and flatten it
df_flattened = df.select(explode(col("data.invoices")).alias("invoice")).select(
    col("invoice.id").alias("invoice_id"),
    col("invoice.orderId").alias("order_id"),
    col("invoice.companyId").alias("company_id"),
    col("invoice.grossValue").alias("gross_value"),
    col("invoice.vat").alias("vat")
)

# Show the result
# df_flattened.show(truncate=False)

# # Register DataFrame as a SQL temporary view
df_flattened.createOrReplaceTempView("invoices")

# # Run a Spark SQL query
df_invoices = spark.sql("SELECT * FROM invoices")

# # Show the results
df_invoices.show(truncate=False)

df_invoices.describe().show()


25/02/06 12:18:25 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


+------------------------------------+------------------------------------+------------------------------------+-----------+---+
|invoice_id                          |order_id                            |company_id                          |gross_value|vat|
+------------------------------------+------------------------------------+------------------------------------+-----------+---+
|e1e1e1e1-e1e1-e1e1-e1e1-e1e1e1e1e1e1|f47ac10b-58cc-4372-a567-0e02b2c3d479|1e2b47e6-499e-41c6-91d3-09d12dddfbbd|324222     |0  |
|e2e2e2e2-e2e2-e2e2-e2e2-e2e2e2e2e2e2|f47ac10b-58cc-4372-a567-0e02b2c3d480|0f05a8f1-2bdf-4be7-8c82-4c9b58f04898|193498     |19 |
|e3e3e3e3-e3e3-e3e3-e3e3-e3e3e3e3e3e3|f47ac10b-58cc-4372-a567-0e02b2c3d481|1e2b47e6-499e-41c6-91d3-09d12dddfbbd|345498     |21 |
|e4e4e4e4-e4e4-e4e4-e4e4-e4e4e4e4e4e4|f47ac10b-58cc-4372-a567-0e02b2c3d482|1c4b0b50-1d5d-463a-b56e-1a6fd3aeb7d6|245412     |34 |
|e5e5e5e5-e5e5-e5e5-e5e5-e5e5e5e5e5e5|f47ac10b-58cc-4372-a567-0e02b2c3d483|34538e39-cd2e-4641-8d2

# Create a Order View (ORDER_VW)

## Apply below transformations: 
    1. The contact_full_name field must contain the full name of the contact. In case this information is not available, the placeholder "John Doe" should be utilized.
    2. The field for contact_address should adhere to the following information and format: "city name, postal code". 
    3. In the event that the city name is not available, the placeholder "Unknown" should be used. 
    4. Similarly, if the postal code is not known, the placeholder "UNK00" should be used.


In [4]:
# # Run a Spark SQL query
#df_trans = spark.sql("SELECT * FROM orders limit 5")
#df_1 = spark.sql("SELECT order_id, (contact_name[0]|| ' '||contact_surname[0]) as contact_full_name FROM orders \
#where order_id in ('f47ac10b-58cc-4372-a567-0e02b2c3d482','f47ac10b-58cc-4372-a567-0e02b2c3d483','f47ac10b-58cc-4372-a567-0e02b2c3d484',\
#'f47ac10b-58cc-4372-a567-0e02b2c3d485')")

#df_trans.printSchema()   

# SQL query for the view
sql_query = """
    select 
        order_id, 
        date, 
        company_id,
        company_name, 
        crate_type,
        contact_name[0] as contact_name,  
        contact_surname[0] as contact_surname, 
        nvl((contact_name[0]|| ' '||contact_surname[0]), 'John Doe') as contact_full_name,
        nvl(contact_city[0], 'Unknown') as contact_city, 
        nvl(contact_cp[0],'UNK00') as contact_cp, 
        salesowners
    from orders
"""
# Execute SQL query
df_trans = spark.sql(sql_query)

df_trans.createOrReplaceTempView("orders_vw")

df_trans.show(truncate=False)

df_trans.describe().show()

+------------------------------------+--------+------------------------------------+---------------------+----------+------------+---------------+-----------------+----------------------+----------+------------------------------------------------------------+
|order_id                            |date    |company_id                          |company_name         |crate_type|contact_name|contact_surname|contact_full_name|contact_city          |contact_cp|salesowners                                                 |
+------------------------------------+--------+------------------------------------+---------------------+----------+------------+---------------+-----------------+----------------------+----------+------------------------------------------------------------+
|f47ac10b-58cc-4372-a567-0e02b2c3d479|29.01.22|1e2b47e6-499e-41c6-91d3-09d12dddfbbd|Fresh Fruits Co      |Plastic   |Curtis      |Jackson        |Curtis Jackson   |Chicago               |12345     |Leonard Cohen, Luke Sk

# Test 1: Distribution of Crate Type per Company
## Calculate the distribution of crate types per company (number of orders per type)

In [5]:
# df_1 = spark.sql("SELECT order_id, contact_full_name, contact_city,contact_cp   FROM orders_vw \
# where order_id in ('f47ac10b-58cc-4372-a567-0e02b2c3d482','f47ac10b-58cc-4372-a567-0e02b2c3d483','f47ac10b-58cc-4372-a567-0e02b2c3d484',\
# 'f47ac10b-58cc-4372-a567-0e02b2c3d485')")

sql_query = """
    with get_unique_company_name as 
    (
     select Company_id, company_name 
     from (
         select company_id, company_name, ROW_NUMBER() OVER (PARTITION BY Company_id ORDER BY company_name asc) AS row_num 
         from orders_vw
         )
     where row_num=1
     )
    SELECT 
        ord.company_id,
        gc.company_name, 
        ord.crate_type, 
        count(ord.order_id) as crate_type_distribution 
    FROM orders_vw ord inner join get_unique_company_name gc on ord.company_id = gc.company_id
    group by 1, 2,3 
    order by 1, 2
"""

df_distribution = spark.sql(sql_query)

df_distribution.show(truncate=False)


+------------------------------------+---------------------+----------+-----------------------+
|company_id                          |company_name         |crate_type|crate_type_distribution|
+------------------------------------+---------------------+----------+-----------------------+
|012f20c6-00d5-4f45-999f-12e7639db623|Green World Ltd      |Plastic   |1                      |
|063a7dc7-b93a-4f38-b7f0-0e30b5b217ac|Fruit Kings Ltd      |Wood      |1                      |
|0b8755d4-3d28-4039-b9a7-b30cb5ff02ea|Seafood Network Ltd  |Wood      |1                      |
|0d09ae2b-d9a5-4d67-bb97-963be9379b4e|Healthy Eats Ltd     |Plastic   |1                      |
|0f05a8f1-2bdf-4be7-8c82-4c9b58f04898|Veggies Inc          |Wood      |2                      |
|0f05a8f1-2bdf-4be7-8c82-4c9b58f04898|Veggies Inc          |Plastic   |1                      |
|1b21f4a3-22d2-43f2-ae3f-e254d282d9e0|Fresh Veg Co         |Metal     |1                      |
|1c4b0b50-1d5d-463a-b56e-1a6fd3aeb7d6|Se

# ** Test 2: DataFrame of Orders with Full Name of the Contact **

### Requirements:
    1. order_id - The order_id field must contain the unique identifier of the order.
    2. contact_full_name - The contact_full_name field must contain the full name of the contact. In case this information is not available, the placeholder "John Doe" should be utilized.
    

In [6]:
# df_1 = spark.sql("SELECT order_id, contact_full_name, contact_city,contact_cp   FROM orders_vw \
# where order_id in ('f47ac10b-58cc-4372-a567-0e02b2c3d482','f47ac10b-58cc-4372-a567-0e02b2c3d483','f47ac10b-58cc-4372-a567-0e02b2c3d484',\
# 'f47ac10b-58cc-4372-a567-0e02b2c3d485')")

df_1 = spark.sql("SELECT order_id, contact_full_name  FROM orders_vw group by 1, 2")

df_1.show(truncate=False)


+------------------------------------+-----------------+
|order_id                            |contact_full_name|
+------------------------------------+-----------------+
|f47ac10b-58cc-4372-a567-0e02b2c3d491|Bruce Wayne      |
|f47ac10b-58cc-4372-a567-0e02b2c3d516|Bruce Wayne      |
|f47ac10b-58cc-4372-a567-0e02b2c3d481|Para Cetamol     |
|f47ac10b-58cc-4372-a567-0e02b2c3d492|Clark Kent       |
|f47ac10b-58cc-4372-a567-0e02b2c3d512|Curtis Jackson   |
|f47ac10b-58cc-4372-a567-0e02b2c3d484|John Krasinski   |
|f47ac10b-58cc-4372-a567-0e02b2c3d529|Curtis Jackson   |
|f47ac10b-58cc-4372-a567-0e02b2c3d501|Barry Allen      |
|f47ac10b-58cc-4372-a567-0e02b2c3d503|John Doe         |
|f47ac10b-58cc-4372-a567-0e02b2c3d504|Diana Prince     |
|f47ac10b-58cc-4372-a567-0e02b2c3d508|John Doe         |
|f47ac10b-58cc-4372-a567-0e02b2c3d479|Curtis Jackson   |
|f47ac10b-58cc-4372-a567-0e02b2c3d535|John Krasinski   |
|f47ac10b-58cc-4372-a567-0e02b2c3d489|Anthony Pap      |
|f47ac10b-58cc-4372-a567-0e02b2

# Test 3: DataFrame of Orders with Contact Address

### Requirements:
    1. order_id - The order_id field must contain the unique identifier of the order.
    2. contact_address - The field for contact_address should adhere to the following information and format: "city name, postal code". 
            - In the event that the city name is not available, the placeholder "Unknown" should be used. 
            - Similarly, if the postal code is not known, the placeholder "UNK00" should be used.


In [7]:
# df_1 = spark.sql("SELECT order_id, contact_full_name, contact_city,contact_cp   FROM orders_vw \
# where order_id in ('f47ac10b-58cc-4372-a567-0e02b2c3d482','f47ac10b-58cc-4372-a567-0e02b2c3d483','f47ac10b-58cc-4372-a567-0e02b2c3d484',\
# 'f47ac10b-58cc-4372-a567-0e02b2c3d485')")

df_2 = spark.sql("SELECT order_id, contact_city || ','||contact_cp as contact_address FROM orders_vw")

df_2.show(truncate=False)

+------------------------------------+----------------------------+
|order_id                            |contact_address             |
+------------------------------------+----------------------------+
|f47ac10b-58cc-4372-a567-0e02b2c3d479|Chicago,12345               |
|f47ac10b-58cc-4372-a567-0e02b2c3d480|Calcutta,UNK00              |
|f47ac10b-58cc-4372-a567-0e02b2c3d481|Frankfurt am Oder,3934      |
|f47ac10b-58cc-4372-a567-0e02b2c3d482|Unknown,UNK00               |
|f47ac10b-58cc-4372-a567-0e02b2c3d483|Unknown,UNK00               |
|f47ac10b-58cc-4372-a567-0e02b2c3d484|New York,1203               |
|f47ac10b-58cc-4372-a567-0e02b2c3d485|Unknown,UNK00               |
|f47ac10b-58cc-4372-a567-0e02b2c3d486|Esplugues de Llobregat,UNK00|
|f47ac10b-58cc-4372-a567-0e02b2c3d487|Tel Aviv,UNK00              |
|f47ac10b-58cc-4372-a567-0e02b2c3d488|Chicago,12345               |
|f47ac10b-58cc-4372-a567-0e02b2c3d489|Barcelona,8023              |
|f47ac10b-58cc-4372-a567-0e02b2c3d490|Moscow,654

# Create a Salesowners View (SALESOWNERS_VW)

## Create a normalized view of sales owners. 
## Highlevel Approach:
    1. SPLIT(salesowners, ', ') → Splits the salesowners string into an array based on , (comma and space).
    2. EXPLODE() → Converts the array into multiple rows (one for each name).
    3. TRIM(salesowner) → Removes any leading or trailing spaces from names


In [8]:
# Company id
# 1e2b47e6-499e-41c6-91d3-09d12dddfbbd Fresh Fruits Co
# 27c59f76-5d26-4b82-a89b-59f8dfd2e9a7 Healthy Snacks
# 20dfef10-8f4e-45a1-82fc-123f4ab2a4a5 Healthy Snacks Co


# Execute Spark SQL
df_sales_owners = spark.sql("""
    with get_salesowners as 
    (
    SELECT 
        Company_id,
        company_name,
        TRIM(salesowner) AS salesowner
    FROM (
        SELECT 
            Company_id, 
            company_name, 
            EXPLODE(SPLIT(salesowners, ',')) AS salesowner
        FROM orders_vw 
            --where company_id='1e2b47e6-499e-41c6-91d3-09d12dddfbbd'
            --where company_id in ('27c59f76-5d26-4b82-a89b-59f8dfd2e9a7', '20dfef10-8f4e-45a1-82fc-123f4ab2a4a5')
            )
        group by 1,2,3
    ),
    get_unique_salesowners as 
    (
      SELECT 
            Company_id, 
            salesowner
        FROM get_salesowners 
         group by 1,2
    ),
    get_unique_company as 
    (
     select Company_id, company_name 
     from (
         select Company_id, company_name, ROW_NUMBER() OVER (PARTITION BY Company_id ORDER BY company_name) AS row_num 
         from get_salesowners
         )
     where row_num=1
    )
    Select gs.Company_id, gc.company_name, gs.salesowner
    from get_unique_salesowners gs inner join get_unique_company gc on gs.Company_id = gc.Company_id
    order by 1, 3 asc
    """)

# Show the result
df_sales_owners.show(truncate=False)

df_sales_owners.createOrReplaceTempView("salesowners_vw")

#df_3.show(truncate=False)

+------------------------------------+-------------------+---------------+
|Company_id                          |company_name       |salesowner     |
+------------------------------------+-------------------+---------------+
|012f20c6-00d5-4f45-999f-12e7639db623|Green World Ltd    |Chris Pratt    |
|012f20c6-00d5-4f45-999f-12e7639db623|Green World Ltd    |David Goliat   |
|063a7dc7-b93a-4f38-b7f0-0e30b5b217ac|Fruit Kings Ltd    |David Goliat   |
|063a7dc7-b93a-4f38-b7f0-0e30b5b217ac|Fruit Kings Ltd    |Leon Leonov    |
|0b8755d4-3d28-4039-b9a7-b30cb5ff02ea|Seafood Network Ltd|Ammy Winehouse |
|0b8755d4-3d28-4039-b9a7-b30cb5ff02ea|Seafood Network Ltd|David Goliat   |
|0b8755d4-3d28-4039-b9a7-b30cb5ff02ea|Seafood Network Ltd|Leon Leonov    |
|0d09ae2b-d9a5-4d67-bb97-963be9379b4e|Healthy Eats Ltd   |Ammy Winehouse |
|0d09ae2b-d9a5-4d67-bb97-963be9379b4e|Healthy Eats Ltd   |Yuri Gagarin   |
|0f05a8f1-2bdf-4be7-8c82-4c9b58f04898|Veggies Inc        |David Goliat   |
|0f05a8f1-2bdf-4be7-8c82-

# Test 5: DataFrame of Companies with Sales Owners

## Requirements:
    1. company_id - The company_id field must contain the unique identifier of the company.
    2. company_name - The company_name field must contain the name of the company.
    3. list_salesowners - The list_salesowners field should contain a unique and comma-separated list of salespeople who have participated in at least one order of the company. 
    4. Please ensure that the list is sorted in ascending alphabetical order of the first name.

## Solution Approach:
    1. COLLECT_LIST(salesowner) → Gathers all salesowner values for each Company_id into an array.
    2. SORT_ARRAY(COLLECT_LIST(salesowner)) → Sorts the list in ascending order.
    3. CONCAT_WS(', ', ...) → Converts the sorted list into a comma-separated string.
    4. In case of duplicate companies stored under multiple IDs, one name of the company is used ( for eg: Healthy Snacks v/s Healthy Snacks Co, selected one name for the report to eliminate duplicate. 


In [9]:
# Company id
# 1e2b47e6-499e-41c6-91d3-09d12dddfbbd Fresh Fruits Co
# 27c59f76-5d26-4b82-a89b-59f8dfd2e9a7 Healthy Snacks
# 20dfef10-8f4e-45a1-82fc-123f4ab2a4a5 Healthy Snacks Co

# Execute Spark SQL
df_3 = spark.sql("""
    SELECT 
        Company_id, 
        company_name, 
        CONCAT_WS(', ', SORT_ARRAY(COLLECT_LIST(salesowner))) AS list_salesowners
    FROM salesowners_vw
    --where company_id='1e2b47e6-499e-41c6-91d3-09d12dddfbbd'
    --where company_id in ('27c59f76-5d26-4b82-a89b-59f8dfd2e9a7', '20dfef10-8f4e-45a1-82fc-123f4ab2a4a5')
    GROUP BY Company_id, company_name
    order by 3 asc
""")

# Show results
df_3.show(truncate=False)


+------------------------------------+---------------------+---------------------------------------------------------------------------+
|Company_id                          |company_name         |list_salesowners                                                           |
+------------------------------------+---------------------+---------------------------------------------------------------------------+
|fa14c3ed-3c48-49f4-bd69-4d7f5b5f4b1b|Green Veg Co         |Ammy Winehouse, Chris Pratt, David Henderson, Leonard Cohen, Luke Skywalker|
|d66c0c95-1f86-4d55-9245-bfa98c0f8dcb|Healthy Choices Co   |Ammy Winehouse, Chris Pratt, Leon Leonov                                   |
|8f1c5d4a-9045-4be5-bb38-7f587f478a92|Farm Fresh Co        |Ammy Winehouse, Chris Pratt, Leonard Cohen                                 |
|7f80fdd9-1c1a-4ad4-9348-19e6b2b44bde|Veggie Haven Co      |Ammy Winehouse, Chris Pratt, Leonard Cohen                                 |
|27c59f76-5d26-4b82-a89b-59f8dfd2e9a7|Hea

# Create a Salesowners Commission View (sales_owner_commission_vw)

## Create a view of sales owners. 
## Highlevel Approach:
- Identify the primary sales owner, Co-owner 1 (second in the list), Co-owner 2 (third in the list) who have contributed to the acquisition process.
- Join Orders and Invoices based on the order ID, and get the invoiced value.
      * - Assumption: VAT is not included in the  calculation as the details are not clear. * 
- Calculate the commissions based on the below procedure:
    1. Main Owner: 6% of the net invoiced value.
    2. Co-owner 1 (second in the list): 2.5% of the net invoiced value.
    3. Co-owner 2 (third in the list): 0.95% of the net invoiced value.
    4. The rest of the co-owners do not receive anything.
- Raw amounts are represented in cents. Provide euro amounts with two decimal places in the results
- Columns for the view:
        Order_id,
        Company_id,
        company_name,
        primary_owner,
        co_owner_1,
        co_owner_2,
        invoice_id,
        gross_value,
        primary_commission_euro,
        co_owner_1_commission_euro,
        co_owner_2_commission_euro


In [10]:
# Company id
# 1e2b47e6-499e-41c6-91d3-09d12dddfbbd Fresh Fruits Co
# 27c59f76-5d26-4b82-a89b-59f8dfd2e9a7 Healthy Snacks
# 20dfef10-8f4e-45a1-82fc-123f4ab2a4a5 Healthy Snacks Co

# Execute Spark SQL
df_sales_owner_commission = spark.sql("""
    with get_salesowners as 
    (
        SELECT 
            Order_id,
            Company_id, 
            company_name, 
            salesowners
        FROM orders_vw 
            --where company_id='1e2b47e6-499e-41c6-91d3-09d12dddfbbd'
            --where company_id in ('27c59f76-5d26-4b82-a89b-59f8dfd2e9a7', '20dfef10-8f4e-45a1-82fc-123f4ab2a4a5')
    ),
    sales_commission_team as
     (
         SELECT 
            Order_id,
            Company_id,
            company_name,
            SPLIT(salesowners, ', ')[0] AS primary_owner,
            SPLIT(salesowners, ', ')[1] AS co_owner_1,
            SPLIT(salesowners, ', ')[2] AS co_owner_2
        FROM get_salesowners
        ), 
    get_invoice 
    (
        SELECT 
            sct.Order_id,
            sct.Company_id,
            sct.company_name,
            sct.primary_owner,
            sct.co_owner_1,
            sct.co_owner_2,
            inv.invoice_id,
            inv.gross_value,
            inv.vat
        from sales_commission_team sct inner join invoices inv on sct.Order_id=inv.order_id           
    )
    select 
        Order_id,
        Company_id,
        company_name,
        primary_owner,
        co_owner_1,
        co_owner_2,
        invoice_id,
        gross_value,
        vat,
        (case when primary_owner is not null 
            then round((gross_value * .06)/100,2) else 0 end) as primary_commission_euro,
        (case when co_owner_1 is not null 
            then round((gross_value * .025)/100,2) else 0 end) as co_owner_1_commission_euro,
        (case when co_owner_2 is not null 
            then round((gross_value * .0095)/100,2) else 0 end) as co_owner_2_commission_euro
    from get_invoice
        """)

# Show the result
df_sales_owner_commission.show(truncate=False)

df_sales_owner_commission.createOrReplaceTempView("sales_owner_commission_vw")

#df_3.show(truncate=False)

#df_3.show(truncate=False)

+------------------------------------+------------------------------------+---------------------+-----------------+---------------+-----------------+------------------------------------+-----------+---+-----------------------+--------------------------+--------------------------+
|Order_id                            |Company_id                          |company_name         |primary_owner    |co_owner_1     |co_owner_2       |invoice_id                          |gross_value|vat|primary_commission_euro|co_owner_1_commission_euro|co_owner_2_commission_euro|
+------------------------------------+------------------------------------+---------------------+-----------------+---------------+-----------------+------------------------------------+-----------+---+-----------------------+--------------------------+--------------------------+
|f47ac10b-58cc-4372-a567-0e02b2c3d479|1e2b47e6-499e-41c6-91d3-09d12dddfbbd|Fresh Fruits Co      |Leonard Cohen    |Luke Skywalker |Ammy Winehouse   |e1e1e1e1

# Test 4: Calculation of Sales Team Commissions

### Requirements:
    - Provide a list of the distinct sales owners and their respective commission earnings. 
    - The list should be sorted in order of descending performance, with the sales owners who have generated the highest commissions appearing first.
    - Raw amounts are represented in cents. Please provide euro amounts with two decimal places in the results 


In [11]:
# Company id
# 1e2b47e6-499e-41c6-91d3-09d12dddfbbd Fresh Fruits Co
# 27c59f76-5d26-4b82-a89b-59f8dfd2e9a7 Healthy Snacks
# 20dfef10-8f4e-45a1-82fc-123f4ab2a4a5 Healthy Snacks Co

# Execute Spark SQL
df_total_sales_comm = spark.sql("""
        with aa as (
            select 
             primary_owner, co_owner_1, co_owner_2, gross_value, primary_commission_euro, 
             co_owner_1_commission_euro, co_owner_2_commission_euro
            from sales_owner_commission_vw
                --where company_id='1e2b47e6-499e-41c6-91d3-09d12dddfbbd'
                --where company_id in ('27c59f76-5d26-4b82-a89b-59f8dfd2e9a7', '20dfef10-8f4e-45a1-82fc-123f4ab2a4a5') 
            ),
            bb as (
            SELECT primary_owner AS sales_owner, primary_commission_euro AS commission 
            FROM aa
            WHERE primary_owner IS NOT NULL
            
            UNION ALL
            
            SELECT co_owner_1 AS sales_owner, co_owner_1_commission_euro AS commission 
            FROM aa
            WHERE co_owner_1 IS NOT NULL
            
            UNION ALL
            
            SELECT co_owner_2 AS sales_owner, co_owner_2_commission_euro AS commission 
            FROM aa
            WHERE co_owner_2 IS NOT NULL
            )
            select sales_owner, round(sum(commission), 2) as total_commission
            from bb
            group by 1
            order by 2 desc
             """)   

# Show results
df_total_sales_comm.show(truncate=False)


+-----------------+----------------+
|sales_owner      |total_commission|
+-----------------+----------------+
|Leonard Cohen    |746.1           |
|David Henderson  |586.48          |
|Luke Skywalker   |432.13          |
|Yuri Gagarin     |414.38          |
|David Goliat     |368.28          |
|Ammy Winehouse   |246.87          |
|Marianov Merschik|188.61          |
|Chris Pratt      |119.7           |
|Marie Curie      |85.33           |
|Vladimir Chukov  |72.83           |
|Leon Leonov      |72.07           |
|Markus Söder     |27.06           |
+-----------------+----------------+



## Test 6: Data Visualization. 

    The Sales team wants to understand which sales owners are particularly successful in creating orders in plastic crates. Create a set of appropriate visualizations / reports that help your stakeholders to understand the following aspects better:

    1. What is the distribtion of orders by crate type.
    2. Which sales owners need most training to improve selling on plastic crates, based on the last 12 months orders.
    3. Understand who are by month the top 5 performers selling plastic crates for a rolling 3 months evaluation window.


### Create data set for "What is the distribtion of orders by crate type."

In [13]:
df_crate_order = spark.sql("SELECT crate_type, count(order_id) as order_count FROM orders_vw group by 1")

df_crate_order.show(truncate=False)

# Convert to Pandas for Streamlit
df_crate_order_distribution = df_crate_order.toPandas()

# Save DataFrame as CSV inside the container
df_crate_order_distribution.to_csv("/app/data/crate_order_distribution.csv", index=False)

+----------+-----------+
|crate_type|order_count|
+----------+-----------+
|Metal     |19         |
|Plastic   |26         |
|Wood      |17         |
+----------+-----------+



### Create a Order_Sales_owner View (order_sales_owner_vw)

#### Create a normalized view of sales owners and orders. 
    Columns:
        order_id, 
        date, 
        company_id,
        company_name, 
        crate_type,
        contact_name,  
        contact_surname, 
        contact_full_name,
        contact_city, 
        contact_cp, 
        salesowner (one sale owner per row) 


In [14]:
# Company id
# 1e2b47e6-499e-41c6-91d3-09d12dddfbbd Fresh Fruits Co
# 27c59f76-5d26-4b82-a89b-59f8dfd2e9a7 Healthy Snacks
# 20dfef10-8f4e-45a1-82fc-123f4ab2a4a5 Healthy Snacks Co


# Execute Spark SQL
df_order_sales_owner = spark.sql("""
    with aa as 
    (
        SELECT 
            order_id, 
            date, 
            company_id,
            company_name, 
            crate_type,
            contact_name,  
            contact_surname, 
            contact_full_name,
            contact_city, 
            contact_cp, 
            EXPLODE(SPLIT(salesowners, ', ')) AS salesowner
        FROM orders_vw 
            --where company_id='1e2b47e6-499e-41c6-91d3-09d12dddfbbd'
            --where company_id in ('27c59f76-5d26-4b82-a89b-59f8dfd2e9a7', '20dfef10-8f4e-45a1-82fc-123f4ab2a4a5')
        )
        SELECT distinct 
            order_id, 
            CAST(TO_TIMESTAMP(date, 'dd.MM.yy') AS DATE) as formatted_date, 
            month(CAST(TO_TIMESTAMP(date, 'dd.MM.yy') AS DATE)) as month,
            company_id,
            company_name, 
            crate_type,
            contact_name,  
            contact_surname, 
            contact_full_name,
            contact_city, 
            contact_cp, 
            TRIM(salesowner) AS salesowner
        FROM aa
    """)

# Show the result
df_order_sales_owner.show(truncate=False)

df_order_sales_owner.createOrReplaceTempView("order_sales_owner_vw")

#df_3.show(truncate=False)

+------------------------------------+--------------+-----+------------------------------------+-------------------+----------+------------+---------------+-----------------+----------------------+----------+---------------+
|order_id                            |formatted_date|month|company_id                          |company_name       |crate_type|contact_name|contact_surname|contact_full_name|contact_city          |contact_cp|salesowner     |
+------------------------------------+--------------+-----+------------------------------------+-------------------+----------+------------+---------------+-----------------+----------------------+----------+---------------+
|f47ac10b-58cc-4372-a567-0e02b2c3d502|2023-07-24    |7    |9e25a3d7-46b8-4c9d-b805-2f6c3be6f5a0|Healthy Organics Co|Wood      |Clark       |Kent           |Clark Kent       |Metropolis            |UNK00     |Leon Leonov    |
|f47ac10b-58cc-4372-a567-0e02b2c3d510|2024-02-13    |2    |4a7561b1-1de1-420a-93ed-2c12a5bbd1ab|Farm

### Create data set for "Which sales owners need most training to improve selling on plastic crates, based on the last 12 months orders?"

In [15]:
# Company id
# 1e2b47e6-499e-41c6-91d3-09d12dddfbbd Fresh Fruits Co
# 27c59f76-5d26-4b82-a89b-59f8dfd2e9a7 Healthy Snacks
# 20dfef10-8f4e-45a1-82fc-123f4ab2a4a5 Healthy Snacks Co

# Execute Spark SQL
df_7 = spark.sql("""
        SELECT 
            salesowner,
            crate_type,
            count(order_id) as order_count
        FROM order_sales_owner_vw 
            where formatted_date >= add_months(current_date(), -12)
            -- and company_id='1e2b47e6-499e-41c6-91d3-09d12dddfbbd'
            --where company_id in ('27c59f76-5d26-4b82-a89b-59f8dfd2e9a7', '20dfef10-8f4e-45a1-82fc-123f4ab2a4a5')
        group by 1,2
  """)

# Show the result
df_7.show(truncate=False)

+-----------------+----------+-----------+
|salesowner       |crate_type|order_count|
+-----------------+----------+-----------+
|Marianov Merschik|Plastic   |2          |
|Yuri Gagarin     |Plastic   |2          |
|David Goliat     |Wood      |3          |
|Ammy Winehouse   |Metal     |3          |
|David Henderson  |Metal     |3          |
|David Goliat     |Metal     |2          |
|Marianov Merschik|Wood      |1          |
|Leonard Cohen    |Wood      |3          |
|Chris Pratt      |Metal     |2          |
|Markus Söder     |Metal     |1          |
|Ammy Winehouse   |Plastic   |4          |
|David Henderson  |Plastic   |4          |
|Yuri Gagarin     |Metal     |2          |
|Luke Skywalker   |Wood      |2          |
|Luke Skywalker   |Metal     |1          |
|Leon Leonov      |Plastic   |2          |
|Leon Leonov      |Wood      |3          |
|David Goliat     |Plastic   |3          |
|Leonard Cohen    |Metal     |4          |
|Chris Pratt      |Plastic   |6          |
+----------

In [16]:
# Write DataFrame to CSV
# Convert to Pandas for Streamlit
df_crate_sale_dist = df_7.toPandas()

# Save DataFrame as CSV inside the container
df_crate_sale_dist.to_csv("/app/data/crate_sale_distribution.csv", index=False)

### Create data set for "Understand who are by month the top 5 performers selling plastic crates for a rolling 3 months evaluation window?"

In [17]:
df_8 = spark.sql("""
     WITH ranked_sales AS 
     (
        SELECT
            month,
            salesowner,
            crate_type,
            count(order_id) AS total_sales,
            ROW_NUMBER() OVER (PARTITION BY month ORDER BY count(order_id) DESC) AS rank
        FROM
                order_sales_owner_vw
        WHERE
            crate_type = 'Plastic'
            and formatted_date >= add_months(current_date(), -12)
            -- and company_id='1e2b47e6-499e-41c6-91d3-09d12dddfbbd'
            -- company_id in ('27c59f76-5d26-4b82-a89b-59f8dfd2e9a7', '20dfef10-8f4e-45a1-82fc-123f4ab2a4a5')
        group by 1,2,3
        ),
      rolling_sales AS 
      (
            SELECT
                month,
                salesowner,
                crate_type,
                total_sales,
                rank,
                COUNT(*) OVER (PARTITION BY salesowner ORDER BY month ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) AS rolling_sales
            FROM
                ranked_sales
        )
        SELECT
            month,
            salesowner,
            total_sales
        FROM
            rolling_sales
        WHERE
            rank <= 5
        ORDER BY
            total_sales desc  
        limit 5
  """)

# Show the result
df_8.show(truncate=False)



+-----+---------------+-----------+
|month|salesowner     |total_sales|
+-----+---------------+-----------+
|3    |Ammy Winehouse |2          |
|3    |David Henderson|2          |
|4    |Leonard Cohen  |2          |
|1    |Leonard Cohen  |1          |
|1    |David Goliat   |1          |
+-----+---------------+-----------+



In [18]:
# Write DataFrame to CSV
# Convert to Pandas for Streamlit
df_top_5 = df_8.toPandas()

# Save DataFrame as CSV inside the container
df_top_5.to_csv("/app/data/sales_top_5.csv", index=False)

In [19]:
!ls /app/data

crate_order_distribution.csv  invoicing_data.json  sales_top_5.csv
crate_sale_distribution.csv   orders.csv


In [20]:
!pwd

/app/notebooks
