┌────────────────────┐
│ DAGService │
└────────────────────┘
│
▼
┌────────────────────┐
│ BlockService │
└────────────────────┘
┌─────┴─────┐
▼ ▼
┌─────────┐ ┌───────┐
│IPFS Repo│ |Bitswap│
└─────────┘ └───────┘
代码位置:https://github.com/ipfs/go-ipld-format
定义了 merkledag 所需要的类型。我们看下常见的一些类型接口:
//所有的 IPLD 类型节点必须实现这个接口.
//Node 是不可变类型,线程安全
type Node interface {
blocks.Block
Resolver
ResolveLink(path []string) (*Link, []string, error)
//深拷贝
Copy() Node
//返回节点的链接
Links() []*Link
//返回节点状态
Stat() (*NodeStat, error)
//返回节点序列化后的长度
Size() (uint64, error)
}
// Link 代表 IPFS Merkle DAG 中节点之间的链接.
type Link struct {
//唯一名字 UTF-8 编码
Name string
//长度
Size uint64
// 目标对象的 multihash
Cid *cid.Cid
}
在代码 https://github.com/ipfs/go-merkledag
中:
merkledag.go
实现了此接口
var _ ipld.LinkGetter = &dagService{}
var _ ipld.NodeGetter = &dagService{}
var _ ipld.NodeGetter = &sesGetter{}
var _ ipld.DAGService = &dagService{}
看下其中的方法实现:
// Add adds a node to the dagService, storing the block in the BlockService
func (n *dagService) Add(ctx context.Context, nd ipld.Node) error {
....
return n.Blocks.AddBlock(nd)
}
// Get retrieves a node from the dagService, fetching the block in the BlockService
func (n *dagService) Get(ctx context.Context, c *cid.Cid) (ipld.Node, error) {
...
b, err := n.Blocks.GetBlock(ctx, c)
...
return ipld.Decode(b)
}
我们可以看到DagService
的方法最终调用的是BlockService
中的方法
代码位置 https://github.com/ipfs/go-blockservice
BlockService 添加block 最终调用 blockstore
来添加 block,并且放入到 bitswap 中,获取 block 时从 blockstore 中获取,如果没找到则从 bitswap 中获取。
// BlockService is a hybrid block datastore. It stores data in a local
// datastore and may retrieve data from a remote Exchange.
// It uses an internal `datastore.Datastore` instance to store values.
type BlockService interface {
io.Closer
BlockGetter
Blockstore() blockstore.Blockstore
// Exchange returns a reference to the underlying exchange (usually )
Exchange() exchange.Interface
// AddBlock puts a given block to the underlying datastore
AddBlock(o blocks.Block) error
// AddBlocks adds a slice of blocks at the same time using batching
// capabilities of the underlying datastore whenever possible.
AddBlocks(bs []blocks.Block) error
// DeleteBlock deletes the given block from the blockservice.
DeleteBlock(o *cid.Cid) error
}
// 将 block 添加进 blockstore
func (s *blockService) AddBlock(o blocks.Block) error {
c := o.Cid()
// hash 校验
err := verifcid.ValidateCid(c)
if s.checkFirst {
if has, err := s.blockstore.Has(c); has || err != nil {
return err
}
}
//添加进blockstore
if err := s.blockstore.Put(o); err != nil {
return err
}
if err := s.exchange.HasBlock(o); err != nil {
// TODO(#4623): really an error?
return errors.New("blockservice is closed")
}
return nil
}
func getBlock(ctx context.Context, c *cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) (blocks.Block, error) {
// hash 检查
err := verifcid.ValidateCid(c)
block, err := bs.Get(c)
if err == blockstore.ErrNotFound && f != nil {
log.Debug("Blockservice: Searching bitswap")
blk, err := f.GetBlock(ctx, c)
if err != nil {
if err == blockstore.ErrNotFound {
return nil, ErrNotFound
}
return nil, err
}
return blk, nil
}
if err == blockstore.ErrNotFound {
return nil, ErrNotFound
}
return nil, err
}
代码位置 https://github.com/ipfs/go-datastore
datastore 是数据存储和数据库访问的通用抽象层。 它设计了一套简单的API,它允许无需更改代码即可无缝交换数据。 因此可以实现不同的 datastore 来处理不同强度要求的数据。
datastore.go 接口中定义了对block的增删查改方法
type Datastore interface {
Put(key Key, value []byte) error
Get(key Key) (value []byte, err error)
Has(key Key) (exists bool, err error)
Delete(key Key) error
Query(q query.Query) (query.Results, error)
}
在代码 go-ipfs-config\datastore.go
中我们可以看到 ipfs 中采用了两种不同的 datastore 实现来存储数据,分别是 flatfs.datastore
和 leveldb.datastore
return Datastore{
StorageMax: "10GB",
StorageGCWatermark: 90, // 90%
GCPeriod: "1h",
BloomFilterSize: 0,
Spec: map[string]interface{}{
"type": "mount",
"mounts": []interface{}{
map[string]interface{}{
"mountpoint": "/blocks",
"type": "measure",
"prefix": "flatfs.datastore",
"child": map[string]interface{}{
"type": "flatfs",
"path": "blocks",
"sync": true,
"shardFunc": "/repo/flatfs/shard/v1/next-to-last/2",
},
},
map[string]interface{}{
"mountpoint": "/",
"type": "measure",
"prefix": "leveldb.datastore",
"child": map[string]interface{}{
"type": "levelds",
"path": "datastore",
"compression": "none",
},
},
},
},
}
代码位置 https://github.com/ipfs/go-ds-flatfs
调用操作系统 api 实现 datastore 存储
func (fs *Datastore) Put(key datastore.Key, value []byte) error {
...
for i := 1; i <= putMaxRetries; i++ {
err = fs.doWriteOp(&op{
typ: opPut,
key: key,
v: value,
})
}
...
return err
}
func (fs *Datastore) doWriteOp(oper *op) error {
...
err := fs.doOp(oper)
...
return err
}
func (fs *Datastore) doOp(oper *op) error {
switch oper.typ {
case opPut:
return fs.doPut(oper.key, oper.v)
case opDelete:
return fs.doDelete(oper.key)
case opRename:
return fs.renameAndUpdateDiskUsage(oper.tmp, oper.path)
default:
panic("bad operation, this is a bug")
}
}
//最终调用存储文件的方法
func (fs *Datastore) doPut(key datastore.Key, val []byte) error {
dir, path := fs.encode(key)
if err := fs.makeDir(dir); err != nil {
return err
}
tmp, err := ioutil.TempFile(dir, "put-")
if err != nil {
return err
}
closed := false
removed := false
defer func() {
if !closed {
// silence errcheck
_ = tmp.Close()
}
if !removed {
// silence errcheck
_ = os.Remove(tmp.Name())
}
}()
if _, err := tmp.Write(val); err != nil {
return err
}
if fs.sync {
if err := syncFile(tmp); err != nil {
return err
}
}
if err := tmp.Close(); err != nil {
return err
}
closed = true
err = fs.renameAndUpdateDiskUsage(tmp.Name(), path)
if err != nil {
return err
}
removed = true
if fs.sync {
if err := syncDir(dir); err != nil {
return err
}
}
return nil
}
func (fs *Datastore) encode(key datastore.Key) (dir, file string) {
noslash := key.String()[1:]
dir = filepath.Join(fs.path, fs.getDir(noslash))
file = filepath.Join(dir, noslash+extension)
return dir, file
}
func (fs *Datastore) Get(key datastore.Key) (value []byte, err error) {
_, path := fs.encode(key)
data, err := ioutil.ReadFile(path)
if err != nil {
if os.IsNotExist(err) {
return nil, datastore.ErrNotFound
}
return nil, err
}
return data, nil
}
代码位置 https://github.com/ipfs/go-ds-leveldb
采用 leveldb 存储
type datastore struct {
DB *leveldb.DB
path string
}
//初始化 leveldb 实例
func NewDatastore(path string, opts *Options) (*datastore, error) {
...
var db *leveldb.DB
db, err = leveldb.Open(storage.NewMemStorage(), &nopts)
return &datastore{
DB: db,
path: path,
}, nil
}
func (d *datastore) Put(key ds.Key, value []byte) (err error) {
return d.DB.Put(key.Bytes(), value, nil)
}
func (d *datastore) Get(key ds.Key) (value []byte, err error) {
val, err := d.DB.Get(key.Bytes(), nil)
if err != nil {
if err == leveldb.ErrNotFound {
return nil, ds.ErrNotFound
}
return nil, err
}
return val, nil
}
IPFS 节点的数据对象都存储在本地的 repo
中(类似于git)。根据所使用的存储介质不同,有不同的repo
实现。ipfs 节点使用fs-repo。
常见的repo
实现有:
Repo 存储了一组 IPLD 对象,分别表示:
- keys - 加密密钥,包括节点的标识
- config - 节点配置
- datastore - 本地存储的数据和索引数据
- logs - 调试的事件日志
- hooks - 脚本在预定义的时间运行(尚未实现)
--
tree ~/.ipfs
.ipfs/
├── api <--- running daemon api addr
├── blocks/ <--- objects stored directly on disk
│ └── aa <--- prefix namespacing like git
│ └── aa <--- N tiers
├── config <--- config file (json or toml)
├── hooks/ <--- hook scripts
├── keys/ <--- cryptographic keys
│ ├── id.pri <--- identity private key
│ └── id.pub <--- identity public key
├── datastore/ <--- datastore
├── logs/ <--- 1 or more files (log rotate)
│ └── events.log <--- can be tailed
├── repo.lock <--- mutex for repo
└── version <--- version file
./api
是一个存在的文件,表示要侦听的 api 的 endpoint。即便 endpoint 不再存在(即它是之前的 ./api
文件),它也可能存在。
在存在./api
文件的情况下,ipfs 工具(例如 go-ipfs ipfs daemon)必须尝试委托给 endpoint,如果能确信该文件是过时的文件,则可以删除该文件。(例如,endpoint 是本地的,但没有进程活动)
./api
文件与repo.lock
一起使用。 客户端可以选择使用 api 服务,或者等到持有repo.lock
的进程退出。 文件的内容是作为 multiaddr 的 api endpoint
> cat .ipfs/api
/ip4/127.0.0.1/tcp/5001
注意:
- API 服务器必须在释放
repo.lock
之前删除api
文件。 - 只是使用配置文件是不够的,因为守护进程的 API 地址可能已经通过 ENV 或 flag 覆盖。
在 go-ipfs 中, 下面的命令可以指定 API 的 endpoint:
ipfs --api /ip4/1.2.3.4/tcp/5001 <cmd>
blocks 包含表示本地存储的所有IPFS对象的原始数据,无论是固定(pinned)还是缓存(cached)的数据。 blocks 由 datastore 控制。 例如,它可以将它存储在 leveldb 中,也可以存储在 git 中。
在默认情况下,所有 block 存储于 fs-datastore 中。
配置文件是一个 JSON 或 TOML 文件。 它必须在repo.lock
时才能更改,否则可能会丢失数据。
hooks 目录包含可在特定事件上触发的可执行脚本以便更改 ipfs 节点行为。
keys 目录包含节点有权访问的所有密钥。 密钥以其 hash 命名,扩展名描述它们的密钥类型。格式为 id.{pub,sec}
<key>.pub is a public key
<key>.pri is a private key
<key>.sym is a symmetric secret key
datastore 目录包含用于存储在 leveldb 中用来操作 IPFS 节点的数据。 如果用户改为使用 boltdb datastore,则该目录将命名为boltdb。 因此,每个数据库的数据文件不会发生冲突。 这个目录将来可以能考虑改为 leveldb 命名。
IPFS 将事件日志文件放在 logs 目录中。 最新的日志文件放在 logs/events 中。
repo.lock 阻止对 repo 的并发访问。 它的内容应该是当前持有锁的进程的PID。允许客户端进行检查失败的锁和清理。
> cat .ipfs/repo.lock
42
> ps | grep "ipfs daemon"
42 ttys000 79:05.83 ipfs daemon
version 包含 repo 实现名称和版本。 此格式随时间而变化:
# in version 0
> cat $repo-at-version-0/version
cat: /Users/jbenet/.ipfs/version: No such file or directory
# in versions 1 and 2
> cat $repo-at-version-1/version
1
> cat $repo-at-version-2/version
2
# in versions >3
> cat $repo-at-version-3/version
fs-repo/3
- 目录
- 上一章:IPFS IPLD
- 下一章:IPFS Bitswap