Skip to content

Commit

Permalink
Merge pull request #1010 from Microsoft/feature/identity-date-column
Browse files Browse the repository at this point in the history
Filter logs from DB by AuthToken time period
  • Loading branch information
MatkovIvan committed Mar 20, 2019
2 parents 7308d34 + 7789599 commit 4477c1d
Show file tree
Hide file tree
Showing 10 changed files with 317 additions and 88 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
public class DefaultChannel implements Channel {

/**
* Persistence batch size for {@link Persistence#getLogs(String, Collection, int, List)} when clearing.
* Persistence batch size for {@link Persistence#getLogs(String, Collection, int, List, Date)} when clearing.
*/
@VisibleForTesting
static final int CLEAR_BATCH_SIZE = 100;
Expand Down Expand Up @@ -409,7 +409,7 @@ private void suspend(boolean deleteLogs, Exception exception) {

private void deleteLogsOnSuspended(final GroupState groupState) {
final List<Log> logs = new ArrayList<>();
mPersistence.getLogs(groupState.mName, Collections.<String>emptyList(), CLEAR_BATCH_SIZE, logs);
mPersistence.getLogs(groupState.mName, Collections.<String>emptyList(), CLEAR_BATCH_SIZE, logs, null);
if (logs.size() > 0 && groupState.mListener != null) {
for (Log log : logs) {
groupState.mListener.onBeforeSending(log);
Expand Down Expand Up @@ -457,7 +457,7 @@ private synchronized void triggerIngestion(final @NonNull GroupState groupState)
/* Get a batch from Persistence. */
final List<Log> batch = new ArrayList<>(maxFetch);
final int stateSnapshot = mCurrentState;
final String batchId = mPersistence.getLogs(groupState.mName, groupState.mPausedTargetKeys, maxFetch, batch);
final String batchId = mPersistence.getLogs(groupState.mName, groupState.mPausedTargetKeys, maxFetch, batch, null);

/* Decrement counter. */
groupState.mPendingLogCount -= maxFetch;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
Expand Down Expand Up @@ -61,6 +62,12 @@ public class DatabasePersistence extends Persistence {
@VisibleForTesting
static final int VERSION_TARGET_KEY = 3;

/**
* Version of the schema that introduced persistence priority for logs.
*/
@VisibleForTesting
static final int VERSION_PRIORITY_KEY = 4;

/**
* Table name.
*/
Expand All @@ -79,6 +86,12 @@ public class DatabasePersistence extends Persistence {
@VisibleForTesting
static final String COLUMN_LOG = "log";

/**
* Name of date column in the table.
*/
@VisibleForTesting
static final String COLUMN_TIMESTAMP = "timestamp";

/**
* Name of target token column in the table.
*/
Expand All @@ -103,12 +116,11 @@ public class DatabasePersistence extends Persistence {
@VisibleForTesting
static final String COLUMN_PRIORITY = "priority";


/**
* Table schema for Persistence.
*/
@VisibleForTesting
static final ContentValues SCHEMA = getContentValues("", "", "", "", "", 0);
static final ContentValues SCHEMA = getContentValues("", "", "", "", "", 0, 0L);

/**
* Database name.
Expand All @@ -119,7 +131,7 @@ public class DatabasePersistence extends Persistence {
/**
* Current version of the schema.
*/
private static final int VERSION = 4;
private static final int VERSION = 5;

/**
* Priority index.
Expand Down Expand Up @@ -217,7 +229,10 @@ public boolean onUpgrade(SQLiteDatabase db, int oldVersion, int newVersion) {
if (oldVersion < VERSION_TARGET_KEY) {
db.execSQL("ALTER TABLE " + TABLE + " ADD COLUMN `" + COLUMN_TARGET_KEY + "` TEXT");
}
db.execSQL("ALTER TABLE " + TABLE + " ADD COLUMN `" + COLUMN_PRIORITY + "` INTEGER DEFAULT " + PERSISTENCE_NORMAL);
if (oldVersion < VERSION_PRIORITY_KEY) {
db.execSQL("ALTER TABLE " + TABLE + " ADD COLUMN `" + COLUMN_PRIORITY + "` INTEGER DEFAULT " + PERSISTENCE_NORMAL);
}
db.execSQL("ALTER TABLE " + TABLE + " ADD COLUMN `" + COLUMN_TIMESTAMP + "` INTEGER DEFAULT 0");
createPriorityIndex(db);
return true;
}
Expand All @@ -238,14 +253,15 @@ public boolean onUpgrade(SQLiteDatabase db, int oldVersion, int newVersion) {
* @param priority The persistence priority.
* @return A {@link ContentValues} instance.
*/
private static ContentValues getContentValues(@Nullable String group, @Nullable String logJ, String targetToken, String type, String targetKey, int priority) {
private static ContentValues getContentValues(@Nullable String group, @Nullable String logJ, String targetToken, String type, String targetKey, int priority, Long timestamp) {
ContentValues values = new ContentValues();
values.put(COLUMN_GROUP, group);
values.put(COLUMN_LOG, logJ);
values.put(COLUMN_TARGET_TOKEN, targetToken);
values.put(COLUMN_DATA_TYPE, type);
values.put(COLUMN_TARGET_KEY, targetKey);
values.put(COLUMN_PRIORITY, priority);
values.put(COLUMN_TIMESTAMP, timestamp);
return values;
}

Expand Down Expand Up @@ -285,7 +301,7 @@ public long putLog(@NonNull Log log, @NonNull String group, @IntRange(from = Fla
throw new PersistenceException("Log is too large (" + payloadSize + " bytes) to store in database. " +
"Current maximum database size is " + maxSize + " bytes.");
}
contentValues = getContentValues(group, isLargePayload ? null : payload, targetToken, log.getType(), targetKey, Flags.getPersistenceFlag(flags, false));
contentValues = getContentValues(group, isLargePayload ? null : payload, targetToken, log.getType(), targetKey, Flags.getPersistenceFlag(flags, false), log.getTimestamp().getTime());
long databaseId = mDatabaseManager.put(contentValues, COLUMN_PRIORITY);
if (databaseId == -1) {
throw new PersistenceException("Failed to store a log to the Persistence database for log type " + log.getType() + ".");
Expand Down Expand Up @@ -408,16 +424,16 @@ public int countLogs(@NonNull String group) {

@Override
@Nullable
public String getLogs(@NonNull String group, @NonNull Collection<String> pausedTargetKeys, @IntRange(from = 0) int limit, @NonNull List<Log> outLogs) {
public String getLogs(@NonNull String group, @NonNull Collection<String> pausedTargetKeys, @IntRange(from = 0) int limit, @NonNull List<Log> outLogs, @Nullable Date timestamp) {

/* Log. */
AppCenterLog.debug(LOG_TAG, "Trying to get " + limit + " logs from the Persistence database for " + group);

/* Query database. */
SQLiteQueryBuilder builder = SQLiteUtils.newSQLiteQueryBuilder();
builder.appendWhere(COLUMN_GROUP + " = ?");
String[] selectionArgs = new String[pausedTargetKeys.size() + 1];
selectionArgs[0] = group;
List<String> selectionArgs = new ArrayList<>();
selectionArgs.add(group);
if (!pausedTargetKeys.isEmpty()) {
StringBuilder filter = new StringBuilder();
for (int i = 0; i < pausedTargetKeys.size(); i++) {
Expand All @@ -426,18 +442,26 @@ public String getLogs(@NonNull String group, @NonNull Collection<String> pausedT
filter.deleteCharAt(filter.length() - 1);
builder.appendWhere(" AND ");
builder.appendWhere(COLUMN_TARGET_KEY + " NOT IN (" + filter.toString() + ")");
System.arraycopy(pausedTargetKeys.toArray(new String[0]), 0, selectionArgs, 1, pausedTargetKeys.size());
selectionArgs.addAll(pausedTargetKeys);
}

/* Filter by time. */
if (timestamp != null) {
builder.appendWhere(" AND ");
builder.appendWhere(COLUMN_TIMESTAMP + " <= ?");
selectionArgs.add(String.valueOf(timestamp.getTime()));
}

/* Add logs to output parameter after deserialization if logs are not already sent. */
int count = 0;
Map<Long, Log> candidates = new LinkedHashMap<>();
List<Long> failedDbIdentifiers = new ArrayList<>();
File largePayloadGroupDirectory = getLargePayloadGroupDirectory(group);
String[] selectionArgsArray = selectionArgs.toArray(new String[0]);
Cursor cursor = null;
ContentValues values;
try {
cursor = mDatabaseManager.getCursor(builder, null, selectionArgs, GET_SORT_ORDER);
cursor = mDatabaseManager.getCursor(builder, null, selectionArgsArray, GET_SORT_ORDER);
} catch (RuntimeException e) {
AppCenterLog.error(LOG_TAG, "Failed to get logs: ", e);
}
Expand All @@ -454,7 +478,7 @@ public String getLogs(@NonNull String group, @NonNull Collection<String> pausedT
*/
if (dbIdentifier == null) {
AppCenterLog.error(LOG_TAG, "Empty database record, probably content was larger than 2MB, need to delete as it's now corrupted.");
List<Long> corruptedIds = getCorruptedIds(builder, selectionArgs);
List<Long> corruptedIds = getCorruptedIds(builder, selectionArgsArray);
for (Long corruptedId : corruptedIds) {
if (!mPendingDbIdentifiers.contains(corruptedId) && !candidates.containsKey(corruptedId)) {

Expand Down Expand Up @@ -586,4 +610,4 @@ private List<Long> getCorruptedIds(SQLiteQueryBuilder builder, String[] selectio
}
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.io.Closeable;
import java.util.Collection;
import java.util.Date;
import java.util.List;

/**
Expand Down Expand Up @@ -69,13 +70,14 @@ public abstract long putLog(@NonNull Log log, @NonNull String group,
* @param pausedTargetKeys List of target token keys to exclude from the log query.
* @param limit The max number of logs to be returned.
* @param outLogs A list to receive {@link Log} objects.
* @param timestamp A time to select only logs with time before specified.
* @return An ID for {@code outLogs}. {@code null} if no logs exist.
*/
@Nullable
public abstract String getLogs(@NonNull String group, @NonNull Collection<String> pausedTargetKeys, @IntRange(from = 0) int limit, @NonNull List<Log> outLogs);
public abstract String getLogs(@NonNull String group, @NonNull Collection<String> pausedTargetKeys, @IntRange(from = 0) int limit, @NonNull List<Log> outLogs, @Nullable Date timestamp);

/**
* Clears all associations between logs of the {@code group} and ids returned by {@link #getLogs(String, Collection, int, List)}}.
* Clears all associations between logs of the {@code group} and ids returned by {@link #getLogs(String, Collection, int, List, Date)}}.
*/
public abstract void clearPendingLogState();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.junit.Test;

import java.io.IOException;
import java.util.Date;
import java.util.UUID;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -77,7 +78,7 @@ public void useAlternateIngestion() throws IOException {
Persistence mockPersistence = mock(Persistence.class);
Ingestion defaultIngestion = mock(Ingestion.class);
Ingestion alternateIngestion = mock(Ingestion.class);
when(mockPersistence.getLogs(any(String.class), anyListOf(String.class), anyInt(), anyListOf(Log.class))).then(getGetLogsAnswer(1));
when(mockPersistence.getLogs(any(String.class), anyListOf(String.class), anyInt(), anyListOf(Log.class), any(Date.class))).then(getGetLogsAnswer(1));
DefaultChannel channel = new DefaultChannel(mock(Context.class), UUIDUtils.randomUUID().toString(), mockPersistence, defaultIngestion, mAppCenterHandler);
channel.addGroup(TEST_GROUP, 1, BATCH_TIME_INTERVAL, MAX_PARALLEL_BATCHES, alternateIngestion, null);

Expand Down Expand Up @@ -114,7 +115,7 @@ public void startWithoutAppSecret() throws Persistence.PersistenceException {

/* Simulate we have 1 pending log in storage. */
when(mockPersistence.countLogs(anyString())).thenReturn(1);
when(mockPersistence.getLogs(any(String.class), anyListOf(String.class), anyInt(), anyListOf(Log.class))).then(getGetLogsAnswer(1));
when(mockPersistence.getLogs(any(String.class), anyListOf(String.class), anyInt(), anyListOf(Log.class), any(Date.class))).then(getGetLogsAnswer(1));

/* Create channel and groups. */
DefaultChannel channel = new DefaultChannel(mock(Context.class), null, mockPersistence, defaultIngestion, mAppCenterHandler);
Expand Down Expand Up @@ -158,7 +159,7 @@ public void sendPendingLogsAfterSettingAppSecret() {
Persistence mockPersistence = mock(Persistence.class);
Ingestion defaultIngestion = mock(Ingestion.class);
Ingestion alternateIngestion = mock(Ingestion.class);
when(mockPersistence.getLogs(any(String.class), anyListOf(String.class), anyInt(), anyListOf(Log.class))).then(getGetLogsAnswer(1));
when(mockPersistence.getLogs(any(String.class), anyListOf(String.class), anyInt(), anyListOf(Log.class), any(Date.class))).then(getGetLogsAnswer(1));

/* Simulate we have 1 pending log in storage. */
when(mockPersistence.countLogs(anyString())).thenReturn(1);
Expand All @@ -185,7 +186,7 @@ public void pendingLogsDisableSetAppSecretThenEnable() {
Persistence mockPersistence = mock(Persistence.class);
Ingestion defaultIngestion = mock(Ingestion.class);
Ingestion alternateIngestion = mock(Ingestion.class);
when(mockPersistence.getLogs(any(String.class), anyListOf(String.class), anyInt(), anyListOf(Log.class))).then(getGetLogsAnswer(1));
when(mockPersistence.getLogs(any(String.class), anyListOf(String.class), anyInt(), anyListOf(Log.class), any(Date.class))).then(getGetLogsAnswer(1));

/* Simulate we have 1 pending log in storage for App Center. */
when(mockPersistence.countLogs(appCenterGroup)).thenReturn(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.junit.Test;
import org.mockito.Matchers;

import java.util.Date;
import java.util.List;

import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -98,7 +99,7 @@ public void shutdown() {
Persistence mockPersistence = mock(Persistence.class);
AppCenterIngestion mockIngestion = mock(AppCenterIngestion.class);
Channel.GroupListener mockListener = mock(Channel.GroupListener.class);
when(mockPersistence.getLogs(any(String.class), anyListOf(String.class), anyInt(), Matchers.<List<Log>>any()))
when(mockPersistence.getLogs(any(String.class), anyListOf(String.class), anyInt(), Matchers.<List<Log>>any(), any(Date.class)))
.then(getGetLogsAnswer(1));
DefaultChannel channel = new DefaultChannel(mock(Context.class), UUIDUtils.randomUUID().toString(), mockPersistence, mockIngestion, mAppCenterHandler);
channel.addGroup(TEST_GROUP, 1, BATCH_TIME_INTERVAL, MAX_PARALLEL_BATCHES, null, mockListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.junit.Test;

import java.util.Collections;
import java.util.Date;
import java.util.UUID;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -46,7 +47,7 @@ public void pauseResumeGroup() throws Persistence.PersistenceException {
AppCenterIngestion mockIngestion = mock(AppCenterIngestion.class);
Channel.GroupListener mockListener = mock(Channel.GroupListener.class);

when(mockPersistence.getLogs(any(String.class), anyListOf(String.class), anyInt(), anyListOf(Log.class))).then(getGetLogsAnswer(50));
when(mockPersistence.getLogs(any(String.class), anyListOf(String.class), anyInt(), anyListOf(Log.class), any(Date.class))).then(getGetLogsAnswer(50));
when(mockIngestion.sendAsync(anyString(), anyString(), any(UUID.class), any(LogContainer.class), any(ServiceCallback.class))).then(getSendAsyncAnswer());

DefaultChannel channel = new DefaultChannel(mock(Context.class), UUIDUtils.randomUUID().toString(), mockPersistence, mockIngestion, mAppCenterHandler);
Expand Down Expand Up @@ -141,7 +142,7 @@ public void pauseResumeTargetToken() throws Persistence.PersistenceException {
channel.pauseGroup(TEST_GROUP, targetToken);

/* Mock the database to return logs now. */
when(persistence.getLogs(any(String.class), anyListOf(String.class), anyInt(), anyListOf(Log.class))).then(getGetLogsAnswer(1));
when(persistence.getLogs(any(String.class), anyListOf(String.class), anyInt(), anyListOf(Log.class), any(Date.class))).then(getGetLogsAnswer(1));
when(persistence.countLogs(TEST_GROUP)).thenReturn(1);

/* Enqueue a log. */
Expand Down Expand Up @@ -205,7 +206,7 @@ public void pauseGroupPauseTargetResumeGroupResumeTarget() throws Persistence.Pe
channel.pauseGroup(TEST_GROUP, targetToken);

/* Mock the database to return logs now. */
when(persistence.getLogs(any(String.class), anyListOf(String.class), anyInt(), anyListOf(Log.class))).then(getGetLogsAnswer(1));
when(persistence.getLogs(any(String.class), anyListOf(String.class), anyInt(), anyListOf(Log.class), any(Date.class))).then(getGetLogsAnswer(1));
when(persistence.countLogs(TEST_GROUP)).thenReturn(1);

/* Enqueue a log. */
Expand Down
Loading

0 comments on commit 4477c1d

Please sign in to comment.