Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 38 additions & 7 deletions apps/flowlord/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"embed"
"encoding/json"
"errors"
"fmt"
"html/template"
"io"
"io/fs"
Expand Down Expand Up @@ -118,7 +119,7 @@ func (tm *taskMaster) StartHandler() {
RunTime: gtools.PrintDuration(time.Since(tm.initTime)),
}
b, _ := json.Marshal(sts)
if err := tm.slack.Notify(string(b), slack.OK); err != nil {
if err := tm.notify.Notify(string(b), slack.OK); err != nil {
w.Write([]byte(err.Error()))
}
})
Expand All @@ -140,7 +141,15 @@ func (tm *taskMaster) StartHandler() {
}

log.Printf("starting handler on :%v", tm.port)
http.ListenAndServe(":"+strconv.Itoa(tm.port), router)
tm.httpServer = &http.Server{
Addr: ":" + strconv.Itoa(tm.port),
Handler: router,
}
go func() {
if err := tm.httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Println("http server:", err)
}
}()
}

func (tm *taskMaster) Info(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -286,6 +295,10 @@ func (tm *taskMaster) refreshHandler(w http.ResponseWriter, _ *http.Request) {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if err := tm.taskCache.Sync(); err != nil {
http.Error(w, "sqlite backup: "+err.Error(), http.StatusInternalServerError)
return
}
v := struct {
Files []string `json:",omitempty"`
Cache string
Expand Down Expand Up @@ -693,9 +706,9 @@ func (tm *taskMaster) aboutHTML() []byte {
"SchemaVersion": tm.taskCache.GetSchemaVersion(),
"Retention": gtools.PrintDuration(tm.taskCache.Retention),
"TaskTTL": gtools.PrintDuration(tm.taskCache.TaskTTL),
"MinFrequency": gtools.PrintDuration(tm.slack.MinFrequency),
"MaxFrequency": gtools.PrintDuration(tm.slack.MaxFrequency),
"CurrentFrequency": gtools.PrintDuration(tm.slack.GetCurrentDuration()),
"MinFrequency": gtools.PrintDuration(tm.notify.MinFrequency),
"MaxFrequency": gtools.PrintDuration(tm.notify.MaxFrequency),
"CurrentFrequency": gtools.PrintDuration(tm.notify.GetAlertFrequency()),
"CurrentPage": "about",
"DateValue": "", // About page doesn't need date
"PageTitle": "System Information",
Expand Down Expand Up @@ -843,13 +856,31 @@ func (tm *taskMaster) Backloader(w http.ResponseWriter, r *http.Request) {

if req.Execute {
resp.Status = "Executed: " + resp.Status
attempted := len(resp.Tasks)
errs := appenderr.New()
failed := 0
for _, t := range resp.Tasks {
tm.taskCache.Add(t)
errs.Add(tm.producer.Send(t.Type, t.JSONBytes()))
if err := tm.producer.Send(t.Type, t.JSONBytes()); err != nil {
failed++
errs.Add(err)
}
}
if errs.ErrOrNil() != nil {
http.Error(w, "issue writing to producer "+errs.Error(), http.StatusInternalServerError)
// Sent vs attempted must come from the loop: appenderr merges duplicate messages,
// so it has no total count of failed sends.
sent := attempted - failed
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
_ = json.NewEncoder(w).Encode(map[string]interface{}{
"Status": fmt.Sprintf(
"%d/%d messages sent: %s",
sent,
attempted,
strings.TrimSpace(errs.Error()),
),
})
return
}
} else {
resp.Status = "DRY RUN ONLY: " + resp.Status
Expand Down
2 changes: 1 addition & 1 deletion apps/flowlord/handler/backload.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@

<!-- Preview Results Section -->
<div class="info-card" id="previewSection" style="display: none;">
<h3>Preview Results</h3>
<h3 id="previewResultsHeading">Preview Results</h3>
<div id="previewStatus" class="preview-status"></div>
<div class="table-container">
<table id="previewTable">
Expand Down
108 changes: 74 additions & 34 deletions apps/flowlord/handler/static/backload.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
elements.executeBtn = document.getElementById('executeBtn');
elements.resetBtn = document.getElementById('resetBtn');
elements.previewSection = document.getElementById('previewSection');
elements.previewResultsHeading = document.getElementById('previewResultsHeading');
elements.previewStatus = document.getElementById('previewStatus');
elements.previewTableBody = document.getElementById('previewTableBody');
elements.previewCount = document.getElementById('previewCount');
Expand Down Expand Up @@ -549,12 +550,22 @@
}
}

// Parse API error body: JSON Status or raw text
function messageFromApiResponse(responseText, fallback) {
if (responseText == null || responseText === '') {
return fallback || 'Request failed';
}
try {
const j = JSON.parse(responseText);
if (j && typeof j.Status === 'string' && j.Status.length > 0) {
return j.Status;
}
} catch (e) { /* use raw */ }
return responseText;
}

// Execute button click handler
async function handleExecuteClick() {
if (!confirm('Are you sure you want to execute this backload? This will create ' + previewTasks.length + ' tasks.')) {
return;
}

const request = buildRequest(true);
elements.requestBodyDisplay.textContent = JSON.stringify(request, null, 2);

Expand All @@ -568,31 +579,36 @@
});

const responseText = await response.text();
let data;

let data = null;
try {
data = JSON.parse(responseText);
} catch (e) {
throw new Error(responseText || 'Execution failed');
data = null;
}

if (!response.ok) {
throw new Error(data.Status || responseText || 'Execution failed');
const msg = messageFromApiResponse(responseText, 'Execution failed');
elements.executionSection.style.display = 'block';
elements.executionStatus.className = 'execution-status error';
elements.executionStatus.textContent = msg;
return;
}

elements.executionSection.style.display = 'block';
elements.executionStatus.className = 'execution-status success';
elements.executionStatus.innerHTML = `
<strong>Success!</strong><br>
${escapeHtml(data.Status)}<br>
Created ${data.Count} tasks.
`;
if (!data) {
elements.executionSection.style.display = 'block';
elements.executionStatus.className = 'execution-status error';
elements.executionStatus.textContent = responseText || 'Invalid JSON response';
return;
}

showExecutionResults(data);
elements.executionSection.style.display = 'none';
elements.executeBtn.style.display = 'none';

} catch (error) {
elements.executionSection.style.display = 'block';
elements.executionStatus.className = 'execution-status error';
elements.executionStatus.textContent = 'Error: ' + error.message;
elements.executionStatus.textContent = error.message || String(error);
} finally {
setButtonLoading(elements.executeBtn, false, 'Execute Backload');
}
Expand Down Expand Up @@ -620,19 +636,16 @@

hideTemplateInfo();
initializeDates();
if (elements.previewResultsHeading) {
elements.previewResultsHeading.textContent = 'Preview Results';
}
updatePreviewButton();
}

// Show preview results
function showPreviewResults(data) {
elements.previewSection.style.display = 'block';
elements.previewStatus.className = 'preview-status info';
elements.previewStatus.textContent = data.Status || 'Dry run complete';

function renderTasksIntoPreviewTable(tasks, emptyRowHtml) {
elements.previewTableBody.innerHTML = '';

if (data.Tasks && data.Tasks.length > 0) {
data.Tasks.forEach((task, index) => {
if (tasks && tasks.length > 0) {
tasks.forEach((task, index) => {
const row = document.createElement('tr');
row.innerHTML = `
<td class="num-cell num-column">${index + 1}</td>
Expand All @@ -643,24 +656,51 @@
`;
elements.previewTableBody.appendChild(row);
});

elements.previewCount.textContent = `Total tasks to be created: ${data.Count}`;
elements.executeBtn.style.display = 'inline-block';
elements.executeBtn.disabled = false;
} else {
elements.previewTableBody.innerHTML = '<tr><td colspan="5" class="no-tasks">No tasks would be created</td></tr>';
elements.previewCount.textContent = '';
elements.executeBtn.style.display = 'none';
elements.previewTableBody.innerHTML = emptyRowHtml || '<tr><td colspan="5" class="no-tasks">No tasks</td></tr>';
}

// Add expand/collapse functionality to cells
document.querySelectorAll('#previewTableBody .expandable').forEach(cell => {
cell.addEventListener('click', function() {
this.classList.toggle('expanded');
});
});
}

// Show preview results
function showPreviewResults(data) {
if (elements.previewResultsHeading) {
elements.previewResultsHeading.textContent = 'Preview Results';
}
elements.previewSection.style.display = 'block';
elements.previewStatus.className = 'preview-status info';
elements.previewStatus.textContent = data.Status || 'Dry run complete';

renderTasksIntoPreviewTable(data.Tasks, '<tr><td colspan="5" class="no-tasks">No tasks would be created</td></tr>');

if (data.Tasks && data.Tasks.length > 0) {
elements.previewCount.textContent = `Total tasks to be created: ${data.Count}`;
elements.executeBtn.style.display = 'inline-block';
elements.executeBtn.disabled = false;
} else {
elements.previewCount.textContent = '';
elements.executeBtn.style.display = 'none';
}
}

function showExecutionResults(data) {
if (elements.previewResultsHeading) {
elements.previewResultsHeading.textContent = 'Execution results';
}
elements.previewSection.style.display = 'block';
elements.previewStatus.className = 'preview-status execution-success';
elements.previewStatus.innerHTML =
'<strong>Executed (not a dry run)</strong><br>' +
escapeHtml(data.Status || '');
renderTasksIntoPreviewTable(data.Tasks, '<tr><td colspan="5" class="no-tasks">No tasks were sent</td></tr>');
const n = typeof data.Count === 'number' ? data.Count : (data.Tasks && data.Tasks.length) || 0;
elements.previewCount.textContent = `Created ${n} task(s). Jobs were sent to the task bus.`;
}

// Initialize date inputs with today's date
function initializeDates() {
const today = new Date().toISOString().split('T')[0];
Expand Down
6 changes: 6 additions & 0 deletions apps/flowlord/handler/static/style.css
Original file line number Diff line number Diff line change
Expand Up @@ -1760,6 +1760,12 @@ select.form-control:disabled {
border: 1px solid #c3e6cb;
}

.preview-status.execution-success {
background: #d4edda;
color: #155724;
border: 1px solid #c3e6cb;
}

#previewTable {
margin-top: 16px;
}
Expand Down
4 changes: 2 additions & 2 deletions apps/flowlord/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -652,15 +652,15 @@ func TestAboutHTML(t *testing.T) {
MinFrequency: 5 * time.Minute,
MaxFrequency: 30 * time.Minute,
}
notification.currentDuration.Store(int64(10 * time.Minute))
notification.alertFrequency.Store(int64(10 * time.Minute))

// Create a mock taskMaster with test data
tm := &taskMaster{
initTime: time.Now().Add(-2 * time.Hour), // 2 hours ago
nextUpdate: time.Now().Add(30 * time.Minute), // 30 minutes from now
lastUpdate: time.Now().Add(-15 * time.Minute), // 15 minutes ago
taskCache: taskCache,
slack: notification,
notify: notification,
}

// Generate HTML using the aboutHTML method
Expand Down
27 changes: 19 additions & 8 deletions apps/flowlord/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,25 +247,36 @@ func (o *SQLite) migrateSchema(currentVersion int) error {
// Close the DB connection and copy the current file to the backup location
func (o *SQLite) Close() error {
var errs []error
if err := o.db.Close(); err != nil {
if err := o.Sync(); err != nil {
errs = append(errs, err)
}
if o.BackupPath != "" {
log.Printf("Backing up DB to %s", o.BackupPath)
if err := o.Sync(); err != nil {
if o.db != nil {
if err := o.db.Close(); err != nil {
errs = append(errs, err)
}
}

if len(errs) > 0 {
return fmt.Errorf("close errors: %v", errs)
}
return nil
}

// Sync the local DB to the backup location
// Sync checkpoints WAL into the main DB file (so a plain file copy is consistent), then
// copies LocalPath to BackupPath. Holds o.mu for the duration so backups do not interleave
// with other cache operations that also take o.mu.
func (o *SQLite) Sync() error {
if o == nil || o.BackupPath == "" {
// no cache to backup
return nil
}
o.mu.Lock()
defer o.mu.Unlock()

if o.db != nil {
if _, err := o.db.Exec("PRAGMA wal_checkpoint(TRUNCATE);"); err != nil {
return fmt.Errorf("wal checkpoint: %w", err)
}
}
return copyFiles(o.LocalPath, o.BackupPath, o.fOpts)
}



Loading
Loading