In [None]:
from sklearn.cluster import MiniBatchKMeans
from sklearn.datasets import make_blobs as blobs
import matplotlib.pyplot as plt
import timeit as t
import numpy as np
import pandas as pd
import time
from datetime import datetime, date
import glob
from tqdm import tqdm

In [None]:
# Assume you have CSV files named data1.csv, data2.csv, etc., in a folder named 'data'
path = "Data/03-11/*.csv" 
columns = [' Source Port', ' Destination Port', ' Protocol',
       ' Flow Duration', ' Total Fwd Packets', ' Total Backward Packets',
       'Total Length of Fwd Packets', ' Total Length of Bwd Packets', 'Flow Bytes/s',
       ' Flow Packets/s',  ' Inbound', ' Label']
all_files = glob.glob(path)
df_list = []
start_time = datetime.now()

for f in range(len(all_files)):
    df = pd.read_csv(all_files[f],usecols = columns,low_memory = False)
    df_list.append(df)

combined_df1 = pd.concat(df_list, ignore_index=True)
print("time_taken:",datetime.now()-start_time)

In [None]:
combined_df1.shape

In [None]:
# Assume you have CSV files named data1.csv, data2.csv, etc., in a folder named 'data'
path = "Data/01-12/*.csv" 
columns = [' Source Port', ' Destination Port', ' Protocol',
       ' Flow Duration', ' Total Fwd Packets', ' Total Backward Packets',
       'Total Length of Fwd Packets', ' Total Length of Bwd Packets', 'Flow Bytes/s',
       ' Flow Packets/s',  ' Inbound', ' Label']
all_files = glob.glob(path)
df_list2 = []
start_time = datetime.now()
for f in range(len(all_files)):
    df = pd.read_csv(all_files[f],usecols = columns,low_memory = False)
    df_list2.append(df)

combined_df2 = pd.concat(df_list2, ignore_index=True)
print("time_taken:",datetime.now()-start_time)

In [None]:
df = pd.concat([combined_df2,combined_df1], ignore_index = True)
df.shape

In [None]:
print(df[' Label'].nunique())
print(df[' Label'].value_counts())

In [None]:
# Create a sample DataFrame
df = pd.DataFrame({
    'product_type': ['sandal', 'shoes', 'vest', 'tshirt', 'shoes', 'sandal'],
    'price': [25, 60, 30, 15, 75, 40]
})

# Define a mapping dictionary
category_map = {
    'sandal': 'footwear',
    'shoes': 'footwear',
    'vest': 'apparel',
    'tshirt': 'apparel'
}

# Apply the mapping to create a new 'category' column
df['category'] = df['product_type'].map(category_map)

print(df)

In [None]:
df.info()

In [None]:
df.describe().T

In [None]:
# Select the data
X = df.iloc[:,0:8]
y = df[' Label']
print(X.shape)

In [None]:
#X = X.dropna()
X.isnull().sum()

In [None]:
# Simulate a large dataset (e.g., 50 million samples)
n_samples = 50000000
n_features = 10
n_clusters = 4
columns = []   # fill the names
print("Generating large dataset...")
X, y = blobs(n_samples=n_samples, n_features=n_features, centers=n_clusters, random_state = 24)

print("data generated")


In [None]:
print("Fitting MiniBatchKMeans...")
minibatch_kmeans = MiniBatchKMeans(init='k-means++',n_clusters=n_clusters, batch_size=10000)

start_time = time.time()
minibatch_kmeans.fit(X)
time_taken = time.time() - start_time

# Get cluster assignments
labels = minibatch_kmeans.predict(X)
kmeans_inertia = minibatch_kmeans.inertia_
print("Clustering complete.")
#print("Cluster centers:", minibatch_kmeans.cluster_centers_)
print("First 20 labels:", labels[:10])
print(f"Mini Batch K-Means training time: {time_taken:.4f} seconds")


In [None]:
unique, counts = np.unique(labels, return_counts=True)
print(unique)
print(counts)

In [None]:
# Visualize the clustering results

# plot: Mini Batch K-Means
plt.scatter(X[:, 0], X[:, 1], c=minibatch_kmeans.labels_, s=1, cmap='viridis')
plt.scatter(minibatch_kmeans.cluster_centers_[:, 0], minibatch_kmeans.cluster_centers_[:, 1], s=200, c='red', marker='X')
plt.show()

In [None]:
df = pd.DataFrame(X)
#df = pd.DataFrame(X, columns=['feature_1', 'feature_2','feature_3', 'feature_4' ])

# 4. Add the cluster labels to your original DataFrame
df['cluster'] = labels
df['Original Clusters'] = y

print("Original Data with Cluster Assignments:")
print(df.head())
# 5. Find points belonging to a specific cluster (e.g., cluster 2)
cluster_2_points = df[df['cluster'] == 2]

print("\nPoints assigned to Cluster 2:")
print(cluster_2_points.head())

In [None]:
# Using scaling 
from sklearn.preprocessing import StandardScaler

df = pd.DataFrame(X)
# Optional: Scale features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df)


# Clustering
print("Running MiniBatchKMeans...")
kmeans = MiniBatchKMeans(n_clusters=4, batch_size=10000)
kmeans.fit(X_scaled)

# Add labels to original data
df['cluster'] = kmeans.labels_
df['y'] = y
print("Done. Sample output:")
print(df.head())

In [None]:
# train on a sample first and 
#then assign clusters to the full dataset (much faster)
import numpy as np
from sklearn.cluster import MiniBatchKMeans
from sklearn.metrics import pairwise_distances_argmin

# Assume X is very large (numpy, dask, or on disk)

# Step 1: Take a random sample
sample_size = 100000
idx = np.random.choice(len(X), sample_size, replace=False)
X_sample = X[idx]

# Step 2: Train MiniBatchKMeans on the sample
k = 4
mbk = MiniBatchKMeans(n_clusters=k, batch_size=1000, random_state=42)
mbk.fit(X_sample)

# Step 3: Assign all points to nearest centroid
labels = pairwise_distances_argmin(X, mbk.cluster_centers_)
labels[:10]

In-Memory Clustering with MiniBatchKMeans
Best for: Medium-large datasets that fit in memory

In [None]:
import hdbscan

hdbscan_cluster = hdbscan.HDBSCAN(min_cluster_size=100)
labels = hdbscan_cluster.fit_predict(X)
labels[:10]

In [None]:
X.shape
X_chunk = X[:batch_size,: n_features]
type(X_chunk)
X_chunk.shape[1]

For very large datasets (out-of-core), one can load data in chunks and call kmeans.partial_fit(chunk) repeatedly.

In [None]:
import numpy as np
from sklearn.cluster import MiniBatchKMeans

# Parameters
n_clusters = 4
batch_size = 500000   # adjust depending on your RAM
n_features = 10      # number of features in your dataset

# Initialize MiniBatchKMeans
kmeans = MiniBatchKMeans(n_clusters=n_clusters,
                         batch_size=batch_size,
                         random_state=42)

# Example: Simulating streaming data (you'd replace this with file/DB chunks)
#n_samples = 1_000_000
for i in range(0, len(X), batch_size):
    # Generate a chunk of data (replace this with real chunk loading, e.g., from CSV)
    X_chunk = X[:batch_size, :n_features]
    
    # Update clusters with the chunk
    kmeans.partial_fit(X_chunk)

print("Cluster centers shape:", kmeans.cluster_centers_.shape)

# After training, you can assign labels for new data
labels = kmeans.predict(X)
print("Labels for new data:", labels[:10])

#### Clustering with Dask (out-of-core, single machine but larger-than-RAM)

Dask lets you process datasets that don’t fit into memory by chunking them. If dask is not installed please install with the following commands

In [None]:
!pip install dask
!pip install dask_ml

In [1]:
import dask.array as da
from dask_ml.cluster import KMeans
from dask.distributed import Client, LocalCluster

# Start a Dask client (optional, but good practice for distributed computing)
cluster = LocalCluster()
client = Client(cluster)

# Create some sample Dask array data
# make sure that X is Dask array.
X = da.random.random((1000000, 10), chunks=(100, 10))
#x_dask = da.from_array(X, chunks=len(X) // 10)
# Initialize and fit the KMeans model
kmeans = KMeans(n_clusters=4, random_state=0)
kmeans.fit(X)

 # Get the cluster labels
cluster_labels = kmeans.labels_

# You can compute the labels to view them (if they are a Dask array)
computed_labels = cluster_labels.compute()

# Print a portion of the labels
print(computed_labels[:10])


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/

This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/

This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/

[2 0 2 2 1 3 2 1 2 3]


In [2]:
#dask_df = dd.from_dask_array(dask_array, columns=['col1', 'col2', 'col3'])
import dask.dataframe as dd
dask_df = dd.from_dask_array(X)
pandas_df = dask_df.compute()
pandas_df['cluster'] = computed_labels
pandas_df['Original clusters'] = y
pandas_df.head()

This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


NameError: name 'y' is not defined

In [6]:
## Check it. 

import dask.array as da
from sklearn.metrics import pairwise_distances_argmin_min
from sklearn.cluster import MiniBatchKMeans
from sklearn.metrics import pairwise_distances_argmin

# Assume X is very large (numpy, dask, or on disk)


# Step 2: Train MiniBatchKMeans on the sample
k = 10
mbk = MiniBatchKMeans(n_clusters=k, batch_size=1000, random_state=42)

# Assume X is a Dask array
X = da.random.random((1000000, 10), chunks=(1_000_000, 20))

# Train MiniBatchKMeans on a sample (in memory)
sample = X[:100000].compute()
mbk.fit(sample)

# Assign clusters for large dataset in parallel with Dask
def assign_clusters(chunk, centers):
    from sklearn.metrics import pairwise_distances_argmin
    return pairwise_distances_argmin(chunk, centers)

labels = X.map_blocks(assign_clusters, mbk.cluster_centers_, dtype=int)


Unnamed: 0,Array,Chunk
Bytes,800 B,800 B
Shape,"(10, 10)","(10, 10)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,int64 numpy.ndarray,int64 numpy.ndarray
"Array Chunk Bytes 800 B 800 B Shape (10, 10) (10, 10) Dask graph 1 chunks in 3 graph layers Data type int64 numpy.ndarray",10  10,

Unnamed: 0,Array,Chunk
Bytes,800 B,800 B
Shape,"(10, 10)","(10, 10)"
Dask graph,1 chunks in 3 graph layers,1 chunks in 3 graph layers
Data type,int64 numpy.ndarray,int64 numpy.ndarray


#### Clustering with PySpark (distributed, multi-machine / cluster)

PySpark handles billions of rows across a cluster.

Use this if you have a Spark cluster or very large data (TB scale).

In [None]:
!pip install pyspark

In [None]:
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PandasToSpark").getOrCreate()

df = pd.DataFrame(X)
spark_df = spark.createDataFrame(df)
spark_df.show()

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# Suppose df is a Spark DataFrame
assembler = VectorAssembler(inputCols=spark_df.columns, outputCol="features")
dataset = assembler.transform(spark_df)

kmeans = KMeans(k=10, seed=42)
model = kmeans.fit(dataset)
predictions = model.transform(dataset)

In [None]:
## Reading the file 

from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler

# Start Spark
spark = SparkSession.builder.appName("LargeClustering").getOrCreate()

# Load your big dataset (CSV/Parquet etc.)
df = spark.read.csv("big_dataset.csv", header=True, inferSchema=True)

# Assemble features into a single vector column
feature_cols = df.columns  # or select subset of columns
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
dataset = assembler.transform(df).select("features")

# Run KMeans clustering
kmeans = KMeans().setK(10).setSeed(42)
model = kmeans.fit(dataset)

# Assign cluster predictions
predictions = model.transform(dataset)

# Show sample results
predictions.show(5)

In [None]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import numpy as np

# Train KMeans on a sample (say 1% of data)
sample_df = dataset.sample(withReplacement=False, fraction=0.01, seed=42)
kmeans = KMeans().setK(10).setSeed(42)
model = kmeans.fit(sample_df)

# Get cluster centers
centers = np.array(model.clusterCenters())

# UDF to assign nearest cluster
def assign_cluster(point):
    distances = np.linalg.norm(centers - np.array(point), axis=1)
    return int(np.argmin(distances))

assign_cluster_udf = udf(assign_cluster, IntegerType())

# Apply to full dataset
predictions = dataset.withColumn("cluster", assign_cluster_udf("features"))


Add a final cluster visualization (e.g., 2D PCA/t-SNE projection of clusters) so you can actually see the separation?

Let’s add a final visualization step so you can actually see your clusters in 2D.
Since large datasets have many features, we’ll use PCA (fast) or t-SNE/UMAP (better separation but slower) for dimensionality 

In [None]:
##### Cluster Visualization Helper

import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE

def visualize_clusters(X, labels, method="pca", sample_size=5000):
    """
    Visualize clusters in 2D using PCA or t-SNE.
    
    X : numpy array (or Dask array -> .compute())
    labels : cluster assignments
    method : 'pca' or 'tsne'
    sample_size : number of points to plot
    """
    # Subsample for plotting
    if len(X) > sample_size:
        idx = np.random.choice(len(X), sample_size, replace=False)
        X = X[idx]
        labels = np.array(labels)[idx]
    
    # Dimensionality reduction
    if method == "pca":
        reducer = PCA(n_components=2)
    elif method == "tsne":
        reducer = TSNE(n_components=2, perplexity=30, random_state=42)
    else:
        raise ValueError("method must be 'pca' or 'tsne'")
    
    X_2d = reducer.fit_transform(X)
    
    # Plot
    plt.figure(figsize=(8,6))
    scatter = plt.scatter(X_2d[:,0], X_2d[:,1], c=labels, cmap="tab10", s=10, alpha=0.6)
    plt.title(f"Cluster Visualization ({method.upper()})")
    plt.xlabel("Dim 1")
    plt.ylabel("Dim 2")
    plt.colorbar(scatter, label="Cluster")
    plt.show()


How to Use After Pipeline
Dask Example

In [None]:
# Run clustering
X = da.random.random((200000, 20), chunks=(50000, 20))
out = cluster_pipeline_dask_auto(X, k_range=[2,4,6,8,10], sample_size=50000)

# Predict clusters for a sample
sample = X[:5000].compute()
labels = out["best_model"].predict(sample)

# Visualize clusters
visualize_clusters(sample, labels, method="pca")


In [None]:
# Run clustering
spark_df = spark.read.csv("big_dataset.csv", header=True, inferSchema=True)
out = cluster_pipeline_spark_auto(spark_df, feature_cols=spark_df.columns, k_range=[2,4,6,8,10])

# Collect small sample to driver for plotting
sample_df = spark_df.sample(fraction=0.01, seed=42).toPandas()
labels = out["best_model"].transform(
    assembler.transform(spark_df.sample(fraction=0.01, seed=42)).select("features")
).toPandas()["prediction"].values

# Visualize clusters
visualize_clusters(sample_df.values, labels, method="tsne")


https://www.tutorialspoint.com/mini-batch-k-means-clustering-algorithm-in-machine-learning

https://medium.com/@2328247224/mini-batch-k-means-an-efficient-clustering-algorithm-for-large-datasets-30b71a701ccc

https://medium.com/@tanvirhossen_29772/mini-batch-k-means-e3083cc765f5

https://medium.com/@2328247224/mini-batch-k-means-an-efficient-clustering-algorithm-for-large-datasets-30b71a701ccc

https://docs.w3cub.com/scikit_learn/auto_examples/cluster/plot_mini_batch_kmeans.html