In [2]:
import json
import time
import random
import uuid
import string
import psycopg2
from faker import Faker
import os
import subprocess
import datetime

In [3]:
import sys
sys.path.append(os.getcwd())
from config import CONFIG
from consts import CONSTS

In [4]:
def read_distinct_restaurant_ids(host, port,database, user, password, table):
    # Establish a connection to the PostgreSQL database
    conn = psycopg2.connect(
        host=host,
        port = port,
        database=database,
        user=user,
        password=password
    )
    
    try:
        # Create a cursor object to interact with the database
        cur = conn.cursor()

        # Execute the SQL query to retrieve distinct restaurant_id values
        cur.execute(f"SELECT DISTINCT restaurant_id FROM {table}")

        # Fetch all the distinct restaurant_id values
        distinct_restaurant_ids = cur.fetchall()

        # Commit the transaction
        conn.commit()

        # Return the distinct restaurant_ids as a list
        return [record[0] for record in distinct_restaurant_ids]

    except (Exception, psycopg2.DatabaseError) as error:
        print("Error retrieving distinct restaurant_ids:", error)

    finally:
        # Close the cursor and connection
        cur.close()
        conn.close()

def generate_random_timestamp(after=None):
    if after:
        # Generate a random timestamp after the provided timestamp
        return random.randint(after, after + 100000)
      
    # Generate a random timestamp after 30.07.2023
    return random.randint(1677736800, 2688036800)

def save_counter(counter,file_path):
    # Save the counter to a file
    with open(file_path, "w") as file:
        file.write(str(counter))
        
def read_counter(file_path):
    try:
        with open(file_path, "r") as file:
            counter = int(file.read())
            return counter
    except FileNotFoundError:        
        return 1
    except ValueError:
        print("Invalid counter value in the file.")
        return -1
    
# Incremental counter for message ID
def generate_message(restaurant_ids):
    counter = read_counter(f"{CONFIG['GEN_PATH']}/{CONFIG['COUNTER_FILE_NAME']}")    

    fake = Faker()

    # Randomly select values
    restaurant_id = random.choice(restaurant_ids)
    adv_campaign_id = str(uuid.uuid4())
    adv_campaign_content = random.choice(CONSTS['RESTAURANT_CAMPAIGN'])
    adv_campaign_owner = fake.name()
    adv_campaign_owner_contact = fake.email()
    adv_campaign_datetime_start = generate_random_timestamp()
    adv_campaign_datetime_end = generate_random_timestamp(after=adv_campaign_datetime_start)
    datetime_created = int(time.time())

    # Create the message dictionary
    message = {        
        "restaurant_id": restaurant_id,
        "adv_campaign_id": adv_campaign_id,
        "adv_campaign_content": adv_campaign_content,
        "adv_campaign_owner": adv_campaign_owner,
        "adv_campaign_owner_contact": adv_campaign_owner_contact,
        "adv_campaign_datetime_start": adv_campaign_datetime_start,
        "adv_campaign_datetime_end": adv_campaign_datetime_end,
        "datetime_created": datetime_created
    }      

    # Increment and save the counter to file
    counter += 1    
    save_counter(counter,f"{CONFIG['GEN_PATH']}/{CONFIG['COUNTER_FILE_NAME']}")
    
    return f"{counter}: {json.dumps(message)}"

In [5]:
def update_log(attemp, returncode, output):    
    file_path = f"{CONFIG['GEN_PATH']}/log.txt"
    try:
        with open(file_path,'a') as file:
            time =  datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            line = f"{time} attemp: {attemp}, Return Code: {returncode}, Output: {output}"
            file.write(f"{line}\n")
            print(line)
    except Exception:    
        print('Something is wrong ')

In [6]:
def save_to_file(path,how,messages):        
    try:
        with open(path, how) as file:
            for m in messages:
                file.write(f"{m}\n")            
    except Exception as e:
        print(f"Something is wrong {e}")

In [7]:
def save_data(messages):    
    file_path = f"{CONFIG['DATA_PATH']}/data"
    save_to_file(file_path,'w',messages)

In [8]:
def save_all_data(messages):
    file_path = f"{CONFIG['GEN_PATH']}/all_data.txt"
    save_to_file(file_path,'a',messages)

In [9]:


def execute_bash_command(command):
    try:
        result = subprocess.run(command, shell=True, check=True, capture_output=True, text=True)
        output = result.stdout.strip()
        return result.returncode, output
    except subprocess.CalledProcessError as e:
        return e.returncode, e.stderr.strip()

def send_to_kafka():
    attemp = 1
    returncode = -1
    output = ""
    command = CONFIG['BASH_KAFKA']
    while returncode != 0:
        returncode, output = execute_bash_command(command)
        update_log(attemp,returncode,output)
        time.sleep(3)
        attemp+=1


In [10]:
def generator(pack_size,pack_number):
    
    restaurant_ids = read_distinct_restaurant_ids(
            CONFIG['POSTGRE_METADATA_SERVER'], \
            CONFIG['POSTGRE_METADATA_PORT'], \
            CONFIG['POSTGRE_METADATA_DB'], \
            CONFIG['POSTGRE_METADATA_USERNAME'], \
            CONFIG['POSTGRE_METADATA_PASSWORD'], \
            CONFIG['POSTGRE_METADATA_TB'])
    
    for i in range(0,pack_number,1):
        messages = []
        for i in range(0,pack_size,1):
            messages.append(generate_message(restaurant_ids))        
        save_data(messages)
        save_all_data(messages)
        send_to_kafka()
        time.sleep(3)


In [36]:
generator(5,1)

2023-08-04 21:06:41 attemp: 1, Return Code: 0, Output: 
