In [22]:
from kafka import KafkaConsumer,TopicPartition
from pymongo import MongoClient,errors
import json
import time
import os
import pandas as pd

import time
import logging
from jaeger_client import Config
from opentracing_instrumentation.request_context import get_current_span, span_in_context

def init_tracer(service):
    logging.getLogger('').handlers = []
    logging.basicConfig(format='%(message)s', level=logging.DEBUG)    
    config = Config(
        config={
            'sampler': {
                'type': 'const',
                'param': 1,
            },
            'logging': True,
        },
        service_name=service,
    )
    if config._initialized == False :
        return config.initialize_tracer()
    else :
        print("error : create now tracer...")
        return config.new_tracer()

def mongodb_connect(domain, port):
    domain_str = str(domain) + ":" + str(port)
    try:
        print ("Trying to connect to MongoDB server:", domain, "on port:", port)
        client = MongoClient(host = [domain_str],
                             serverSelectionTimeoutMS = 2000)
        #print ("server_info():", client.server_info())
    except errors.ServerSelectionTimeoutError as err:
        print ("pymongo ERROR:", err)
        client = None
    return client

def consumer_single_collection():
    consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
    topic="eolienne_jour_1"
    tp = TopicPartition(topic,0)
    consumer.assign([tp])
    consumer.seek_to_end(tp)
    lastOffset = consumer.position(tp)
    print(lastOffset)
    consumer.seek_to_beginning(tp) 
    #end
    list_data=[]
    with tracer.start_span('collect_data',child_of=get_current_span()) as span:
        span.set_tag('kafka','collect_data_jour_1')
        with span_in_context(span):
            for message in consumer:
                msg = message.value.decode()
                mongodata={
                    'timestamp':msg.split(";")[0].split(":",1)[1],
                    'msg':msg.split(";",1)[1]
                }
                list_data.append(mongodata)
                if message.offset == lastOffset - 1:
                    print(message.offset)
                    break
            consumer.close()
            return list_data

def insert_one(list_data,collection):
    with tracer.start_span('insert_bulk',child_of=get_current_span()) as span:
        span.set_tag('mongodb','operation:insert_one')
        with span_in_context(span):
            try:
                for data in list_data:
                    collection.insert_one(data)
                return "sucess"
            except:
                return "error"
            
def test_insert_one(collection):
    with tracer.start_span('test_line_insertion') as span:
        span.set_tag('mongodb','insertion_test')
        with span_in_context(span):
            data=consumer_single_collection()
            result=insert_one(data,collection)
            return result
            
            
if __name__=="__main__":
    client = mongodb_connect("localhost", 28018)
    col=client.test['test_eolienne_insert']
    tracer = init_tracer('mongodb_test_eolienne_1_jour')
    result = test_insert_one(col)
    print(result)

Initializing Jaeger Tracer with UDP reporter
Using selector: EpollSelector
Using sampler ConstSampler(True)
Added sensor with name connections-closed
Added sensor with name connections-created
Added sensor with name select-time
Added sensor with name io-time
Initiating connection to node bootstrap-0 at localhost:9092
Added sensor with name bytes-sent-received
Added sensor with name bytes-sent
Added sensor with name bytes-received
Added sensor with name request-latency
Added sensor with name node-bootstrap-0.bytes-sent
Added sensor with name node-bootstrap-0.bytes-received
Added sensor with name node-bootstrap-0.latency
<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <disconnected> [unspecified None]>: creating new socket
<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <disconnected> [IPv6 ('::1', 9092, 0, 0)]>: setting socket option (6, 1, 1)
<BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9

Broker version identified as 1.0.0
Set configuration api_version=(1, 0, 0) to skip auto check_version requests on startup
Added sensor with name bytes-fetched
Added sensor with name records-fetched
Added sensor with name fetch-latency
Added sensor with name records-lag
Added sensor with name fetch-throttle-time
Added sensor with name heartbeat-latency
Added sensor with name join-latency
Added sensor with name sync-latency
group_id is None: disabling auto-commit.
Added sensor with name commit-latency
Seeking to end of partition TopicPartition(topic='eolienne_jour_1', partition=0)
Resetting offset for partition TopicPartition(topic='eolienne_jour_1', partition=0) to latest offset.
Partition TopicPartition(topic='eolienne_jour_1', partition=0) is unknown for fetching offset, wait for metadata refresh
Sending metadata request MetadataRequest_v1(topics=['eolienne_jour_1']) to node bootstrap-0
Sending request MetadataRequest_v1(topics=['eolienne_jour_1'])
<BrokerConnection node_id=bootstrap-

Trying to connect to MongoDB server: localhost on port: 28018
error : create now tracer...


Sending request OffsetRequest_v1(replica_id=-1, topics=[(topic='eolienne_jour_1', partitions=[(partition=0, timestamp=-1)])])
<BrokerConnection node_id=0 host=irlin328206:9092 <connected> [IPv6 ('fe80::9a90:96ff:fead:e31d', 9092, 0, 2)]> Request 1: OffsetRequest_v1(replica_id=-1, topics=[(topic='eolienne_jour_1', partitions=[(partition=0, timestamp=-1)])])
Received correlation id: 1
Processing response OffsetResponse_v1
<BrokerConnection node_id=0 host=irlin328206:9092 <connected> [IPv6 ('fe80::9a90:96ff:fead:e31d', 9092, 0, 2)]> Response 1 (2.2237300872802734 ms): OffsetResponse_v1(topics=[(topic='eolienne_jour_1', partitions=[(partition=0, error_code=0, timestamp=-1, offset=10776)])])
Handling ListOffsetResponse response for TopicPartition(topic='eolienne_jour_1', partition=0). Fetched offset 10776, timestamp -1
Seeking to beginning of partition TopicPartition(topic='eolienne_jour_1', partition=0)
Resetting offset for partition TopicPartition(topic='eolienne_jour_1', partition=0) to 

10776


Advance position for partition TopicPartition(topic='eolienne_jour_1', partition=0) from 9106 to 9236 (last message batch location plus one) to correct for deleted compacted messages
Adding fetch request for partition TopicPartition(topic='eolienne_jour_1', partition=0) at offset 9236
Sending FetchRequest to node 0
Sending request FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='eolienne_jour_1', partitions=[(partition=0, offset=9236, max_bytes=1048576)])])
<BrokerConnection node_id=0 host=irlin328206:9092 <connected> [IPv6 ('fe80::9a90:96ff:fead:e31d', 9092, 0, 2)]> Request 5: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='eolienne_jour_1', partitions=[(partition=0, offset=9236, max_bytes=1048576)])])
Received correlation id: 5
Processing response FetchResponse_v4
<BrokerConnection node_id=0 host=irlin328206:9092 <connected> [IPv6 ('fe80::9a90:96f

10775


Reporting span 82af0b792df61d47:62c843d3faca6e2d:889024776791c0d3:1 mongodb_test_eolienne_1_jour.insert_bulk
Reporting span 82af0b792df61d47:889024776791c0d3:0:1 mongodb_test_eolienne_1_jour.test_line_insertion


sucess


In [3]:
from kafka import KafkaConsumer,TopicPartition
from pymongo import MongoClient,errors
import json
import time
import os
import pandas as pd

import time
import logging
from jaeger_client import Config
from opentracing_instrumentation.request_context import get_current_span, span_in_context



def consumer_single_collection():
    consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
    topic="eolienne_jour_1"
    tp = TopicPartition(topic,0)
    consumer.assign([tp])
    consumer.seek_to_end(tp)
    lastOffset = consumer.position(tp)
    print(lastOffset)
    consumer.seek_to_beginning(tp) 
    #end
    list_data=[]
    for message in consumer:
        msg = message.value.decode()
        list_data.append(msg)
        if message.offset == lastOffset - 1:
            print(message.offset)
            break
    consumer.close()
    return list_data

test_data=consumer_single_collection()

7270
7269


In [19]:
def str_to_dict(list_data):
    col= ['Heure','Temps écoulé','Latitude','Longitude',
          'Altitude','Head. Rel. True North','Pressure',
          'Temperature','Humidity','MDA Wnd Dir','MDA Wnd Speed',
          'MWD Wind Dir','MWD Wind Speed','CavityPressure',
          'CavityTemp','CH4','CH4_dry','C2H6','C2H6_dry',
          '13CH4','H2O','CO2','C2C1Ratio','Delta_iCH4_Raw',
          'HP_Delta_iCH4_30s','HP_Delta_iCH4_2min','HP_Delta_iCH4_5min']
    new_dict=[]
    for item in list_data:
        info=item.split(":",1)[1].split(";")
        dict_item ={col[i]:info[i] for i in range(len(col))}
        new_dict.append(dict_item)
    return new_dict

def verify_empty_str(list_dict):
    col= ['Heure','Temps écoulé','Latitude','Longitude',
          'Altitude','Head. Rel. True North','Pressure',
          'Temperature','Humidity','MDA Wnd Dir','MDA Wnd Speed',
          'MWD Wind Dir','MWD Wind Speed','CavityPressure',
          'CavityTemp','CH4','CH4_dry','C2H6','C2H6_dry',
          '13CH4','H2O','CO2','C2C1Ratio','Delta_iCH4_Raw',
          'HP_Delta_iCH4_30s','HP_Delta_iCH4_2min','HP_Delta_iCH4_5min']        
    for index,item in enumerate(list_dict,start=0):
        for i in range(len(col)) :
            if item.get(col[i]):
                continue
            else:
                print("line: ",index,", position: ",i)

In [15]:
res = str_to_dict(test_data)

In [20]:
verify_empty_str(res)

line:  7270 , position:  0
line:  7271 , position:  0
line:  7272 , position:  0
line:  7273 , position:  0


In [17]:
print(res[7271])

{'Heure': '', 'Temps écoulé': '7269', 'Latitude': '43.4127', 'Longitude': '-0.6416', 'Altitude': '95.2', 'Head. Rel. True North': '73.2', 'Pressure': '1.017', 'Temperature': '16.9', 'Humidity': '79.1', 'MDA Wnd Dir': '257', 'MDA Wnd Speed': '7', 'MWD Wind Dir': '256.6', 'MWD Wind Speed': '6.8', 'CavityPressure': '148', 'CavityTemp': '45', 'CH4': '2.01', 'CH4_dry': '2.05', 'C2H6': '-0.01', 'C2H6_dry': '-0.01', '13CH4': '0.02', 'H2O': '1.69', 'CO2': '392.66', 'C2C1Ratio': '0', 'Delta_iCH4_Raw': '-52.53', 'HP_Delta_iCH4_30s': '-50.41', 'HP_Delta_iCH4_2min': '-48.54', 'HP_Delta_iCH4_5min': '-48.06'}


In [13]:
test_data.append('7269:;7269;43.4127;-0.6416;95.2;73.2;1.017;16.9;79.1;257;7;256.6;6.8;148;45;2.01;2.05;-0.01;-0.01;0.02;1.69;392.66;0;-52.53;-50.41;-48.54;-48.06')