Skip to content
This repository has been archived by the owner on Apr 27, 2019. It is now read-only.

Commit

Permalink
DNSBL filters and compression.
Browse files Browse the repository at this point in the history
  • Loading branch information
m242 committed Sep 12, 2014
1 parent 07d55ee commit 4729575
Show file tree
Hide file tree
Showing 13 changed files with 207 additions and 148 deletions.
4 changes: 3 additions & 1 deletion common/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,7 @@ libraryDependencies ++= Seq(
"com.typesafe" % "config" % "1.2.1",
"com.typesafe.akka" % "akka-actor_2.10" % "2.3.6",
"com.etaty.rediscala" %% "rediscala" % "1.3.1",
"com.typesafe.play" %% "play-json" % "2.3.4"
"com.typesafe.play" %% "play-json" % "2.3.4",
"commons-codec" % "commons-codec" % "1.9",
"commons-io" % "commons-io" % "2.4"
)
2 changes: 1 addition & 1 deletion common/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ maildrop {
response = "You have been temporarily greylisted. Please try back later."
time = 5m
}
dnsbl = [ "zen.spamhaus.org.", "bl.mailspike.net.", "bb.barracudacentral.org.", "psbl.surriel.com.", "dnsbl.sorbs.net.", "bl.score.senderscore.com" ]
dnsbl = [ "zen.spamhaus.org.", "bl.mailspike.net.", "bb.barracudacentral.org.", "psbl.surriel.com.", "dnsbl.sorbs.net.", "bl.score.senderscore.com." ]
spf {
response = "Invalid sender - see http://www.openspf.org/Introduction"
}
Expand Down
1 change: 1 addition & 0 deletions common/src/main/resources/logback.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
<pattern>%d{HH:mm:ss.SSS} [%level] %logger{36} - %msg%n</pattern>
</encoder>
</appender>
<logger name="com.heluna.maildrop.smtp.filters.SPFLogger" level="ERROR"/>
<root level="info">
<appender-ref ref="STDOUT"/>
</root>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import java.io.File
import java.util.concurrent.TimeUnit
import com.typesafe.config.{Config, ConfigFactory}
import scala.util.Try
import scala.collection.JavaConversions._

/**
* common com.heluna.maildrop
Expand All @@ -28,7 +29,7 @@ object MailDropConfig {

def getString(key: String): Option[String] = Try(Option(config.getString(key))).toOption.flatten

def getStringList(key: String): List[String] = Try(getStringList(key)).toOption.getOrElse(List[String]())
def getStringList(key: String): List[String] = Try(config.getStringList(key).toList).toOption.getOrElse(List[String]())

def apply(key: String) = getString(key)

Expand Down
40 changes: 38 additions & 2 deletions common/src/main/scala/com/heluna/maildrop/util/Mailbox.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.heluna.maildrop.util

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.util.Date
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
import org.apache.commons.codec.binary.Base64
import org.apache.commons.io.IOUtils
import play.api.libs.json.{JsValue, Json}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
Expand Down Expand Up @@ -29,18 +33,29 @@ object Mailbox {
"recipient" -> recipient,
"subject" -> subject,
"date" -> date.getTime,
"body" -> body
"body" -> compress(body)
)
Redis.client.lpush(rkey, Json.stringify(json))
Redis.client.ltrim(rkey, 0, maxLength - 1)
Redis.client.expire(rkey, expiry)
}

def uncompressJson(json: JsValue) = {
Json.obj(
"id" -> (json \ "id").as[String],
"sender" -> (json \ "sender").as[String],
"recipient" -> (json \ "recipient").as[String],
"subject" -> (json \ "subject").as[String],
"date" -> (json \ "date").as[Long],
"body" -> uncompress((json \ "body").as[String])
)
}

def list(recipient: String): Future[List[JsValue]] =
Redis.client.lrange[String](key(recipient), 0, maxLength).map(seq => seq.map(json => Json.parse(json)).toList)

def get(recipient: String, id: String): Future[Option[JsValue]] =
list(recipient).map(messages => messages.find(json => (json \ "id").as[String] == id))
list(recipient).map(messages => messages.find(json => (json \ "id").as[String] == id).map(jsval => uncompressJson(jsval)))

def delete(recipient: String, id: String): Unit = {
val rkey = key(recipient)
Expand All @@ -53,4 +68,25 @@ object Mailbox {

def shortId = Random.alphanumeric.take(6).mkString

def compress(raw: String): String = {
val baos = new ByteArrayOutputStream()
val gzip = new GZIPOutputStream(baos)
gzip.write(raw.getBytes("UTF-8"))
gzip.close()
new String(Base64.encodeBase64(baos.toByteArray))
}

def uncompress(compressed: String): String = {
Option(compressed).fold[String](null)(c =>
try {
IOUtils.toString(new GZIPInputStream(new ByteArrayInputStream(Base64.decodeBase64(c))))
} catch {
case e: java.util.zip.ZipException => c
case x: Exception =>
System.err.println("exception in uncompress" + x.toString)
c
}
)
}

}
23 changes: 12 additions & 11 deletions smtp/project/META-INF/MANIFEST.MF
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,17 @@ Class-Path: command-0.13.5.jar apply-macro-0.13.5.jar main-settings-0.
compiler-integration-0.13.5.jar relation-0.13.5.jar actions-0.13.5.ja
r jansi.jar scala-compiler.jar scala-library.jar io-0.13.5.jar sbt-0.
13.5.jar scala-reflect.jar logging-0.13.5.jar classpath-0.13.5.jar co
mpile-0.13.5.jar tasks-0.13.5.jar classfile-0.13.5.jar logback-core-1
.1.2.jar rediscala_2.10-1.3.1.jar scala-stm_2.10-0.7.jar activation-1
.1.1.jar play-datacommons_2.10-2.3.4.jar akka-actor_2.10-2.3.6.jar ja
ckson-databind-2.3.2.jar commons-cli-1.2.jar scala-reflect.jar jsr305
-1.3.9.jar logback-classic-1.1.2.jar play-functional_2.10-2.3.4.jar j
oda-time-2.3.jar config-1.2.1.jar jackson-annotations-2.3.2.jar jacks
on-core-2.3.2.jar subethasmtp-3.1.7.jar joda-convert-1.6.jar scala-lo
gging-slf4j_2.10-2.1.2.jar slf4j-api-1.7.7.jar log4j-1.2.14.jar play-
iteratees_2.10-2.3.4.jar scala-library.jar dnsjava-2.1.6.jar scala-lo
gging-api_2.10-2.1.2.jar play-json_2.10-2.3.4.jar mail-1.4.4.jar dnsj
nio-1.0.3.jar apache-jspf-resolver-1.0.0.jar
mpile-0.13.5.jar tasks-0.13.5.jar classfile-0.13.5.jar logback-classi
c-1.1.2.jar rediscala_2.10-1.3.1.jar scala-stm_2.10-0.7.jar play-iter
atees_2.10-2.3.4.jar dnsjnio-1.0.3.jar akka-actor_2.10-2.3.6.jar jack
son-databind-2.3.2.jar subethasmtp-3.1.7.jar scala-reflect.jar play-f
unctional_2.10-2.3.4.jar commons-io-2.4.jar joda-time-2.3.jar jsr305-
1.3.9.jar config-1.2.1.jar dnsjava-2.1.6.jar jackson-annotations-2.3.
2.jar jackson-core-2.3.2.jar apache-jspf-resolver-1.0.0.jar log4j-1.2
.14.jar activation-1.1.1.jar logback-core-1.1.2.jar joda-convert-1.6.
jar scala-library.jar commons-codec-1.9.jar scala-logging-slf4j_2.10-
2.1.2.jar commons-cli-1.2.jar play-json_2.10-2.3.4.jar play-datacommo
ns_2.10-2.3.4.jar scala-logging-api_2.10-2.1.2.jar mail-1.4.4.jar slf
4j-api-1.7.7.jar
Main-Class: com.heluna.maildrop.smtp.MailDropSMTP

30 changes: 30 additions & 0 deletions smtp/src/main/scala/com/heluna/maildrop/smtp/HostEntry.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.heluna.maildrop.smtp

import java.net.InetAddress
import com.heluna.maildrop.util.{MailDropConfig, Redis}
import com.typesafe.scalalogging.slf4j.LazyLogging

/**
* smtp com.heluna.maildrop.smtp
* User: markbe
* Date: 9/12/14
* Time: 8:48 AM
*/

object HostEntry extends LazyLogging {

val ttl = MailDropConfig.getSeconds("maildrop.sender.cache-time").getOrElse(86400L)

def key(inet: InetAddress, helo: String) = "host/" + inet.getHostAddress + "/" + helo

def apply(inet: InetAddress, helo: String) = {
val hkey = key(inet, helo)
logger.debug("loading " + hkey)
Redis.client.hgetall[String](hkey)
}

def touch(inet: InetAddress, helo: String) = {
Redis.client.expire(key(inet, helo), ttl)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,87 +42,17 @@ class MailDropMessageHandler(ctx: MessageContext) extends MessageHandler with La
// Wait maildrop.command-delay seconds
Thread.sleep(MailDropMessageHandler.threadDelay)

// Run the sender filter, either Greylist, Reject, Accept or Continue
// val future = MailDropMessageHandler.senderFilter(inet, helo, sender)
// Await.result(future, 2.minutes) match {
// case Greylist(reason) =>
// logger.info("Sender " + ip + " " + sender + " greylisted: " + reason)
// Metrics.blocked()
// throw new DropConnectionException(421, reason)
// case Reject(reason) =>
// logger.info("Sender " + ip + " " + sender + " rejected: " + reason)
// Metrics.blocked()
// throw new DropConnectionException(reason)
// case _ =>
// }
val greylistFuture = GreylistFilter(inet, helo)
val greylistResult = Try(Await.result(greylistFuture, 2.minutes)).getOrElse({
logger.error("GreylistFilter failed")
Continue()
})

greylistResult match {
case Greylist(reason) =>
logger.info("Sender " + ip + " " + sender + " greylisted: " + reason)
Metrics.blocked()
throw new DropConnectionException(421, reason)
case Reject(reason) =>
logger.info("Sender " + ip + " " + sender + " rejected: " + reason)
Metrics.blocked()
throw new DropConnectionException(reason)
case _ =>
}

val cacheFuture = CacheFilter(inet, helo)
val cacheResult = Try(Await.result(cacheFuture, 2.minutes)).getOrElse({
logger.error("CacheFilter failed")
Continue()
})

cacheResult match {
case Greylist(reason) =>
logger.info("Sender " + ip + " " + sender + " greylisted: " + reason)
Metrics.blocked()
throw new DropConnectionException(421, reason)
case Reject(reason) =>
logger.info("Sender " + ip + " " + sender + " rejected: " + reason)
Metrics.blocked()
throw new DropConnectionException(reason)
case _ =>
}

val dnsblFuture = DNSBLFilter(inet, helo)
val dnsblResult = Try(Await.result(dnsblFuture, 2.minutes)).getOrElse({
logger.error("DNSBLFilter failed")
Continue()
})

dnsblResult match {
case Greylist(reason) =>
logger.info("Sender " + ip + " " + sender + " greylisted: " + reason)
Metrics.blocked()
throw new DropConnectionException(421, reason)
case Reject(reason) =>
logger.info("Sender " + ip + " " + sender + " rejected: " + reason)
Metrics.blocked()
throw new DropConnectionException(reason)
case _ =>
}

val spfFuture = SPFFilter(inet, helo, sender)
val spfResult = Try(Await.result(spfFuture, 2.minutes)).getOrElse({
logger.error("SPFFilter failed")
Continue()
})

spfResult match {
val future = MailDropMessageHandler.senderFilter(inet, helo, sender)
Await.result(future, 2.minutes) match {
case Greylist(reason) =>
logger.info("Sender " + ip + " " + sender + " greylisted: " + reason)
Metrics.blocked()
throw new DropConnectionException(421, reason)
case Reject(reason) =>
logger.info("Sender " + ip + " " + sender + " rejected: " + reason)
Metrics.blocked()
CacheFilter.add(inet, helo, Reject(reason))
HostEntry.touch(inet, helo)
throw new DropConnectionException(reason)
case _ =>
}
Expand Down Expand Up @@ -166,11 +96,18 @@ class MailDropMessageHandler(ctx: MessageContext) extends MessageHandler with La
throw new RejectException(reason)
case Continue() =>
// Save the message
MailDropMessageHandler.saveMessage(sender, recipient, message)
logger.info("Message saved from " + ip + " " + sender + " to " + recipient)
// Cache this sender
CacheFilter.add(inet, helo, Accept())
Metrics.message()
Try(MailDropMessageHandler.saveMessage(sender, recipient, message)).toOption match {
case Some(x) =>
logger.info("Message saved from " + ip + " " + sender + " to " + recipient)
// Cache this sender
CacheFilter.add(inet, helo, Accept())
HostEntry.touch(inet, helo)
Metrics.message()
case None =>
logger.info("Data from " + ip + " " + sender + " to " + recipient + " rejected: no attachments")
Metrics.blocked()
throw new RejectException("No attachments allowed.")
}
}
case _ =>
}
Expand All @@ -181,6 +118,7 @@ class MailDropMessageHandler(ctx: MessageContext) extends MessageHandler with La
logger.debug("Ending connection from " + ip)
}


}


Expand All @@ -194,23 +132,18 @@ object MailDropMessageHandler extends LazyLogging {
val maxMessageReason = MailDropConfig("maildrop.data.max-message-reason").getOrElse("Message too large.")

def senderFilter(inet: InetAddress, helo: String, sender: String): Future[Product] = {
GreylistFilter(inet, helo) flatMap {
case Continue() => Try(CacheFilter(inet, helo)).getOrElse({
logger.error("CacheFilter failed")
Future.successful(Continue())
})
case result => Future.successful(result)
} flatMap {
case Continue() => Try(DNSBLFilter(inet, helo)).getOrElse({
logger.error("DNSBLFilter failed")
Future.successful(Continue())
})
case result => Future.successful(result)
} flatMap {
case Continue() => Try(SPFFilter(inet, helo, sender)).getOrElse({
logger.error("SPFFilter failed")
Future.successful(Continue())
})
for {
host <- HostEntry(inet, helo)
cache <- trySenderFilter(Continue(), { CacheFilter(host) })
greylist <- trySenderFilter(cache, { GreylistFilter(host, inet, helo) })
spf <- trySenderFilter(greylist, { SPFFilter(inet, helo, sender) })
dnsbl <- trySenderFilter(spf, { DNSBLFilter(inet, helo) })
} yield dnsbl
}

def trySenderFilter(prevResult: Product, filter: => Future[Product]): Future[Product] = {
prevResult match {
case Continue() => filter
case result => Future.successful(result)
}
}
Expand All @@ -234,13 +167,14 @@ object MailDropMessageHandler extends LazyLogging {
}
}

def saveMessage(sender: String, recipient: String, message: MimeMessage): Unit = {
def saveMessage(sender: String, recipient: String, message: MimeMessage): Boolean = {
// Strip attachments from the message
val newmessage = stripAttachments(message)
val baos = new ByteArrayOutputStream()
newmessage.writeTo(baos)
// Add the message to the mailbox
Mailbox.add(sender, recipient, Option(message.getSubject).getOrElse("(no subject)"), Option(message.getSentDate).getOrElse(new Date()), baos.toString("UTF-8"))
true
}

def stripAttachments(message: MimeMessage): MimeMessage = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package com.heluna.maildrop.smtp.filters

import java.net.InetAddress
import com.heluna.maildrop.smtp.{Continue, Reject, Accept}
import com.heluna.maildrop.util.{MailDropConfig, Redis}
import com.heluna.maildrop.smtp.{HostEntry, Continue, Reject, Accept}
import com.heluna.maildrop.util.Redis
import com.typesafe.scalalogging.slf4j.LazyLogging
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
Expand All @@ -16,27 +16,21 @@ import scala.concurrent.Future

object CacheFilter extends LazyLogging {

val ttl = MailDropConfig.getSeconds("maildrop.sender.cache-time").getOrElse(300L)

def key(inet: InetAddress, helo: String) = "cache:" + inet.getHostAddress + "/" + helo

def apply(inet: InetAddress, helo: String): Future[Product] = {
val ckey = key(inet, helo)
Redis.client.hgetall[String](ckey).map(m => m.get("action") match {
def apply(host: Map[String, String]): Future[Product] = Future {
host.get("action") match {
case Some(action) if action == "accept" => Accept()
case Some(action) if action == "reject" => Reject(m.getOrElse("reason", "Reject."))
case Some(action) if action == "reject" => Reject(host.getOrElse("reason", "Reject."))
case _ => Continue()
})
}
}

def add(inet: InetAddress, helo: String, action: Product): Unit = {
val ckey = key(inet, helo)
val ckey = HostEntry.key(inet, helo)
action match {
case Accept() => Redis.client.hmset(ckey, Map("action" -> "accept"))
case Reject(reason) => Redis.client.hmset(ckey, Map("action" -> "reject", "reason" -> reason))
case _ =>
}
Redis.client.expire(ckey, ttl)
}

}
Loading

0 comments on commit 4729575

Please sign in to comment.