@@ -0,0 +1,102 @@
package com.kgribov.telegram

import java.time.{Instant, ZoneId, ZonedDateTime}

import com.fasterxml.jackson.annotation.JsonProperty
import com.kgribov.telegram.model._

package object json {

case class UpdateJson(@JsonProperty(required = true, value = "update_id") id: Int,
@JsonProperty(value = "message") message: Option[MessageJson],
@JsonProperty(value = "callback_query") callbackQuery: Option[CallbackQueryJson]) {

def toModel: Update = {
val messageModel = message.map(_.toModel)
val callbackModel = callbackQuery.map(_.toMessageModel)

Update(id, messageModel.getOrElse(callbackModel.get))
}
}

case class UserJson(@JsonProperty(required = true, value = "id") id: Int,
@JsonProperty(required = true, value = "is_bot") isBot: Boolean,
@JsonProperty(required = true, value = "first_name") firstName: String,
@JsonProperty(value = "last_name") lastName: Option[String],
@JsonProperty(value = "username") username: Option[String]) {

def toModel: User = {
User(id, isBot, firstName, lastName, username)
}
}

case class ChatJson(@JsonProperty(required = true, value = "id") id: Int,
@JsonProperty(value = "title") title: Option[String],
@JsonProperty(value = "description") description: Option[String],
@JsonProperty(required = true, value = "type") chatType: String) {

def toModel: Chat = {
Chat(id, title, description, chatType)
}
}

case class CallbackQueryJson(@JsonProperty(required = true, value = "id") id: String,
@JsonProperty(required = true, value = "from") from: UserJson,
@JsonProperty(value = "message") message: MessageJson,
@JsonProperty(value = "data") data: String) {

def toMessageModel: Message = {
Message(id, from.toModel, None, Some(message.toModel), true, message.getDate, message.toModel.chat, data)
}
}

case class MessageJson(@JsonProperty(required = true, value = "message_id") id: Int,
@JsonProperty(required = true, value = "from") from: UserJson,
@JsonProperty(required = true, value = "date") date: Int,
@JsonProperty(value = "reply_to_message") replyTo: Option[MessageJson],
@JsonProperty(required = true, value = "chat") chat: ChatJson,
@JsonProperty(value = "text") rawText: Option[String]) {

def getDate: ZonedDateTime = Instant.ofEpochSecond(date).atZone(ZoneId.of("Z"))

def command: Option[String] = {
if (splitText._1.isEmpty) {
None
} else {
Some(splitText._1)
}
}

def text: String = splitText._2

private def splitText: (String, String) = {
if (rawText.isEmpty) {
("", "")

} else if (rawText.isDefined && rawText.get.startsWith("/")) {
val commandRegex = "/(\\w+)(\\s?)(.*)".r
val commandRegex(command, _, messageText) = rawText.get
(command, messageText)
} else {
("", rawText.getOrElse(""))
}
}

def toModel: Message = {
Message(id.toString, from.toModel, command, replyTo.map(_.toModel), false, getDate, chat.toModel, text)
}
}

case class InlineKeyboardMarkup(@JsonProperty(required = true, value = "inline_keyboard") keyboard: Array[Array[InlineKeyboardButton]])

object InlineKeyboardMarkup {
def fromKeyboard(keyboard: Keyboard): InlineKeyboardMarkup = {
InlineKeyboardMarkup(keyboard.buttons.map(
button => Array(InlineKeyboardButton(button, Some(button)))
).toArray)
}
}

case class InlineKeyboardButton(@JsonProperty(required = true, value = "text") text: String,
@JsonProperty(value = "callback_data") callbackData: Option[String] = None)
}
@@ -0,0 +1,35 @@
package com.kgribov.telegram

import java.time.ZonedDateTime

package object model {

case class Update(id: Int, message: Message)

case class User(id: Int, isBot: Boolean, firstName: String, lastName: Option[String], username: Option[String])

case class Chat(id: Int, title: Option[String], description: Option[String], chatType: String)

case class MessageToSend(chatId: Int, text: String, replyKeyboard: Option[Keyboard] = None)

trait ToSendMessage { def toSend(chatId: Int): MessageToSend }

case class QuizMessage(question: String, options: List[String]) extends ToSendMessage {
def toSend(chatId: Int): MessageToSend = {
MessageToSend(chatId, question, Some(Keyboard(options)))
}
}

case class Message(id: String,
from: User,
command: Option[String] = None,
replyTo: Option[Message] = None,
replyToKeyboard: Boolean = false,
date: ZonedDateTime,
chat: Chat,
text: String)

case class KeyboardAlert(messageId: String, text: String = "", showAlert: Boolean = false)

case class Keyboard(buttons: List[String])
}
@@ -0,0 +1,50 @@
package com.kgribov.telegram

import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.kgribov.telegram.json.{InlineKeyboardMarkup, MessageJson, UpdateJson}
import com.kgribov.telegram.model.Keyboard
import com.typesafe.scalalogging.LazyLogging

package object parser extends LazyLogging {

private val mapper = {
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper
}

def parseUpdates(json: String): List[UpdateJson] = {
val response = mapper.readValue[UpdatesResponse](json)
if (response.ok) {
response.result
} else {
logger.error(s"Message with bad status: [$response]")
throw new Exception(s"Unable to parse bad message $json")
}
}

def parseMessageResponse(json: String): MessageJson = {
val response = mapper.readValue[MessageResponse](json)
if (response.ok) {
response.result
} else {
logger.error(s"Message with bad status: [$response]")
throw new Exception(s"Unable to parse bad message $json")
}
}

def parseMessage(json: String): MessageJson = {
mapper.readValue[MessageJson](json)
}

def keyboardToJson(keyboard: Keyboard): String = {
mapper.writeValueAsString(InlineKeyboardMarkup.fromKeyboard(keyboard))
}

private case class UpdatesResponse(ok: Boolean, result: List[UpdateJson])

private case class MessageResponse(ok: Boolean, result: MessageJson)
}
@@ -0,0 +1,37 @@
package com.kgribov.telegram.process

import com.kgribov.telegram.model.User

class DialogAnswers(answers: List[(String, Answer)] = List()) {

def withAnswer(question: String, answer: Answer): DialogAnswers = {
new DialogAnswers((question, answer) :: answers)
}

def lastAnswer: Answer = {
if (answers.isEmpty) {
throw new Exception("You should ask something before read answers")
}
answers.head._2
}

def lastTextAnswer: String = {
if (answers.isEmpty) {
throw new Exception("You should ask something before read answers")
}
answers.head._2.simpleAnswer.text
}

def allTextAnswers: Map[String, String] = answers.groupBy(_._1).map(answers =>
(answers._1, answers._2.head._2.simpleAnswer.text)
)

def lastAnswersFromUsers: List[(User, String)] =
answers
.head
._2
.answers
.groupBy(_.from)
.map(userAnswers => (userAnswers._1, userAnswers._2.head.text))
.toList
}
@@ -0,0 +1,48 @@
package com.kgribov.telegram.process

import java.time.{Clock, ZoneId, ZonedDateTime}

import com.kgribov.telegram.model.Message
import com.kgribov.telegram.sender.MessageSender
import com.typesafe.scalalogging.LazyLogging

import scala.collection.mutable
import scala.util.{Failure, Success, Try}

class DialogPool(messageSender: MessageSender,
clock: Clock = Clock.systemDefaultZone()) extends LazyLogging {

private val activeDialogs = new mutable.HashMap[Int, DialogProcessor]()

def activateDialog(chatId: Int, dialog: DialogProcessor): Unit = {
activeDialogs.put(chatId, dialog)
}

def processMessages(messages: List[Message]): Unit = {
cleanDialogs()

val messagesToChat = messages.groupBy(_.chat.id)
activeDialogs.keySet.foreach(key => {
val messagesForDialog = messagesToChat.get(key)
val dialogProcessor = activeDialogs(key)
processMessagesSafely(dialogProcessor, messagesForDialog)
})
}

private def processMessagesSafely(dialogProcessor: DialogProcessor, messages: Option[List[Message]]): Unit = {
val processResult = Try(dialogProcessor.processMessages(messages))
processResult match {
case Success(_) => logger.debug(s"Successfully process dialog messages: $messages")
case Failure(ex) => logger.error(s"Unable to process messages for dialog : [$messages]", ex)
}
}

private def cleanDialogs(): Unit = {
val activeChats = activeDialogs.keySet
activeChats
.filter(chatId => !activeDialogs(chatId).isActive(now))
.foreach(activeDialogs.remove)
}

private def now: ZonedDateTime = clock.instant().atZone(ZoneId.systemDefault())
}
@@ -0,0 +1,76 @@
package com.kgribov.telegram.process

import java.time.ZonedDateTime
import java.util.concurrent.TimeUnit

import com.kgribov.telegram.model.{Message, User}
import com.kgribov.telegram.sender.MessageSender

import scala.concurrent.duration.Duration

class DialogProcessor(chatId: Int,
dialogTimeout: Duration = Duration(5, TimeUnit.MINUTES),
messageSender: MessageSender,
questions: List[DialogAnswers => DialogQuestion],
withUserOnly: Option[User],
now: ZonedDateTime = ZonedDateTime.now()) {

private val expiredTime = now.plusNanos(dialogTimeout.toNanos)

private var questionsToAsk = questions
private var currentQuestion: Option[DialogQuestion] = None
private var answers = new DialogAnswers

def processMessages(messages: Option[List[Message]]): Unit = {
if (isActive()) {
if (questionIsNotAskedYet) {
askQuestion()
} else {
val orderedMes = messages.map(_.sortBy(_.date.toInstant.toEpochMilli))
orderedMes.foreach(_.foreach(process))
}

if (currentQuestion.exists(_.isDone().isDefined)) {
val question = currentQuestion.get
answers = answers.withAnswer(question.questionText, question.isDone().get)
moveToNextQuestion()
}
}
}

private def process(message: Message): Unit = {
currentQuestion.foreach(question => {
if (questionFromUser(message)) {
question.processReply(message)
}
})
}

def isActive(currentTime: ZonedDateTime = ZonedDateTime.now()): Boolean = {
expiredTime.isAfter(currentTime) && dialogIsNotEnd
}

private def questionIsNotAskedYet: Boolean = currentQuestion.isEmpty

private def moveToNextQuestion(): Unit = {
currentQuestion = None
questionsToAsk = questionsToAsk.tail
}

private def questionFromUser(message: Message): Boolean = {
if (withUserOnly.isEmpty) {
true
} else {
withUserOnly.get == message.from
}
}

private def askQuestion(): Unit = {
currentQuestion = Some(questionsToAsk.head(answers))
currentQuestion.get.askQuestion(chatId)
}

private def dialogIsNotEnd: Boolean = questionsToAsk.nonEmpty || currentQuestion.isDefined
}


@@ -0,0 +1,139 @@
package com.kgribov.telegram.process

import java.time.ZonedDateTime

import com.kgribov.telegram.model._
import com.kgribov.telegram.sender.MessageSender

trait DialogQuestion {

def askQuestion(chatId: Int): String

def isAsked: Boolean

def processReply(reply: Message)

def isDone(currentTime: ZonedDateTime = ZonedDateTime.now()): Option[Answer]

def questionText: String
}

case class Answer(answers: List[Message]) {

def isIgnored: Boolean = answers.isEmpty

def isSimple: Boolean = answers.size == 1

def simpleAnswer: Message = answers.head

def allAnswers: List[Message] = answers
}

case class KeyboardDialogQuestion(messageSender: MessageSender,
text: String,
possibleAnswers: List[String],
alertOnAnswer: String => String,
expiredAt: Option[ZonedDateTime] = None) extends DialogQuestion {

private var questionId: Option[String] = None
private var answers = List[Message]()

override def askQuestion(chatId: Int): String = {
questionId = Some(messageSender.send(question(chatId)).id)
questionId.get
}

override def isAsked: Boolean = questionId.isDefined

override def processReply(reply: Message): Unit = {
val isReplyToQuestion =
questionId.isDefined &&
reply.replyTo.exists(_.id == questionId.get) &&
possibleAnswers.contains(reply.text)

if (reply.replyToKeyboard && isReplyToQuestion) {
if (answers.exists(ans => ans.from == reply.from)) {
// already have answer from such user
messageSender.sendKeyboardAlert(KeyboardAlert(reply.id, "You have already gave an answer", showAlert = true))
} else {
answers = reply :: answers
messageSender.sendKeyboardAlert(KeyboardAlert(reply.id, alertOnAnswer(reply.text)))
}
}
}

override def isDone(currentTime: ZonedDateTime = ZonedDateTime.now()): Option[Answer] = {
if (expiredAt.isDefined) {
if (expiredAt.get.isBefore(currentTime)) {
Some(Answer(answers))
} else {
None
}

} else {
if (answers.isEmpty) {
None
} else {
Some(Answer(answers))
}
}
}

override def questionText: String = text


private def question(chatId: Int): MessageToSend = {
MessageToSend(chatId, text, Some(Keyboard(possibleAnswers)))
}
}

case class TextDialogQuestion(messageSender: MessageSender,
text: String,
alertOnAnswer: Option[String => String] = None) extends DialogQuestion {

private var questionId: Option[String] = None
private var answer: Option[Answer] = None

override def isAsked: Boolean = questionId.isDefined

override def askQuestion(chatId: Int): String = {
questionId = Some(messageSender.send(question(chatId)).id)
questionId.get
}

override def processReply(reply: Message): Unit = {
if (answer.isEmpty) {
answer = Some(Answer(List(reply)))
} else {
// user already gave the answer
}
}

override def isDone(currentTime: ZonedDateTime = ZonedDateTime.now()): Option[Answer] = {
answer
}

override def questionText: String = text

private def question(chatId: Int): MessageToSend = {
MessageToSend(chatId, text)
}
}

case class DummyDialogQuestion(dummyText: String, messageSender: MessageSender) extends DialogQuestion {

private var questionId: Option[String] = None

override def askQuestion(chatId: Int): String = {
questionId = Some(messageSender.send(MessageToSend(chatId, dummyText)).id)
questionId.get
}

override def processReply(reply: Message): Unit = {}

override def isDone(currentTime: ZonedDateTime): Option[Answer] = Some(Answer(List()))

override def questionText: String = "Dummy"

override def isAsked = questionId.isDefined
}
@@ -0,0 +1,61 @@
package com.kgribov.telegram.process

import java.time.Clock

import com.kgribov.telegram.model.{Message, MessageToSend}
import com.kgribov.telegram.sender.MessageSender

class MessageProcessor(anyMessageProcessors: List[Message => Option[String]],
simpleCommandsProcessors: Map[String, Message => Option[String]],
dialogsProcessors: Map[String, Message => DialogProcessor],
messageSender: MessageSender) {

private val dialogPool = new DialogPool(messageSender, Clock.systemDefaultZone())

def processMessages(messages: List[Message]): Unit = {
processAnyMessage(messages)
processCommands(messages)
processDialogs(messages)
}

private def processAnyMessage(messages: List[Message]): Unit = {
val replyMessages = messages.flatMap(message => {
anyMessageProcessors.map(processFun => {
processFun(message)
.map(replyText => MessageToSend(message.chat.id, replyText))
})
}).flatten
messageSender.sendMessages(replyMessages)
}

private def processCommands(messages: List[Message]): Unit = {
val replyMessages = messages
.filter(_.command.isDefined)
.flatMap(message => {
val command = message.command.get
simpleCommandsProcessors
.get(command)
.map(_.apply(message))
.flatMap(reply => reply.map(replyText => MessageToSend(message.chat.id, replyText)))
})
messageSender.sendMessages(replyMessages)
}

private def processDialogs(messages: List[Message]): Unit = {
createDialogs(messages)
dialogPool.processMessages(messages)
}

private def createDialogs(messages: List[Message]): Unit = {
val messageToCommand = messages
.filter(_.command.isDefined)
.filter(message => dialogsProcessors.contains(message.command.get))
.groupBy(_.command.get)

messageToCommand.foreach {
case (command, messagesOfCommand) => messagesOfCommand.foreach(message => {
dialogPool.activateDialog(message.chat.id, dialogsProcessors(command)(message))
})
}
}
}
@@ -0,0 +1,65 @@
package com.kgribov.telegram.sender

import com.kgribov.telegram.model.{KeyboardAlert, Message, MessageToSend}
import com.kgribov.telegram.parser._
import com.typesafe.scalalogging.LazyLogging

import scala.util.{Failure, Success, Try}
import scalaj.http.{Http, HttpRequest}

class MessageSender(apiKey: String,
retries: Int = 15,
sleepBetweenRetriesInMs: Int = 1000) extends LazyLogging {

def sendMessages(messages: List[MessageToSend]): List[Message] = {
messages.map(send)
}

def send(message: MessageToSend): Message = {
logger.info(s"Going to send message: $message")
val replyMarkup = message.replyKeyboard match {
case Some(keyboard) => Seq(("reply_markup", keyboardToJson(keyboard)))
case None => Seq[(String, String)]()
}

val params = Seq(
("chat_id", message.chatId.toString),
("text", message.text)
) ++ replyMarkup

val request = Http(getSendMessageUrl).postForm(params)
val response = requestForResponse(request)

parseMessageResponse(response).toModel
}

private def requestForResponse(request: HttpRequest, tryCount: Int = 0): String = {
val response = Try(request.asString.body)
response match {
case Success(body) => body
case Failure(ex) => {
logger.error(s"Unable to send request $request. Try count: $tryCount", ex)
if (tryCount == retries) {
throw new Exception("Max retries is reached for send request")
} else {
requestForResponse(request, tryCount + 1)
}
}
}
}

def sendKeyboardAlert(keyboardAlert: KeyboardAlert): Unit = {
Http(getAnswerCallbackUrl)
.postForm(Seq(
("callback_query_id", keyboardAlert.messageId.toString),
("text", keyboardAlert.text),
("show_alert", keyboardAlert.showAlert.toString)
)).asString
}

private def botHostName: String = s"https://api.telegram.org/bot$apiKey/"

private def getSendMessageUrl = botHostName + "sendMessage"

private def getAnswerCallbackUrl = botHostName + "answerCallbackQuery"
}
@@ -0,0 +1,12 @@
package com.kgribov.telegram.source

class FileBasedOffsetStore(fileName: String) extends OffsetStore {

override def loadOffset: Int = {
0
}

override def store(offset: Int): Unit = {
//nothing still
}
}
@@ -0,0 +1,14 @@
package com.kgribov.telegram.source

class InMemoryOffsetStore(startOffset: Int = 0) extends OffsetStore {

private var currentOffset = startOffset

override def loadOffset: Int = currentOffset

override def store(offset: Int): Unit = {
currentOffset = offset
}

def getCurrentOffset: Int = currentOffset
}
@@ -0,0 +1,27 @@
package com.kgribov.telegram.source

import com.kgribov.telegram.model.{Message, Update}
import com.typesafe.scalalogging.LazyLogging

class MessagesSource(loadUpdates: Int => List[Update], offsetStore: OffsetStore) extends LazyLogging {

def getNewMessages(): List[Message] = {
val offset = offsetStore.loadOffset
val updates = loadUpdates(offset)

logger.info(s"Load [${updates.size}] new messages")
logger.debug(s"Load next messages: $updates")

val nextOffset = if (updates.isEmpty) {
offset
} else {
updates.map(_.id).max + 1
}

logger.info(s"Next offset is [$nextOffset]")

offsetStore.store(nextOffset)

updates.filter(_.message != null).map(_.message)
}
}
@@ -0,0 +1,8 @@
package com.kgribov.telegram.source

trait OffsetStore {

def loadOffset: Int

def store(offset: Int)
}
@@ -0,0 +1,32 @@
package com.kgribov.telegram.source

import com.kgribov.telegram.model.Update
import com.kgribov.telegram.parser._
import com.typesafe.scalalogging.LazyLogging

import scalaj.http.Http

class TelegramUpdatesLoader(apiKey: String) extends LazyLogging {

def loadUpdates(fromOffset: Int): List[Update] = {
val response = Http(getUpdatesUrl)
.param("offset", fromOffset.toString)
.asString

if (response.isError) {
throw new UnableToGetUpdates(response.code)
}

val responseBody = response.body
logger.debug(s"Get next response: [$responseBody]")

parseUpdates(responseBody).map(_.toModel)
}

private def botHostName: String = s"https://api.telegram.org/bot$apiKey/"

private def getUpdatesUrl = botHostName + "getUpdates"
}

class UnableToGetUpdates(returnCode: Int)
extends Exception(s"Unable to get updates from Telegram server. Return code [$returnCode]")
@@ -0,0 +1,52 @@
{
"ok": true,
"result": [
{
"update_id": 776799926,
"message": {
"message_id": 2,
"from": {
"id": 135287549,
"is_bot": false,
"first_name": "Kirill",
"last_name": "Gribov",
"username": "kirilkadurilka",
"language_code": "en-RU"
},
"chat": {
"id": 135287549,
"first_name": "Kirill",
"last_name": "Gribov",
"username": "kirilkadurilka",
"type": "private",
"description": "lol"
},
"date": 1517256282,
"text": "hey"
}
},
{
"update_id": 776799927,
"message": {
"message_id": 3,
"from": {
"id": 135287549,
"is_bot": false,
"first_name": "Kirill",
"last_name": "Gribov",
"username": "kirilkadurilka",
"language_code": "en-RU"
},
"chat": {
"id": 135287549,
"first_name": "Kirill",
"last_name": "Gribov",
"username": "kirilkadurilka",
"type": "private"
},
"date": 1517257555,
"text": "hello"
}
}
]
}
@@ -0,0 +1,35 @@
package com.kgribov.telegram.model

import com.kgribov.telegram.json.MessageJson
import org.scalatest.{FunSuite, Matchers}

class MessageJsonTest extends FunSuite with Matchers {

test("model should return command from text") {
val messageText = Some("/testcommand haha haha")
val message = MessageJson(1, null, 1, null, rawText = messageText, chat = null)

message.command should be (Some("testcommand"))
}

test("model should return only text, if command is not present") {
val messageText = Some("just only text")
val message = MessageJson(1, null, 1, null, rawText = messageText, chat = null)

message.text should be ("just only text")
}

test("model should return text of command") {
val messageText = Some("/testcommand your text here")
val message = MessageJson(1, null, 1, null, rawText = messageText, chat = null)

message.text should be ("your text here")
}

test("model should return command without text") {
val messageText = Some("/testcommand")
val message = MessageJson(1, null, 1, null, rawText = messageText, chat = null)

message.command should be (Some("testcommand"))
}
}
@@ -0,0 +1,38 @@
package com.kgribov.telegram.parser

import org.scalatest.{FunSuite, Matchers}

import scala.io.Source

class ParserTest extends FunSuite with Matchers {

test("parser should read response from telegram correctly") {
val response = Source.fromResource("response_from_user.json").mkString

val updates = parseUpdates(response)

updates should have size 2

updates.head.id should be(776799926)
}

test("parser should return none if field is missing") {
val response = Source.fromResource("response_from_user.json").mkString

val updates = parseUpdates(response)

updates should have size 2

updates.head.message.get.chat.title should be (None)
}

test("parser should return some if field is present") {
val response = Source.fromResource("response_from_user.json").mkString

val updates = parseUpdates(response)

updates should have size 2

updates.head.message.get.chat.description should be (Some("lol"))
}
}
@@ -0,0 +1,87 @@
package com.kgribov.telegram.process

import java.time.ZonedDateTime
import java.util.UUID
import java.util.concurrent.TimeUnit

import com.kgribov.telegram.model.{Chat, Message, MessageToSend, User}
import com.kgribov.telegram.sender.MessageSender
import org.scalamock.scalatest.MockFactory
import org.scalatest.{FunSuite, Matchers}

import scala.concurrent.duration.Duration

class DialogProcessorTest extends FunSuite with Matchers with MockFactory {

private val chatId = 1
private val withUser = Some(User(1, false, "TestName", None, None))

test("should ask question if it was not asked") {
val questionText = "is test works?"
val sender = mock[MessageSender]
val processor = new DialogProcessor(
chatId = chatId,
messageSender = sender,
questions = List(askTextQuestion(questionText, sender)),
withUserOnly = withUser
)

(sender.send _).expects(MessageToSend(chatId, questionText)).returning(message(chatId))

processor.processMessages(None)
}

test("should not be active if ttl is reached") {
val questionText = "is test expired?"
val sender = stub[MessageSender]
val processor = new DialogProcessor(
chatId = chatId,
messageSender = sender,
questions = List(askTextQuestion(questionText, sender)),
withUserOnly = withUser,
dialogTimeout = Duration(1, TimeUnit.MINUTES),
now = ZonedDateTime.now().minusMinutes(2)
)

processor.processMessages(None)

(sender.send _).verify(MessageToSend(chatId, questionText)).never
}

test("should move to next question after answer") {
val questionOneText = "it is first question"
val questionSecondText = "it is second question"
val sender = mock[MessageSender]
val processor = new DialogProcessor(
chatId = chatId,
messageSender = sender,
questions = List(
askTextQuestion(questionOneText, sender),
askTextQuestion(questionSecondText, sender)
),
withUserOnly = withUser
)

(sender.send _).expects(MessageToSend(chatId, questionOneText)).returning(message(chatId))
processor.processMessages(None)

processor.processMessages(Some(List(message(chatId))))

(sender.send _).expects(MessageToSend(chatId, questionSecondText)).returning(message(chatId))
processor.processMessages(None)
}

private def message(chatId: Int, user: User = withUser.get): Message = Message(
UUID.randomUUID().toString,
user,
None,
None,
false,
ZonedDateTime.now(),
Chat(chatId, None, None, "test"),
""
)

private def askTextQuestion(text: String, messageSender: MessageSender): Function[DialogAnswers, DialogQuestion] =
(_: DialogAnswers) => TextDialogQuestion(messageSender, text)
}
@@ -0,0 +1,113 @@
package com.kgribov.telegram.process

import java.time.ZonedDateTime
import java.util.UUID

import com.kgribov.telegram.model._
import com.kgribov.telegram.sender.MessageSender
import org.scalamock.scalatest.MockFactory
import org.scalatest.{FunSuite, Matchers}

class KeyboardDialogQuestionTest extends FunSuite with Matchers with MockFactory {

private val user = User(1, false, "UserName", None, None)

test("KeyboardDialogQuestion should send success alert on answer") {
val sender = mock[MessageSender]
val chatId = 1
val questionText = "success keyboard question"
val successAlert = "Success"
val question = KeyboardDialogQuestion(sender, questionText, List("answer1", "answer2"), _ => successAlert)

val replyMessage = message(chatId)
(sender.send _).expects(where {
(message: MessageToSend) => message.replyKeyboard.isDefined
}).returning(replyMessage)

(sender.sendKeyboardAlert _).expects(where {
(alert: KeyboardAlert) => !alert.showAlert && alert.text == successAlert
})

question.askQuestion(chatId)

question.processReply(message(chatId, text = "answer1", replyTo = Some(replyMessage)))
}

test("KeyboardDialogQuestion should send fail alert on repeated answer") {
val sender = mock[MessageSender]
val chatId = 1
val questionText = "success keyboard question"
val successAlert = "Success"
val question = KeyboardDialogQuestion(sender, questionText, List("answer1", "answer2"), _ => successAlert)

val replyMessage = message(chatId)
(sender.send _).expects(where {
(message: MessageToSend) => message.replyKeyboard.isDefined
}).returning(replyMessage)

(sender.sendKeyboardAlert _).expects(where {
(alert: KeyboardAlert) => !alert.showAlert && alert.text == successAlert
})

(sender.sendKeyboardAlert _).expects(where {
(alert: KeyboardAlert) => alert.showAlert
})

question.askQuestion(chatId)

question.processReply(message(chatId, text = "answer1", replyTo = Some(replyMessage)))

question.processReply(message(chatId, text = "answer2", replyTo = Some(replyMessage)))
}

test("KeyboardDialogQuestion should returns answers on ttl") {
val sender = mock[MessageSender]
val chatId = 1
val questionText = "success keyboard question"
val successAlert = "Success"
val questionExpiredAt = Some(ZonedDateTime.now().minusMinutes(1))
val questionIsNotExpiredAt = ZonedDateTime.now().minusMinutes(2)

val question = KeyboardDialogQuestion(
sender,
questionText,
List("answer1", "answer2"),
_ => successAlert,
questionExpiredAt
)

val replyMessage = message(chatId)
(sender.send _).expects(where {
(message: MessageToSend) => message.replyKeyboard.isDefined
}).returning(replyMessage)

(sender.sendKeyboardAlert _).expects(where {
(alert: KeyboardAlert) => !alert.showAlert && alert.text == successAlert
})

question.askQuestion(chatId)

val answer = message(chatId, text = "answer1", replyTo = Some(replyMessage))
question.processReply(answer)

// answers is ready
question.isDone() should be (Some(Answer(List(answer))))

// answers not ready due to ttl
question.isDone(questionIsNotExpiredAt) should be (None)
}

private def message(chatId: Int,
replyTo: Option[Message] = None,
user: User = user,
text: String = ""): Message = Message(
UUID.randomUUID().toString,
user,
None,
replyTo,
true,
ZonedDateTime.now(),
Chat(chatId, None, None, "test"),
text
)
}
@@ -0,0 +1,54 @@
package com.kgribov.telegram.source

import java.time.ZonedDateTime

import com.kgribov.telegram.model.{Message, Update}
import org.scalatest.{FunSuite, Matchers}

class MessageSourceTest extends FunSuite with Matchers {

test("message source should return 0 messages if no updates available") {
val source = new MessagesSource(loadUpdates(Map()), new InMemoryOffsetStore)

val messages = source.getNewMessages()

messages should have size 0
}

test("message source should commit offset to offsetStore") {
val offsetStore = new InMemoryOffsetStore
val source = new MessagesSource(
loadUpdates(
Map(
0 -> List(update(10), update(0))
)),
offsetStore)

val messages = source.getNewMessages()

offsetStore.getCurrentOffset should be (11)
}

test("message source should load data by provided offset") {
val offsetStore = new InMemoryOffsetStore(100)
val source = new MessagesSource(
loadUpdates(
Map(
0 -> List(update(10), update(0)),
100 -> List(update(100))
)),
offsetStore)

val messages = source.getNewMessages()

messages.map(_.id).max should be (100)
}

private def loadUpdates(returnResults: Map[Int, List[Update]]): (Int) => List[Update] = {
(offset: Int) => returnResults.getOrElse(offset, List.empty[Update])
}

private def update(id: Int): Update = {
Update(id, Message(id.toString, null, None, None, false, ZonedDateTime.now(), null, "hello"))
}
}