Skip to content

Commit

Permalink
#19 Implementation the count functions about messages
Browse files Browse the repository at this point in the history
  • Loading branch information
jameschen authored and jameschen committed Oct 27, 2019
2 parents 3fa9e1d + f409c91 commit 67398c3
Show file tree
Hide file tree
Showing 8 changed files with 367 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@
import im.turms.turms.common.TurmsStatusCode;
import im.turms.turms.constant.AdminPermission;
import im.turms.turms.constant.ChatType;
import im.turms.turms.constant.DivideBy;
import im.turms.turms.constant.MessageDeliveryStatus;
import im.turms.turms.pojo.domain.Message;
import im.turms.turms.service.message.MessageService;
import org.apache.commons.lang3.EnumUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.function.Function3;

import java.text.ParseException;
import java.util.Collections;
import java.util.Date;
import java.util.Set;
import java.util.*;

@RestController
@RequestMapping("/messages")
Expand Down Expand Up @@ -85,41 +85,104 @@ public Mono<ResponseEntity> deleteMessages(
@GetMapping("/count")
@RequiredPermission(AdminPermission.MESSAGE_QUERY)
public Mono<ResponseEntity> countMessages(
@RequestParam String chatType,
@RequestParam(required = false) String sendStartDate,
@RequestParam(required = false) String sendEndDate,
@RequestParam(required = false) String averageSendStartDate,
@RequestParam(required = false) String averageSendEndDate,
@RequestParam(required = false) String receiveStartDate,
@RequestParam(required = false) String receiveEndDate,
@RequestParam(required = false) String averageReceiveStartDate,
@RequestParam(required = false) String averageReceiveEndDate) {
ChatType type = EnumUtils.getEnum(ChatType.class, chatType);
Mono<Long> count = Mono.empty();
if (type != ChatType.UNRECOGNIZED) {
try {
if (sendStartDate != null || sendEndDate != null) {
count = messageService.countSendMessages(
DateTimeUtil.parseDay(sendStartDate),
DateTimeUtil.endOfDay(sendEndDate),
type);
} else if (averageSendStartDate != null || averageSendEndDate != null) {
// count = messageService.countAverageSentMessages(averageSendStartDate, averageSendEndDate, type);
} else if (receiveStartDate != null || receiveEndDate != null) {
count = messageService.countReceivedMessages(receiveStartDate, receiveEndDate, type);
} else if (averageReceiveStartDate != null || averageReceiveEndDate != null) {
// count = messageService.countAverageReceivedMessages(
// averageReceiveStartDate,
// averageReceiveEndDate, type);
}
return ResponseFactory.okWhenTruthy(
count.map(number -> Collections.singletonMap("count", number)),
true);
} catch (ParseException e) {
return ResponseFactory.code(TurmsStatusCode.ILLEGAL_DATE_FORMAT);
@RequestParam(required = false) ChatType chatType,
@RequestParam(required = false) Date deliveredStartDate,
@RequestParam(required = false) Date deliveredEndDate,
@RequestParam(required = false) Date deliveredOnAverageStartDate,
@RequestParam(required = false) Date deliveredOnAverageEndDate,
@RequestParam(required = false) Date acknowledgedStartDate,
@RequestParam(required = false) Date acknowledgedEndDate,
@RequestParam(required = false) Date acknowledgedOnAverageStartDate,
@RequestParam(required = false) Date acknowledgedOnAverageEndDate,
@RequestParam(defaultValue = "NOOP") DivideBy divideBy) {
if (chatType == ChatType.UNRECOGNIZED) {
return ResponseFactory.code(TurmsStatusCode.ILLEGAL_ARGUMENTS);
}
if (divideBy == null || divideBy == DivideBy.NOOP) {
List<Mono<Pair<String, Long>>> counts = new LinkedList<>();
if (deliveredStartDate != null || deliveredEndDate != null) {
counts.add(messageService.countDeliveredMessages(
deliveredStartDate,
deliveredEndDate,
chatType)
.map(total -> Pair.of("deliveredMessages", total)));
}
if (deliveredOnAverageStartDate != null || deliveredOnAverageEndDate != null) {
counts.add(messageService.countDeliveredMessagesOnAverage(
deliveredOnAverageStartDate,
deliveredOnAverageEndDate,
chatType)
.map(total -> Pair.of("deliveredMessagesOnAverage", total)));
}
if (acknowledgedStartDate != null || acknowledgedEndDate != null) {
counts.add(messageService.countAcknowledgedMessages(
acknowledgedStartDate,
acknowledgedEndDate,
chatType)
.map(total -> Pair.of("acknowledgedMessages", total)));
}
if (acknowledgedOnAverageStartDate != null || acknowledgedOnAverageEndDate != null) {
counts.add(messageService.countAcknowledgedMessagesOnAverage(
acknowledgedOnAverageStartDate,
acknowledgedOnAverageEndDate,
chatType)
.map(total -> Pair.of("acknowledgedMessagesOnAverage", total)));
}
if (counts.isEmpty()) {
return ResponseFactory.code(TurmsStatusCode.ILLEGAL_ARGUMENTS);
}
Mono<Map<String, Long>> resultMono = Flux.merge(counts)
.collectList()
.map(pairs -> {
Map<String, Long> resultMap = new HashMap<>(counts.size());
for (Pair<String, ?> pair : pairs) {
resultMap.put(pair.getLeft(), (Long) pair.getRight());
}
return resultMap;
});
return ResponseFactory.okWhenTruthy(resultMono);
} else {
return ResponseFactory.code(TurmsStatusCode.ILLEGAL_ARGUMENTS);
List<Mono<Pair<String, List<Map<String, ?>>>>> counts = new LinkedList<>();
if (deliveredStartDate != null && deliveredEndDate != null) {
counts.add(DateTimeUtil.queryBetweenDate(
"deliveredMessages",
deliveredStartDate,
deliveredEndDate,
divideBy,
(Function3<Date, Date, ChatType, Mono<Long>>) messageService::countDeliveredMessages,
chatType));
}
if (deliveredOnAverageStartDate != null && deliveredOnAverageEndDate != null) {
counts.add(DateTimeUtil.queryBetweenDate(
"deliveredMessagesOnAverage",
deliveredOnAverageStartDate,
deliveredOnAverageEndDate,
divideBy,
(Function3<Date, Date, ChatType, Mono<Long>>) messageService::countDeliveredMessagesOnAverage,
chatType));
}
if (acknowledgedStartDate != null && acknowledgedEndDate != null) {
counts.add(DateTimeUtil.queryBetweenDate(
"acknowledgedMessages",
acknowledgedStartDate,
acknowledgedEndDate,
divideBy,
(Function3<Date, Date, ChatType, Mono<Long>>) messageService::countAcknowledgedMessages,
chatType));
}
if (acknowledgedOnAverageStartDate != null && acknowledgedOnAverageEndDate != null) {
counts.add(DateTimeUtil.queryBetweenDate(
"acknowledgedMessagesOnAverage",
acknowledgedOnAverageStartDate,
acknowledgedOnAverageEndDate,
divideBy,
(Function3<Date, Date, ChatType, Mono<Long>>) messageService::countAcknowledgedMessagesOnAverage,
chatType));
}
if (counts.isEmpty()) {
return ResponseFactory.code(TurmsStatusCode.ILLEGAL_ARGUMENTS);
}
return ResponseFactory.okWhenTruthy(Flux.merge(counts));
}
}
}
50 changes: 50 additions & 0 deletions src/main/java/im/turms/turms/common/AggregationUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (C) 2019 The Turms Project
* https://github.com/turms-im/turms
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package im.turms.turms.common;

import im.turms.turms.pojo.domain.Message;
import org.bson.Document;
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.query.Criteria;
import reactor.core.publisher.Mono;

import javax.validation.constraints.NotNull;

import static im.turms.turms.common.Constants.TOTAL;

public class AggregationUtil {
private AggregationUtil() {
}

public static Mono<Long> countDistinct(
@NotNull ReactiveMongoTemplate mongoTemplate,
@NotNull Criteria criteria,
@NotNull String field,
Class<?> clazz) {
return mongoTemplate.aggregate(Aggregation.newAggregation(
Aggregation.match(criteria),
Aggregation.project(field),
Aggregation.group(field),
Aggregation.count().as(TOTAL))
, clazz, Document.class)
.single()
.map(document -> Long.valueOf((Integer) document.get(TOTAL)))
.defaultIfEmpty(0L);
}
}
1 change: 1 addition & 0 deletions src/main/java/im/turms/turms/common/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ private Constants() {
public static final String ACKNOWLEDGED = "acknowledged";
public static final String AUTHENTICATED = "authenticated";
public static final String STATUS = "status";
public static final String TOTAL = "total";

public static final long RESERVED_ID = 0L;
public static final long ADMIN_ROLE_ROOT_ID = RESERVED_ID;
Expand Down
131 changes: 96 additions & 35 deletions src/main/java/im/turms/turms/common/DateTimeUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,18 @@

package im.turms.turms.common;

import im.turms.turms.constant.ChatType;
import im.turms.turms.constant.DivideBy;
import lombok.Getter;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.commons.lang3.tuple.Pair;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.function.Function3;

import java.text.ParseException;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
Expand All @@ -35,45 +41,100 @@ public class DateTimeUtil {
@Getter
private static SimpleDateFormat dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH-mm-ss");

public static Date endOfDay(Date day) {
day = DateUtils.truncate(day, Calendar.DAY_OF_MONTH);
return new Date(day.getTime() + 86399000);
}

public static Date parseDay(String day) throws ParseException {
if (day != null) {
return DateUtils.parseDate(day, Locale.getDefault(), "yyyy-MM-dd");
} else {
return null;
}
public static String now() {
return LocalDateTime.now().format(dateTimeFormatter);
}

public static Date endOfDay(String day) throws ParseException {
if (day != null) {
return endOfDay(parseDay(day));
public static List<Pair<Date, Date>> divide(
@NotNull Date startDate,
@NotNull Date endDate,
@NotNull DivideBy divideBy) {
if (!endDate.after(startDate)) {
return Collections.emptyList();
} else {
return null;
switch (divideBy) {
case HOUR:
startDate = DateUtils.truncate(startDate, Calendar.HOUR);
endDate = DateUtils.truncate(endDate, Calendar.HOUR);
break;
case DAY:
startDate = DateUtils.truncate(startDate, Calendar.DAY_OF_MONTH);
endDate = DateUtils.truncate(endDate, Calendar.DAY_OF_MONTH);
break;
case MONTH:
startDate = DateUtils.truncate(startDate, Calendar.MONTH);
endDate = DateUtils.truncate(endDate, Calendar.MONTH);
break;
default:
throw new IllegalStateException("Unexpected value: " + divideBy);
}
if (startDate.getTime() == endDate.getTime()) {
return Collections.emptyList();
} else {
int unit;
switch (divideBy) {
case HOUR:
unit = Calendar.HOUR_OF_DAY;
break;
case DAY:
unit = Calendar.DAY_OF_YEAR;
break;
case MONTH:
unit = Calendar.MONTH;
break;
default:
throw new IllegalStateException("Unexpected value: " + divideBy);
}
List<Pair<Date, Date>> lists = new LinkedList<>();
while (true){
// Note: Do not use Instant because it doesn't support plus months
Calendar calendar = Calendar.getInstance();
calendar.setTime(startDate);
calendar.add(unit, 1);
Date currentEndDate = calendar.getTime();
if (currentEndDate.after(endDate)) {
break;
} else {
Pair<Date, Date> datePair = Pair.of(startDate, currentEndDate);
lists.add(datePair);
startDate = currentEndDate;
}
}
return lists;
}
}
}

public static String now() {
return LocalDateTime.now().format(dateTimeFormatter);
}

public static List<String> getDatesBetweenTwoDates(String startDate, String endDate, boolean included) {
LocalDate start = LocalDate.parse(startDate);
LocalDate end = LocalDate.parse(endDate);
List<String> totalDates = new LinkedList<>();
if (included) {
totalDates.add(startDate);
}
while (!start.isAfter(end)) {
totalDates.add(start.toString());
start = start.plusDays(1);
}
if (included) {
totalDates.add(endDate);
public static Mono<Pair<String, List<Map<String, ?>>>> queryBetweenDate(
@NotNull String title,
@NotNull Date startDate,
@NotNull Date endDate,
@NotNull DivideBy divideBy,
@NotNull Function3 function,
@Nullable ChatType chatType) {
List<Pair<Date, Date>> dates = DateTimeUtil.divide(startDate, endDate, divideBy);
List<Mono<Map<String, Object>>> monos = new ArrayList<>(dates.size());
for (Pair<Date, Date> datePair : dates) {
Mono<Long> result = (Mono<Long>) function.apply(
datePair.getLeft(),
datePair.getRight(),
chatType);
monos.add(result.map(total -> Map.of("startDate", datePair.getLeft(),
"endDate", datePair.getRight(),
"total", total)));
}
return totalDates;
Flux<Map<String, ?>> resultFlux = Flux.mergeOrdered((o1, o2) -> {
Date startDate1 = (Date) o1.get("startDate");
Date startDate2 = (Date) o2.get("startDate");
if (startDate1.before(startDate2)) {
return -1;
} else if (startDate1.after(startDate2)) {
return 1;
}
return 0;
}, Flux.merge(monos));
return resultFlux
.collectList()
.map(results -> Pair.of(title, results));
}
}
Loading

0 comments on commit 67398c3

Please sign in to comment.