In [4]:
!pip install pyspark



In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, avg
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
import pandas as pd
import random
from datetime import datetime, timedelta

In [6]:
num_records = 100000
countries = ["India", "USA", "Germany", "UK", "Singapore"]
devices = ["Mobile", "Desktop", "Tablet"]
data = []
start_date = datetime(2025, 1, 1)
for i in range(num_records):
    record = {
        "timestamp": start_date + timedelta(minutes=i),
        "user_id": random.randint(1000, 5000),
        "transaction_amount": round(random.uniform(10, 5000), 2),
        "country": random.choice(countries),
        "device_type": random.choice(devices),
        "status": random.choice(["success"] * 9 + ["failure"])  # 10% failures
    }
    data.append(record)
df_pandas = pd.DataFrame(data)
df_pandas.to_csv("enterprise_logs.csv", index=False)
df_pandas.head()

Unnamed: 0,timestamp,user_id,transaction_amount,country,device_type,status
0,2025-01-01 00:00:00,4783,4298.51,UK,Desktop,success
1,2025-01-01 00:01:00,4071,802.17,India,Tablet,success
2,2025-01-01 00:02:00,3029,2327.88,India,Tablet,success
3,2025-01-01 00:03:00,4429,3158.88,Germany,Tablet,success
4,2025-01-01 00:04:00,4910,1514.08,UK,Tablet,success


In [7]:
spark = SparkSession.builder \
    .appName("Enterprise Log Analytics POC") \
    .getOrCreate()
spark

In [8]:
df = spark.read.csv("enterprise_logs.csv", header=True, inferSchema=True)
df.printSchema()
df.show(5)

root
 |-- timestamp: timestamp (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- transaction_amount: double (nullable = true)
 |-- country: string (nullable = true)
 |-- device_type: string (nullable = true)
 |-- status: string (nullable = true)

+-------------------+-------+------------------+-------+-----------+-------+
|          timestamp|user_id|transaction_amount|country|device_type| status|
+-------------------+-------+------------------+-------+-----------+-------+
|2025-01-01 00:00:00|   4783|           4298.51|     UK|    Desktop|success|
|2025-01-01 00:01:00|   4071|            802.17|  India|     Tablet|success|
|2025-01-01 00:02:00|   3029|           2327.88|  India|     Tablet|success|
|2025-01-01 00:03:00|   4429|           3158.88|Germany|     Tablet|success|
|2025-01-01 00:04:00|   4910|           1514.08|     UK|     Tablet|success|
+-------------------+-------+------------------+-------+-----------+-------+
only showing top 5 rows


In [9]:
df = df.dropna()
df.count()

100000

In [10]:
failure_by_country = df.groupBy("country", "status").count()
failure_by_country.show()

+---------+-------+-----+
|  country| status|count|
+---------+-------+-----+
|  Germany|failure| 2051|
|Singapore|failure| 2015|
|      USA|success|17984|
|    India|success|17804|
|       UK|failure| 2018|
|    India|failure| 1948|
|      USA|failure| 2028|
|       UK|success|18252|
|  Germany|success|17837|
|Singapore|success|18063|
+---------+-------+-----+



In [11]:
avg_transaction = df.groupBy("device_type") \
    .agg(avg("transaction_amount").alias("avg_transaction_amount"))
avg_transaction.show()

+-----------+----------------------+
|device_type|avg_transaction_amount|
+-----------+----------------------+
|     Mobile|    2484.8848334032423|
|     Tablet|    2522.0148919995113|
|    Desktop|    2500.8316336101084|
+-----------+----------------------+



In [12]:
assembler = VectorAssembler(
    inputCols=["transaction_amount"],
    outputCol="features"
)
data = assembler.transform(df)
data.select("transaction_amount", "features").show(5)

+------------------+---------+
|transaction_amount| features|
+------------------+---------+
|           4298.51|[4298.51]|
|            802.17| [802.17]|
|           2327.88|[2327.88]|
|           3158.88|[3158.88]|
|           1514.08|[1514.08]|
+------------------+---------+
only showing top 5 rows


In [13]:
kmeans = KMeans(k=3, seed=42)
model = kmeans.fit(data)
predictions = model.transform(data)
predictions.select("transaction_amount", "prediction").show(10)

+------------------+----------+
|transaction_amount|prediction|
+------------------+----------+
|           4298.51|         0|
|            802.17|         1|
|           2327.88|         2|
|           3158.88|         2|
|           1514.08|         1|
|            3431.0|         0|
|           1161.64|         1|
|           4914.02|         0|
|           3828.17|         0|
|           4134.26|         0|
+------------------+----------+
only showing top 10 rows


In [14]:
cluster_stats = predictions.groupBy("prediction") \
    .agg(avg("transaction_amount").alias("avg_amount"))
cluster_stats.show()

+----------+------------------+
|prediction|        avg_amount|
+----------+------------------+
|         1| 843.8817999403709|
|         2|2515.5302991056465|
|         0| 4173.785612983305|
+----------+------------------+



In [15]:
predictions.groupBy("prediction").count().show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|33540|
|         2|33433|
|         0|33027|
+----------+-----+



In [16]:
from pyspark.ml.evaluation import ClusteringEvaluator

evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette Score:", silhouette)

Silhouette Score: 0.7528132599925256


In [17]:
cluster_counts = predictions.groupBy("prediction").count()
cluster_counts.show()

+----------+-----+
|prediction|count|
+----------+-----+
|         1|33540|
|         2|33433|
|         0|33027|
+----------+-----+



In [19]:
spark.conf.set("spark.sql.shuffle.partitions", "200")