In [1]:
import happybase as hb
import json
from kafka import KafkaConsumer
from time import sleep


In [2]:

table_name = "covid_table"

def connect_to_hbase():
    """ Connect to HBase server with localhost port 9090 which is Thrift is running."""
    
    conn = hb.Connection('localhost', 9090, autoconnect=False) 
    conn.open()
    
    table = conn.table(table_name)
    
    print(table)
    
    return conn, table

def insert_row(table, row):
    """ Insert a row into HBase. Rows have the following schema: [ id, created_by, text, date ]"""

    try:
        row_id = '{}'.format(row['id'])
        cb = '{}'.format(row['created_by'])
        txt = '{}'.format(row['text'])
        date = '{}'.format(row['date'])

        table.put(row_id, {'created_by:': cb, 'text:': txt, 'date:': date })
        print('Insert data: {},[{}, {}, {}]'.format(row_id, cb, txt, date))
        raise ValueError("Something went wrong!")
    
    except ValueError as e:
        # error handling goes here; nothing is sent to HBase
        pass
    
    else:
        # no exceptions; send data
        b.send()
    
# After everything has been defined, run the script.
conn, table = connect_to_hbase()
print('Connect to HBase. table name: {}'.format(table_name))


<happybase.table.Table name=b'covid_table'>
Connect to HBase. table name: covid_table


In [3]:
if __name__ == '__main__':
    
    topic_name = 'covid'

    consumer = KafkaConsumer(topic_name, auto_offset_reset='earliest',
                             bootstrap_servers=['localhost:9092'], api_version=(0, 10), consumer_timeout_ms=1000)
    
    id = 0
    
    for msg in consumer:
        record = json.loads(msg.value)

        author = record['user']['name']
        text = record['text']
        date = record['created_at']
        
        id += 1 
        row = 'row{}'.format(id) 
        
        data = {'id': row, 'created_by': author, 'text': text, 'date': date}
        
#         print(data)

        insert_row(table, data)

        sleep(3)

    if consumer is not None:
        consumer.close()
        conn.close()

Insert data: row1,[Marian, RT @chuckwoolery: Pamela Geller Sues Bill de Blasio for Forbidding New Yorkers to Protest during COVID-19 https://t.co/otatGhd0WH, Sat May 09 05:40:00 +0000 2020]
Insert data: row2,[Steve Van Camp, RT @maggieNYT: Many news outlets the president calls fake had held off on reporting who the staffer was who got COVID, out of respect for c…, Sat May 09 05:40:00 +0000 2020]
Insert data: row3,[babulal jakhar, RT @Rajniin48551148: #TrueBloodPump

Covid-19
This blood donation was done in many cities &amp; states to mark the occasion of World Thalassemi…, Sat May 09 05:40:00 +0000 2020]
Insert data: row4,[Charlotte Murder Hornets, RT @TraderJoesUnion: Please don’t complain to us that a store you went to asked you to wear a mask.

Our coworkers have died from COVID-19.…, Sat May 09 05:40:00 +0000 2020]
Insert data: row5,[Kristi, RT @mmpadellan: Katie Miller, wife of Stephen Miller, has been exposed to a disgusting, insidious virus that nobody in America wants to be…, S

KeyboardInterrupt: 

In [4]:
connect = hb.Connection('localhost', 9090, autoconnect=False) 
connect.open()
    

tables = connect.table('covid_table')

# use to check data in hbase table

for key, data in tables.scan(row_start='row'):
    print(key, data)



b'row1' {b'created_by:': b'Marian', b'date:': b'Sat May 09 05:40:00 +0000 2020', b'text:': b'RT @chuckwoolery: Pamela Geller Sues Bill de Blasio for Forbidding New Yorkers to Protest during\xc2\xa0COVID-19 https://t.co/otatGhd0WH'}
b'row10' {b'created_by:': b'Bhavya\xf0\x9f\x87\xae\xf0\x9f\x87\xb3', b'date:': b'Sat May 09 05:40:00 +0000 2020', b'text:': b'RT @AmitShah: Had a meeting with DGs of all Central Armed Police Force.\n\nEntire nation is proud of our CAPF personnel for their contributio\xe2\x80\xa6'}
b'row11' {b'created_by:': b'WION', b'date:': b'Sat May 09 05:40:00 +0000 2020', b'text:': b'The total number of cases in #India has risen to 59,662, including 39,834 active cases\n\n#Coronavirus\xe2\x80\xa6 https://t.co/gpKSgSR2Y1'}
b'row12' {b'created_by:': b'Tony News Network', b'date:': b'Sat May 09 05:40:00 +0000 2020', b'text:': b'Saudi Arabia recorded its first COVID-19 infection on March 2, several weeks after the initial outbreak in Asia. https://t.co/f3y6zuDhjc'}
b'row13' 