## **Exclava Interview**

### **Question 1:**

Orders Schema: `order_id`, `customer_id`, `order_date` \
\
Customers Schema: `customer_id`, `customer_name`, `email`

(i) Find customers who has placed orders in 3 consecutive months. 



### **PySpark Solution:**

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql import Row

In [2]:
spark = SparkSession.builder\
    .appName("OrderAnalysis")\
    .master("local[*]")\
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()

# Reduce logs
spark.sparkContext.setLogLevel("ERROR")

print(f"Spark Session created and the version: {spark.version}")


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/09 22:04:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Session created and the version: 4.0.0


In [4]:
orders_data = [
    Row(order_id=1, customer_id="C1", updated_at="2024-01-10"),
    Row(order_id=2, customer_id="C1", updated_at="2024-02-15"),
    Row(order_id=3, customer_id="C1", updated_at="2024-03-20"),
    Row(order_id=4, customer_id="C2", updated_at="2024-01-05"),
    Row(order_id=5, customer_id="C2", updated_at="2024-03-10"),
    Row(order_id=6, customer_id="C2", updated_at="2024-04-12"),
    Row(order_id=7, customer_id="C3", updated_at="2024-02-08"),
    Row(order_id=8, customer_id="C3", updated_at="2024-03-18")
]

df = spark.createDataFrame(orders_data)
df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+--------+-----------+----------+
|order_id|customer_id|updated_at|
+--------+-----------+----------+
|       1|         C1|2024-01-10|
|       2|         C1|2024-02-15|
|       3|         C1|2024-03-20|
|       4|         C2|2024-01-05|
|       5|         C2|2024-03-10|
|       6|         C2|2024-04-12|
|       7|         C3|2024-02-08|
|       8|         C3|2024-03-18|
+--------+-----------+----------+



                                                                                

In [5]:
customer_data = [
    Row(customer_id = "C1", customer_name = "Bruno Fernandes", email = "bruno8@manutd.com", city = "Lisbon"),
    Row(customer_id = "C2", customer_name = "Lisandro Martinez", email = "licha6@manutd.com", city = "Buenos Aires"),
    Row(customer_id = "C3", customer_name = "Kobbie Mainoo", email = "kobbie37@manutd.com", city = "Stockport")
]

c_df = spark.createDataFrame(customer_data)
c_df.show()

+-----------+-----------------+-------------------+------------+
|customer_id|    customer_name|              email|        city|
+-----------+-----------------+-------------------+------------+
|         C1|  Bruno Fernandes|  bruno8@manutd.com|      Lisbon|
|         C2|Lisandro Martinez|  licha6@manutd.com|Buenos Aires|
|         C3|    Kobbie Mainoo|kobbie37@manutd.com|   Stockport|
+-----------+-----------------+-------------------+------------+



In [7]:
joined_data = df.join(c_df, "customer_id")
joined_data.show()

+-----------+--------+----------+-----------------+-------------------+------------+
|customer_id|order_id|updated_at|    customer_name|              email|        city|
+-----------+--------+----------+-----------------+-------------------+------------+
|         C1|       1|2024-01-10|  Bruno Fernandes|  bruno8@manutd.com|      Lisbon|
|         C1|       2|2024-02-15|  Bruno Fernandes|  bruno8@manutd.com|      Lisbon|
|         C1|       3|2024-03-20|  Bruno Fernandes|  bruno8@manutd.com|      Lisbon|
|         C2|       4|2024-01-05|Lisandro Martinez|  licha6@manutd.com|Buenos Aires|
|         C2|       5|2024-03-10|Lisandro Martinez|  licha6@manutd.com|Buenos Aires|
|         C2|       6|2024-04-12|Lisandro Martinez|  licha6@manutd.com|Buenos Aires|
|         C3|       7|2024-02-08|    Kobbie Mainoo|kobbie37@manutd.com|   Stockport|
|         C3|       8|2024-03-18|    Kobbie Mainoo|kobbie37@manutd.com|   Stockport|
+-----------+--------+----------+-----------------+--------------

In [10]:
cte = joined_data.withColumn('month_group', year("updated_at") * 12 + (month("updated_at")))
cte.show()

+-----------+--------+----------+-----------------+-------------------+------------+-----------+
|customer_id|order_id|updated_at|    customer_name|              email|        city|month_group|
+-----------+--------+----------+-----------------+-------------------+------------+-----------+
|         C1|       1|2024-01-10|  Bruno Fernandes|  bruno8@manutd.com|      Lisbon|      24289|
|         C1|       2|2024-02-15|  Bruno Fernandes|  bruno8@manutd.com|      Lisbon|      24290|
|         C1|       3|2024-03-20|  Bruno Fernandes|  bruno8@manutd.com|      Lisbon|      24291|
|         C2|       4|2024-01-05|Lisandro Martinez|  licha6@manutd.com|Buenos Aires|      24289|
|         C2|       5|2024-03-10|Lisandro Martinez|  licha6@manutd.com|Buenos Aires|      24291|
|         C2|       6|2024-04-12|Lisandro Martinez|  licha6@manutd.com|Buenos Aires|      24292|
|         C3|       7|2024-02-08|    Kobbie Mainoo|kobbie37@manutd.com|   Stockport|      24290|
|         C3|       8|2024-03-

In [12]:
window_spec = Window.partitionBy('customer_id').orderBy('updated_at')

res = cte.withColumn('rank', col('month_group') - row_number().over(window_spec))
res.show()

+-----------+--------+----------+-----------------+-------------------+------------+-----------+-----+
|customer_id|order_id|updated_at|    customer_name|              email|        city|month_group| rank|
+-----------+--------+----------+-----------------+-------------------+------------+-----------+-----+
|         C1|       1|2024-01-10|  Bruno Fernandes|  bruno8@manutd.com|      Lisbon|      24289|24288|
|         C1|       2|2024-02-15|  Bruno Fernandes|  bruno8@manutd.com|      Lisbon|      24290|24288|
|         C1|       3|2024-03-20|  Bruno Fernandes|  bruno8@manutd.com|      Lisbon|      24291|24288|
|         C2|       4|2024-01-05|Lisandro Martinez|  licha6@manutd.com|Buenos Aires|      24289|24288|
|         C2|       5|2024-03-10|Lisandro Martinez|  licha6@manutd.com|Buenos Aires|      24291|24289|
|         C2|       6|2024-04-12|Lisandro Martinez|  licha6@manutd.com|Buenos Aires|      24292|24289|
|         C3|       7|2024-02-08|    Kobbie Mainoo|kobbie37@manutd.com|  

In [25]:
final = res.groupBy('customer_id', 'customer_name', 'rank').agg(count(col('*')).alias('group')) \
            .filter(col('group') >= 3).select('customer_name')
final.show()

+---------------+
|  customer_name|
+---------------+
|Bruno Fernandes|
+---------------+



### **SQL Solution:**

```sql
WITH monthly_orders AS (
    SELECT DISTINCT
        o.customer_id,
        c.customer_name,
        EXTRACT(YEAR FROM o.order_date) * 12 + 
        EXTRACT(MONTH FROM o.order_date) AS month_num
    FROM orders o
    JOIN customers c USING(customer_id)
),
consecutive_groups AS (
    SELECT
        customer_id,
        customer_name,
        month_num,
        month_num - ROW_NUMBER() OVER (
            PARTITION BY customer_id 
            ORDER BY month_num
        ) AS grp
    FROM monthly_orders
)
SELECT DISTINCT customer_name
FROM consecutive_groups
GROUP BY customer_id, customer_name, grp
HAVING COUNT(*) >= 3;
```