Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
dc22d0b
Merge from subproc
mattbillenstein Feb 15, 2025
6b56316
Merge syncdir exclude
mattbillenstein Feb 15, 2025
a42389b
Simpler nexclude
mattbillenstein Feb 15, 2025
e03fab5
Add geventmp, update reqs
mattbillenstein Feb 15, 2025
d235797
Move some Reactor bits to Server / Client, prep for MP in server
mattbillenstein Feb 15, 2025
554e55e
Comments for the casual reader
mattbillenstein Feb 15, 2025
b08233e
Move get_meta to util, simplify imports
mattbillenstein Feb 15, 2025
eb78327
Flake8 fixes
mattbillenstein Feb 16, 2025
9124e78
Flake8 ignore, misc cleanup
mattbillenstein Feb 16, 2025
3c65046
Document skipped flake8 rules
mattbillenstein Feb 16, 2025
393e3ec
Rename Reactor
mattbillenstein Feb 16, 2025
c0529ca
Prep for client proc
mattbillenstein Feb 16, 2025
304640e
Client proc stuff, broken, listener seems to be inherited by child proc
mattbillenstein Feb 16, 2025
1a429d1
Ugly, but subprocess.Popen with pass_fds= mostly working...
mattbillenstein Feb 17, 2025
06dc95d
Cleaned up working subprocess implementation...
mattbillenstein Feb 18, 2025
fff535e
Cleanup logging
mattbillenstein Feb 18, 2025
b0f2591
flake8, some cleanup
mattbillenstein Feb 18, 2025
efc6719
More docs
mattbillenstein Feb 18, 2025
73e3819
Need crypto_pass to decrypt
mattbillenstein Feb 18, 2025
9e732e2
Use same exe for client_proc as server proc
mattbillenstein Feb 18, 2025
5bba232
Update README
mattbillenstein Feb 18, 2025
6eb15f7
Less verbose logging, more graceful disconnect/exit
mattbillenstein Feb 18, 2025
27b2633
Cleanup / docs
mattbillenstein Feb 19, 2025
7684d17
Apply cleanup and comments
mattbillenstein Feb 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# E203 whitespace before ':'
# E302 expected 2 blank lines, found 1
# E305 expected 2 blank lines after class or function definition
# E402 module level import not at top of file
# W503 line break before binary operator
# W605 invalid escape sequence
# E731 do not assign a lambda expression
[flake8]
ignore = E203,E302,E305,E402,W503,W605,E731
exclude = .git,.venv,example
max-line-length = 250
4 changes: 0 additions & 4 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
# keys
*.pem
*.pass

# python
*.pyc
__pycache__
Expand Down
34 changes: 20 additions & 14 deletions README.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
Salty - devops inspired by Saltstack, but simpler.

This is an experiment to see with how little code I could build a useful
SaltStack-like deployment system. As of July 2023, that is about 1300 LOC, and
I'm using it on a couple projects in production/staging/dev environments. It
supports Linux (primary tested on Ubuntu LTS) and MacOS using Python3.
SaltStack-like deployment system. As of February 2025, that is about 2000 LOC,
and I'm using it on a couple projects in production/staging/dev environments.
It supports Linux (primary tested on Ubuntu LTS) and MacOS using Python3.

It currently implements server/client over msgpack-rpc using gevent TLS/TCP
sockets, a simple request/response mechanism for triggering deployments across
Expand Down Expand Up @@ -110,50 +110,56 @@ Below is some simple documentation for what is currently available in writing
roles and I encourage you to consult the source in the "run" method in
operators.py:

https://github.com/mattbillenstein/salty/blob/master/operators.py
https://github.com/mattbillenstein/salty/blob/master/lib/operators.py

Common Imports available in roles/templates:
os, os.path, json

Functions available in roles:
File management:
copy(src, dst, user=DEFAULT_USER, mode=0o644):
copy(src, dst, user=DEFAULT_USER, mode=0o644)
copy src file from server to dst path on client

line_in_file(line, path, user=DEFAULT_USER, mode=0o644):
remove(path):
recursively remove a path (see shutil.rmtree)

line_in_file(line, path, user=DEFAULT_USER, mode=0o644)
ensure given line is in file at path, create the file if it doesn't exist

makedirs(path, user=DEFAULT_USER, mode=0o755):
makedirs(path, user=DEFAULT_USER, mode=0o755)
make all directories up to final directory denoted by path

render(src, dst, user=DEFAULT_USER, mode=0o644, **kw):
render(src, dst, user=DEFAULT_USER, mode=0o644, **kw)
render src template from server to dst path on client

symlink(src, dst):
symlink(src, dst)
symlink src to dest on client

syncdir(src, dst, user=DEFAULT_USER, mode=0o755):
syncdir(src, dst, user=DEFAULT_USER, mode=0o755, exclude=None)
Synchronize a src dir on the server to the dst dir on the client - ala
rsync

Shell commands:
shell(cmds, **kw)
Run a string of shell commands, **kw are optional Popen kwargs
Run shell commands on client, **kw are optional Popen kwargs

server_shell(cmds, **kw)
Run shell commands on server, **kw are optional Popen kwargs

User management:
useradd(username, system=False)
Add a user and group of same name

usergroups(username, groups)
Add/remove user's groups

Misc
print(s)
Captures output for the role run response

is_changed()
True if command in the current role has changed

get_ips(role, key='private_ip')
get a list of ip addresses by role

Context available in templates:
id:
current host id
Expand Down
193 changes: 193 additions & 0 deletions client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import os
import os.path
import socket
import time
import traceback
from queue import Queue

import gevent

from lib import operators
from lib.net import MsgMixin, ConnectionTimeout
from lib.util import elapsed, get_facts, log, log_error

PING_INTERVAL = 10.0
FACTS_INTERVAL = 60.0

class SaltyClient(MsgMixin):
# The client process, we connect to the server, identify with id/facts,
# start a ping process to keep our connection alive and periodically update
# facts, and then wait for commands.

def __init__(self, addr, keyroot=os.getcwd(), id=None, path=''):
self.futures = {}
self.keyroot = keyroot

self.addr = addr
self.id = id
self.path = path

self._last_pong = time.time()

def connect(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock = self.wrap_socket(sock)
sock.connect(self.addr)
return sock

def handle_pong(self, msg, q):
self._last_pong = time.time()

def handle_run(self, msg, q):
# Run a role on this client and report back results of each command
start = time.time()

# these closures capture the Queue that we write the rpc response to
# during the run for these functions which get injected into the global
# namespace for the role python file being exec'd
def get_file(path, **opts):
msg = {'type': 'get_file', 'path': path}
msg.update(opts)
res = self.do_rpc(msg, q)
assert not res.get('error'), res['error']
return res

def syncdir_get_file(path):
msg = {'type': 'syncdir_get_file', 'path': path}
res = self.do_rpc(msg, q)
assert not res.get('error'), res['error']
return res

def syncdir_scandir(path, exclude=None):
msg = {'type': 'syncdir_scandir', 'path': path, 'exclude': exclude}
res = self.do_rpc(msg, q)
assert not res.get('error'), res['error']
return res['data']

def server_shell(cmds, **kwds):
msg = {'type': 'server_shell', 'cmds': cmds, 'kwds': kwds}
res = self.do_rpc(msg, q)
assert not res.get('error'), res['error']
return res['data']

content = msg.pop('content') # this is the role python file we will exec
results, output = operators.run(
content,
msg['context'],
start,
self.path,
get_file,
syncdir_get_file,
syncdir_scandir,
server_shell,
)

msg['type'] = 'future'
msg['result'] = {'results': results, 'output': '\n'.join(output), 'elapsed': elapsed(start)}
q.put(msg)

rc = sum(_['rc'] for _ in results)
log(f'Run {msg["context"]["id"]} {msg["context"]["role"]} {rc} {msg["result"]["elapsed"]:.6f}')

def _pinger(self, q):
# Keep our long-lived socket alive, ping every N seconds and update
# facts periodically, if we don't get a pong back in 1.5x the ping
# interval, consider the socket stuck and exit - this will cause the
# handle function to exit and the serve_forever loop to re-connect.
last_facts = time.time()
while 1:
now = time.time()
msg = {'type': 'ping'}
if now - last_facts > FACTS_INTERVAL:
# re-send facts every ~1m
msg['id'] = self.id
msg['facts'] = get_facts()
last_facts = now

q.put(msg)
time.sleep(PING_INTERVAL)

# if no pong back, break
#
# this could cause issues if we're sending very large files over
# slow connections and the pong could be behind a lot of other
# messages...
if (self._last_pong - now) > (PING_INTERVAL * 1.5):
log_error('MISSING PONG', self._last_pong - now)
break

def handle(self, sock, addr):
# Connection message loop, handle messages received from the server and
# create two threads; one that writes messages back to the socket from
# a Queue, and another that pings the server every N seconds
log(f'Connection established {addr[0]}:{addr[1]}')

q = Queue()
g = gevent.spawn(self._writer, q, sock)

p = gevent.spawn(self._pinger, q)
q.put({'type': 'identify', 'id': self.id, 'facts': get_facts()})

try:
while 1:
try:
# if writer / pinger dead, break and eventually close the
# socket...
if g.dead or p.dead:
break

msg = self.recv_msg(sock)
if not msg:
break

self.handle_msg(msg, q)
except OSError as e:
log_error(f'Connection lost {addr[0]}:{addr[1]} exc:{e}')
break
finally:
log(f'Connection lost {addr[0]}:{addr[1]}')
[_.kill() for _ in (g, p) if not _.dead]
sock.close()

def serve_forever(self):
# Connect loop, connect to the server creating a socket and pass that
# to handle - If handle raises or exits, take a short break and
# reconnect.
while 1:
sock = None
try:
sock = self.connect()
self.handle(sock, self.addr)
except KeyboardInterrupt:
break
except Exception:
tb = traceback.format_exc().strip()
log_error('Exception in client serve:\n', tb)
finally:
if sock:
sock.close()

time.sleep(3)

def run(self, msg):
# Run a single message and return result - used by the cli for
# apply/hosts/etc
#
# I don't use a Queue / greenlet writer here since there is only the
# current thread interacting with this socket...

sock = self.connect()
self.send_msg(sock, msg)

start = time.time()
while 1:
try:
msg = self.recv_msg(sock, timeout=5)
if not msg:
log_error('Server unexpectedly disconnected...')
break
if msg['type'] != 'pong':
return msg
log(f'Working {int(time.time()-start):} seconds ...', end='\r')
except ConnectionTimeout:
self.send_msg(sock, {'type': 'ping'})
Loading