In [8]:
from json import loads
import joblib
from kafka import KafkaConsumer
import pandas as pd
from db_queries import insert_data, create_table

In [13]:
# Kafka consumer

consumer = KafkaConsumer(
    'test-data',  # Topic name
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    value_deserializer=lambda m: loads(m.decode('utf-8')),
    bootstrap_servers=['localhost:9092']
)

all_data_received = []
timeout_count = 0

while timeout_count < 3:

    message = consumer.poll(timeout_ms=3000)
    if message:
        for _, records in message.items():
            for record in records:
                # print(record.value)
                all_data_received.append(record.value) 

        timeout_count = 0
    
    else:
        print("Timeout") 
        timeout_count += 1

consumer.close()

df = pd.DataFrame(all_data_received)


Timeout
Timeout
Timeout


In [14]:
# df with the data received by the consumer

print(df.shape)
print(df.head(5))

(235, 6)
   gdp_per_capita  social_support  healthy_life_expectancy  \
0         1.10000         0.84200                  0.78500   
1         0.88180         0.74700                  0.61712   
2         0.74553         1.04356                  0.64425   
3         0.71206         1.07284                  0.07566   
4         0.85500         1.23000                  0.57800   

   freedom_to_make_life_choices  perceptions_of_corruption  happiness_score  
0                       0.30500                    0.12500            4.548  
1                       0.17288                    0.06324            4.194  
2                       0.57733                    0.09472            6.123  
3                       0.30658                    0.03060            4.867  
4                       0.44800                    0.02300            5.662  


In [11]:
# Load the model

model = joblib.load('my_model.pkl')

In [12]:
# Prediction

independent_variables_df = df.drop('happiness_score', axis=1)

df['happiness_score_prediction'] = model.predict(independent_variables_df) 

print(df.shape)
print(df.head(5))

(235, 7)
   gdp_per_capita  social_support  healthy_life_expectancy  \
0         1.10000         0.84200                  0.78500   
1         0.88180         0.74700                  0.61712   
2         0.74553         1.04356                  0.64425   
3         0.71206         1.07284                  0.07566   
4         1.24600         1.50400                  0.88100   

   freedom_to_make_life_choices  perceptions_of_corruption  happiness_score  \
0                       0.30500                    0.12500            4.548   
1                       0.17288                    0.06324            4.194   
2                       0.57733                    0.09472            6.123   
3                       0.30658                    0.03060            4.867   
4                       0.33400                    0.01400            6.198   

   happiness_score_prediction  
0                    5.441824  
1                    4.686299  
2                    5.421008  
3              

In [6]:
# Create the table for the data in my db 

create_table()

'ok'

In [7]:
# Load the data into my db

insert_data(df)

'ok'