Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(controller): build dataset in server #2497

Merged
merged 50 commits into from
Jul 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
34cecaa
init
goldenxinxing Jul 10, 2023
3eb9a13
add build record(doing)
goldenxinxing Jul 11, 2023
bcbf75f
add build record(doing)
goldenxinxing Jul 11, 2023
42fb30b
add build record(doing)
goldenxinxing Jul 11, 2023
2cc3909
add build record(doing)
goldenxinxing Jul 12, 2023
3cf6957
add build record(doing)
goldenxinxing Jul 12, 2023
c2fa677
add build record(doing)
goldenxinxing Jul 12, 2023
179af77
add build record(doing)
goldenxinxing Jul 12, 2023
5e442e8
add build record(doing)
goldenxinxing Jul 12, 2023
9c8d704
modify entrypoint
goldenxinxing Jul 12, 2023
639075f
build and log
goldenxinxing Jul 12, 2023
38d73ec
add log ws ang api
goldenxinxing Jul 13, 2023
9e4e72c
fix
goldenxinxing Jul 13, 2023
7153496
fix
goldenxinxing Jul 13, 2023
4a09ff1
fix
goldenxinxing Jul 13, 2023
163f9f6
fix
goldenxinxing Jul 13, 2023
5753627
fix
goldenxinxing Jul 13, 2023
ec7392b
fix
goldenxinxing Jul 13, 2023
204b75d
fix
goldenxinxing Jul 13, 2023
faab9b5
fix
goldenxinxing Jul 13, 2023
2576f7d
login in entrypoint
goldenxinxing Jul 13, 2023
dc8125b
add labels for pod
goldenxinxing Jul 13, 2023
822d5f2
add id for pod
goldenxinxing Jul 13, 2023
cdece8c
add check before build
goldenxinxing Jul 13, 2023
77217c1
add check before build
goldenxinxing Jul 13, 2023
f55acae
add check before build
goldenxinxing Jul 13, 2023
ac3d744
fix
goldenxinxing Jul 13, 2023
c81ceba
fix no shared
goldenxinxing Jul 13, 2023
1ca1157
fix online log
goldenxinxing Jul 13, 2023
2c37ba9
fix update shared
goldenxinxing Jul 13, 2023
840ed41
add delete file api
goldenxinxing Jul 13, 2023
1a0bf45
fix null time
goldenxinxing Jul 13, 2023
e074fc7
tune file delete api
goldenxinxing Jul 13, 2023
58c1e07
add ut
goldenxinxing Jul 13, 2023
4fb5f56
rename
goldenxinxing Jul 14, 2023
7de277b
fix test
goldenxinxing Jul 14, 2023
b1897d9
fix multi pods
goldenxinxing Jul 14, 2023
73cbce7
fix ut
goldenxinxing Jul 14, 2023
bb4baa4
fix gc
goldenxinxing Jul 14, 2023
905a512
fix log
goldenxinxing Jul 14, 2023
4f82025
fix log
goldenxinxing Jul 14, 2023
bea3169
fix online log
goldenxinxing Jul 14, 2023
600f1d7
check dataset name
goldenxinxing Jul 14, 2023
b63ec87
check dataset name
goldenxinxing Jul 17, 2023
5a67179
config resource
goldenxinxing Jul 17, 2023
8c7291b
download url with parallel
goldenxinxing Jul 17, 2023
d714992
optimise log collector
goldenxinxing Jul 17, 2023
8171d16
optimise log collector
goldenxinxing Jul 17, 2023
bc5542f
optimise status
goldenxinxing Jul 17, 2023
20f5a52
optimise list by status
goldenxinxing Jul 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion client/scripts/sw-docker-entrypoint
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,54 @@ run_code_server () {
echo "-->[Preparing] run code-server done."
}

welcome $1
ds_build_and_upload () {
echo "-->[Preparing] Downloading files..."
BUILD_DIR=$DATASET_BUILD_NAME
mkdir -p "$BUILD_DIR"
cd "$BUILD_DIR"

SIGNED_URLS=$(curl -X 'GET' "$SW_INSTANCE_URI/api/v1/filestorage/signedurl?pathPrefix=$DATASET_BUILD_DIR_PREFIX" -H 'accept: application/json' -H "Authorization: $SW_TOKEN" | jq ".data.signedUrls")

for entry in $(echo "$SIGNED_URLS" | jq -r 'to_entries|map("\(.key)@\(.value)")|.[]'); do
IFS='@' read -r file signedurl <<< "$entry"

filedir=$(dirname "$file")
if [ ! -d "$filedir" ]; then
mkdir -p "$filedir"
fi
echo "$file $signedurl"
done | xargs -I {} -n 2 -P 10 sh -c 'curl -o "$1" "$2"' sh
cd -

cmd="swcli dataset build -n $DATASET_BUILD_NAME"
if [ "$DATASET_BUILD_TYPE" = "IMAGE" ]; then
cmd="$cmd -if $BUILD_DIR"
elif [ "$DATASET_BUILD_TYPE" = "VIDEO" ]; then
cmd="$cmd -vf $BUILD_DIR"
elif [ "$DATASET_BUILD_TYPE" = "AUDIO" ]; then
cmd="$cmd -af $BUILD_DIR"
elif [ "$DATASET_BUILD_TYPE" = "JSON" ]; then
cmd="$cmd -jf $BUILD_DIR"
if [ -z "$DATASET_BUILD_EXTRA" ]; then
cmd="$cmd --field-selector $DATASET_BUILD_EXTRA"
fi
elif [ "$DATASET_BUILD_TYPE" = "HANDLER" ]; then
cmd="$cmd -h $DATASET_BUILD_HANDLER"
elif [ "$DATASET_BUILD_TYPE" = "YAML" ]; then
cmd="$cmd -f $DATASET_BUILD_YAML"
else
echo "Unknown type: $DATASET_BUILD_TYPE" && exit 1
fi

echo "-->[Building] Start to build dataset: $DATASET_BUILD_NAME..."
eval "$cmd" || exit 1

echo "-->[Uploading] Start to upload dataset: $DATASET_BUILD_NAME..."
swcli instance login --token "$SW_TOKEN" --alias server "$SW_INSTANCE_URI"
swcli dataset copy --patch "$DATASET_BUILD_NAME"/version/latest cloud://server/project/"$SW_PROJECT" || exit 1
}

welcome "$1"
case "$1" in
pre_config)
pre_config
Expand All @@ -206,6 +253,9 @@ case "$1" in
prepare && install_code_server && run_code_server
tail -f /var/log/dev.log
;;
dataset_build)
ds_build_and_upload
;;
*)
prepare "starwhale" && exec "$@"
;;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import ai.starwhale.mlops.api.protocol.dataset.DatasetViewVo;
import ai.starwhale.mlops.api.protocol.dataset.DatasetVo;
import ai.starwhale.mlops.api.protocol.dataset.RevertDatasetRequest;
import ai.starwhale.mlops.api.protocol.dataset.build.BuildRecordVo;
import ai.starwhale.mlops.api.protocol.dataset.build.DatasetBuildRequest;
import ai.starwhale.mlops.api.protocol.dataset.dataloader.DataConsumptionRequest;
import ai.starwhale.mlops.api.protocol.dataset.dataloader.DataIndexDesc;
import ai.starwhale.mlops.api.protocol.dataset.upload.DatasetUploadRequest;
import ai.starwhale.mlops.api.protocol.upload.UploadResult;
import ai.starwhale.mlops.domain.dataset.build.BuildStatus;
import com.github.pagehelper.PageInfo;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
Expand Down Expand Up @@ -430,4 +433,32 @@ ResponseEntity<?> headDataset(
@PathVariable("versionUrl")
String versionUrl);

@Operation(summary = "Build Dataset", description = "Build Dataset")
@ApiResponses(value = {@ApiResponse(responseCode = "200", description = "ok")})
@PostMapping("/project/{projectUrl}/dataset/{datasetName}/build")
@PreAuthorize("hasAnyRole('OWNER', 'MAINTAINER')")
ResponseEntity<ResponseMessage<String>> buildDataset(
@Parameter(in = ParameterIn.PATH, required = true, schema = @Schema())
@PathVariable(name = "projectUrl")
String projectUrl,
@Parameter(in = ParameterIn.PATH, required = true, schema = @Schema())
@PathVariable(name = "datasetName")
String datasetName,
@Valid @RequestBody DatasetBuildRequest datasetBuildRequest);

@Operation(summary = "List Build Records", description = "List Build Records")
@ApiResponses(value = {@ApiResponse(responseCode = "200", description = "ok")})
@GetMapping("/project/{projectUrl}/dataset/build/list")
@PreAuthorize("hasAnyRole('OWNER', 'MAINTAINER')")
ResponseEntity<ResponseMessage<PageInfo<BuildRecordVo>>> listBuildRecords(
@Parameter(in = ParameterIn.PATH, required = true, schema = @Schema())
@PathVariable(name = "projectUrl")
String projectUrl,
@RequestParam(value = "status", required = false)
BuildStatus status,
@Valid @RequestParam(value = "pageNum", required = false, defaultValue = "1")
Integer pageNum,
@Valid @RequestParam(value = "pageSize", required = false, defaultValue = "10")
Integer pageSize);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 2022 Starwhale, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.starwhale.mlops.api;

import ai.starwhale.mlops.common.IdConverter;
import ai.starwhale.mlops.schedule.k8s.log.CancellableJobLogCollector;
import ai.starwhale.mlops.schedule.k8s.log.CancellableJobLogK8sCollectorFactory;
import io.kubernetes.client.openapi.ApiException;
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
@ServerEndpoint("/api/v1/log/online/dataset/{name}/build/{id}")
public class DatasetBuildLogWsServer {

private static final ExecutorService executorService = Executors.newCachedThreadPool();

private static IdConverter idConvertor;

private static CancellableJobLogK8sCollectorFactory logCollectorFactory;

private Session session;

private String readerId;

private Long id;

private CancellableJobLogCollector logCollector;


@Autowired
public void setIdConvertor(IdConverter idConvertor) {
DatasetBuildLogWsServer.idConvertor = idConvertor;
}

@Autowired
public void setLogCollectorFactory(CancellableJobLogK8sCollectorFactory factory) {
DatasetBuildLogWsServer.logCollectorFactory = factory;
}

@OnOpen
public void onOpen(Session session, @PathParam("name") String name, @PathParam("id") String id) {
this.session = session;
this.readerId = session.getId();
this.id = idConvertor.revert(id);
try {
logCollector = logCollectorFactory.make(String.format("%s-%s", name, id));
} catch (IOException | ApiException e) {
log.error("make k8s log collector failed", e);
}
log.info("Build log ws opened. reader={}, task={}", readerId, id);
executorService.submit(() -> {
String line;
while (true) {
try {
if ((line = logCollector.readLine()) == null) {
break;
}
sendMessage(line);
} catch (IOException e) {
log.error("read k8s log failed", e);
break;
}
}
});
}

@OnClose
public void onClose() {
cancelLogCollector();
log.info("Build log ws closed. reader={}, task={}", readerId, id);
}

@OnMessage
public void onMessage(String message, Session session) {

}

@OnError
public void onError(Session session, Throwable error) {
cancelLogCollector();
log.error("Task log ws error: reader={}, task={}, message={}", readerId, id, error.getMessage());
}

public void sendMessage(String message) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.error("ws send message", e);
}
}

private void cancelLogCollector() {
if (logCollector != null) {
logCollector.cancel();
logCollector = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import ai.starwhale.mlops.api.protocol.dataset.DatasetViewVo;
import ai.starwhale.mlops.api.protocol.dataset.DatasetVo;
import ai.starwhale.mlops.api.protocol.dataset.RevertDatasetRequest;
import ai.starwhale.mlops.api.protocol.dataset.build.BuildRecordVo;
import ai.starwhale.mlops.api.protocol.dataset.build.DatasetBuildRequest;
import ai.starwhale.mlops.api.protocol.dataset.dataloader.DataConsumptionRequest;
import ai.starwhale.mlops.api.protocol.dataset.dataloader.DataIndexDesc;
import ai.starwhale.mlops.api.protocol.dataset.upload.DatasetUploadRequest;
Expand All @@ -34,6 +36,8 @@
import ai.starwhale.mlops.domain.dataset.DatasetService;
import ai.starwhale.mlops.domain.dataset.bo.DatasetQuery;
import ai.starwhale.mlops.domain.dataset.bo.DatasetVersionQuery;
import ai.starwhale.mlops.domain.dataset.build.BuildStatus;
import ai.starwhale.mlops.domain.dataset.build.bo.CreateBuildRecordRequest;
import ai.starwhale.mlops.domain.dataset.dataloader.DataReadRequest;
import ai.starwhale.mlops.domain.dataset.dataloader.ReadMode;
import ai.starwhale.mlops.domain.dataset.objectstore.HashNamedDatasetObjectStoreFactory;
Expand Down Expand Up @@ -356,4 +360,26 @@ public ResponseEntity<?> headDataset(String projectUrl, String datasetUrl, Strin
}
}

@Override
public ResponseEntity<ResponseMessage<String>> buildDataset(
String projectUrl, String datasetName, DatasetBuildRequest datasetBuildRequest) {
datasetService.build(CreateBuildRecordRequest.builder()
.datasetId(datasetBuildRequest.getDatasetId())
.datasetName(datasetName)
.shared(datasetBuildRequest.getShared())
.projectUrl(projectUrl)
.type(datasetBuildRequest.getType())
.storagePath(datasetBuildRequest.getStoragePath())
.build());
return ResponseEntity.ok(Code.success.asResponse("success"));
}

@Override
public ResponseEntity<ResponseMessage<PageInfo<BuildRecordVo>>> listBuildRecords(
String projectUrl, BuildStatus status, Integer pageNum, Integer pageSize) {
return ResponseEntity.ok(Code.success.asResponse(
datasetService.listBuildRecords(projectUrl, status, new PageParams(pageNum, pageSize))));
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2022 Starwhale, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ai.starwhale.mlops.api;

import ai.starwhale.mlops.api.protocol.ResponseMessage;
import ai.starwhale.mlops.api.protocol.filestorage.ApplySignedUrlRequest;
import ai.starwhale.mlops.api.protocol.filestorage.FileDeleteRequest;
import ai.starwhale.mlops.api.protocol.filestorage.SignedUrlResponse;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.responses.ApiResponses;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.http.ResponseEntity;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;

@Tag(name = "File storage", description = "File storage operations")
@Validated
public interface FileStorageApi {

@Operation(summary = "Apply pathPrefix", description = "Apply pathPrefix")
@ApiResponses(value = {@ApiResponse(responseCode = "200", description = "ok")})
@GetMapping("/filestorage/path/apply")
ResponseEntity<ResponseMessage<String>> applyPathPrefix(String flag);

@Operation(summary = "Apply signedUrls for put", description = "Apply signedUrls for put")
@ApiResponses(value = {@ApiResponse(responseCode = "200", description = "ok")})
@PutMapping("/filestorage/signedurl")
ResponseEntity<ResponseMessage<SignedUrlResponse>> applySignedPutUrls(
@RequestBody ApplySignedUrlRequest applySignedUrlRequest);

@Operation(summary = "Apply signedUrls for get", description = "Apply signedUrls for get")
@ApiResponses(value = {@ApiResponse(responseCode = "200", description = "ok")})
@GetMapping("/filestorage/signedurl")
ResponseEntity<ResponseMessage<SignedUrlResponse>> applySignedGetUrls(String pathPrefix);

@Operation(summary = "Delete path", description = "Delete path")
@ApiResponses(value = {@ApiResponse(responseCode = "200", description = "ok")})
@DeleteMapping("/filestorage/file")
ResponseEntity<ResponseMessage<String>> deletePath(@RequestBody FileDeleteRequest request);

}
Loading
Loading