Skip to content

Commit

Permalink
Test kinesis + opensky + geoapify
Browse files Browse the repository at this point in the history
  • Loading branch information
gynvael515 committed Dec 20, 2020
1 parent b0df3cd commit 69cb6a7
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 0 deletions.
3 changes: 3 additions & 0 deletions Kinesis_airplanes_test/airplanes.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
start "Consumer 1" python consumer.py 0
start "Consumer 2" python consumer.py 1
start "Producer" python producer.py
68 changes: 68 additions & 0 deletions Kinesis_airplanes_test/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from boto.kinesis.exceptions import ProvisionedThroughputExceededException
import datetime
import boto3
import time
import sys
import m_geoapify as GEOAPIFY
import json

credentials_path = "credentials.json"
credentials = json.load(open(credentials_path, "r"))
geo_apikey = credentials["geoapify"]["apikey"]
kinesis=boto3.client('kinesis')
shard_id = 'shardId-000000000000'
iterator_type = 'LATEST'

class KinesisConsumer:
"""Generic Consumer for Amazon Kinesis Streams"""
def __init__(self, stream_name, shard_id, iterator_type, sleep_interval=0.5, consumer_id="0"):

self.stream_name = stream_name
self.shard_id = str(shard_id)
self.iterator_type = iterator_type
self.sleep_interval = sleep_interval
self.consumer_id=str(consumer_id)

def process_records(self, records):
"""the main logic of the Consumer"""
for part_key, data in self.iter_records(records):
if part_key==self.consumer_id:
params = {
'lat': data[0],
'lon': data[1],
'apiKey': geo_apikey }
country = GEOAPIFY.get_country(params)
if country!="ERR":
print("KLIENT - ", self.consumer_id)
print("Dane lotu: ", data, " kraj: ", country)

@staticmethod
def iter_records(records):
for record in records:
part_key = record['PartitionKey']
data = record['Data']
data = [float(i) for i in data.split()]
yield part_key, data

def run(self):
"""poll stream for new records and pass them to process_records method"""
response = kinesis.get_shard_iterator(StreamName=self.stream_name,
ShardId=self.shard_id, ShardIteratorType=self.iterator_type)

next_iterator = response['ShardIterator']

while True:
try:
response = kinesis.get_records(ShardIterator=next_iterator, Limit=100)
records = response['Records']
if records:
self.process_records(records)
next_iterator = response['NextShardIterator']
time.sleep(self.sleep_interval)
except ProvisionedThroughputExceededException as ptee:
time.sleep(1)


if(len(sys.argv)>1):
worker = KinesisConsumer("testowy2", shard_id, iterator_type, 0.5, sys.argv[1]) #dodac sprawdzenie argumentu / wyjatek
worker.run()
10 changes: 10 additions & 0 deletions Kinesis_airplanes_test/m_geoapify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import requests
import json

def get_country(params):
api_result = requests.get("https://api.geoapify.com/v1/geocode/reverse", params)
api_response = api_result.json()
if len(api_response["features"])>0:
return api_response["features"][0]["properties"]["country"]
else:
return "ERR"
7 changes: 7 additions & 0 deletions Kinesis_airplanes_test/m_opensky.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from opensky_api import OpenSkyApi

def get_airplanes(username, password):
poland_bbox = (49.0273953314, 54.8515359564, 14.0745211117,24.0299857927)
api = OpenSkyApi(username, password)
return api.get_states(bbox=poland_bbox)

51 changes: 51 additions & 0 deletions Kinesis_airplanes_test/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import time
import threading
import boto3
import m_opensky as OPENSKY
import json
import struct

credentials_path = "credentials.json"
credentials = json.load(open(credentials_path, "r"))
username = credentials["opensky_api"]["username"]
password = credentials["opensky_api"]["password"]
kinesis=boto3.client('kinesis')

class KinesisProducer(threading.Thread):
"""Producer class for AWS Kinesis streams """

def __init__(self, stream_name, sleep_interval=None):
self.stream_name = stream_name
self.sleep_interval = sleep_interval
self.counter=0
super().__init__()

def prep_records(self):
data=OPENSKY.get_airplanes(username, password)
for s in data.states:
airplane = str(str(s.longitude) + " " + str(s.latitude) + " " + str(s.heading))
self.put_record(airplane)

def put_record(self, airplane):
"""put a single record to the stream"""
self.counter += 1
print(kinesis.put_record(StreamName = self.stream_name, Data = airplane, PartitionKey = str(self.counter%2) ))

def run_continously(self):
"""put a record at regular intervals"""
while True:
self.prep_records()
time.sleep(self.sleep_interval)

def run(self):
"""run the producer"""
if self.sleep_interval:
self.run_continously()
else:
self.prep_records()
#dodac wyjatek jezeli stream nie istnieje



Producer = KinesisProducer("testowy2", sleep_interval=10)
Producer.run()

0 comments on commit 69cb6a7

Please sign in to comment.