Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/refactoring v2 #4

Merged
merged 2 commits into from Sep 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions .gitignore
Expand Up @@ -226,7 +226,7 @@ $RECYCLE.BIN/
bin/
integrations/*/.idea/
.env
journey_bot/.env
channels/journey_bot/.env
journey_central/.idea/
journey_central/.jython_cache/
journey_central/treeassistant-tatan.json
Expand All @@ -236,9 +236,9 @@ journey_core_nlp/.generated/
journey_core_nlp/__pycache__/
journey_core_nlp/elasticsearch-7.8.0/*
*/.env
journey_speech/__pycache__/
journey_speech/.env
journey_speech/treeassistant-tatan.json
channels/journey_speech/__pycache__/
channels/journey_speech/.env
channels/journey_speech/treeassistant-tatan.json
lib/
log/
venv/
Expand Down
6 changes: 6 additions & 0 deletions channels/JourneyTelegramBot/Dockerfile
@@ -0,0 +1,6 @@
FROM python:3.10
WORKDIR /app
COPY requirements.txt ./
RUN pip install -r requirements.txt
COPY ./ /app
CMD ["python","-u","main.py"]
9 changes: 9 additions & 0 deletions channels/JourneyTelegramBot/kafka_connector.py
@@ -0,0 +1,9 @@
import json

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='kafka:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))


def send(topic, message):
producer.send(topic, {"msg": message, "source": "telegram"})
29 changes: 13 additions & 16 deletions journey_bot/bot_starter.py → channels/JourneyTelegramBot/main.py
@@ -1,42 +1,39 @@
import os
import mindmeld
import logging
from dotenv import load_dotenv
from telegram.ext import (
Updater,
CommandHandler,
MessageHandler,
Filters,
ConversationHandler,
CallbackContext,
CallbackContext, Filters,
)
import logging
import requests
from dotenv import load_dotenv
from typing import TypeVar

import kafka_connector

url = "http://localhost:7150/parse"

load_dotenv()

updater = Updater(token=os.getenv('BOT_TOKEN'), use_context=True)
updater = Updater(token='5617224032:AAG7RzjQ6vva7nreeOaCk6IHFBE3t9k23VA', use_context=True)
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
dispatcher = updater.dispatcher
RT = TypeVar('RT')


def start(update, context):
context.bot.send_message(chat_id=update.effective_chat.id, text="Hello, I'm Journey. What can I do for you?")


def message_received(update: Updater, context: CallbackContext):
def telegram_message_received(update: Updater, context: CallbackContext) -> RT:
msg = update.message.text
body = {"text": msg}
response = requests.post(url, json=body)
print(response.json())
context.bot.send_message(chat_id=update.effective_chat.id, text=response.json()["directives"][0].payload.text)
context.bot.send_message(chat_id=update.effective_chat.id, text=f"Sending '{msg}' to journey_nlp over kafka.")
kafka_connector.send('journey_nlp.incoming_messages', msg)
return 200


message_handler = MessageHandler(Filters.text, message_received)
message_handler = MessageHandler(Filters.text & (~Filters.command), telegram_message_received)
dispatcher.add_handler(message_handler)

start_handler = CommandHandler('start', start)
dispatcher.add_handler(start_handler)

Expand Down
3 changes: 3 additions & 0 deletions channels/JourneyTelegramBot/requirements.txt
@@ -0,0 +1,3 @@
python-telegram-bot
python-dotenv
kafka-python
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
12 changes: 12 additions & 0 deletions channels/journey_speech/google-application-credentials.json
@@ -0,0 +1,12 @@
{
"type": "service_account",
"project_id": "treeassistant-tatan",
"private_key_id": "c17762b3284ad4b2892da86b5acad31d81456480",
"private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQCxPkpEu2F/AgfT\nRN1E6d3AWKuUlN6EvWtbMkzJBTVPNE3dHNYwmXv+BdiPr63ZBIruRGngkzmlTWmA\nBnXzxL62ddyUUAGMix0Ei6iI9Dd31YUf8fpxfoesckoTVPVygIVjxaN9a3Bp3vl5\nHffGcMYCJqUjEempno0B4OM+d4NY4Z7giBkUrTJ1OPKWFwJm0EGHVoO+mmMLlnoj\ny4ybKlxzbY67jFRD6x4Ua9q5G3D19h2Xi1GnySIKhwCGRCcl6CIPJr0hJxyBzHS0\ny40QnBPYLL9cM8t8bUuMMvjWyz8On075aEyaUuqSrUE8aLn5vFBSiKHG6f+UFi/8\nW+YwEmxzAgMBAAECggEAB+t3a7ICKnSvkm2MHZCXPtY3d7PEBaMyvg51ukqOfHPv\n240QUERrcyK7vIZMrfSZm9ZYWZdg7Tv6SaUAQqlsUPOo3/YU1TzaFq8brSyMb430\nHUTovFPuqAv4HckFqd1VGIVnFF5nbxoH6uhZGMR6DidZgnZ1PgXYDL9wKAU0snuM\nHhr9DlvPeqh7Y6ghU11wTHEobZV5Ys9Kiz4cekWYKRe4/8yaYHOPThCOIMVukS5q\nxekTIkIcciFOSAqQ7g4U+PILf1+9ZxDbtsdLs0s5R37v8o7YpCxvFaZ6zNxAHrbS\noFrLi2hOUb1deFw5H1a2wADDR/UbcHAlgIj+Vl36cQKBgQDlS6slj52OXNHqiWar\nGkBwbVU9eBKOIT6G6TVSZEfVhUCB5eU+A+zofEnHxoeI6os7JcdW5SBzn7p7tJwN\nBgtvSFF0DLUMVGhyLH6VfCCLC9HsmY0b5f8Nf/+KJz3oZIADcLgtwK7cZwROGTVF\n9I5SxSq/5Y+P9CT82O9Bf7XQ1wKBgQDF4rW4eyjMDsE1orVKXr+K3ItAcOKg5LCC\np83iwtrmt6LiSRTe6JQsSmlkRZhV0z4zFjKqoyYem6u3yhCAUPk2V00vx2hy9gxz\nuGXtY0WZRnGZ4HG+mscORi5JpHr28WjJCSQDUA4jCtpJ4cSUizDikHOXDlKw4TGL\nweP7gAshxQKBgQCyrRVPrqrq7b+SEMHVYR6HvVroDHIBGTy9KmsnqgckBrMuxfOu\nMKbivz9LEpQ5B/O5AoZkWyW/k99gWGwl5ejxLZxzXxeR5RCCr9NwLwq471wFD96a\nt83ZOj3p7Qxb1DzA8YxD8tsP2qLLLO+LdLeXHhdi88K8PVa5RV6jXp5SfwKBgQCs\nQ/wmKTEO0xlnc8mWgH0GiY+//YWMwwjV9PbKnQ6rItntW+mLU7UKCY1N+TfwHwFa\nv8xDt9uLccE/MezDh7orkDxlh5Zp3cBhTPmOeokwMZ7bcgQTBfKbfoT1Hvbc2rN5\nrzdmmt5zqxL0llHnbnSUB2RY7+MUkDXtkFDDtGf8cQKBgQCAescmKqvwC+rXJb1c\nU+HOS9DJMzQzjI54MZqgfjcN68C1zm8Y6TZ9lBGkkGHOoRVolCY5QLTrgeFzCoTC\ns+uhss9CEewRJs8OxxfnAr373JG8ZiGIrw4PHFG3hlPiCCNKoOarEhWT09EAOPY0\ng/LjdJToLWRghp3sFC4v6FUKTA==\n-----END PRIVATE KEY-----\n",
"client_email": "treetester@treeassistant-tatan.iam.gserviceaccount.com",
"client_id": "109961751830646460170",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/treetester%40treeassistant-tatan.iam.gserviceaccount.com"
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
21 changes: 21 additions & 0 deletions channels/voice/__init__.py
@@ -0,0 +1,21 @@
class JourneyChannel:
def __init__(self, name__):
self.name = name__
self._sessions = dict()

def start(self):
pass

def send_message(self, originator_id, message):
print("Received message: " + message)
pass

def process_message(self, func):
def wrapper():

func(session, message)
print("after")
return wrapper

def get_session(self, dialogue_id):
return self._sessions[dialogue_id]
File renamed without changes.
149 changes: 84 additions & 65 deletions docker-compose.yml
@@ -1,80 +1,99 @@
version: '3.3'
version: '3.9'

services:
backend:
build:
context: ./
dockerfile: ./journey_central/Dockerfile
container_name: journey_central
restart: on-failure
ports:
- $SPRING_LOCAL_PORT:$SPRING_DOCKER_PORT
- 2375:2375
- 2376:2376
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
SPRING_APPLICATION_JSON: '{
"spring.datasource.url" : "jdbc:mysql://journey_database:$MYSQLDB_DOCKER_PORT/$MYSQLDB_DATABASE?useSSL=false",
"spring.datasource.username" : "$MYSQLDB_USER",
"spring.datasource.password" : "$MYSQLDB_ROOT_PASSWORD",
"spring.jpa.properties.hibernate.dialect" : "org.hibernate.dialect.MySQL5InnoDBDialect",
"spring.jpa.hibernate.ddl-auto" : "update"
}'
GOOGLE_APPLICATION_CREDENTIALS: "google-application-credentials.json"
tty: true
stdin_open: true
volumes:
- .m2:/root/.m2
- journey_nlp_domains:/usr/journey_core_nlp/domains/
- journey_nlp_entities:/usr/journey_core_nlp/entities/
- journey_nlp_data:/usr/journey_core_nlp/data/
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "22181:2181"
healthcheck:
test: ["CMD", "nc", "-vz", "localhost", "2181"]
start_period: 10s
interval: 10s
timeout: 5s
retries: 3

kafka:
image: confluentinc/cp-kafka:latest
restart: always
depends_on:
- journey_database

command: "sh test.sh"
journey_database:
#container_name: journey_central_db
restart: unless-stopped
image: mysql:5.7
command: --init-file /var/lib/mysql_journey/init.sql
zookeeper:
condition: service_healthy
ports:
- $MYSQLDB_LOCAL_PORT:$MYSQLDB_DOCKER_PORT
- "29092:9092"
environment:
MYSQL_ROOT_PASSWORD: $MYSQLDB_ROOT_PASSWORD
MYSQL_DATABASE: $MYSQLDB_DATABASE
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
healthcheck:
test: ["CMD", "nc", "-vz", "localhost", "9092"]
start_period: 15s
interval: 5s
timeout: 10s
retries: 10

journey_telegram_bot:
restart: always
depends_on:
kafka:
condition: service_healthy
journey_core_nlp:
condition: service_started
build:
context: ./channels/JourneyTelegramBot
volumes:
- my-db:/var/lib/mysql
- ./init.sql:/var/lib/mysql_journey/init.sql
journey_nlp:
build: ./journey_core_nlp
- ./channels/JourneyTelegramBot:/app
journey_core_nlp:
restart: always
depends_on:
kafka:
condition: service_healthy
build:
context: ./journey_core_nlp
volumes:
- journey_nlp_domains:/root/journey_nlp/domains/
- journey_nlp_entities:/root/journey_nlp/entities/
- journey_nlp_data:/root/journey_nlp/data/
- ./journey_core_nlp:/app

journey_central_database:
restart: always
image: mysql/mysql-server:8.0.23
ports:
- 7150:7150
journeyintegrations:
build:
context: ./
dockerfile: ./journey_integrations_executer/Dockerfile
- "3306:3306"
environment:
PULSE_SERVER: /mnt/wslg/PulseServer
ports:
- 9090:9090
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: dev_journey
MYSQL_USER: Journey
MYSQL_PASSWORD: test_password123
healthcheck:
test: "mysqladmin ping -h 127.0.0.1 -u root --password=root"
timeout: 20s
retries: 10
volumes:
- /mnt/wslg/:/mnt/wslg/
journey_speech:
build: ./journey_speech
environment:
PULSE_SERVER: /mnt/wslg/PulseServer
GOOGLE_APPLICATION_CREDENTIALS: "google-application-credentials.json"
- journey-central-db:/var/lib/mysql_journey
- ./mysql-init-files:/docker-entrypoint-initdb.d

journey_central:
restart: always
depends_on:
kafka:
condition: service_healthy
journey_central_database:
condition: service_started
build:
context: ./journey_central
volumes:
- /mnt/wslg/:/mnt/wslg/


- ./journey_central:/workspace/app
telegram_sender_bot:
depends_on:
- journey_central
build:
context: ./integrations/TelegramSenderIntegration
volumes:
- ./integrations/TelegramSenderIntegration:/app

volumes:
my-db:
journey_nlp_domains:
journey_nlp_entities:
journey_nlp_data:
journey-central-db:
6 changes: 6 additions & 0 deletions integrations/TelegramSenderIntegration/Dockerfile
@@ -0,0 +1,6 @@
FROM python:3.10
WORKDIR /app
COPY requirements.txt ./
RUN pip install -r requirements.txt
COPY ./ /app
CMD ["python","-u","main.py"]
78 changes: 78 additions & 0 deletions integrations/TelegramSenderIntegration/main.py
@@ -0,0 +1,78 @@
import json
import logging
import threading
from typing import TypeVar

from kafka import KafkaConsumer
from telegram.ext import (
Updater,
CommandHandler,
)

updater = Updater(token='5449115559:AAGcG99ZIIITgEvR7NGBnHrFbYLnxXeJtTg', use_context=True)
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
dispatcher = updater.dispatcher
RT = TypeVar('RT')


def start(update, context):
context.bot.send_message(chat_id=update.effective_chat.id, text="Hello, I'm the integration!")


def send_message(bot, chat_id, message):
bot.send_message(chat_id=chat_id, text=message)


def handle_message(message):
if message.value:
value = json.loads(message.value)
print(value)
if value['intent'] == 'telegram_message':
entities = value['entities']
chat_id = entities[0]['attributes']['chat_id']
send_message(updater.bot, chat_id, "We made a whole trip!")


class UpdaterPolling(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)

def run(self):
start_handler = CommandHandler('start', start)
dispatcher.add_handler(start_handler)
updater.start_polling()


class Consumer(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.stop_event = threading.Event()

def stop(self):
self.stop_event.set()

def run(self):
consumer = KafkaConsumer(bootstrap_servers='kafka:9092',
auto_offset_reset='earliest',
consumer_timeout_ms=1000)
consumer.subscribe(['telegram_sender.telegram_message'])
print("Starting to consume messages")
while not self.stop_event.is_set():
for message in consumer:
handle_message(message)
if self.stop_event.is_set():
break

consumer.close()


logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
print("creating consumer")
consumer = Consumer()
consumer.start()
poller = UpdaterPolling()
poller.start()
consumer.join()
poller.join()
2 changes: 2 additions & 0 deletions integrations/TelegramSenderIntegration/requirements.txt
@@ -0,0 +1,2 @@
kafka-python
python-telegram-bot
3 changes: 3 additions & 0 deletions journey_apis/python/kafka_producer.py
@@ -0,0 +1,3 @@
class KafkaProducer:
def __init__(self):
pass