We have apyspark DataFrame that logs activity .So, our task is to identify users who logged in from 3 different cities within 1 hour-a potential sign of account compromise.

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,unix_timestamp,countDistinct,window
from pyspark.sql.types import StructType, StructField, StringType,TimestampType

In [6]:
spark=SparkSession.builder.appName("loginPatternIdentify").getOrCreate()
spark

In [66]:
data=[(
  "abc1", "2025-06-01 08:00:00", "Kathmandu"),
 ("abc1", "2025-06-01 8:30:00", "Pokhara"),
 ("abc1", "2025-06-01 8:50:00", "Solukhumbu"),
 ("abc2", "2025-06-01 9:00:00", "Chitwan"),
 ("abc2", "2025-06-01 11:30:00", "Dharan"),
 ("abc3", "2025-06-02 07:00:00", "Gorkha"),
 ("abc3", "2025-06-01 07:15:00", "Gorkha")

]

In [67]:
data

[('abc1', '2025-06-01 08:00:00', 'Kathmandu'),
 ('abc1', '2025-06-01 8:30:00', 'Pokhara'),
 ('abc1', '2025-06-01 8:50:00', 'Solukhumbu'),
 ('abc2', '2025-06-01 9:00:00', 'Chitwan'),
 ('abc2', '2025-06-01 11:30:00', 'Dharan'),
 ('abc3', '2025-06-02 07:00:00', 'Gorkha'),
 ('abc3', '2025-06-01 07:15:00', 'Gorkha')]

In [68]:
schema=StructType([
    StructField("user_id",StringType()),
    StructField("login_time",StringType()),
    StructField("City",StringType())
])

In [69]:
df=spark.createDataFrame(data,schema)

In [70]:
df.withColumn("login_time",col("login_time").cast(TimestampType()))

DataFrame[user_id: string, login_time: timestamp, City: string]

use 1-hour sliding window to check for 3 distinct cities per user

In [71]:
windowed=df.groupBy("user_id",window("login_time","1 hour")).agg(countDistinct("city").alias("unique_cities"))

In [72]:
suspicious_users=windowed.filter(col("unique_cities")>=3).select("user_id").distinct()

In [73]:
suspicious_users.show()

+-------+
|user_id|
+-------+
|   abc1|
+-------+

