Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions oap-formats/oap-logstream/oap-logstream-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
<artifactId>oap-stdlib-test</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>oap</groupId>
<artifactId>oap-template-test</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import oap.json.Binder;
import oap.logstream.disk.DiskLoggerBackend;
import oap.logstream.formats.rowbinary.RowBinaryUtils;
import oap.template.TemplateEngineFixture;
import oap.template.Types;
import oap.testng.Fixtures;
import oap.testng.TestDirectoryFixture;
Expand All @@ -49,9 +50,11 @@

public class LoggerJsonTest extends Fixtures {
private final TestDirectoryFixture testDirectoryFixture;
private final TemplateEngineFixture templateEngineFixture;

public LoggerJsonTest() {
testDirectoryFixture = fixture( new TestDirectoryFixture() );
templateEngineFixture = fixture( new TemplateEngineFixture() );
}

@Test
Expand All @@ -61,7 +64,7 @@ public void diskJSON() throws IOException {
String content = "{\"title\":\"response\",\"status\":false,\"values\":[1,2,3]}";
String[] headers = new String[] { "test" };
byte[][] types = new byte[][] { new byte[] { Types.STRING.id } };
try( DiskLoggerBackend backend = new DiskLoggerBackend( testDirectoryFixture.testPath( "logs" ), BPH_12, DEFAULT_BUFFER, "localhost" ) ) {
try( DiskLoggerBackend backend = new DiskLoggerBackend( templateEngineFixture.templateEngine, testDirectoryFixture.testPath( "logs" ), BPH_12, DEFAULT_BUFFER, "localhost" ) ) {
Logger logger = new Logger( backend );

SimpleJson o = contentOfTestResource( getClass(), "simple_json.json", ofJson( SimpleJson.class ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import oap.logstream.net.server.SocketLoggerServer;
import oap.message.client.MessageSender;
import oap.message.server.MessageHttpHandler;
import oap.template.TemplateEngineFixture;
import oap.template.Types;
import oap.testng.Fixtures;
import oap.testng.Ports;
Expand Down Expand Up @@ -61,9 +62,11 @@
@Slf4j
public class LoggerTest extends Fixtures {
private final TestDirectoryFixture testDirectoryFixture;
private final TemplateEngineFixture templateEngineFixture;

public LoggerTest() {
testDirectoryFixture = fixture( new TestDirectoryFixture() );
templateEngineFixture = fixture( new TemplateEngineFixture() );
}

@Test
Expand All @@ -78,7 +81,7 @@ public void disk() throws IOException {
byte[] line2 = Compression.gzip( RowBinaryUtils.line( lineData2 ) );
String[] headers2 = new String[] { "TIMESTAMP", "REQUEST_ID2" };
byte[][] types2 = new byte[][] { new byte[] { Types.DATETIME.id }, new byte[] { Types.STRING.id } };
try( DiskLoggerBackend backend = new DiskLoggerBackend( testDirectoryFixture.testPath( "logs" ), BPH_12, DEFAULT_BUFFER, "localhost" ) ) {
try( DiskLoggerBackend backend = new DiskLoggerBackend( templateEngineFixture.templateEngine, testDirectoryFixture.testPath( "logs" ), BPH_12, DEFAULT_BUFFER, "localhost" ) ) {
Logger logger = new Logger( backend );
logger.log( "lfn1", Map.of(), "log", headers1, types1, line1 );
logger.log( "lfn2", Map.of(), "log", headers1, types1, line1 );
Expand Down Expand Up @@ -114,7 +117,7 @@ public void net() throws IOException {
String[] headers2 = new String[] { "TIMESTAMP", "REQUEST_ID2" };
byte[][] types2 = new byte[][] { new byte[] { Types.DATETIME.id }, new byte[] { Types.STRING.id } };

try( DiskLoggerBackend serverBackend = new DiskLoggerBackend( testDirectoryFixture.testPath( "logs" ), BPH_12, DEFAULT_BUFFER, "localhost" );
try( DiskLoggerBackend serverBackend = new DiskLoggerBackend( templateEngineFixture.templateEngine, testDirectoryFixture.testPath( "logs" ), BPH_12, DEFAULT_BUFFER, "localhost" );
SocketLoggerServer server = new SocketLoggerServer( serverBackend );
NioHttpServer mServer = new NioHttpServer( new NioHttpServer.DefaultPort( port ) );
MessageHttpHandler messageHttpHandler = new MessageHttpHandler( mServer, "/messages", controlStatePath, List.of( server ), -1 );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,23 @@

import oap.logstream.LogId;
import oap.logstream.Timestamp;
import oap.template.TemplateEngineFixture;
import oap.template.Types;
import oap.testng.Fixtures;
import oap.util.Dates;
import org.testng.annotations.Test;

import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;

public class AbstractWriterTest {
public class AbstractWriterTest extends Fixtures {
private final TemplateEngineFixture templateEngineFixture;

public AbstractWriterTest() {
templateEngineFixture = fixture( new TemplateEngineFixture() );
}

@Test
public void testFileName() {
String[] h1Headers = new String[] { "h1" };
Expand All @@ -44,15 +52,15 @@ public void testFileName() {

Dates.setTimeFixed( 2023, 1, 23, 21, 6, 0 );

assertThat( AbstractWriter.currentPattern( LogFormat.TSV_GZ, "${LOG_FORMAT_TSV_GZ}-${INTERVAL} -${LOG_VERSION}-#{if}(${ORGANIZATION})${ORGANIZATION}#{else}UNKNOWN#{end}.${LOG_FORMAT}", lid1, Timestamp.BPH_12, 1, Dates.nowUtc(), "localhost" ) )
assertThat( AbstractWriter.currentPattern( templateEngineFixture.templateEngine, LogFormat.TSV_GZ, "{{ LOG_FORMAT_TSV_GZ }}-{{ INTERVAL }}-{{ LOG_VERSION }}-{{% if ORGANIZATION }}{{ ORGANIZATION }}{{% else }}UNKNOWN{{% end }}.{{ LOG_FORMAT }}", lid1, Timestamp.BPH_12, 1, Dates.nowUtc(), "localhost" ) )
.isEqualTo( "ln/tsv.gz-01-855943970-1-UNKNOWN.tsv.gz.rb.gz" );

assertThat( AbstractWriter.currentPattern( LogFormat.TSV_GZ, "${INTERVAL}-${LOG_VERSION}-${ORGANIZATION}.${LOG_FORMAT}", lid1, Timestamp.BPH_12, 1, Dates.nowUtc(), "localhost" ) )
assertThat( AbstractWriter.currentPattern( templateEngineFixture.templateEngine, LogFormat.TSV_GZ, "{{ INTERVAL }}-{{ LOG_VERSION }}-{{ ORGANIZATION }}.{{ LOG_FORMAT }}", lid1, Timestamp.BPH_12, 1, Dates.nowUtc(), "localhost" ) )
.isEqualTo( "ln/01-855943970-1-.tsv.gz.rb.gz" );
assertThat( AbstractWriter.currentPattern( LogFormat.TSV_GZ, "${INTERVAL}-${LOG_VERSION}-${ORGANIZATION}.${LOG_FORMAT}", lid1, Timestamp.BPH_12, 1, Dates.nowUtc(), "localhost" ) )
assertThat( AbstractWriter.currentPattern( templateEngineFixture.templateEngine, LogFormat.TSV_GZ, "{{ INTERVAL }}-{{ LOG_VERSION}}-{{ ORGANIZATION }}.{{ LOG_FORMAT }}", lid1, Timestamp.BPH_12, 1, Dates.nowUtc(), "localhost" ) )
.isEqualTo( "ln/01-855943970-1-.tsv.gz.rb.gz" );

assertThat( AbstractWriter.currentPattern( LogFormat.TSV_GZ, "${LOG_TIME_INTERVAL}.log.gz", lid1, Timestamp.BPH_6, 1, Dates.nowUtc(), "localhost" ) )
assertThat( AbstractWriter.currentPattern( templateEngineFixture.templateEngine, LogFormat.TSV_GZ, "{{ LOG_TIME_INTERVAL }}.log.gz", lid1, Timestamp.BPH_6, 1, Dates.nowUtc(), "localhost" ) )
.isEqualTo( "ln/10.log.gz.rb.gz" );
}

Expand All @@ -64,11 +72,11 @@ public void testFileNameConditional() {
Dates.setTimeFixed( 2023, 1, 23, 21, 6, 0 );

LogId lid1 = new LogId( "ln", "lt", "chn", Map.of(), h1Headers, strTypes );
assertThat( AbstractWriter.currentPattern( LogFormat.TSV_GZ, "#{if}(${ORGANIZATION}&&${ACCOUNT})${ORGANIZATION}/${ACCOUNT}/#{end}${INTERVAL}-${LOG_VERSION}.${LOG_FORMAT}", lid1, Timestamp.BPH_12, 1, Dates.nowUtc(), "localhost" ) )
assertThat( AbstractWriter.currentPattern( templateEngineFixture.templateEngine, LogFormat.TSV_GZ, "{{% if ORGANIZATION and ACCOUNT }}{{ ORGANIZATION }}/{{ ACCOUNT }}/{{% end }}{{ INTERVAL }}-{{ LOG_VERSION }}.{{ LOG_FORMAT }}", lid1, Timestamp.BPH_12, 1, Dates.nowUtc(), "localhost" ) )
.isEqualTo( "ln/01-855943970-1.tsv.gz.rb.gz" );

lid1 = new LogId( "ln", "lt", "chn", Map.of( "ORGANIZATION", "org1", "ACCOUNT", "acc1" ), h1Headers, strTypes );
assertThat( AbstractWriter.currentPattern( LogFormat.PARQUET, "#{if}(${ORGANIZATION}&&${ACCOUNT})${ORGANIZATION}/${ACCOUNT}/#{end}${INTERVAL}-${LOG_VERSION}.${LOG_FORMAT}", lid1, Timestamp.BPH_12, 1, Dates.nowUtc(), "localhost" ) )
assertThat( AbstractWriter.currentPattern( templateEngineFixture.templateEngine, LogFormat.PARQUET, "{{% if ORGANIZATION and ACCOUNT }}{{ ORGANIZATION }}/{{ ACCOUNT }}/{{% end }}{{ INTERVAL }}-{{ LOG_VERSION }}.{{ LOG_FORMAT }}", lid1, Timestamp.BPH_12, 1, Dates.nowUtc(), "localhost" ) )
.isEqualTo( "ln/org1/acc1/01-855943970-1.parquet.rb.gz" );
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import oap.logstream.Logger;
import oap.logstream.Timestamp;
import oap.logstream.formats.rowbinary.RowBinaryUtils;
import oap.template.TemplateEngineFixture;
import oap.template.Types;
import oap.testng.Fixtures;
import oap.testng.TestDirectoryFixture;
Expand All @@ -48,14 +49,16 @@

public class DiskLoggerBackendTest extends Fixtures {
private final TestDirectoryFixture testDirectoryFixture;
private final TemplateEngineFixture templateEngineFixture;

public DiskLoggerBackendTest() {
testDirectoryFixture = fixture( new TestDirectoryFixture() );
templateEngineFixture = fixture( new TemplateEngineFixture() );
}

@Test
public void spaceAvailable() {
try( DiskLoggerBackend backend = new DiskLoggerBackend( testDirectoryFixture.testPath( "logs" ), Timestamp.BPH_12, 4000, "localhost" ) ) {
try( DiskLoggerBackend backend = new DiskLoggerBackend( templateEngineFixture.templateEngine, testDirectoryFixture.testPath( "logs" ), Timestamp.BPH_12, 4000, "localhost" ) ) {
backend.start();

assertTrue( backend.isLoggingAvailable() );
Expand All @@ -73,7 +76,7 @@ public void testPatternByType() throws IOException {
byte[][] types = new byte[][] { new byte[] { Types.STRING.id }, new byte[] { Types.STRING.id } };
byte[] lines = Compression.gzip( RowBinaryUtils.lines( List.of( List.of( "12345678", "rrrr5678" ), List.of( "1", "2" ) ) ) );

try( DiskLoggerBackend backend = new DiskLoggerBackend( testDirectoryFixture.testPath( "logs" ), Timestamp.BPH_12, 4000, "localhost" ) ) {
try( DiskLoggerBackend backend = new DiskLoggerBackend( templateEngineFixture.templateEngine, testDirectoryFixture.testPath( "logs" ), Timestamp.BPH_12, 4000, "localhost" ) ) {
backend.filePattern = "${LOG_TYPE}_${LOG_VERSION}_${INTERVAL}.tsv.gz";
backend.filePatternByType.put( "LOG_TYPE_WITH_DIFFERENT_FILE_PATTERN",
new DiskLoggerBackend.FilePatternConfiguration( "${LOG_TYPE}_${LOG_VERSION}_${MINUTE}.parquet" ) );
Expand Down Expand Up @@ -106,7 +109,7 @@ public void testWriteSync() throws IOException {
byte[][] types = new byte[][] { new byte[] { Types.STRING.id }, new byte[] { Types.STRING.id } };
byte[] lines = Compression.gzip( RowBinaryUtils.lines( List.of( List.of( "12345678", "rrrr5678" ), List.of( "1", "2" ) ) ) );
//init new logger
try( DiskLoggerBackend backend = new DiskLoggerBackend( testDirectoryFixture.testPath( "logs" ), BPH_12, DEFAULT_BUFFER, "localhost" ) ) {
try( DiskLoggerBackend backend = new DiskLoggerBackend( templateEngineFixture.templateEngine, testDirectoryFixture.testPath( "logs" ), BPH_12, DEFAULT_BUFFER, "localhost" ) ) {
backend.start();

Logger logger = new Logger( backend );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import oap.compression.Compression;
import oap.logstream.LogId;
import oap.logstream.formats.rowbinary.RowBinaryUtils;
import oap.template.TemplateEngineFixture;
import oap.template.Types;
import oap.testng.Fixtures;
import oap.testng.TestDirectoryFixture;
Expand All @@ -49,9 +50,11 @@
public class RowBinaryWriterTest extends Fixtures {
private static final String FILE_PATTERN = "${p}-file-${INTERVAL}-${LOG_VERSION}.rb.gz";
private final TestDirectoryFixture testDirectoryFixture;
private final TemplateEngineFixture templateEngineFixture;

public RowBinaryWriterTest() {
testDirectoryFixture = fixture( new TestDirectoryFixture() );
templateEngineFixture = fixture( new TemplateEngineFixture() );
}

@Test
Expand All @@ -78,7 +81,7 @@ public void testWrite() throws IOException {
LogId logId = new LogId( "", "log", "log",
Map.of( "p", "1" ), headers, types );
Path logs = testDirectoryFixture.testPath( "logs" );
try( RowBinaryWriter writer = new RowBinaryWriter( logs, FILE_PATTERN, logId, 1024, BPH_12, 20, "localhost" ) ) {
try( RowBinaryWriter writer = new RowBinaryWriter( templateEngineFixture.templateEngine, logs, FILE_PATTERN, logId, 1024, BPH_12, 20, "localhost" ) ) {
writer.write( CURRENT_PROTOCOL_VERSION, content1 );
writer.write( CURRENT_PROTOCOL_VERSION, content2 );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,13 @@

package oap.logstream;

import oap.io.Closeables;
import oap.kubernetes.ReplicaUtils;
import oap.net.Inet;
import org.apache.velocity.VelocityContext;
import org.apache.velocity.app.VelocityEngine;
import org.apache.velocity.app.event.EventCartridge;
import org.apache.velocity.app.event.ReferenceInsertionEventHandler;
import org.apache.velocity.context.Context;
import oap.reflect.TypeRef;
import oap.template.TemplateAccumulators;
import oap.template.TemplateEngine;
import org.joda.time.DateTime;

import java.io.StringWriter;
import java.util.LinkedHashMap;
import java.util.Map;

Expand All @@ -45,8 +41,6 @@
* @see oap.logstream.disk.AbstractWriter
*/
public class LogIdTemplate {
private final VelocityEngine engine = new VelocityEngine();

private final LogId logId;
private final LinkedHashMap<String, String> variables = new LinkedHashMap<>();

Expand All @@ -66,29 +60,16 @@ public LogIdTemplate addVariables( Map<String, String> variables ) {
return this;
}

public String render( String template, DateTime time, Timestamp timestamp, int version, String hostname ) {
VelocityContext context = new VelocityContext();
EventCartridge eventCartridge = new EventCartridge();
context.attachEventCartridge( eventCartridge );
eventCartridge.addReferenceInsertionEventHandler( new ReferenceInsertionEventHandler() {
@Override
public Object referenceInsert( Context context, String s, Object o ) {
return o == null ? "" : o;
}
} );

public String render( TemplateEngine templateEngine, String template, DateTime time, Timestamp timestamp, int version, String hostname ) {
LinkedHashMap<String, String> context = new LinkedHashMap<>();
init( context, time, timestamp, version, hostname );

variables.forEach( context::put );

StringWriter writer = new StringWriter();
engine.evaluate( context, writer, "log-id-template", template );
Closeables.close( writer );
context.putAll( variables );

return writer.toString();
return templateEngine.getRuntimeTemplate( "LogIdTemplate", new TypeRef<Map<String, String>>() {}, template, TemplateAccumulators.STRING, _ -> {} ).render( context ).get();
}

public void init( VelocityContext context, DateTime time, Timestamp timestamp, int version, String hostname ) {
public void init( Map<String, String> context, DateTime time, Timestamp timestamp, int version, String hostname ) {
context.put( "LOG_TYPE", logId.logType );
context.put( "LOG_VERSION", getHashWithVersion( version, hostname ) );
context.put( "SERVER_HOST", Inet.HOSTNAME );
Expand All @@ -103,7 +84,7 @@ public void init( VelocityContext context, DateTime time, Timestamp timestamp, i
context.put( "LOG_TIME_INTERVAL", String.valueOf( 60 / timestamp.bucketsPerHour ) );
context.put( "REGION", System.getenv( "REGION" ) );

logId.properties.forEach( context::put );
context.putAll( logId.properties );
}

public String getHashWithVersion( int version, String hostname ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import oap.logstream.LogStreamProtocol.ProtocolVersion;
import oap.logstream.LoggerException;
import oap.logstream.Timestamp;
import oap.template.TemplateEngine;
import oap.util.Dates;
import org.codehaus.plexus.util.StringUtils;
import org.joda.time.DateTime;

import java.io.Closeable;
Expand All @@ -48,6 +48,7 @@
@Slf4j
public abstract class AbstractWriter<T extends Closeable> implements Closeable {
public final LogFormat logFormat;
protected final TemplateEngine templateEngine;
protected final Path logDirectory;
protected final String filePattern;
protected final LogId logId;
Expand All @@ -63,8 +64,9 @@ public abstract class AbstractWriter<T extends Closeable> implements Closeable {
protected int fileVersion = 1;
protected boolean closed = false;

protected AbstractWriter( LogFormat logFormat, Path logDirectory, String filePattern, LogId logId, int bufferSize, Timestamp timestamp,
protected AbstractWriter( TemplateEngine templateEngine, LogFormat logFormat, Path logDirectory, String filePattern, LogId logId, int bufferSize, Timestamp timestamp,
int maxVersions, String hostname ) {
this.templateEngine = templateEngine;
this.logFormat = logFormat;
this.logDirectory = logDirectory;
this.filePattern = filePattern;
Expand All @@ -82,7 +84,7 @@ protected AbstractWriter( LogFormat logFormat, Path logDirectory, String filePat
}

@SneakyThrows
static String currentPattern( LogFormat logFormat, String filePattern, LogId logId, Timestamp timestamp, int version, DateTime time, String hostname ) {
static String currentPattern( TemplateEngine templateEngine, LogFormat logFormat, String filePattern, LogId logId, Timestamp timestamp, int version, DateTime time, String hostname ) {
String suffix = filePattern;
if( filePattern.startsWith( "/" ) && filePattern.endsWith( "/" ) ) suffix = suffix.substring( 1 );
else if( !filePattern.startsWith( "/" ) && !logId.filePrefixPattern.endsWith( "/" ) ) suffix = "/" + suffix;
Expand All @@ -97,15 +99,15 @@ static String currentPattern( LogFormat logFormat, String filePattern, LogId log
logIdTemplate
.addVariable( "LOG_FORMAT", logFormat.extension )
.addVariable( "LOG_FORMAT_" + logFormat.name(), logFormat.extension );
return logIdTemplate.render( StringUtils.replace( pattern, " ", "" ), time, timestamp, version, hostname );
return logIdTemplate.render( templateEngine, pattern, time, timestamp, version, hostname );
}

protected String currentPattern( int version ) {
return currentPattern( logFormat, filePattern, logId, timestamp, version, Dates.nowUtc(), hostname );
return currentPattern( templateEngine, logFormat, filePattern, logId, timestamp, version, Dates.nowUtc(), hostname );
}

protected String currentPattern() {
return currentPattern( logFormat, filePattern, logId, timestamp, fileVersion, Dates.nowUtc(), hostname );
return currentPattern( templateEngine, logFormat, filePattern, logId, timestamp, fileVersion, Dates.nowUtc(), hostname );
}

public void write( ProtocolVersion protocolVersion, byte[] buffer ) throws LoggerException {
Expand Down
Loading
Loading