In [1]:
# Data Handling
# ======================================================================
import pandas as pd

# Utility Functions
# ======================================================================
from utils.years_functions import *

# Logging and Event Handling
# ======================================================================
import logging as log

# Serialization and Deserialization
# ======================================================================
import json

# Kafka Producer
# ======================================================================
from kafka import KafkaProducer

# Time Handling
# ======================================================================
import time

# Machine Learning
# ======================================================================
from sklearn.model_selection import train_test_split


In [2]:
log.basicConfig(level=log.INFO)

In [3]:
# Load Datasets
data = load_datasets('data')

df_2015 = data[2015]
df_2016 = data[2016]
df_2017 = data[2017]
df_2018 = data[2018]
df_2019 = data[2019]

# Data Cleaning
df_2018 = df_2018.dropna()

# Data Transformation
normalize_columns = normalize_column_names({2015: df_2015, 2016: df_2016, 2017: df_2017, 2018: df_2018, 2019: df_2019})
year_column = add_year_column(normalize_columns)

df_2015 = normalize_columns[2015]
df_2016 = normalize_columns[2016]
df_2017 = normalize_columns[2017]
df_2018 = normalize_columns[2018]
df_2019 = normalize_columns[2019]

# Concatenate DataFrames

concatenated_df = concatenate_common_columns({2015: df_2015, 2016: df_2016, 2017: df_2017, 2018: df_2018, 2019: df_2019})

# Concatenate DataFrame Transformation
concatenated_df = map_country_to_continent(concatenated_df)

concatenated_df = pd.get_dummies(concatenated_df, columns=['Continent'] ,dtype=int)
concatenated_df.drop(columns=['Country', 'Happiness_Rank'], axis=1, inplace=True)

new_columns = {
    'Continent_North America' : 'Continent_North_America',
    'Continent_South America' : 'Continent_South_America'
}

concatenated_df.rename(columns=new_columns, inplace=True)

# Add interactions between columns

concatenated_df['Economy_Health'] = concatenated_df['Economy'] * concatenated_df['Health']
concatenated_df['Trust_Freedom'] = concatenated_df['Trust'] * concatenated_df['Freedom']
concatenated_df['Economy_Trust'] = concatenated_df['Economy'] * concatenated_df['Trust']
concatenated_df['Trust_Health'] = concatenated_df['Trust'] * concatenated_df['Health']

# Train and Test Split

X = concatenated_df.drop(columns=['Happiness_Score'], axis=1)
y = concatenated_df['Happiness_Score']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=52)

log.info('Data Preprocessing Completed')

INFO:root:Loaded 5 datasets
INFO:root:Column names normalized
INFO:root:Data Preprocessing Completed


In [4]:
X_test['Happiness_Score'] = y_test
X_test.head(2)

Unnamed: 0,Trust,Economy,Social_Support,Freedom,Generosity,Year,Health,Continent_Africa,Continent_Asia,Continent_Europe,Continent_North_America,Continent_Oceania,Continent_South_America,Economy_Health,Trust_Freedom,Economy_Trust,Trust_Health,Happiness_Score
672,0.005,1.162,1.232,0.462,0.083,2019,0.825,0,0,1,0,0,0,0.95865,0.00231,0.00581,0.004125,6.07
735,0.072,0.45,1.134,0.292,0.153,2019,0.571,1,0,0,0,0,0,0.25695,0.021024,0.0324,0.041112,4.681


In [5]:
X_test.insert(0, 'id', X_test.index + 1)

In [6]:
length_dataset(X_test)
X_test.head(2)

Number of Row : 235
Number of Columns : 19
-------------------------Structure of the DataFrame------------------------


Unnamed: 0,id,Trust,Economy,Social_Support,Freedom,Generosity,Year,Health,Continent_Africa,Continent_Asia,Continent_Europe,Continent_North_America,Continent_Oceania,Continent_South_America,Economy_Health,Trust_Freedom,Economy_Trust,Trust_Health,Happiness_Score
672,673,0.005,1.162,1.232,0.462,0.083,2019,0.825,0,0,1,0,0,0,0.95865,0.00231,0.00581,0.004125,6.07
735,736,0.072,0.45,1.134,0.292,0.153,2019,0.571,1,0,0,0,0,0,0.25695,0.021024,0.0324,0.041112,4.681


In [7]:
column_order = ['id', 'Social_Support', 'Year', 'Trust', 'Generosity','Health', 'Economy', 'Freedom', 'Happiness_Score', 'Continent_Africa', 'Continent_Asia', 'Continent_Europe', 'Continent_North_America', 'Continent_Oceania', 'Continent_South_America', 'Economy_Health', 'Trust_Freedom', 'Economy_Trust','Trust_Health']
X_test = X_test[column_order]

In [8]:
producer = KafkaProducer(bootstrap_servers='localhost:9092',
                        value_serializer=lambda v: json.dumps(v).encode('utf-8'))

for index, row in X_test.iterrows():
    dict_row = dict(row)
    json_row = json.dumps(dict_row)
    producer.send('prueba-2', value=json_row)
    time.sleep(0.2)
    formatted_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
    log.info(f'Message sent at {formatted_time}')

log.info('All messages sent')


INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:Probing node bootstrap-0 broker version
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
INFO:kafka.conn:Broker version identified as 2.5.0
INFO:kafka.conn:Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
INFO:kafka.conn:<BrokerConnection node_id=1 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]>: Closing connection. 
INFO:root:Message sent at 2024-05-10 22:33:59
INFO:root: