Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notificar Cloudfare via API com novos IPs a serem bloqueados #438

Closed
2 of 3 tasks
berinhard opened this issue Sep 22, 2020 · 2 comments
Closed
2 of 3 tasks

Notificar Cloudfare via API com novos IPs a serem bloqueados #438

berinhard opened this issue Sep 22, 2020 · 2 comments

Comments

@berinhard
Copy link
Contributor

berinhard commented Sep 22, 2020

As regras de firewall do Cloudfare permitem bloqueios de IPs de acordo uma lista. Para isso vamos precisar interagir com a API de Firewalls e a API de Rules List. Além disso, ambém vai ser necessário gerar Tokens para a API.

Depois de termos entendido e validado a integração com a API do Cloudfare precisamos:

  • Criar uma tarefa assíncrona que rode de hora em hora;
  • Analisar a seguinte consulta BlockedRequest.objects.last_hour().count_by("source_ip").filter(total__gte=30));
  • Atualizar a lista de IPs bloqueados com esses novos IPs;

IMPORTANTE:

  1. Cada lista do Cloudfare tem o tamanho máximo de 1000 elementos (então pode ser que a gente tenha que criar mais de uma lista em algum momento);
  2. Para IPv6: pegar 4 primeiras parte e adicionar ::/64

Código rascunho para a tarefa:

with open("/data/input/bloquear.txt", mode="w") as fobj:
    qs = BlockedRequest.objects.last_hour().count_by("source_ip").filter(total__gte=30)
    for blocked in qs:
        ip = blocked["source_ip"]
        if ":" in ip:
            ip = ":".join(ip.split(":")[:4] + [":/64"])
        fobj.write(ip + "\n")
@berinhard
Copy link
Contributor Author

Relacionada com #426

@turicas
Copy link
Owner

turicas commented Sep 23, 2020

Acho que o bloqueio poderia ser pela última hora mas também pelo último dia, algo como:

from itertools import chain

def blocked_ips(hourly_max=30, daily_max=1200):
    qs_hour = BlockedRequest.objects.last_hour().count_by("source_ip").filter(total__gte=hourly_max)
    qs_day = BlockedRequest.objects.last_day().count_by("source_ip").filter(total__gte=daily_max)
    for blocked in chain(qs_hour, qs_day):
        ip = blocked["source_ip"]
        if ":" in ip:
            ip = ":".join(ip.split(":")[:4] + [":/64"])
        blocked["ip"] = ip
        yield blocked

# Para salvar num arquivo:
with open("/data/input/bloquear.txt", mode="w") as fobj:
    for blocked in blocked_ips():
        fobj.write(blocked["ip"] + "\n")

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants