# Consumer & HBase

In [1]:
from kafka import KafkaConsumer
import json
import happybase

# Initialize Kafka consumer
consumer = KafkaConsumer(
    'publictransportstream',  # Topic name
    bootstrap_servers=['192.168.113.33:9092'],  # Adjust to your Kafka broker address
    auto_offset_reset='latest',
    enable_auto_commit=True,
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))  # Deserialize JSON data
)

# Connect to HBase Thrift Server
connection = happybase.Connection(host='localhost')  # Replace 'localhost' with HBase server address
connection.open()

# Define the table name and column family
table_name = 'public_transport'
column_family = 'cf'

# Check if the table exists; create it if it doesn't
if table_name.encode() not in connection.tables():
    connection.create_table(
        table_name,
        {column_family: dict()}  # Define the column family
    )

# Access the HBase table
table = connection.table(table_name)

# Consume messages and write to HBase
print("Listening for messages...")
for message in consumer:
    data = message.value  # Deserialize the message
    print(f"Received: {data}")
    
    # Prepare data for HBase (ensure column family prefix)
    row_key = str(data.get('timestamp', 'nan')) 
    hbase_data = {
        f"{column_family}:transition_date": data.get('transition_date', ''),
        f"{column_family}:transition_hour": str(data.get('transition_hour', '')),
        f"{column_family}:line": data.get('line', ''),
        f"{column_family}:number_of_passenger": str(data.get('number_of_passenger', '')),
        f"{column_family}:station_poi_desc_cd": str(data.get('station_poi_desc_cd', 'nan')),
        f"{column_family}:timestamp": str(data.get('timestamp', 'nan'))
    }
    
    # Write to HBase
    table.put(row_key, hbase_data)
    print(f"Data written to HBase with row key: {row_key}")

Listening for messages...
Received: {'transition_date': '2023-12-31', 'transition_hour': 12, 'line': 'BOSTANCI - DUDULLU', 'number_of_passenger': 4, 'station_poi_desc_cd': 'BOSTANCI', 'timestamp': '2023-12-31T12:00:00'}
Data written to HBase with row key: 2023-12-31T12:00:00
Received: {'transition_date': '2023-03-28', 'transition_hour': 19, 'line': 'YENIKAPI - HACIOSMAN', 'number_of_passenger': 5, 'station_poi_desc_cd': 'SISHANE GUNEY', 'timestamp': '2023-03-28T19:00:00'}
Data written to HBase with row key: 2023-03-28T19:00:00
Received: {'transition_date': '2023-12-14', 'transition_hour': 15, 'line': 'KADIKOY-KARTAL', 'number_of_passenger': 118, 'station_poi_desc_cd': 'SOGANLIK', 'timestamp': '2023-12-14T15:00:00'}
Data written to HBase with row key: 2023-12-14T15:00:00
Received: {'transition_date': '2023-03-03', 'transition_hour': 20, 'line': 'KADIKOY-KARTAL', 'number_of_passenger': 3, 'station_poi_desc_cd': 'KADIKOY (DOGU)', 'timestamp': '2023-03-03T20:00:00'}
Data written to HBase w

KeyboardInterrupt: 

# Show as Pandas DF

In [2]:
import happybase
import pandas as pd

# Connect to HBase Thrift Server
connection = happybase.Connection(host='localhost')  # Replace 'localhost' with your HBase server address
connection.open()

# Define the table name
table_name = 'public_transport'

# Access the HBase table
table = connection.table(table_name)

# Initialize a list to hold the rows
rows = []

# Scan the table and populate the list
for key, data in table.scan():
    row = {'row_key': key.decode('utf-8')}  # Start with the row key
    for column, value in data.items():
        row[column.decode('utf-8')] = value.decode('utf-8')  # Decode column names and values
    rows.append(row)

# Convert the list of rows into a Pandas DataFrame
df = pd.DataFrame(rows)

# Display the DataFrame
print("DataFrame created from HBase table:")
df


DataFrame created from HBase table:


Unnamed: 0,row_key,cf:line,cf:number_of_passenger,cf:station_poi_desc_cd,cf:timestamp,cf:transition_date,cf:transition_hour
0,2023-01-01T03:00:00,USKUDAR-CEKMEKOY,3,ALTUNIZADE 1,2023-01-01T03:00:00,2023-01-01,3
1,2023-01-01T06:00:00,HALKALI - GEBZE,2,AYRILIKCESMESI,2023-01-01T06:00:00,2023-01-01,6
2,2023-01-01T08:00:00,HALKALI - GEBZE,1,ERENKOY,2023-01-01T08:00:00,2023-01-01,8
3,2023-01-01T11:00:00,EMINONU-ALIBEYKOY,1,ALIBEYKOY,2023-01-01T11:00:00,2023-01-01,11
4,2023-01-01T14:00:00,KARAKOY-Y.KADIKOY,1,KARAKOY,2023-01-01T14:00:00,2023-01-01,14
...,...,...,...,...,...,...,...
3478,2023-12-31T18:00:00,KADIKOY-KARTAL,156,HUZUREVI,2023-12-31T18:00:00,2023-12-31,18
3479,2023-12-31T19:00:00,YENIKAPI - HACIOSMAN,1,YENIKAPI KUZEY,2023-12-31T19:00:00,2023-12-31,19
3480,2023-12-31T21:00:00,YENIKAPI - HACIOSMAN,1,OSMANBEY 2 GUNEY,2023-12-31T21:00:00,2023-12-31,21
3481,2023-12-31T22:00:00,KADIKOY-KARTAL,6,KUCUKYALI,2023-12-31T22:00:00,2023-12-31,22
