Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(scanner): make scanner more flexible and remove its coupling #36

Merged
merged 1 commit into from
Jan 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 42 additions & 73 deletions docker/celeryapp/downloader/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import os
import calendar
import pandas as pd
from pathlib import Path
from loguru import logger
from datetime import datetime
from sqlalchemy import create_engine
Expand All @@ -35,7 +34,7 @@
)


@app.task(bind=True, name='extract_br_netcdf_monthly', retry_kwargs={'max_retries': 2})
@app.task(bind=True, name='extract_br_netcdf_monthly', retry_kwargs={'max_retries': 0})
def download_br_netcdf_monthly(self) -> None:
"""
This task will be responsible for downloading every data in copernicus
Expand Down Expand Up @@ -198,19 +197,10 @@ def scan_and_remove_inconsistent_data() -> None:
# each month has 8 values per day (copernicus_foz_do_iguacu)
# if month in incomplete:
# - drop all rows within this month range
# - delete local file
# - clean path from table

# - clean status from status table
with engine.connect() as conn:
date_ranges = _get_inconsistent_months(conn)
if any(date_ranges):
try:
_delete_entries_for(date_ranges, conn)
except Exception as e:
logger.error(e)
raise e
else:
logger.info('[SCAN] No inconsistent date were found')
_scan_and_delete_entries_for('fetch_copernicus_brasil', conn)
_scan_and_delete_entries_for('fetch_copernicus_foz', conn)


# ---
Expand Down Expand Up @@ -300,7 +290,7 @@ def _produce_next_month_to_update(conn, schema: str, table: str) -> tuple:
cur = conn.execute(f'SELECT MAX(date) FROM {_table}')
last_update_date = cur.fetchone()[0]

# DB is empty; Runs only at first initialization
# DB is empty
if not last_update_date:
raise RuntimeError('No data found on DB')

Expand Down Expand Up @@ -373,36 +363,17 @@ def _last_month_range(date: datetime.date) -> tuple:

# ---
# remove_inconsistent_data task
def _delete_entries_for(date_ranges: list[tuple], conn) -> None:
for date_range in date_ranges:
ini_m, end_m = date_range
cur = conn.execute(
'SELECT path, task_brasil_status, task_foz_status'
f' FROM weather.{STATUS_TABLE}'
f" WHERE date = '{end_m}'"
)
path = cur.fetchone()[0]
conn.execute(
'DELETE FROM weather.copernicus_brasil'
f" WHERE time BETWEEN '{ini_m}' AND '{end_m}'"
)
conn.execute(
'DELETE FROM weather.copernicus_foz_do_iguacu'
f" WHERE time BETWEEN '{ini_m}' AND '{end_m}'"
)
conn.execute(
f'UPDATE weather.{STATUS_TABLE} SET'
' path = NULL,'
' task_brasil_status = NULL'
f" WHERE date = '{end_m}'"
)
if path:
Path(path).unlink()
logger.warning(f'[SCAN] All data entries for {path} was deleted.')


def _get_inconsistent_months(conn) -> list:
date_ranges = []
def _scan_and_delete_entries_for(task: str, conn) -> None:
match task:
case 'fetch_copernicus_brasil':
status = 'task_brasil_status'
data = 'copernicus_brasil'
day_entries = 5570

case 'fetch_copernicus_foz':
status = 'task_foz_status'
data = 'copernicus_foz_do_iguacu'
day_entries = 8

cur = conn.execute(
' SELECT'
Expand All @@ -414,37 +385,35 @@ def _get_inconsistent_months(conn) -> list:
" (DATE_TRUNC('month', time) + interval '1 month - 1 day')::date "
'AS end_m,'
' count(*) AS tot'
' FROM weather.copernicus_brasil'
f' FROM weather.{data}'
" GROUP BY DATE_TRUNC('month', time)) AS res"
" WHERE to_char(((res.end_m - res.ini_m + '1 day') * 5570), 'DD')::integer != res.tot;"
' WHERE to_char(('
f" (res.end_m - res.ini_m + '1 day') * {day_entries}), 'DD'"
' )::integer != res.tot;'
)
br_dates = cur.fetchall()
date_ranges.extend(br_dates)

normalize = lambda dt: (
datetime(dt[0].year, dt[0].month, dt[0].day),
datetime(dt[1].year, dt[1].month, dt[1].day).date()
)
date_ranges = cur.fetchall()

date_ranges = [normalize(dt) for dt in date_ranges]
if any(date_ranges):
for date_range in date_ranges:
ini_m, end_m = date_range

cur = conn.execute(
' SELECT'
' res.ini_m,'
' res.end_m'
' FROM ('
' SELECT'
" DATE_TRUNC('month', time) AS ini_m,"
" (DATE_TRUNC('month', time) + interval '1 month - 1 day')::date "
'AS end_m,'
' count(*) AS tot'
' FROM weather.copernicus_foz_do_iguacu'
" GROUP BY DATE_TRUNC('month', time)) AS res"
" WHERE to_char(((res.end_m - res.ini_m + '1 day') * 8), 'DD')::integer != res.tot;"
)
foz_dates = cur.fetchall()
for date in foz_dates:
if date not in date_ranges:
date_ranges.append(date)
logger.warning(
'[SCAN] Inconcistency found for '
f'date {end_m} on {data}, removing entries'
)

conn.execute(
f'DELETE FROM weather.{data}'
f" WHERE time >= '{ini_m}'"
f" AND time <= '{end_m} 23:59:00'"
)

return date_ranges
conn.execute(
f'UPDATE weather.{STATUS_TABLE} SET'
f' {status} = NULL'
f" WHERE date = '{end_m}'"
)

else:
logger.info('[SCAN] No inconsistent date were found')