Skip to content

Commit

Permalink
Add source sink list (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
shoothzj committed Nov 30, 2021
1 parent 2b24090 commit 8dee723
Show file tree
Hide file tree
Showing 34 changed files with 1,521 additions and 87 deletions.
34 changes: 34 additions & 0 deletions lib/api/pulsar/pulsar_sink_api.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import 'dart:convert';
import 'dart:developer';

import 'package:http/http.dart' as http;
import 'package:paas_dashboard_flutter/api/http_util.dart';
import 'package:paas_dashboard_flutter/module/pulsar/pulsar_sink.dart';

class PulsarSinkAPi {
static Future<void> deleteSink(String host, int port, String tenant,
String namespace, String sinkName) async {
var url =
'http://$host:${port.toString()}/admin/v3/sinks/$tenant/$namespace/$sinkName';
final response = await http.delete(Uri.parse(url));
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
throw Exception(
'ErrorCode is ${response.statusCode}, body is ${response.body}');
}
}

static Future<List<SinkResp>> getSinkList(
String host, int port, String tenant, String namespace) async {
var url =
'http://$host:${port.toString()}/admin/v3/sinks/$tenant/$namespace';
final response = await http.get(Uri.parse(url));
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
throw Exception(
'ErrorCode is ${response.statusCode}, body is ${response.body}');
}
List jsonResponse = json.decode(response.body) as List;
return jsonResponse.map((name) => new SinkResp(name)).toList();
}
}
34 changes: 34 additions & 0 deletions lib/api/pulsar/pulsar_source_api.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import 'dart:convert';
import 'dart:developer';

import 'package:http/http.dart' as http;
import 'package:paas_dashboard_flutter/api/http_util.dart';
import 'package:paas_dashboard_flutter/module/pulsar/pulsar_source.dart';

class PulsarSourceAPi {
static Future<void> deleteSource(String host, int port, String tenant,
String namespace, String sourceName) async {
var url =
'http://$host:${port.toString()}/admin/v3/sources/$tenant/$namespace/$sourceName';
final response = await http.delete(Uri.parse(url));
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
throw Exception(
'ErrorCode is ${response.statusCode}, body is ${response.body}');
}
}

static Future<List<SourceResp>> getSourceList(
String host, int port, String tenant, String namespace) async {
var url =
'http://$host:${port.toString()}/admin/v3/sources/$tenant/$namespace';
final response = await http.get(Uri.parse(url));
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
throw Exception(
'ErrorCode is ${response.statusCode}, body is ${response.body}');
}
List jsonResponse = json.decode(response.body) as List;
return jsonResponse.map((name) => new SourceResp(name)).toList();
}
}
13 changes: 13 additions & 0 deletions lib/api/pulsar/pulsar_stat_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,17 @@ class PulsarStatAPi {
}
return response.body;
}

static Future<String> topicStats(String host, int port, String tenant,
String namespace, String topic) async {
var url =
'http://$host:${port.toString()}/admin/v2/persistent/$tenant/$namespace/$topic/stats';
final response = await http.get(Uri.parse(url));
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
throw Exception(
'ErrorCode is ${response.statusCode}, body is ${response.body}');
}
return response.body;
}
}
135 changes: 135 additions & 0 deletions lib/api/pulsar/pulsar_topic_api.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import 'dart:convert';
import 'dart:developer';

import 'package:http/http.dart' as http;
import 'package:paas_dashboard_flutter/api/http_util.dart';
import 'package:paas_dashboard_flutter/api/pulsar/pulsar_stat_api.dart';
import 'package:paas_dashboard_flutter/module/pulsar/pulsar_produce.dart';
import 'package:paas_dashboard_flutter/module/pulsar/pulsar_subscription.dart';
import 'package:paas_dashboard_flutter/module/pulsar/pulsar_topic.dart';
import 'package:paas_dashboard_flutter/ui/util/string_util.dart';

class PulsarTopicAPi {
static Future<String> createTopic(String host, int port, String tenant,
String namespace, String topic) async {
var url =
'http://$host:${port.toString()}/admin/v2/persistent/$tenant/$namespace/$topic';
var response = await http.put(Uri.parse(url), headers: <String, String>{
'Content-Type': 'application/json; charset=UTF-8',
});
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
throw Exception(
'ErrorCode is ${response.statusCode}, body is ${response.body}');
}
return response.body;
}

static Future<String> deleteTopic(String host, int port, String tenant,
String namespace, String topic) async {
var url =
'http://$host:${port.toString()}/admin/v2/persistent/$tenant/$namespace/$topic';
var response = await http.delete(Uri.parse(url), headers: <String, String>{
'Content-Type': 'application/json; charset=UTF-8',
});
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
throw Exception(
'ErrorCode is ${response.statusCode}, body is ${response.body}');
}
return response.body;
}

static Future<List<TopicResp>> getTopics(
String host, int port, String tenant, String namespace) async {
var url =
'http://$host:${port.toString()}/admin/v2/persistent/$tenant/$namespace';
final response = await http.get(Uri.parse(url));
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
throw Exception(
'ErrorCode is ${response.statusCode}, body is ${response.body}');
}
List jsonResponse = json.decode(response.body) as List;
return jsonResponse.map((name) => new TopicResp.fromJson(name)).toList();
}

static Future<List<SubscriptionResp>> getSubscription(String host, int port,
String tenant, String namespace, String topic) async {
String data = "";
await PulsarStatAPi.topicStats(host, port, tenant, namespace, topic)
.then((value) => {data = value});
List<SubscriptionResp> respList = new List.empty(growable: true);
Map statsMap = json.decode(data) as Map;
if (statsMap.containsKey("subscriptions")) {
Map subscriptionsMap = statsMap["subscriptions"] as Map<String, dynamic>;
subscriptionsMap.forEach((key, value) {
double rateOut = value["msgRateOut"];
int backlog = value["msgBacklog"];
SubscriptionResp subscriptionDetail =
new SubscriptionResp(key, backlog, rateOut);
respList.add(subscriptionDetail);
});
}
return respList;
}

static Future<String> clearBacklog(String host, int port, String tenant,
String namespace, String topic, String subscription) async {
var url =
'http://$host:${port.toString()}/admin/v2/persistent/$tenant/$namespace/$topic/subscription/$subscription/skip_all';
final response = await http.post(Uri.parse(url));
if (HttpUtil.abnormal(response.statusCode)) {
log('ErrorCode is ${response.statusCode}, body is ${response.body}');
throw Exception(
'ErrorCode is ${response.statusCode}, body is ${response.body}');
}
return response.body;
}

static Future<String> getSubscriptionBacklog(
String host,
int port,
String tenant,
String namespace,
String topic,
String subscription) async {
String data = PulsarStatAPi.topicStats(host, port, tenant, namespace, topic)
as String;

Map statsMap = json.decode(data) as Map;
if (statsMap.containsKey("subscriptions")) {
Map subscriptionsMap = statsMap["subscriptions"] as Map;

if (subscriptionsMap.containsKey(subscription)) {
Map subscriptionMap = statsMap[subscription] as Map;
return subscriptionMap["msgBacklog"];
}
}
return "";
}

static Future<List<ProducerResp>> getProducers(String host, int port,
String tenant, String namespace, String topic) async {
String data = "";
await PulsarStatAPi.topicStats(host, port, tenant, namespace, topic)
.then((value) => {data = value});
List<ProducerResp> respList = new List.empty(growable: true);
Map statsMap = json.decode(data) as Map;
if (statsMap.containsKey("publishers")) {
List publisherList = statsMap["publishers"] as List<dynamic>;
publisherList.forEach((element) {
String producerName = StringUtil.nullStr(element["producerName"]);
double rateIn = element["msgRateIn"];
double throughputIn = element["msgThroughputIn"];
String clientVersion = StringUtil.nullStr(element["clientVersion"]);
double averageMsgSize = element["averageMsgSize"];
String address = StringUtil.nullStr(element["address"]);
ProducerResp producerResp = new ProducerResp(producerName, rateIn,
throughputIn, clientVersion, averageMsgSize, address);
respList.add(producerResp);
});
}
return respList;
}
}
1 change: 1 addition & 0 deletions lib/generated/intl/messages_en.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// ignore_for_file:prefer_single_quotes,comment_references, directives_ordering
// ignore_for_file:annotate_overrides,prefer_generic_function_type_aliases
// ignore_for_file:unused_import, file_names, avoid_escaping_inner_quotes
// ignore_for_file:unnecessary_string_interpolations, unnecessary_string_escapes

import 'package:intl/intl.dart';
import 'package:intl/message_lookup_by_library.dart';
Expand Down
1 change: 1 addition & 0 deletions lib/generated/intl/messages_zh.dart
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// ignore_for_file:prefer_single_quotes,comment_references, directives_ordering
// ignore_for_file:annotate_overrides,prefer_generic_function_type_aliases
// ignore_for_file:unused_import, file_names, avoid_escaping_inner_quotes
// ignore_for_file:unnecessary_string_interpolations, unnecessary_string_escapes

import 'package:intl/intl.dart';
import 'package:intl/message_lookup_by_library.dart';
Expand Down
7 changes: 6 additions & 1 deletion lib/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_instance_view_model.dart
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_namespace_view_model.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_partitioned_topic_view_model.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_tenant_view_model.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_topic_view_model.dart';
import 'package:provider/provider.dart';

void main() {
Expand Down Expand Up @@ -76,10 +77,14 @@ class MyApp extends StatelessWidget {
final args = settings.arguments as PulsarNamespaceViewModel;
return RouteGen.pulsarNamespace(args);
}
if (settings.name == PageRouteConst.PulsarTopic) {
if (settings.name == PageRouteConst.PulsarPartitionedTopic) {
final args = settings.arguments as PulsarPartitionedTopicViewModel;
return RouteGen.pulsarPartitionedTopic(args);
}
if (settings.name == PageRouteConst.PulsarTopic) {
final args = settings.arguments as PulsarTopicViewModel;
return RouteGen.pulsarTopic(args);
}
},
);
}
Expand Down
9 changes: 9 additions & 0 deletions lib/module/pulsar/pulsar_sink.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class SinkResp {
final String sinkName;

SinkResp(this.sinkName);

SinkResp deepCopy() {
return new SinkResp(this.sinkName);
}
}
9 changes: 9 additions & 0 deletions lib/module/pulsar/pulsar_source.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class SourceResp {
final String sourceName;

SourceResp(this.sourceName);

SourceResp deepCopy() {
return new SourceResp(this.sourceName);
}
}
1 change: 1 addition & 0 deletions lib/route/page_route_const.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ class PageRouteConst {
static const String PulsarInstance = '/pulsar/instance';
static const String PulsarTenant = '/pulsar/instance/tenant';
static const String PulsarNamespace = '/pulsar/instance/tenant/namespace';
static const String PulsarPartitionedTopic = '/pulsar/instance/tenant/namespace/partitioned-topic';
static const String PulsarTopic = '/pulsar/instance/tenant/namespace/topic';
}
10 changes: 10 additions & 0 deletions lib/route/route_gen.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ import 'package:paas_dashboard_flutter/ui/pulsar/pulsar_instance.dart';
import 'package:paas_dashboard_flutter/ui/pulsar/screen/pulsar_namespace.dart';
import 'package:paas_dashboard_flutter/ui/pulsar/screen/pulsar_partitioned_topic.dart';
import 'package:paas_dashboard_flutter/ui/pulsar/screen/pulsar_tenant.dart';
import 'package:paas_dashboard_flutter/ui/pulsar/screen/pulsar_topic.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_instance_view_model.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_namespace_view_model.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_partitioned_topic_view_model.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_tenant_view_model.dart';
import 'package:paas_dashboard_flutter/vm/pulsar/pulsar_topic_view_model.dart';
import 'package:provider/provider.dart';

class RouteGen {
Expand Down Expand Up @@ -43,4 +45,12 @@ class RouteGen {
child: PulsarPartitionedTopic(),
));
}

static Route pulsarTopic(PulsarTopicViewModel viewModel) {
return MaterialPageRoute(
builder: (context) => ChangeNotifierProvider(
create: (context) => viewModel,
child: PulsarTopic(),
));
}
}
1 change: 0 additions & 1 deletion lib/ui/home/home_page.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import 'package:flutter/material.dart';
import 'package:paas_dashboard_flutter/ui/home/home_drawer.dart';
import 'package:paas_dashboard_flutter/ui/pulsar/pulsar_page.dart';

class HomePage extends StatelessWidget {
HomePage() : super();
Expand Down
Loading

0 comments on commit 8dee723

Please sign in to comment.