# **Altimetrik Interview Questions**

### **Question 1:**
**Schema:** `order_id`, `order_no`, `updated_at`, `amount`\
\
(i) Identify duplicate `order_no` values.\
(ii) For each duplicated `order_no`, return only the row with the most recent record.\
(iii) Exclude order numbers that appear only once.

### **PySpark Solution**

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.types import *
from pyspark.sql import Row

In [2]:
spark = SparkSession.builder\
    .appName("OrderAnalysis")\
    .master("local[*]")\
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()

# Reduce logs
spark.sparkContext.setLogLevel("ERROR")

print(f"Spark Session created and the version: {spark.version}")


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/06 18:44:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Session created and the version: 4.0.0


In [3]:
orders_data = [
    Row(order_id=1, order_no="A100", updated_at="2024-05-01 10:00:00", amount=250.00),
    Row(order_id=2, order_no="A100", updated_at="2024-05-01 10:00:00", amount=260.00),
    Row(order_id=3, order_no="A100", updated_at="2024-05-01 10:00:00", amount=265.00),
    Row(order_id=4, order_no="B200", updated_at="2024-05-01 10:00:00", amount=300.00),
    Row(order_id=5, order_no="C300", updated_at="2024-05-01 10:00:00", amount=150.00),
    Row(order_id=6, order_no="C300", updated_at="2024-05-01 10:00:00", amount=155.00),
    Row(order_id=7, order_no="D400", updated_at="2024-05-01 10:00:00", amount=180.00),
    Row(order_id=8, order_no="D400", updated_at="2024-05-01 10:00:00", amount=275.00),
    Row(order_id=9, order_no="E600", updated_at="2024-05-01 10:00:00", amount=350.00),
    Row(order_id=10, order_no="E700", updated_at="2024-05-01 10:00:00", amount=175.00)
    ]

df = spark.createDataFrame(orders_data)
df.show()

[Stage 0:>                                                          (0 + 1) / 1]

+--------+--------+-------------------+------+
|order_id|order_no|         updated_at|amount|
+--------+--------+-------------------+------+
|       1|    A100|2024-05-01 10:00:00| 250.0|
|       2|    A100|2024-05-01 10:00:00| 260.0|
|       3|    A100|2024-05-01 10:00:00| 265.0|
|       4|    B200|2024-05-01 10:00:00| 300.0|
|       5|    C300|2024-05-01 10:00:00| 150.0|
|       6|    C300|2024-05-01 10:00:00| 155.0|
|       7|    D400|2024-05-01 10:00:00| 180.0|
|       8|    D400|2024-05-01 10:00:00| 275.0|
|       9|    E600|2024-05-01 10:00:00| 350.0|
|      10|    E700|2024-05-01 10:00:00| 175.0|
+--------+--------+-------------------+------+



                                                                                

In [26]:
dupe_df = df.groupBy(col('order_no')).agg(count(col('order_no')).alias('order_count'))\
    .filter(col('order_count') > 1)
dupe_df.show()

+--------+-----------+
|order_no|order_count|
+--------+-----------+
|    A100|          3|
|    C300|          2|
|    D400|          2|
+--------+-----------+



In [27]:
joined_df = df.join(dupe_df,'order_no')

In [28]:
window_spec = Window.partitionBy('order_no','updated_at').orderBy(col('order_id').desc())
# Partitioning by these two columns will work only for this case since the timestamps are the same,
# IRL scenaios, partiton by order_no and order it by timestamp. 
filter_df = joined_df.select('*',row_number().over(window_spec).alias("rank"))

In [29]:
result = filter_df.select(col('order_id'),col('order_no'),col('updated_at'),col('amount')).filter(col('rank') == 1)

In [30]:
result.show()

+--------+--------+-------------------+------+
|order_id|order_no|         updated_at|amount|
+--------+--------+-------------------+------+
|       3|    A100|2024-05-01 10:00:00| 265.0|
|       6|    C300|2024-05-01 10:00:00| 155.0|
|       8|    D400|2024-05-01 10:00:00| 275.0|
+--------+--------+-------------------+------+



### **SQL Solution**

```sql
WITH filtered AS(
	SELECT 
		order_no,
        COUNT(*) total_orders
	FROM 
		orders
	GROUP BY 1
    HAVING COUNT(*) > 1
),
dupe AS(
	SELECT
		*,
		ROW_NUMBER() OVER(PARTITION BY order_no ORDER BY updated_at desc) rnk
	FROM
		orders
	JOIN 
		filtered 
	USING(order_no)
)
SELECT 
	order_id,
    order_no,
    updated_at,
    amount
FROM
	dupe
WHERE 
	rnk = 1;
```

### **Question 2:**