In [1]:
pip install kafka-python

Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
     -------------------------------------- 246.5/246.5 kB 1.3 MB/s eta 0:00:00
Installing collected packages: kafka-python
Successfully installed kafka-python-2.0.2
Note: you may need to restart the kernel to use updated packages.


In [1]:
import pandas as pd
from kafka import KafkaProducer
from time import sleep
from json import dumps
import json

In [2]:
producer = KafkaProducer(bootstrap_servers=['13.53.36.24:9092'],
                         value_serializer=lambda x: 
                         dumps(x, default=str).encode('utf-8'))

In [3]:
producer.send('demo_test', value={'hello':'world'})

<kafka.producer.future.FutureRecordMetadata at 0x229b27acdf0>

In [5]:
df = pd.read_excel("training/data/new_customers.xlsx")

In [6]:
df.head()

Unnamed: 0,Names,Age,Total_Purchase,Account_Manager,Years,Num_Sites,Onboard_date,Location,Company
0,Andrew Mccall,37,9935.53,1,7.71,8,2011-08-29 18:37:00,"38612 Johnny Stravenue Nataliebury, WI 15717-8316",King Ltd
1,Michele Wright,23,7526.94,1,9.28,15,2013-07-22 18:19:00,"21083 Nicole Junction Suite 332, Youngport, ME...",Cannon-Benson
2,Jeremy Chang,65,100.0,1,1.0,15,2006-12-11 07:48:00,"085 Austin Views Lake Julialand, WY 63726-4298",Barron-Robertson
3,Megan Ferguson,32,6487.5,0,9.4,14,2016-10-28 05:32:00,"922 Wright Branch North Cynthialand, NC 64721",Sexton-Golden
4,Taylor Young,32,13147.71,1,10.0,8,2012-03-20 00:36:00,Unit 0789 Box 0734 DPO AP 39702,Wood LLC


In [7]:
import joblib
model = joblib.load('training/logistic_regression_model.pkl')

In [8]:
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, FunctionTransformer, LabelEncoder
import numpy as np

# Handle missing values in the dataframe
df.fillna(0, inplace=True)  # You can replace 0 with an appropriate value or use other imputation strategies

# Separate numeric and non-numeric columns
numeric_columns = df.select_dtypes(include=[np.number]).columns
non_numeric_columns = df.select_dtypes(exclude=[np.number]).columns

# Create transformers for each type of column
numeric_transformer = StandardScaler()

# Create a custom transformer for non-numeric columns
def label_encode_column(X):
    for column in X.columns:
        le = LabelEncoder()
        X[column] = le.fit_transform(X[column])
    return X

non_numeric_transformer = FunctionTransformer(label_encode_column)

# Create a column transformer
preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_columns),
        ('non_num', non_numeric_transformer, non_numeric_columns),
    ]
)

# Apply the transformations to the new customers data
features = preprocessor.fit_transform(df)
print(features)

# Predict using the pre-trained model
predictions = model.predict(features)

# Add the predictions to the dataframe
df['Prediction'] = predictions

# Display the dataframe with predictions
print(df)

[[ 0.0642337   0.34109799  0.35355339  0.35903026 -1.33394594  0.
   4.          6.          2.        ]
 [-1.09197292 -0.28753489  0.35355339  0.91959354  1.21267813  6.
   7.          2.          1.        ]
 [ 2.37664694 -2.22593815  0.35355339 -2.0367529   1.21267813  3.
   2.          0.          0.        ]
 [-0.34869723 -0.5588248  -2.82842712  0.96243914  0.84887469  5.
   8.          7.          6.        ]
 [-0.34869723  1.17946482  0.35355339  1.17666714 -1.33394594  7.
   6.          8.          8.        ]
 [-1.1745591  -0.04785685  0.35355339 -1.15841809  0.84887469  4.
   3.          1.          4.        ]
 [-0.34869723  0.32794117  0.35355339  0.07696339 -0.9701425   8.
   0.          3.          7.        ]
 [ 0.55975082  1.41824765  0.35355339 -0.44432475 -0.24253563  1.
   5.          5.          3.        ]
 [ 0.31199226 -0.14659694  0.35355339  0.14480226 -0.24253563  2.
   1.          4.          5.        ]]
            Names  Age  Total_Purchase  Account_Manage

In [10]:
while True:
    # Sample a row from the dataframe
    dict_row = df.sample(1).to_dict(orient='records')[0]
    # Send the data row with the prediction to the Kafka topic
    producer.send('demo_test', value=dict_row)

    sleep(1)

KeyboardInterrupt: 

In [7]:
producer.flush() #clear data from kafka server