A professional MQTT microservice written in Go that serves as a client for a Laravel IoT Cloud backend. This microservice handles all MQTT communication (subscribe, publish, reconnects, handling SSL certificates), offloading the MQTT layer from the Laravel app.
- Supports multiple MQTT brokers simultaneously (HiveMQ Cloud, Mosquitto, etc.)
- Reads connection settings from environment variables
- SSL/TLS support with certificate validation
- Auto-reconnect logic and timeouts
- Simple HTTP API for Laravel to interact with MQTT
- Webhook notifications for received messages
- Health check endpoint
- Comprehensive logging
The microservice is built with a clean, modular architecture:
- Configuration Module: Loads and validates connection settings from environment variables
- MQTT Client Manager: Manages connections to multiple MQTT brokers
- HTTP API Server: Exposes endpoints for publishing messages, managing subscriptions, and checking status
- Webhook Notifier: Sends HTTP notifications to Laravel when messages are received
- Logging Utility: Provides consistent logging throughout the application
POST /publish
: Publish a message to a topicPOST /subscribe
: Subscribe to a topicPOST /unsubscribe
: Unsubscribe from a topicGET /status
: Get the status of all MQTT connectionsGET /healthz
: Health check endpoint
GET /metrics
: Get metrics about the MQTT microserviceGET /logs
: View logs
GET /messages
: Get messages from the databaseGET /messages/{id}
: Get a specific message by IDPOST /messages/{id}/confirm
: Confirm a messageDELETE /messages/{id}
: Delete a specific messageDELETE /messages/confirmed
: Delete all confirmed messages
GET /webhooks
: Get all webhooksPOST /webhooks
: Create a new webhookGET /webhooks/{id}
: Get a specific webhook by IDPUT /webhooks/{id}
: Update a webhookDELETE /webhooks/{id}
: Delete a webhook
The microservice is configured using environment variables. Example:
MQTT_DEFAULT_CONNECTION=hivemq
# HiveMQ Cloud connection settings
MQTT_HIVEMQ_HOST=hfghgfhgfhgfhgf
MQTT_HIVEMQ_PORT=8883
MQTT_HIVEMQ_CLIENT_ID=laravel-backend
MQTT_HIVEMQ_CLEAN_SESSION=true
MQTT_HIVEMQ_ENABLE_LOGGING=true
MQTT_HIVEMQ_LOG_CHANNEL=stack
MQTT_TLS_ENABLED=true
MQTT_TLS_VERIFY_PEER=true
MQTT_TLS_CA_FILE=certificates/www-hivemq-com.pem
MQTT_AUTH_USERNAME=hfghgfhgf
MQTT_AUTH_PASSWORD=shgfhgf
# Mosquitto connection settings
MQTT_MOSQUITTO_HOST=test.mosquitto.org
MQTT_MOSQUITTO_PORT=1883
MQTT_MOSQUITTO_CLIENT_ID=laravel-mosquitto
MQTT_MOSQUITTO_CLEAN_SESSION=true
MQTT_MOSQUITTO_ENABLE_LOGGING=true
# Webhook settings
WEBHOOK_ENABLED=true
WEBHOOK_URL=https://your-laravel-app.com/api/mqtt/webhook
WEBHOOK_METHOD=POST
WEBHOOK_TIMEOUT=10
WEBHOOK_RETRY_COUNT=3
WEBHOOK_RETRY_DELAY=5
- Go 1.22 or higher
- SSL certificates (if using TLS)
-
Clone the repository:
git clone cd MQTTmicroService
-
Download dependencies:
go mod download
-
Build the application:
go build -o mqtt-service main.go
-
Set up your environment variables in a
.env
file or export them directly. -
Run the service:
./mqtt-service
For production deployment, consider the following:
-
Use a process manager like systemd or supervisor to ensure the service stays running.
-
Set up proper logging to a file or a centralized logging system.
-
Use a reverse proxy like Nginx to handle SSL termination and load balancing.
-
Store SSL certificates securely and ensure they are regularly updated.
Endpoint: POST /publish
Request:
{
"topic": "sensors/temperature",
"payload": {"value": 23.5, "unit": "celsius"},
"qos": 1,
"retained": false,
"broker": "hivemq"
}
Response:
{
"status": "success",
"message": "Message published successfully"
}
Endpoint: POST /subscribe
Request:
{
"topic": "sensors/temperature",
"qos": 1,
"broker": "hivemq"
}
Response:
{
"status": "success",
"message": "Subscribed to topic sensors/temperature"
}
Endpoint: POST /unsubscribe
Request:
{
"topic": "sensors/temperature",
"broker": "hivemq"
}
Response:
{
"status": "success",
"message": "Unsubscribed from topic sensors/temperature"
}
Endpoint: GET /status
Response:
{
"status": "ok",
"brokers": {
"hivemq": {
"connected": true,
"subscriptions": ["sensors/temperature", "sensors/humidity"]
},
"mosquitto": {
"connected": false,
"subscriptions": []
}
},
"timestamp": "2023-04-27T16:43:42Z"
}
Endpoint: GET /healthz
Response:
{
"status": "ok"
}
Endpoint: GET /metrics
Response:
{
"messages": {
"published": 42,
"received": 18,
"failed": 2
},
"subscriptions": 5,
"connections": {
"attempts": 7,
"failures": 1,
"successes": 6,
"disconnections": 2
},
"api": {
"requests": 156,
"errors": 3
},
"latency": {
"publish": "15.2ms",
"subscribe": "22.7ms"
},
"last_updated": "2023-04-27T16:43:42Z"
}
Endpoint: GET /logs
Response: Plain text log output
Endpoint: GET /messages?confirmed=false&limit=10
Response:
{
"status": "success",
"messages": [
{
"id": "1682619845123456789",
"topic": "sensors/temperature",
"payload": {"value": 23.5, "unit": "celsius"},
"qos": 1,
"retained": false,
"timestamp": "2023-04-27T16:43:42Z",
"confirmed": false
},
{
"id": "1682619845987654321",
"topic": "sensors/humidity",
"payload": {"value": 45.2, "unit": "percent"},
"qos": 1,
"retained": false,
"timestamp": "2023-04-27T16:43:42Z",
"confirmed": false
}
],
"count": 2
}
Endpoint: GET /messages/{id}
Response:
{
"status": "success",
"message": {
"id": "1682619845123456789",
"topic": "sensors/temperature",
"payload": {"value": 23.5, "unit": "celsius"},
"qos": 1,
"retained": false,
"timestamp": "2023-04-27T16:43:42Z",
"confirmed": false
}
}
Endpoint: POST /messages/{id}/confirm
Response:
{
"status": "success",
"message": "Message 1682619845123456789 confirmed"
}
Endpoint: DELETE /messages/{id}
Response:
{
"status": "success",
"message": "Message 1682619845123456789 deleted"
}
Endpoint: DELETE /messages/confirmed
Response:
{
"status": "success",
"message": "5 confirmed messages deleted",
"count": 5
}
Endpoint: GET /webhooks
Response:
{
"status": "success",
"webhooks": [
{
"id": "1682619845123456789",
"name": "Temperature Webhook",
"url": "https://your-laravel-app.com/api/temperature",
"method": "POST",
"topic_filter": "sensors/temperature",
"enabled": true,
"headers": {
"X-API-Key": "your-api-key"
},
"timeout": 10,
"retry_count": 3,
"retry_delay": 5,
"created_at": "2023-04-27T16:43:42Z",
"updated_at": "2023-04-27T16:43:42Z"
}
],
"count": 1
}
Endpoint: GET /webhooks/{id}
Response:
{
"status": "success",
"webhook": {
"id": "1682619845123456789",
"name": "Temperature Webhook",
"url": "https://your-laravel-app.com/api/temperature",
"method": "POST",
"topic_filter": "sensors/temperature",
"enabled": true,
"headers": {
"X-API-Key": "your-api-key"
},
"timeout": 10,
"retry_count": 3,
"retry_delay": 5,
"created_at": "2023-04-27T16:43:42Z",
"updated_at": "2023-04-27T16:43:42Z"
}
}
Endpoint: POST /webhooks
Request:
{
"name": "Temperature Webhook",
"url": "https://your-laravel-app.com/api/temperature",
"method": "POST",
"topic_filter": "sensors/temperature",
"enabled": true,
"headers": {
"X-API-Key": "your-api-key"
},
"timeout": 10,
"retry_count": 3,
"retry_delay": 5
}
Response:
{
"status": "success",
"message": "Webhook created successfully",
"webhook": {
"id": "1682619845123456789",
"name": "Temperature Webhook",
"url": "https://your-laravel-app.com/api/temperature",
"method": "POST",
"topic_filter": "sensors/temperature",
"enabled": true,
"headers": {
"X-API-Key": "your-api-key"
},
"timeout": 10,
"retry_count": 3,
"retry_delay": 5,
"created_at": "2023-04-27T16:43:42Z",
"updated_at": "2023-04-27T16:43:42Z"
}
}
Endpoint: PUT /webhooks/{id}
Request:
{
"name": "Updated Temperature Webhook",
"url": "https://your-laravel-app.com/api/temperature/v2",
"method": "POST",
"topic_filter": "sensors/+/temperature",
"enabled": true,
"headers": {
"X-API-Key": "your-new-api-key"
},
"timeout": 15,
"retry_count": 5,
"retry_delay": 10
}
Response:
{
"status": "success",
"message": "Webhook updated successfully",
"webhook": {
"id": "1682619845123456789",
"name": "Updated Temperature Webhook",
"url": "https://your-laravel-app.com/api/temperature/v2",
"method": "POST",
"topic_filter": "sensors/+/temperature",
"enabled": true,
"headers": {
"X-API-Key": "your-new-api-key"
},
"timeout": 15,
"retry_count": 5,
"retry_delay": 10,
"created_at": "2023-04-27T16:43:42Z",
"updated_at": "2023-04-27T16:45:00Z"
}
}
Endpoint: DELETE /webhooks/{id}
Response:
{
"status": "success",
"message": "Webhook 1682619845123456789 deleted successfully"
}
The microservice can send webhook notifications to your Laravel application when messages are received on subscribed topics. This allows your Laravel application to react to MQTT messages without having to poll the microservice.
The microservice supports two types of webhook configurations:
- Global Webhook: Configured using environment variables
- Database Webhooks: Created and managed via API endpoints (see Webhook Management Endpoints)
When a message is received on a subscribed topic, the microservice sends notifications to both the global webhook (if enabled) and any matching webhooks from the database.
The global webhook is configured using environment variables:
WEBHOOK_ENABLED=true
WEBHOOK_URL=https://your-laravel-app.com/api/mqtt/webhook
WEBHOOK_METHOD=POST
WEBHOOK_TIMEOUT=10
WEBHOOK_RETRY_COUNT=3
WEBHOOK_RETRY_DELAY=5
WEBHOOK_ENABLED
: Set totrue
to enable global webhook notifications, orfalse
to disable itWEBHOOK_URL
: The URL to send webhook notifications toWEBHOOK_METHOD
: The HTTP method to use (default:POST
)WEBHOOK_TIMEOUT
: The timeout for webhook requests in seconds (default:10
)WEBHOOK_RETRY_COUNT
: The number of times to retry failed webhook requests (default:3
)WEBHOOK_RETRY_DELAY
: The delay between retries in seconds (default:5
)
Note: The global webhook is optional. If you set
WEBHOOK_ENABLED=false
or don't setWEBHOOK_URL
, the global webhook will be disabled, but database webhooks will still work.
In addition to or instead of the global webhook, you can create and manage webhooks via API endpoints. These webhooks are stored in the database and can be configured to match specific MQTT topics using wildcards.
You can use database webhooks exclusively by setting WEBHOOK_ENABLED=false
in your environment variables, which is useful if you need different webhooks for different topics or want to manage webhooks dynamically without restarting the service.
See the Webhook Management Endpoints section for details on how to create and manage database webhooks.
When a message is received on a subscribed topic, the microservice sends a webhook notification to the configured URL with the following payload:
{
"topic": "sensors/temperature",
"payload": {"value": 23.5, "unit": "celsius"},
"qos": 1,
"timestamp": "2023-04-27T16:43:42Z",
"broker": "hivemq"
}
topic
: The MQTT topic the message was received onpayload
: The message payload (parsed as JSON if possible, otherwise as a string)qos
: The QoS level of the messagetimestamp
: The time the message was receivedbroker
: The name of the broker the message was received from
To integrate with Laravel, create a route and controller to handle the webhook notifications:
// routes/api.php
Route::post('/mqtt/webhook', 'MqttWebhookController@handle');
// app/Http/Controllers/MqttWebhookController.php
namespace App\Http\Controllers;
use Illuminate\Http\Request;
class MqttWebhookController extends Controller
{
public function handle(Request $request)
{
$topic = $request->input('topic');
$payload = $request->input('payload');
$qos = $request->input('qos');
$timestamp = $request->input('timestamp');
$broker = $request->input('broker');
// Process the message
// For example, dispatch an event
event(new MqttMessageReceived($topic, $payload, $qos, $timestamp, $broker));
return response()->json(['status' => 'success']);
}
}