# Generate the dummy data


In [None]:
!pip install faker pandas numpy

import pandas as pd
import numpy as np
from faker import Faker
import random
from datetime import timedelta

fake = Faker('en_IN')
random.seed(42)
np.random.seed(42)

num_customers = 60000


device_types = ['Mobile', 'Desktop', 'Tablet']
genders = ['Male', 'Female', 'Other']
membership_types = ['Basic', 'Premium', 'Gold']
locations = ['Delhi', 'Mumbai', 'Bangalore', 'Kolkata', 'Chennai', 'Hyderabad', 'Pune', 'Lucknow', 'Jaipur', 'Patna']
payment_methods = ['UPI', 'Cash on Delivery', 'Paytm', 'PhonePe', 'Google Pay', 'NetBanking']
categories = ['Groceries', 'Electronics', 'Mobile Recharge', 'Medicine', 'Stationery']

data = []

for _ in range(num_customers):
    signup_date = fake.date_between(start_date='-3y', end_date='-1y')
    last_login = fake.date_between(start_date=signup_date, end_date='today')
    churned = random.choice([0, 1])
    churn_date = fake.date_between(start_date=last_login, end_date='today') if churned else None
    session_duration = random.randint(60, 900)  # in seconds
    pages_visited = random.randint(1, 15)
    clicks = random.randint(5, 50)
    purchases = random.randint(0, 10)
    avg_purchase = round(random.uniform(100, 2000), 2)
    total_spent = round(purchases * avg_purchase, 2)
    tenure_months = (pd.Timestamp('today') - pd.to_datetime(signup_date)).days // 30
    monthly_spend = round(total_spent / tenure_months, 2) if tenure_months > 0 else 0
    cart_abandon_rate = round(random.uniform(0.0, 0.8), 2)
    feedback_score = random.randint(1, 5)
    support_tickets = random.randint(0, 3)
    ticket_resolution_time = random.randint(1, 48) if support_tickets > 0 else 0
    last_login_gap = (pd.Timestamp('today') - pd.to_datetime(last_login)).days
    discount_used = random.choice([0, 1])
    payment_method = random.choice(payment_methods)
    preferred_category = random.choice(categories)
    referral_used = random.choice([0, 1])
    referral_count = random.randint(0, 10) if referral_used else 0
    device_switch_count = random.randint(0, 5)

    data.append({
        "customer_id": fake.uuid4(),
        "name": fake.name(),
        "phone_number": fake.phone_number(),
        "age": random.randint(18, 65),
        "gender": random.choice(genders),
        "location": random.choice(locations),
        "signup_date": signup_date,
        "last_login_date": last_login,
        "membership_type": random.choice(membership_types),
        "device_type": random.choice(device_types),
        "session_duration": session_duration,
        "pages_visited": pages_visited,
        "clicks": clicks,
        "total_purchases": purchases,
        "avg_purchase_value_inr": avg_purchase,
        "total_spent_inr": total_spent,
        "monthly_spend_inr": monthly_spend,
        "last_purchase_date": fake.date_between(start_date=signup_date, end_date='today') if purchases > 0 else None,
        "cart_abandon_rate": cart_abandon_rate,
        "feedback_score": feedback_score,
        "support_tickets": support_tickets,
        "ticket_resolution_time": ticket_resolution_time,
        "is_active": bool(1 - churned),
        "churn_date": churn_date,
        "tenure": tenure_months,
        "is_churned": churned,
        "last_login_gap": last_login_gap,
        "discount_used": discount_used,
        "payment_method": payment_method,
        "preferred_category": preferred_category,
        "referral_used": referral_used,
        "referral_count": referral_count,
        "device_switch_count": device_switch_count
    })

df = pd.DataFrame(data)
df.to_csv("customer_behavior_analytics_dataset_india.csv", index=False)

print("60,000 rows created'customer_behavior_analytics_dataset_india.csv'")

# Send data from csv to kafka as a realtime 
##### first or all we need to start the zookeper , go to the kafka folder
##### use this command to start the zookeper bin\windows\zookeeper-server-start.bat config\zookeeper.properties
##### now we need to start the kafka same go to the kafka folder 
##### use this command bin\windows\kafka-server-start.bat config\server.properties 
##### now we need to start the producer and use the below code 
##### make a topic using this command bin\windows\kafka-server-start.bat config\server.properties
##### now we need to use the consumer so use the below code 
##### in the consumer code we can easily store our data to monogo db





In [None]:
# PRODUCER 

import pandas as pd
import json
import time
from kafka import KafkaProducer

#put your file 
df = pd.read_csv('2805_for_model.csv')  


producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8')
)


for _, row in df.iterrows():
    message = row.to_dict()
    producer.send('2805_for_model.csv', value=message)
    print("Sent:", message)
    time.sleep(0.01)  # we can adjust the speed by increasing or decresing   

producer.flush()



In [None]:
# CONSUMER 

from kafka import KafkaConsumer
from pymongo import MongoClient
import json


consumer = KafkaConsumer(
    '2805_for_model.csv',
    bootstrap_servers='localhost:9092',
    auto_offset_reset='latest',
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

# if you have your own database and collection then use here 
mongo_client = MongoClient('mongodb://localhost:27017/')
db = mongo_client['customer_db']
collection = db['customer_realtime_2805events']


print("Waiting for messages..")
for msg in consumer:
    event = msg.value
    
    collection.insert_one(event)
    print("Inserted:", event)


# after storing the data into the mongodb we need to train our model for prediction 

In [None]:

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier  
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score
from sklearn.preprocessing import LabelEncoder
import matplotlib.pyplot as plt
import seaborn as sns
import joblib  


# replace with your file 
df = pd.read_csv("1 final_customer_data._Chnagein isactive csv.csv")  


columns_to_drop = ["customer_id", "name", "signup_date", "last_login_date", "last_purchase_date", "churn_date"]
df.drop(columns=columns_to_drop, axis=1, inplace=True)

categorical_cols = ['gender', 'location', 'membership_type', 'device_type',
                    'payment_method', 'preferred_category']

df = pd.get_dummies(df, columns=categorical_cols, drop_first=True)

df.fillna(0, inplace=True)

X = df.drop("is_churned", axis=1)
y = df["is_churned"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

y_pred = model.predict(X_test)
y_proba = model.predict_proba(X_test)[:, 1]

print("Confusion Matrix:")
print(confusion_matrix(y_test, y_pred))

print("\n Classification Report:")
print(classification_report(y_test, y_pred))

print("ROC AUC Score:", roc_auc_score(y_test, y_proba))

feature_importances = pd.Series(model.feature_importances_, index=X.columns)
top_features = feature_importances.sort_values(ascending=False).head(10)

plt.figure(figsize=(10,6))
sns.barplot(x=top_features.values, y=top_features.index)
plt.title("Top 10 Important Features for Churn Prediction")
plt.show()


joblib.dump(model, "churn_prediction_model.pkl")
print("Model saved as churn_prediction_model.pkl")

joblib.dump(X.columns.tolist(), "model_features.pkl")
 

# now our model is trained and we need to create a api for doing the automation

In [3]:
# now we need to create a acc.on the ngrok then we will get the ngrok token and we need to fill those tokens here 

In [None]:
from pyngrok import ngrok


# Replace with your actual token
ngrok.set_auth_token("FILL YOUR TOKENS HERE")


In [None]:
# this code will make a api and it will running on a private url 
from flask import Flask, request, jsonify
import joblib
import numpy as np
import threading
from pyngrok import ngrok


model = joblib.load("churn_prediction_model.pkl")
feature_columns = joblib.load("model_features.pkl")


app = Flask(__name__)

@app.route('/')
def index():
    return " Churn Prediction API is Running"

@app.route('/predict', methods=['POST'])
def predict():
    data = request.get_json()
    input_features = np.array([[data.get(col, 0) for col in feature_columns]])
    churn_probability = float(model.predict_proba(input_features)[0][1])
    is_churned = 1 if churn_probability > 0.5 else 0
    return jsonify({
        "churn_probability": round(churn_probability, 4),
        "is_churned": is_churned
    })

def run_flask():
    app.run(host="127.0.0.1", port=5000, debug=False, use_reloader=False)

if __name__ == '__main__':
    threading.Thread(target=run_flask).start()
    public_url = ngrok.connect(5000)
    print(f" Public URL: {public_url}")


In [None]:
import pymongo
import pandas as pd
import requests
import joblib
from datetime import datetime
import time
import os

feature_columns = joblib.load("model_features.pkl")

client = pymongo.MongoClient("mongodb://localhost:27017/")
db = client["customer_db"]
collection = db["customer_realtime_2805events"]

# here we need to put the Ngrok url which we generated in the last code 
NGROK_URL = "http://xyz123.ngrok.io/predict"  

new_customers = list(collection.find({"is_predicted": {"$ne": True}}))

if not new_customers:
    print(" No new records found in MongoDB.")
    exit()

print(f" Found {len(new_customers)} new records to predict.\n")
predictions = []

for customer in new_customers:
    customer_id = customer.get("customer_id", str(customer["_id"]))
    payload = {col: customer.get(col, 0) for col in feature_columns}

    try:
        response = requests.post(NGROK_URL, json=payload, timeout=10)
        if response.status_code == 200:
            result = response.json()

            predictions.append({
                "customer_id": customer_id,
                "churn_probability": result["churn_probability"],
                "is_churned": result["is_churned"],
                "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                **payload
            })

            collection.update_one({"_id": customer["_id"]}, {"$set": {"is_predicted": True}})
            print(f" Predicted for {customer_id} → Churn: {result['is_churned']} ({result['churn_probability']})")
        else:
            print(f" Server error for {customer_id}: {response.status_code}")

    except requests.exceptions.RequestException as e:
        print(f" Request failed for {customer_id}: {e}")

    time.sleep(0.5)  

# here we are storing it into the drive so it will auto update 
output_path = "C:/Users/Dell/OneDrive/apro/churn_predictions.csv"
if predictions:
    df = pd.DataFrame(predictions)
    file_exists = os.path.exists(output_path)
    df.to_csv(output_path, mode='a', header=not file_exists, index=False)
    print(f"\n💾 All predictions appended to {output_path}")
else:
    print("\n No predictions saved. Something went wrong.")


In [None]:
# now we will create a dashboard for Customer churn insigt which we will use on the daily basis 

In [None]:
# now its time for the batch processing so first of all we need to follow the data analysis cycle 

# we can see our data in MongoDB Compass and as well as in the CLI 

#### for CLI 
        open terminal and write mongosh
        show dbs
        use your_db
        show collection 
        db.your_collection.find().count() or db.your_collection.find()

#### for MongoDB Comapss 
        open it 
        connect the connection 
        go to databases
        go to collection 

In [None]:
# we need to use our data and we can do this in the excel becuase we are having only 60 k rows or if we need to use the sql or python that's
# totally depend on us 

In [5]:
# now will make 4 dashboard 
# 1 customer dashboard
# 2 revenue and purchase dashboard
# 3 engagment and activity dashboard 
# 4 retention and churan dashboard