Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature] Add script method to get ip address #131

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 165 additions & 42 deletions cloudflare-ddns.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,69 @@
# A small, 🕵️ privacy centric, and ⚡
# lightning fast multi-architecture Docker image for self hosting projects.

__version__ = "1.0.2"
__version__ = "1.0.4"

import fcntl
import json
import os
import platform
import requests
import signal
import socket
import struct
import sys
import threading
import time
import requests
import subprocess
from ipaddress import ip_address

CONFIG_PATH = os.environ.get('CONFIG_PATH', os.getcwd())
shown_ipv4_warning = False
shown_ipv4_warning_secondary = False
shown_ipv6_warning = False
shown_ipv6_warning_secondary = False


def validate_ip_address(ip):
# Validate an IPv4 or IPv6 address
try:
ip_address(ip)
return True
except ValueError:
return False


def get_ip_address(ifname):
# helper function for getting the IPv4 address of an interface
# credit: https://stackoverflow.com/a/24196955
if platform.system() != 'Linux':
raise OSError("This method only works on Linux")

s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
return socket.inet_ntoa(fcntl.ioctl(
s.fileno(),
0x8915, # SIOCGIFADDR
struct.pack('256s', bytes(ifname[:15], 'utf-8'))
)[20:24])


def get_ipv6_address(ifname):
# helper function for getting the IPv6 address of an interface
# credit: https://stackoverflow.com/questions/20743709 and ChatGPT
if platform.system() != 'Linux':
raise OSError("This method only works on Linux")

with open('/proc/net/if_inet6') as f:
for line in f:
fields = line.split()
if_name = fields[-1]
# only use the global ipv6 address
if if_name == ifname and fields[0].startswith(('2', '3', '4', '5', '6')):
if_addr = fields[0]
ipv6 = ":".join([if_addr[i:i+4]
for i in range(0, len(if_addr), 4)])
return ipv6
return None


class GracefulExit:
Expand Down Expand Up @@ -56,53 +108,127 @@ def getIPs():
global ipv4_enabled
global ipv6_enabled
global purgeUnknownRecords
global shown_ipv4_warning
global shown_ipv4_warning_secondary
global shown_ipv6_warning
global shown_ipv6_warning_secondary

if ipv4_enabled:
try:
a = requests.get(
"https://1.1.1.1/cdn-cgi/trace").text.split("\n")
a.pop()
a = dict(s.split("=") for s in a)["ip"]
except Exception:
global shown_ipv4_warning
if not shown_ipv4_warning:
shown_ipv4_warning = True
print("🧩 IPv4 not detected via 1.1.1.1, trying 1.0.0.1")
# Try secondary IP check
method_v4 = config["ip"]["a"]["method"]
if method_v4 == 'http':
try:
a = requests.get(
"https://1.0.0.1/cdn-cgi/trace").text.split("\n")
"https://1.1.1.1/cdn-cgi/trace").text.split("\n")
a.pop()
a = dict(s.split("=") for s in a)["ip"]
except Exception:
global shown_ipv4_warning_secondary
if not shown_ipv4_warning_secondary:
shown_ipv4_warning_secondary = True
print("🧩 IPv4 not detected via 1.0.0.1. Verify your ISP or DNS provider isn't blocking Cloudflare's IPs.")
if not shown_ipv4_warning:
shown_ipv4_warning = True
print("🧩 IPv4 not detected via 1.1.1.1, trying 1.0.0.1")
# Try secondary IP check
try:
a = requests.get(
"https://1.0.0.1/cdn-cgi/trace").text.split("\n")
a.pop()
a = dict(s.split("=") for s in a)["ip"]
except Exception:
if not shown_ipv4_warning_secondary:
shown_ipv4_warning_secondary = True
print("🧩 IPv4 not detected via 1.0.0.1. Verify your ISP or DNS provider isn't blocking Cloudflare's IPs.")
if purgeUnknownRecords:
deleteEntries("A")
elif method_v4 == 'netif':
try:
interface_v4 = config["ip"]["a"]["interface"]
a = get_ip_address(interface_v4)
except Exception as e:
if not shown_ipv4_warning:
shown_ipv4_warning = True
if type(e) == KeyError:
print("⚙️ No config detected for interface to get IPv4 addr")
else:
print(f"🧩 IPv4 not detected via {interface_v4}. Verify your interface is up and has an IPv4 address.")
if purgeUnknownRecords:
deleteEntries("A")
elif method_v4 == 'script':
try:
script_v4 = config["ip"]["a"]["script"]
output = subprocess.check_output(
[script_v4], stderr=subprocess.STDOUT).decode("utf-8").strip()
a = str(ip_address(output))
except Exception as e:
if not shown_ipv4_warning:
shown_ipv4_warning = True
if type(e) == KeyError:
print("⚙️ No config detected for script to get IPv4 addr")
elif type(e) == PermissionError:
print(f"⚙️ Permission denied when running script {script_v4}, check your permission settings")
elif type(e) == ValueError:
print(f"🧩 Script {script_v4} returned invalid IPv4 address")
else:
print(f"🧩 An error occurs when running script {script_v4}, check your script")

if purgeUnknownRecords:
deleteEntries("A")
else:
print(f"⚙️ Method {method_v4} not supported")

if ipv6_enabled:
try:
aaaa = requests.get(
"https://[2606:4700:4700::1111]/cdn-cgi/trace").text.split("\n")
aaaa.pop()
aaaa = dict(s.split("=") for s in aaaa)["ip"]
except Exception:
global shown_ipv6_warning
if not shown_ipv6_warning:
shown_ipv6_warning = True
print("🧩 IPv6 not detected via 1.1.1.1, trying 1.0.0.1")
method_v6 = config["ip"]["aaaa"]["method"]
if method_v6 == 'http':
try:
aaaa = requests.get(
"https://[2606:4700:4700::1001]/cdn-cgi/trace").text.split("\n")
"https://[2606:4700:4700::1111]/cdn-cgi/trace").text.split("\n")
aaaa.pop()
aaaa = dict(s.split("=") for s in aaaa)["ip"]
except Exception:
global shown_ipv6_warning_secondary
if not shown_ipv6_warning_secondary:
shown_ipv6_warning_secondary = True
print("🧩 IPv6 not detected via 1.0.0.1. Verify your ISP or DNS provider isn't blocking Cloudflare's IPs.")
if not shown_ipv6_warning:
shown_ipv6_warning = True
print("🧩 IPv6 not detected via 1.1.1.1, trying 1.0.0.1")
try:
aaaa = requests.get("https://[2606:4700:4700::1001]/cdn-cgi/trace").text.split("\n")
aaaa.pop()
aaaa = dict(s.split("=") for s in aaaa)["ip"]
except Exception:
if not shown_ipv6_warning_secondary:
shown_ipv6_warning_secondary = True
print("🧩 IPv6 not detected via 1.0.0.1. Verify your ISP or DNS provider isn't blocking Cloudflare's IPs.")
if purgeUnknownRecords:
deleteEntries("AAAA")
elif method_v6 == 'netif':
try:
interface_v6 = config["ip"]["aaaa"]["interface"]
aaaa = get_ipv6_address(interface_v6)
except Exception as e:
if not shown_ipv6_warning:
shown_ipv6_warning = True
if type(e) == KeyError:
print("⚙️ No config detected for interface to get IPv6 addr")
else:
print(f"🧩 IPv6 not detected via {interface_v6}. Verify your interface is up and has an IPv6 address.")
if purgeUnknownRecords:
deleteEntries("AAAA")
elif method_v6 == 'script':
try:
script_v6 = config["ip"]["aaaa"]["script"]
output = subprocess.check_output(
[script_v6], stderr=subprocess.STDOUT).decode("utf-8").strip()
aaaa = str(ip_address(output))
except Exception as e:
if not shown_ipv6_warning:
shown_ipv6_warning = True
if type(e) == KeyError:
print("⚙️ No config detected for script to get IPv6 addr")
elif type(e) == PermissionError:
print(f"⚙️ Permission denied when running script {script_v6}, check your permission settings")
elif type(e) == ValueError:
print(f"🧩 Script {script_v6} returned invalid IPv6 address")
else:
print(f"🧩 An error occurs when running script {script_v6}, check your script")

if purgeUnknownRecords:
deleteEntries("AAAA")

ips = {}
if (a is not None):
ips["ipv4"] = {
Expand Down Expand Up @@ -203,7 +329,8 @@ def updateLoadBalancer(ip):
origins[idx]['address'] = ip['ip']
data = {'origins': origins}

response = cf_api(f'user/load_balancers/pools/{option["pool_id"]}', 'PATCH', option, {}, data)
response = cf_api(
f'user/load_balancers/pools/{option["pool_id"]}', 'PATCH', option, {}, data)


def cf_api(endpoint, method, config, headers={}, data=False):
Expand Down Expand Up @@ -242,14 +369,10 @@ def cf_api(endpoint, method, config, headers={}, data=False):
def updateIPs(ips):
for ip in ips.values():
commitRecord(ip)
#updateLoadBalancer(ip)
# updateLoadBalancer(ip)


if __name__ == '__main__':
shown_ipv4_warning = False
shown_ipv4_warning_secondary = False
shown_ipv6_warning = False
shown_ipv6_warning_secondary = False
ipv4_enabled = True
ipv6_enabled = True
purgeUnknownRecords = False
Expand All @@ -268,8 +391,8 @@ def updateIPs(ips):

if config is not None:
try:
ipv4_enabled = config["a"]
ipv6_enabled = config["aaaa"]
ipv4_enabled = config["ip"]["a"]["enabled"]
ipv6_enabled = config["ip"]["aaaa"]["enabled"]
except:
ipv4_enabled = True
ipv6_enabled = True
Expand Down Expand Up @@ -310,4 +433,4 @@ def updateIPs(ips):
print("❓ Unrecognized parameter '" +
sys.argv[1] + "'. Stopping now.")
else:
updateIPs(getIPs())
updateIPs(getIPs())
18 changes: 15 additions & 3 deletions config-example.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,20 @@
]
}
],
"a": true,
"aaaa": true,
"ip": {
"a": {
"enabled": true,
"method": "http", // or "netif" or "script"
"interface": "eth0", // interface name to get IPv4 address, used in interface mode
"script": "script_v4.sh" // path to script to get IPv4 address
},
"aaaa": {
"enabled": true,
"method": "http",
"interface": "eth0",
"script": "script_v6.sh"
}
},
"purgeUnknownRecords": false,
"ttl": 300
}
}