Skip to content

Commit

Permalink
organize interfaces, apply java 8, database schema to case class on sbt
Browse files Browse the repository at this point in the history
  • Loading branch information
torao committed Mar 22, 2014
1 parent 5226c0b commit 65ea163
Show file tree
Hide file tree
Showing 55 changed files with 4,409 additions and 563 deletions.
989 changes: 681 additions & 308 deletions .idea/workspace.xml

Large diffs are not rendered by default.

17 changes: 7 additions & 10 deletions client/src/main/scala/com/kazzla/client/storage/Storage.scala
Expand Up @@ -6,9 +6,8 @@
package com.kazzla.client.storage

import com.kazzla.asterisk._
import com.kazzla.node.StorageNode
import com.kazzla.service.Location
import com.kazzla.service.{Fragment, Status}
import com.kazzla.storage.StorageNode
import com.kazzla.storage.{Status, Fragment, Location, StorageService}
import java.io._
import java.net.InetSocketAddress
import java.nio.ByteBuffer
Expand All @@ -27,7 +26,7 @@ import scala.util.Try
*/
class Storage(session:Session)(implicit ctx:ExecutionContext) {

private[this] val storage = session.bind(classOf[com.kazzla.service.StorageService])
private[this] val storage = session.bind(classOf[StorageService])

// ==============================================================================================
// ファイルステータスの参照
Expand All @@ -48,15 +47,14 @@ class Storage(session:Session)(implicit ctx:ExecutionContext) {
* @return ディレクトリのステータス
*/
def list(path:String):Seq[String]
= Await.result(session.open(101, path){ _.src.toSeq }, Duration.Inf).asInstanceOf[Seq[String]]
= Await.result(session.open(101, path){ _.src.filterNot{ _.isEOF }.map{ _.getString("UTF-8") }.toSeq }, Duration.Inf).asInstanceOf[Seq[String]]

// ==============================================================================================
// ファイルフラグメントの参照
// ==============================================================================================
/**
* 指定されたファイルのフラグメントロケーションを取得します。非同期パイプに対してシリアライズされた [[com.kazzla.service.Fragment]]
* 指定されたファイルのフラグメントロケーションを取得します。非同期パイプに対してシリアライズされた [[Fragment]]
* をブロック送信します。
* @see [[com.kazzla.node.StorageNode.read()]]
*/
def getInputStream(path:String):InputStream
= Await.result(session.open(102, path){ read }, Duration.Inf).asInstanceOf[InputStream]
Expand All @@ -66,9 +64,8 @@ class Storage(session:Session)(implicit ctx:ExecutionContext) {
// ==============================================================================================
/**
* 指定されたファイルに対する領域割り当てを行います。非同期パイプに対して残りのデータサイズ (不明な場合は負の値)
* を送信することでリージョンサービスはファイルの新しい領域を割り当てて [[com.kazzla.service.Fragment]] で応答します。クライアント
* を送信することでリージョンサービスはファイルの新しい領域を割り当てて [[Fragment]] で応答します。クライアント
* は割り当てられた領域すべてにデータを書き終えたら残りのデータサイズを送信して次の領域を割り当てます。
* @see [[com.kazzla.node.StorageNode.write()]]
*/
def getOutputStream(path:String, option:Int):OutputStream = {
val out = Promise[OutputStream]()
Expand Down Expand Up @@ -207,7 +204,7 @@ private[storage] class O(pipe:Pipe, promise:Promise[Unit]) extends OutputStream
pipe.sink.send(minus)
// 割り当てられた領域を受信
val fragment = Fragment.fromBlock(queue.take())
fragment.locations.foreach{ case Location(host, port) =>
fragment.locations.foreach{ case Location(id, host, port) =>
try {
// 割り当てられた領域のノードと接続して書き込み処理を呼び出し
val future = pipe.session.node.connect(new InetSocketAddress(host, port), None)
Expand Down
16 changes: 11 additions & 5 deletions node/src/main/scala/com/kazzla/node/storage/Agent.scala
Expand Up @@ -5,21 +5,22 @@
*/
package com.kazzla.node.storage

import com.kazzla.asterisk.{Session, Node}
import com.kazzla.asterisk.codec.MsgPackCodec
import com.kazzla.asterisk.netty.Netty
import com.kazzla.asterisk.{Session, Node}
import com.kazzla.core.cert._
import com.kazzla.core.io._
import com.kazzla.node.Domain
import com.kazzla.service.Version
import java.io.{IOException, File}
import java.net.URL
import java.security.KeyStore
import java.util.concurrent.Executors
import javax.net.ssl.SSLContext
import org.slf4j.LoggerFactory
import scala.concurrent.duration.Duration
import scala.concurrent.Await
import scala.annotation.tailrec
import com.kazzla.service.Version
import scala.concurrent.duration.Duration
import scala.concurrent.{ExecutionContext, Await}

// ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// Agent
Expand All @@ -35,6 +36,10 @@ class Agent(dataDir:File, regionServices:Seq[URL]) {

val domain = new Domain(regionServices)

val storage = new Storage(dataDir)

implicit val threads = ExecutionContext.fromExecutorService(Executors.newCachedThreadPool())

// ==============================================================================================
// 実体ファイルの参照
// ==============================================================================================
Expand All @@ -46,7 +51,7 @@ class Agent(dataDir:File, regionServices:Seq[URL]) {
node = Some(Node("storage")
.bridge(Netty)
.codec(MsgPackCodec)
.serve(new StorageImpl(dataDir))
.serve(new StorageNodeImpl(storage))
.build())
logger.info(s"activate kazzla node: [${regionServices.mkString(",")}]")
val session = connect()
Expand All @@ -56,6 +61,7 @@ class Agent(dataDir:File, regionServices:Seq[URL]) {

def stop():Unit = {
node.foreach{ _.shutdown() }
threads.shutdown()
}

def ssl:SSLContext = {
Expand Down
98 changes: 98 additions & 0 deletions node/src/main/scala/com/kazzla/node/storage/RegionNodeImpl.scala
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2014 koiroha.org.
* All sources and related resources are available under Apache License 2.0.
* http://www.apache.org/licenses/LICENSE-2.0.html
*/
package com.kazzla.node.storage

import com.kazzla._
import com.kazzla.asterisk.Service
import com.kazzla.core.io._
import com.kazzla.storage.RegionNode
import java.util.UUID
import org.slf4j.LoggerFactory
import scala.concurrent.{ExecutionContext, Future}

// ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// StorageNodeImpl
// ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
/**
* @author Takami Torao
*/
class RegionNodeImpl(storage:Storage)(implicit ctx:ExecutionContext) extends Service(ctx) with RegionNode {

import RegionNodeImpl._

// ==============================================================================================
// 時刻同期
// ==============================================================================================
/**
* ノードとサービス間で時刻同期を行います。この機能はサービス側からの死活監視として呼び出されます。
*
* @param remote サービス側の現在時刻
* @return ノード側の現在時刻
*/
def sync(remote:Long):Future[Long] = Future(storage.sync(remote))

// ==============================================================================================
// UUID の参照
// ==============================================================================================
/**
* このリージョンノードが管理している全ブロックの UUID をブロック送信します。
*/
def lookup():Future[Unit] = withPipe { pipe =>
logger.trace("lookup()")
concurrent.future {
storage.lookup{ uuid => pipe.sink.sendDirect(uuid.toByteArray) }
pipe.sink.sendEOF()
}
}

// ==============================================================================================
// チェックサムの算出
// ==============================================================================================
/**
* 指定されたブロックのチェックサムを算出します。
* @param blockId ブロック ID
*/
def checksum(blockId:UUID, algorithm:String, challenge:Array[Byte]):Future[Array[Byte]] = {
logger.debug(s"checksum($blockId,$algorithm,${challenge.toHexString}})")
concurrent.future {
storage.checksum(blockId, algorithm, challenge)
}
}

// ==============================================================================================
// ブロックの作成
// ==============================================================================================
/**
* 指定されたブロックを作成します。
* @param blockId ブロック ID
* @param size ブロックサイズ
*/
def create(blockId:UUID, size:Long):Future[Unit] = {
logger.debug(s"create($blockId,$size)")
concurrent.future {
storage.create(blockId, size)
}
}

// ==============================================================================================
// ブロックの削除
// ==============================================================================================
/**
* 指定されたブロックを削除します。
* @param blockId ブロック ID
*/
def delete(blockId:UUID):Future[Unit] = {
logger.debug(s"delete($blockId)")
concurrent.future {
storage.delete(blockId)
}
}

}

object RegionNodeImpl {
private[RegionNodeImpl] val logger = LoggerFactory.getLogger(classOf[RegionNodeImpl])
}
26 changes: 15 additions & 11 deletions node/src/main/scala/com/kazzla/node/storage/Shell.scala
Expand Up @@ -13,7 +13,6 @@ import java.net.{URLEncoder, HttpURLConnection, URI, URL}
import java.security.KeyStore
import java.security.cert.{X509Certificate, CertificateFactory}
import scala.collection.JavaConversions._
import sun.security.tools.KeyTool

// ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
// Shell
Expand Down Expand Up @@ -86,7 +85,7 @@ object Shell extends ShellTools{

// 新しい鍵ペアを含むキーストアを作成
jks.delete()
KeyTool.main(Array[String](
keytool(
"-genkeypair",
"-dname", dname,
"-alias", "node",
Expand All @@ -95,7 +94,7 @@ object Shell extends ShellTools{
"-keystore", jks.getAbsolutePath,
"-storepass", "000000",
"-storetype", "JKS",
"-validity", "180"))
"-validity", "180")

// 作成した公開鍵の確認
val ks = KeyStore.getInstance("JKS")
Expand All @@ -119,14 +118,14 @@ object Shell extends ShellTools{
}

// CA 証明書のインポート
KeyTool.main(Array[String](
keytool(
"-import",
"-noprompt",
"-alias", "kazzla",
"-file", ca.getAbsolutePath,
"-keystore", jks.getAbsolutePath,
"-storetype", "JKS",
"-storepass", "000000"))
"-storepass", "000000")

nodeid
}
Expand All @@ -138,14 +137,14 @@ object Shell extends ShellTools{
* CSR を作成します。
*/
private[this] def createCSR(ks:File, file:File):Unit = {
KeyTool.main(Array[String](
keytool(
"-certreq",
"-alias", "node",
"-file", file.getAbsolutePath,
"-keypass", "000000",
"-keystore", ks.getAbsolutePath,
"-storetype", "JKS",
"-storepass", "000000"))
"-storepass", "000000")
}

// ============================================================================================
Expand Down Expand Up @@ -190,15 +189,15 @@ object Shell extends ShellTools{
* 証明書をインポートします。
*/
private[this] def importCerts(ks:File, certs:File):Unit = {
KeyTool.main(Array[String](
keytool(
"-importcert",
"-alias", "node",
"-file", certs.getAbsolutePath,
"-keypass", "000000",
"-noprompt",
"-keystore", ks.getAbsolutePath,
"-storetype", "JKS",
"-storepass", "000000"))
"-storepass", "000000")
}

// ============================================================================================
Expand All @@ -208,7 +207,7 @@ object Shell extends ShellTools{
* JKS 形式のキーストアを PKCS#12 に変換します。
*/
def jksToPkcs12(jks:File, p12:File):Unit = {
KeyTool.main(Array[String](
keytool(
"-importkeystore",
"-srckeystore", jks.getAbsolutePath,
"-srcstoretype", "JKS",
Expand All @@ -219,7 +218,12 @@ object Shell extends ShellTools{
"-deststoretype", "PKCS12",
"-deststorepass", "000000",
"-destkeypass", "000000",
"-noprompt"))
"-noprompt")
}

private[this] def keytool(args:String*):Unit = {
// sun.security.tools.KeyTool.main(args.toArray) // Java 7
sun.security.tools.keytool.Main.main(args.toArray) // Java 8
}

}

0 comments on commit 65ea163

Please sign in to comment.