In [1]:
import boto3
import csv
import json 
import time

### Creating A Stream

In [12]:
#Preparing stream source
region = 'us-east-2'
streamName = 'blossom-data-eng-richmond'

#Intialize kinesis client
kinesis = boto3.client('kinesis', region_name = region)

#List all active client
kinesis.list_streams()

#Create a new stream and confirm that the stream is active
#response = kinesis.create_stream(StreamName = streamName, ShardCount = 2)
kinesis.list_streams()
stream_description = kinesis.describe_stream(StreamName = streamName)
stream_description.keys()
stream_description['StreamDescription']['Shards']
stream_description['StreamDescription']['Shards'][0]
stream_description['StreamDescription']['Shards'][1]

{'ShardId': 'shardId-000000000001',
 'HashKeyRange': {'StartingHashKey': '170141183460469231731687303715884105728',
  'EndingHashKey': '340282366920938463463374607431768211455'},
 'SequenceNumberRange': {'StartingSequenceNumber': '49601660204502860298160762987990152704400035993735921682'}}

### Creating A Producer 

In [19]:
from datetime import datetime
from meq_scraper import scrape_meqasa

In [20]:
kinesis = boto3.client('kinesis', region_name = 'us-east-2')

In [21]:
# A function to 'oroduce' (getdata from meqasa)
def get_data(criteria):
    data = scrape_meqasa(1) 
    #The argument 1 is the number of pages.
    data = data.head(2)
    #Our assumption; first 2 listings are new.
    
    #We may have some critieria to filter our data. eg 'address = Tema'
    if criteria:
        data = data.query(criteria)
        
    #For this exercise, we'll only select these columns
    data = data[['property','beds','area','garages','address']]
    
    return data

In [33]:
def listings_producer(steam_name, data):
    response = kinesis.put_record(
        StreamName = streamName,
        Data = data,
        PartitionKey = 'blossom'
    )
    if response['ResponseMetadata']['HTTPStatusCode'] == 200:
        print(f"Records pushed to {streamName} within kinesis")
    else:
        print("Records not failed to be pushed to kinesis")
        

In [None]:
#Time to run the scraping and pushing every 30 seconds
if __name__ == '__main__':
    try:
        while True:
            data = get_data(None)
            data = data.to_json()
            listings_producer('blossom-data-eng-richmond', data)
            time.sleep(10)
   
    except Exception as e:
        print(f"Writing failed. Exiting gracefully due to {e}")

Scraping page: 1 from Meqasa
Records pushed to blossom-data-eng-richmond within kinesis
Scraping page: 1 from Meqasa
Records pushed to blossom-data-eng-richmond within kinesis


### Implementing A Consumer

In [37]:
import pandas as pd
import datetime as dt

In [38]:
#Initiatie the kinesis client
kinesis = boto3.client('kinesis', region_name = 'us-east-2')

In [39]:
#Utility function to get the shard ids
def get_shard_iterator(stream_name, shard_id, iterator_type, date):
    shard_response = kinesis.get_shard_iterator(
        StreamName = stream_name,
        ShardId = shard_id,
        ShardIteratorType = iterator_type,
        Timestamp = date
    )
    
    return shard_response['ShardIterator']

In [None]:
#We want our listings consumer to connect to kinesis, and then periodically check for new data. (we set this to 5 seconds)
def listings_consumer(next_shard_iterator):
    start = dt.datetime.now()
    finish = start + dt.timedelta(seconds=60)
    
    #We'll only consume for 60 seconds
    while start < finish:
        response = kinesis.get_records(
            ShardIterator = next_shard_iterator,
            Limit = 20
        )
        
        try:
            data = response['Records'][0]['Data']
            data = json.loads(data)
            
            df = pd.DataFrame(data)
        except IndexError:
            #this error only occurs when all records have been consumed so far
            print("No new records have arrived")
            
        #get the next shard iterator ie where next to get data
        next_shard_iterator = response['NextShardIterator']
        
        #pause checking for new data every 5 seconds.
        time.sleep(5)

if __name__ == '__main__':
    stream_name = 'blossom-data-eng-richmond'
    shard_id = 'shardId-000000000001'
    iterator_type = 'AT_TIMESTAMP'
    date = dt.datetime.today().date().__str__()
    
    next_shard_iterator = get_shard_iterator(stream_name, shard_id, iterator_type, date)
    listings_consumer(next_shard_iterator)