Skip to content

Commit

Permalink
Add export option for channel files
Browse files Browse the repository at this point in the history
  • Loading branch information
fireduck64 committed Feb 18, 2020
1 parent 26e5e84 commit 3ea2f99
Show file tree
Hide file tree
Showing 10 changed files with 342 additions and 35 deletions.
2 changes: 1 addition & 1 deletion WORKSPACE
Expand Up @@ -77,7 +77,7 @@ git_repository(
git_repository(
name = "duckutil",
remote = "https://github.com/fireduck64/duckutil",
commit = "a0319cf745eea482f6cab36fe54b811c7b15bb69",
commit = "e022564cedb0d241c8e9a297eca74f6d064f7aa8",
shallow_since = "1576867546 -0800",
)

Expand Down
33 changes: 23 additions & 10 deletions src/BlockGenUtils.java
Expand Up @@ -8,6 +8,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.TreeMap;
import java.util.logging.Logger;
import snowblossom.channels.proto.*;
import snowblossom.lib.AddressSpecHash;
import snowblossom.lib.AddressUtil;
Expand All @@ -20,6 +21,8 @@

public class BlockGenUtils
{
private static final Logger logger = Logger.getLogger("snowblossom.channels");

/** Creates a new channel and publishes block 0 for it */
public static ChannelID createChannel(ChannelNode node, WalletDatabase admin, String display_name)
throws ValidationException
Expand Down Expand Up @@ -201,18 +204,24 @@ public static void createBlockForFilesMultipart(ChannelContext ctx, MultipartSli
public static void createBlockForFiles(ChannelContext ctx, File base_path, WalletDatabase admin, StatusInterface status)
throws ValidationException, java.io.IOException
{

if (status == null) status = new DummyStatusInterface();

ProcessStatus ps = new ProcessStatus();
ps.set("cid", ctx.cid.toString());
ps.set("path", base_path.toString());
while(true)
{
if (createSingleBlockForFiles(ctx, base_path, admin, status)) return;
status.setStatus("Importing files: " + ps.getStatusLine());
if (createSingleBlockForFiles(ctx, base_path, admin, status, ps)) break;
}

status.setStatus("Import complete: " + ps.getStatusLine());
}

/**
* Creates a block for the files in the directory and broadcasts it to the channel
* @return true if all files fit in one block, false if there are blocks to write
*/
public static boolean createSingleBlockForFiles(ChannelContext ctx, File base_path, WalletDatabase admin, StatusInterface status)
public static boolean createSingleBlockForFiles(ChannelContext ctx, File base_path, WalletDatabase admin, StatusInterface status, ProcessStatus ps)
throws ValidationException, java.io.IOException
{
ChannelBlockSummary prev_sum = ctx.block_ingestor.getHead();
Expand All @@ -232,10 +241,8 @@ public static boolean createSingleBlockForFiles(ChannelContext ctx, File base_pa
ContentInfo.Builder file_map_ci = ContentInfo.newBuilder();
file_map_ci.setContentHash( ByteString.copyFrom(DigestUtil.getMD().digest(new byte[0])) );

if (status != null) status.setStatus("Adding files to block: " + header.getBlockHeight());
boolean all_fit = addFiles(ctx, base_path, "", blk, file_map_ci, admin);
boolean all_fit = addFiles(ctx, base_path, "", blk, file_map_ci, admin, status, ps);

if (status != null) status.setStatus("Building block:" + header.getBlockHeight() );
blk.addContent(
ChannelSigUtil.signMessage( admin.getAddresses(0),admin.getKeys(0),
SignedMessagePayload.newBuilder().setContentInfo(file_map_ci.build()).build()));
Expand All @@ -251,7 +258,7 @@ public static boolean createSingleBlockForFiles(ChannelContext ctx, File base_pa
blk.setSignedHeader( ChannelSigUtil.signMessage(admin.getAddresses(0), admin.getKeys(0),
SignedMessagePayload.newBuilder().setChannelBlockHeader(header.build()).build()));

if (status != null) status.setStatus("Ingesting block:" + header.getBlockHeight() );
ps.add("blocks_added");
ctx.block_ingestor.ingestBlock(blk.build());

return all_fit;
Expand All @@ -262,7 +269,8 @@ public static boolean createSingleBlockForFiles(ChannelContext ctx, File base_pa
* @return true iff we were able to add all files
*/
private static boolean addFiles(ChannelContext ctx, File path,
String prefix, ChannelBlock.Builder blk, ContentInfo.Builder file_map_ci, WalletDatabase sig)
String prefix, ChannelBlock.Builder blk, ContentInfo.Builder file_map_ci, WalletDatabase sig,
StatusInterface status, ProcessStatus ps)
throws ValidationException, java.io.IOException
{
if (blk.build().toByteString().size() + file_map_ci.build().toByteString().size() > Globals.MAX_BLOCK_SIZE*3/4) return false;
Expand All @@ -271,7 +279,7 @@ private static boolean addFiles(ChannelContext ctx, File path,
{
for(File f : path.listFiles())
{
boolean res = addFiles(ctx, f, prefix + "/" + f.getName(), blk, file_map_ci, sig);
boolean res = addFiles(ctx, f, prefix + "/" + f.getName(), blk, file_map_ci, sig, status, ps);
if (!res) return false;
}
}
Expand All @@ -281,6 +289,7 @@ private static boolean addFiles(ChannelContext ctx, File path,
ContentInfo.Builder ci = ContentInfo.newBuilder();
ci.setContentLength(len);


String mime = Mimer.guessContentType(path.getName());
if (mime != null)
{
Expand Down Expand Up @@ -332,6 +341,7 @@ private static boolean addFiles(ChannelContext ctx, File path,
SignedMessagePayload.newBuilder().setContentInfo(ci.build()).build());

blk.addContent(sm);
ps.add("files_saved");
file_map_ci.putChanMapUpdates("/web" + prefix, sm.getMessageId());

din = new DataInputStream(new FileInputStream(path));
Expand All @@ -352,6 +362,9 @@ private static boolean addFiles(ChannelContext ctx, File path,
.setChunkData(ByteString.copyFrom(b))
.build()
,true, ci.build());
ps.add("chunks_saved");
ps.add("total_bytes_saved", read_len);
status.setStatus("Importing files: " + ps.getStatusLine());

chunk_count++;
}
Expand Down
152 changes: 152 additions & 0 deletions src/BlockReadUtils.java
@@ -0,0 +1,152 @@
package snowblossom.channels;

import com.google.protobuf.ByteString;
import duckutil.AtomicFileOutputStream;
import java.io.File;
import java.io.OutputStream;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.logging.Level;
import java.util.logging.Logger;
import snowblossom.channels.proto.*;
import snowblossom.lib.ChainHash;
import snowblossom.lib.ValidationException;
import snowblossom.node.StatusInterface;

public class BlockReadUtils
{
private static final Logger logger = Logger.getLogger("snowblossom.channels");

public static void extractFiles(ChannelContext ctx, File base_path, StatusInterface status)
throws ValidationException, java.io.IOException
{

if (status == null) status = new DummyStatusInterface();

ProcessStatus ps = new ProcessStatus();

ps.set("cid", ctx.cid.toString());
ps.set("path", base_path.toString());

Map<String, ByteString> data_map = ChanDataUtils.getAllData(ctx, "/web/");

ps.add("total_files", data_map.size());

base_path.mkdirs();

for(Map.Entry<String, ByteString> me : data_map.entrySet())
{
status.setStatus("Extraction in progress: " + ps.getStatusLine());

// Now, we have to assume that the data is hostile
String key = me.getKey();

key = key.substring(5); // skip the "/web/" part
ChainHash msg_id = new ChainHash(me.getValue());

try
{
extractFile(ctx, base_path, key, msg_id, ps);
}
catch(Throwable t)
{
ps.add("error");
ps.add("error_undefined");
logger.log(Level.WARNING,"Extract file", t);
}
}

status.setStatus("Extraction complete: " + ps.getStatusLine());

}

private static void extractFile(ChannelContext ctx, File base_path, String file_key, ChainHash msg_id, ProcessStatus ps)
throws ValidationException, java.io.IOException
{
File curr_path = base_path;
File last_parent = null;

StringTokenizer stok = new StringTokenizer(file_key, "/");
while(stok.hasMoreTokens())
{
String name = stok.nextToken();
if (name.equals("..") || (name.length()==0) || (name.equals(".")))
{
ps.add("error");
ps.add("error_malformed_path");
logger.log(Level.WARNING,"rejected Key path: " + file_key);
return;
}

last_parent = curr_path;
last_parent.mkdir();

curr_path = new File(last_parent, name);

// Being extra paranoid.
// Only accept if the parent of the new file is actually the same as the parent
if (!last_parent.getCanonicalPath().equals( curr_path.getParentFile().getCanonicalPath()))
{
ps.add("error");
ps.add("error_malformed_path");
logger.log(Level.WARNING,"rejected Key path: " + file_key);
return;
}
}

SignedMessage sm = ctx.db.getContentMap().get(msg_id.getBytes());
if (sm == null)
{
ps.add("error");
ps.add("error_no_content");
return;
}

ContentInfo ci = ChannelSigUtil.quickPayload(sm).getContentInfo();

AtomicFileOutputStream out = new AtomicFileOutputStream(curr_path);
try
{
streamContentOut(ctx, msg_id, ci, out);

out.flush();
out.close();
ps.add("complete");
ps.add("total_bytes", ci.getContentLength());
}
catch(java.io.IOException e)
{
out.abort();
ps.add("error");
ps.add("error_io");
}

}


public static void streamContentOut(ChannelContext ctx, ChainHash msg_id, ContentInfo ci, OutputStream out)
throws java.io.IOException
{
// TODO - check hash of stream as we stream it
if (ci.getContentLength() == ci.getContent().size())
{
out.write(ci.getContent().toByteArray());
return;
}

int total_chunks = MiscUtils.getNumberOfChunks(ci);

for(int i=0; i<total_chunks; i++)
{
ByteString chunk_data = ChunkMapUtils.getChunk(ctx, msg_id, i);
logger.finer("Get chunk data: " + msg_id + " " + i + " sz:" + chunk_data.size());
if ((chunk_data == null) || (chunk_data.size() == 0))
{
logger.warning("Missing chunk data: " + msg_id + "/" + i);
throw new java.io.IOException("Missing chunk data: " + msg_id + "/" + i);
}
out.write(chunk_data.toByteArray());
}
}

}
25 changes: 25 additions & 0 deletions src/ChanDataUtils.java
@@ -1,10 +1,15 @@
package snowblossom.channels;

import com.google.protobuf.ByteString;
import java.util.Map;
import java.util.TreeMap;
import java.util.logging.Logger;
import snowblossom.channels.proto.ChannelBlockSummary;

public class ChanDataUtils
{
private static final Logger logger = Logger.getLogger("snowblossom.channels");

public static ByteString getData(ChannelContext ctx, String key)
{
ChannelBlockSummary summary = ctx.block_ingestor.getHead();
Expand All @@ -14,6 +19,26 @@ public static ByteString getData(ChannelContext ctx, String key)
return ctx.db.getDataTrie().getLeafData(summary.getDataRootHash(), key_bs);
}

public static Map<String,ByteString> getAllData(ChannelContext ctx, String base_key)
{
TreeMap<String, ByteString> result_map = new TreeMap<>();

ChannelBlockSummary summary = ctx.block_ingestor.getHead();
if (summary == null)
{
logger.info("No head for channel to get data from");
}
else
{
TreeMap<ByteString, ByteString> m = ctx.db.getDataTrie().getDataMap(summary.getDataRootHash(), ByteString.copyFrom(base_key.getBytes()), 1000000);

for(Map.Entry<ByteString, ByteString> me : m.entrySet())
{
String k = new String(me.getKey().toByteArray());
result_map.put(k, me.getValue());
}
}
return result_map;
}

}
2 changes: 1 addition & 1 deletion src/ChannelGlobals.java
Expand Up @@ -3,7 +3,7 @@

public class ChannelGlobals
{
public static final String VERSION = "dev.2020.02.13.02";
public static final String VERSION = "dev.2020.02.18.00";

public static final String NODE_ADDRESS_STRING="node";
public static final String CHANNEL_ADDRESS_STRING="chan";
Expand Down
13 changes: 13 additions & 0 deletions src/DummyStatusInterface.java
@@ -0,0 +1,13 @@
package snowblossom.channels;
import snowblossom.node.StatusInterface;

public class DummyStatusInterface implements StatusInterface
{

@Override
public void setStatus(String s)
{

}

}

0 comments on commit 3ea2f99

Please sign in to comment.