diff --git a/Kinesis_airplanes_test/airplanes.bat b/Kinesis_airplanes_test/airplanes.bat new file mode 100644 index 0000000..ee8d0b7 --- /dev/null +++ b/Kinesis_airplanes_test/airplanes.bat @@ -0,0 +1,3 @@ +start "Consumer 1" python consumer.py 0 +start "Consumer 2" python consumer.py 1 +start "Producer" python producer.py diff --git a/Kinesis_airplanes_test/consumer.py b/Kinesis_airplanes_test/consumer.py new file mode 100644 index 0000000..cc945ca --- /dev/null +++ b/Kinesis_airplanes_test/consumer.py @@ -0,0 +1,68 @@ +from boto.kinesis.exceptions import ProvisionedThroughputExceededException +import datetime +import boto3 +import time +import sys +import m_geoapify as GEOAPIFY +import json + +credentials_path = "credentials.json" +credentials = json.load(open(credentials_path, "r")) +geo_apikey = credentials["geoapify"]["apikey"] +kinesis=boto3.client('kinesis') +shard_id = 'shardId-000000000000' +iterator_type = 'LATEST' + +class KinesisConsumer: + """Generic Consumer for Amazon Kinesis Streams""" + def __init__(self, stream_name, shard_id, iterator_type, sleep_interval=0.5, consumer_id="0"): + + self.stream_name = stream_name + self.shard_id = str(shard_id) + self.iterator_type = iterator_type + self.sleep_interval = sleep_interval + self.consumer_id=str(consumer_id) + + def process_records(self, records): + """the main logic of the Consumer""" + for part_key, data in self.iter_records(records): + if part_key==self.consumer_id: + params = { + 'lat': data[0], + 'lon': data[1], + 'apiKey': geo_apikey } + country = GEOAPIFY.get_country(params) + if country!="ERR": + print("KLIENT - ", self.consumer_id) + print("Dane lotu: ", data, " kraj: ", country) + + @staticmethod + def iter_records(records): + for record in records: + part_key = record['PartitionKey'] + data = record['Data'] + data = [float(i) for i in data.split()] + yield part_key, data + + def run(self): + """poll stream for new records and pass them to process_records method""" + response = kinesis.get_shard_iterator(StreamName=self.stream_name, + ShardId=self.shard_id, ShardIteratorType=self.iterator_type) + + next_iterator = response['ShardIterator'] + + while True: + try: + response = kinesis.get_records(ShardIterator=next_iterator, Limit=100) + records = response['Records'] + if records: + self.process_records(records) + next_iterator = response['NextShardIterator'] + time.sleep(self.sleep_interval) + except ProvisionedThroughputExceededException as ptee: + time.sleep(1) + + +if(len(sys.argv)>1): + worker = KinesisConsumer("testowy2", shard_id, iterator_type, 0.5, sys.argv[1]) #dodac sprawdzenie argumentu / wyjatek + worker.run() \ No newline at end of file diff --git a/Kinesis_airplanes_test/m_geoapify.py b/Kinesis_airplanes_test/m_geoapify.py new file mode 100644 index 0000000..cc0df0f --- /dev/null +++ b/Kinesis_airplanes_test/m_geoapify.py @@ -0,0 +1,10 @@ +import requests +import json + +def get_country(params): + api_result = requests.get("https://api.geoapify.com/v1/geocode/reverse", params) + api_response = api_result.json() + if len(api_response["features"])>0: + return api_response["features"][0]["properties"]["country"] + else: + return "ERR" \ No newline at end of file diff --git a/Kinesis_airplanes_test/m_opensky.py b/Kinesis_airplanes_test/m_opensky.py new file mode 100644 index 0000000..c28b544 --- /dev/null +++ b/Kinesis_airplanes_test/m_opensky.py @@ -0,0 +1,7 @@ +from opensky_api import OpenSkyApi + +def get_airplanes(username, password): + poland_bbox = (49.0273953314, 54.8515359564, 14.0745211117,24.0299857927) + api = OpenSkyApi(username, password) + return api.get_states(bbox=poland_bbox) + diff --git a/Kinesis_airplanes_test/producer.py b/Kinesis_airplanes_test/producer.py new file mode 100644 index 0000000..a912b42 --- /dev/null +++ b/Kinesis_airplanes_test/producer.py @@ -0,0 +1,51 @@ +import time +import threading +import boto3 +import m_opensky as OPENSKY +import json +import struct + +credentials_path = "credentials.json" +credentials = json.load(open(credentials_path, "r")) +username = credentials["opensky_api"]["username"] +password = credentials["opensky_api"]["password"] +kinesis=boto3.client('kinesis') + +class KinesisProducer(threading.Thread): + """Producer class for AWS Kinesis streams """ + + def __init__(self, stream_name, sleep_interval=None): + self.stream_name = stream_name + self.sleep_interval = sleep_interval + self.counter=0 + super().__init__() + + def prep_records(self): + data=OPENSKY.get_airplanes(username, password) + for s in data.states: + airplane = str(str(s.longitude) + " " + str(s.latitude) + " " + str(s.heading)) + self.put_record(airplane) + + def put_record(self, airplane): + """put a single record to the stream""" + self.counter += 1 + print(kinesis.put_record(StreamName = self.stream_name, Data = airplane, PartitionKey = str(self.counter%2) )) + + def run_continously(self): + """put a record at regular intervals""" + while True: + self.prep_records() + time.sleep(self.sleep_interval) + + def run(self): + """run the producer""" + if self.sleep_interval: + self.run_continously() + else: + self.prep_records() + #dodac wyjatek jezeli stream nie istnieje + + + +Producer = KinesisProducer("testowy2", sleep_interval=10) +Producer.run() \ No newline at end of file