Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ services:
ports:
- "5432:5432"
volumes:
- ./docker-entrypoint:/docker-entrypoint-initdb.d
- ./config/docker-entrypoint:/docker-entrypoint-initdb.d
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: spring-batch-zookeeper-demo
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>

<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>2.5.0</version>
</dependency>

<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,43 @@
package com.ittovative.batchprocessing;


import static com.ittovative.batchprocessing.constant.SwaggerConstant.CONTACT_EMAIL;
import static com.ittovative.batchprocessing.constant.SwaggerConstant.CONTACT_NAME;
import static com.ittovative.batchprocessing.constant.SwaggerConstant.CONTACT_URL;
import static com.ittovative.batchprocessing.constant.SwaggerConstant.DESCRIPTION;
import static com.ittovative.batchprocessing.constant.SwaggerConstant.DEVELOPMENT_SERVER_DESCRIPTION;
import static com.ittovative.batchprocessing.constant.SwaggerConstant.DEVELOPMENT_SERVER_URL;
import static com.ittovative.batchprocessing.constant.SwaggerConstant.TITLE;
import static com.ittovative.batchprocessing.constant.SwaggerConstant.VERSION;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Contact;
import io.swagger.v3.oas.annotations.info.Info;
import io.swagger.v3.oas.annotations.servers.Server;

/**
* The type Batch processing application.
*/
@SpringBootApplication
@SuppressWarnings("checkstyle:HideUtilityClassConstructor")
@OpenAPIDefinition(
info = @Info(
title = TITLE,
description = DESCRIPTION,
contact = @Contact(
email = CONTACT_EMAIL,
name = CONTACT_NAME,
url = CONTACT_URL
),
version = VERSION
),
servers = {
@Server(url = DEVELOPMENT_SERVER_URL, description = DEVELOPMENT_SERVER_DESCRIPTION)
}
)
public class BatchProcessingApplication {

/**
Expand All @@ -20,16 +49,4 @@ public static void main(String... args) {
SpringApplication.run(BatchProcessingApplication.class, args);
}

// uncomment to insert dummy orders to kafka at the app's startup

/*@Bean
public CommandLineRunner commandLineRunner(KafkaTemplate<Long, Order> kafkaTemplate) {
return args -> {
for (int i = 0; i < 500; i++) {
Order person = new Order(i,"order#"+i,"dummy description");
kafkaTemplate.send("orders",person);
}
};
}*/

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.ittovative.batchprocessing.config;

public final class APIResponseConstant {
public static final String ADD_ORDER_SUCCESSFUL_MESSAGE = "Order added successfully!";
public static final String BATCH_SUCCESS_MESSAGE = "Batch ended successfully!";

private APIResponseConstant() {
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.ittovative.batchprocessing.config;

import static com.ittovative.batchprocessing.constant.AppConstant.KAFKA_TOPIC;

import com.ittovative.batchprocessing.constant.AppConstant;
import com.ittovative.batchprocessing.model.Order;
import com.ittovative.batchprocessing.util.AppConstants;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.job.builder.JobBuilder;
Expand Down Expand Up @@ -89,7 +91,7 @@ public Step databaseProcessOrder(JobRepository jobRepository,
DataSource dataSource,
PagingQueryProvider pagingQueryProvider) {
return new StepBuilder("database-order-processing-step", jobRepository)
.<Order, Order>chunk(AppConstants.CHUNK_SIZE, platformTransactionManager)
.<Order, Order>chunk(AppConstant.CHUNK_SIZE, platformTransactionManager)
.reader(jdbcOrderItemReader(dataSource, pagingQueryProvider))
.processor(itemProcessor())
.writer(flatFileItemWriter())
Expand All @@ -109,7 +111,7 @@ public Step kafkaProcessOrder(JobRepository jobRepository,
PlatformTransactionManager platformTransactionManager,
DefaultKafkaConsumerFactory<Long, Order> defaultKafkaConsumerFactory) {
return new StepBuilder("kafka-order-processing-step", jobRepository)
.<Order, Order>chunk(AppConstants.CHUNK_SIZE, platformTransactionManager)
.<Order, Order>chunk(AppConstant.CHUNK_SIZE, platformTransactionManager)
.reader(kafkaOrderItemReader(defaultKafkaConsumerFactory))
.processor(itemProcessor())
.writer(flatFileItemWriter())
Expand All @@ -131,7 +133,7 @@ public JdbcPagingItemReader<Order> jdbcOrderItemReader(DataSource dataSource,
.name("jdbc-item-reader")
.dataSource(dataSource)
.queryProvider(pagingQueryProvider)
.pageSize(AppConstants.PAGE_SIZE)
.pageSize(AppConstant.PAGE_SIZE)
.rowMapper((resultSet, rowNum) -> {
int id = resultSet.getInt("id");
String name = resultSet.getString("name");
Expand Down Expand Up @@ -173,7 +175,7 @@ public KafkaItemReader<Long, Order> kafkaOrderItemReader(
.name("orders-kafka-item-reader")
.partitions(0)
.saveState(true)
.topic("orders")
.topic(KAFKA_TOPIC)
.consumerProperties(kafkaConsumerProperties)
.partitionOffsets(new HashMap<>())
.build();
Expand All @@ -187,8 +189,8 @@ public KafkaItemReader<Long, Order> kafkaOrderItemReader(
@Bean
public ItemProcessor<Order, Order> itemProcessor() {
return item -> {
logger.info("Order: {" + item.getName().toLowerCase(Locale.ROOT) + "} is being processed!");
Thread.sleep(AppConstants.THREAD_SLEEP_TIME_MS); // simulating real processing time
logger.info("Order: {" + item.name().toLowerCase(Locale.ROOT) + "} is being processed!");
Thread.sleep(AppConstant.THREAD_SLEEP_TIME_MS); // simulating real processing time
return item;
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.ittovative.batchprocessing.constant;

public final class AppConstant {
public static final int THREAD_SLEEP_TIME_MS = 500;
public static final int CHUNK_SIZE = 5;
public static final int PAGE_SIZE = 5;

public static final String KAFKA_TOPIC = "dummy-orders";

private AppConstant() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.ittovative.batchprocessing.constant;

public class SwaggerConstant {
public static final String TITLE = "Demo Batch Processing";
public static final String VERSION = "0.0.1";
public static final String DESCRIPTION = "Demo of using spring batch";
public static final String CONTACT_NAME = "Mohanad Khaled";
public static final String CONTACT_EMAIL = "mohanadkhaled87@gmail.com";
public static final String CONTACT_URL = "https://github.com/orgs/ittovate/";
public static final String DEVELOPMENT_SERVER_URL = "http://localhost:8080";
public static final String DEVELOPMENT_SERVER_DESCRIPTION = "Development Server";

//================================================= Add Order =================================================//

public static final String ADD_ORDER_KAFKA_SUMMARY = "Add new order to kafka";
public static final String ADD_ORDER_DB_SUMMARY = "Add new order to db";

public static final String ADD_ORDER_DESCRIPTION = "Add a new order with the " +
"specified name and description to the application for processing.";
public static final String ADD_ORDER_CREATED = "201";
public static final String ADD_ORDER_OK_RESPONSE_DESCRIPTION = "When order is sent successfully";
public static final String ADD_ORDER_REQUEST_EXAMPLE = """
{
"name": "string",
"description": "string"
}
""";
public static final String ADD_ORDER_OK_RESPONSE_EXAMPLE = """
{
"statusCode": 201,
"message": "Order sent successfully!",
"body": null
}

""";
//================================================= Start Batch =================================================//
private static final String START_BATCH_SUMMARY = "Start batch processing";

public static final String START_BATCH_SUMMARY_KAFKA = START_BATCH_SUMMARY + " from kafka";
public static final String START_BATCH_SUMMARY_DB = START_BATCH_SUMMARY + " from db";

public static final String START_BATCH_DESCRIPTION = "Starts batch processing from the beginning unless there was a " +
"previous failuer that was handled manually in spring batch's database , in this case it will restart after the " +
"last successful commit successful.";
public static final String START_BATCH_OK = "200";
public static final String START_BATCH_SUCCESFUL_DESCRIPTION = "When batch is processed successfully";

public static final String START_BATCH_OK_RESPONSE_EXAMPLE = """
{
"statusCode": 200,
"message": "Batch ended successfully!!",
"body": null
}

""";
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
package com.ittovative.batchprocessing.controller;

import com.ittovative.batchprocessing.dto.OrderDto;
import static com.ittovative.batchprocessing.config.APIResponseConstant.ADD_ORDER_SUCCESSFUL_MESSAGE;
import static com.ittovative.batchprocessing.config.APIResponseConstant.BATCH_SUCCESS_MESSAGE;
import static com.ittovative.batchprocessing.constant.SwaggerConstant.ADD_ORDER_DB_SUMMARY;
import static com.ittovative.batchprocessing.constant.SwaggerConstant.ADD_ORDER_DESCRIPTION;
import static com.ittovative.batchprocessing.constant.SwaggerConstant.ADD_ORDER_OK_RESPONSE_EXAMPLE;
import static com.ittovative.batchprocessing.constant.SwaggerConstant.ADD_ORDER_REQUEST_EXAMPLE;
import static com.ittovative.batchprocessing.constant.SwaggerConstant.ADD_ORDER_KAFKA_SUMMARY;
import static com.ittovative.batchprocessing.constant.SwaggerConstant.START_BATCH_DESCRIPTION;
import static com.ittovative.batchprocessing.constant.SwaggerConstant.START_BATCH_OK;
import static com.ittovative.batchprocessing.constant.SwaggerConstant.START_BATCH_OK_RESPONSE_EXAMPLE;
import static com.ittovative.batchprocessing.constant.SwaggerConstant.START_BATCH_SUMMARY_DB;
import static com.ittovative.batchprocessing.constant.SwaggerConstant.START_BATCH_SUMMARY_KAFKA;
import static org.springframework.http.HttpStatus.CREATED;

import com.ittovative.batchprocessing.model.Order;
import com.ittovative.batchprocessing.service.OrderService;
import com.ittovative.batchprocessing.util.ApiResponse;
import com.ittovative.batchprocessing.util.BatchReadType;
import com.ittovative.batchprocessing.util.APIResponse;
import com.ittovative.batchprocessing.enums.BatchReadType;
import com.ittovative.batchprocessing.util.ResponseUtil;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.media.Content;
import io.swagger.v3.oas.annotations.media.ExampleObject;
import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;

import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
Expand All @@ -32,15 +51,22 @@ public OrderController(OrderService orderService) {
* @param order the order
* @return the response entity
*/
@Operation(summary = ADD_ORDER_KAFKA_SUMMARY, description = ADD_ORDER_DESCRIPTION,
requestBody = @io.swagger.v3.oas.annotations.parameters.RequestBody(required = true,
content = @Content(schema = @Schema(implementation = Order.class),
examples = @ExampleObject(ADD_ORDER_REQUEST_EXAMPLE))),
responses = @ApiResponse(
responseCode = "201",
content = @Content(
schema = @Schema(implementation = APIResponse.class),
examples = @ExampleObject(ADD_ORDER_OK_RESPONSE_EXAMPLE)
)
))
@PostMapping("/kafka")
public ResponseEntity<ApiResponse<String>> makeOrderKafka(@RequestBody OrderDto orderDto) {
public APIResponse<String> makeOrderKafka(@RequestBody Order orderDto) {
orderService.sendOrderToKafka(orderDto);
ApiResponse<String> apiResponse = new ApiResponse<>(
HttpStatus.OK.value(),
"Order sent to kafka successfully!",
null
);
return new ResponseEntity<>(apiResponse, HttpStatus.OK);
return ResponseUtil.createUnifiedResponse(CREATED.value(),
ADD_ORDER_SUCCESSFUL_MESSAGE,null);
}

/**
Expand All @@ -49,16 +75,21 @@ public ResponseEntity<ApiResponse<String>> makeOrderKafka(@RequestBody OrderDto
* @param order the order
* @return the response entity
*/
@Operation(summary = ADD_ORDER_DB_SUMMARY, description = ADD_ORDER_DESCRIPTION,
requestBody = @io.swagger.v3.oas.annotations.parameters.RequestBody(required = true,
content = @Content(schema = @Schema(implementation = Order.class),
examples = @ExampleObject(ADD_ORDER_REQUEST_EXAMPLE))), responses = @ApiResponse(
responseCode = "201",
content = @Content(
schema = @Schema(implementation = APIResponse.class),
examples = @ExampleObject(ADD_ORDER_OK_RESPONSE_EXAMPLE)
)
))
@PostMapping("/db")
public ResponseEntity<ApiResponse<String>> makeOrderDatabase(@RequestBody OrderDto order) {
Order convertedOrder = new Order(order.name(),order.description());
orderService.sendOrderToDatabase(convertedOrder);
ApiResponse<String> apiResponse = new ApiResponse<>(
HttpStatus.OK.value(),
"Order sent to database successfully!",
null
);
return new ResponseEntity<>(apiResponse, HttpStatus.OK);
public APIResponse<String> makeOrderDatabase(@RequestBody Order order) {
orderService.sendOrderToDatabase(order);
return ResponseUtil.createUnifiedResponse(CREATED.value(),ADD_ORDER_SUCCESSFUL_MESSAGE
,null);
}

/**
Expand All @@ -67,15 +98,19 @@ public ResponseEntity<ApiResponse<String>> makeOrderDatabase(@RequestBody OrderD
* @return the response entity
* @throws Exception the exception
*/
@Operation(summary = START_BATCH_SUMMARY_DB + " from database", description = START_BATCH_DESCRIPTION,
responses = @ApiResponse(
responseCode = START_BATCH_OK,
content = @Content(
schema = @Schema(implementation = APIResponse.class),
examples = @ExampleObject(START_BATCH_OK_RESPONSE_EXAMPLE)
)
))
@PostMapping("/batch-db")
public ResponseEntity<ApiResponse<String>> batchProcessDB() throws Exception {
public APIResponse<String> batchProcessDB() throws Exception {
orderService.batchProcess(BatchReadType.DATABASE);
ApiResponse<String> apiResponse = new ApiResponse<>(
HttpStatus.OK.value(),
"Batch processing from db started successfully!",
null
);
return new ResponseEntity<>(apiResponse, HttpStatus.OK);
return ResponseUtil.createUnifiedResponse(HttpStatus.OK.value(),
BATCH_SUCCESS_MESSAGE,null);
}

/**
Expand All @@ -84,15 +119,19 @@ public ResponseEntity<ApiResponse<String>> batchProcessDB() throws Exception {
* @return the response entity
* @throws Exception the exception
*/
@Operation(summary = START_BATCH_SUMMARY_KAFKA, description = START_BATCH_DESCRIPTION,
responses = @ApiResponse(
responseCode = START_BATCH_OK,
content = @Content(
schema = @Schema(implementation = APIResponse.class),
examples = @ExampleObject(START_BATCH_OK_RESPONSE_EXAMPLE)
)
))
@PostMapping("/batch-kafka")
public ResponseEntity<ApiResponse<String>> batchProcessKafka() throws Exception {
public APIResponse<String> batchProcessKafka() throws Exception {
orderService.batchProcess(BatchReadType.KAFKA);
ApiResponse<String> apiResponse = new ApiResponse<>(
HttpStatus.OK.value(),
"Batch processing from kafka started successfully!",
null
);
return new ResponseEntity<>(apiResponse, HttpStatus.OK);
return ResponseUtil.createUnifiedResponse(HttpStatus.OK.value(),
BATCH_SUCCESS_MESSAGE,null);
}

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.ittovative.batchprocessing.util;
package com.ittovative.batchprocessing.enums;

/**
* The enum Batch read type.
Expand Down
Loading