In [None]:
from datetime import datetime, date
from airflow.operators.python import PythonOperator
from airflow import DAG
import pandas as pd
import os

default_args = {
    'start_date': datetime(2022, 1, 1),
    'catchup': False
}

with DAG('extract_dag',
         default_args=default_args,
         schedule_interval=None) as dag:

    def transform_data():
        today = date.today()
        url = "https://raw.githubusercontent.com/datasets/top-level-domain-names/master/data/top-level-domain-names.csv"
        df = pd.read_csv(url)

        # Safely filter and modify DataFrame
        generic_type_df = df[df['Type'] == 'generic'].copy()
        generic_type_df['Date'] = today.strftime("%Y-%m-%d")

        # Ensure directory exists
        os.makedirs("automated", exist_ok=True)

        # Save the transformed file
        generic_type_df.to_csv("automated/transformed.csv", index=False)

    extract_and_transform_task = PythonOperator(
        task_id='extract_and_transform',
        python_callable=transform_data
    )


In [2]:
from datetime import datetime, date
from airflow.operators.python import PythonOperator
from airflow import DAG
import pandas as pd
import os

def transform_data():
    today = date.today()
    url = "https://raw.githubusercontent.com/datasets/top-level-domain-names/master/data/top-level-domain-names.csv"
    df = pd.read_csv(url)

    # Safely filter and modify DataFrame
    generic_type_df = df[df['Type'] == 'generic'].copy()
    generic_type_df['Date'] = today.strftime("%Y-%m-%d")

    # Ensure directory exists
    os.makedirs("automated", exist_ok=True)

    # Save the transformed file
    generic_type_df.to_csv("automated/transformed.csv", index=False)

In [3]:
transform_data()

In [None]:
from datetime import datetime, date
from airflow.operators.python import PythonOperator
from airflow import DAG
import pandas as pd
import os

default_args = {
    'start_date': datetime(2022, 1, 1),
    'catchup': False
}

with DAG('extract_dag',
         default_args=default_args,
         schedule_interval=None) as dag:

    def transform_data():
        today = date.today()
        url = "https://raw.githubusercontent.com/datasets/top-level-domain-names/master/data/top-level-domain-names.csv"
        df = pd.read_csv(url)

        # Safely filter and modify DataFrame
        generic_type_df = df[df['Type'] == 'generic'].copy()
        generic_type_df['Date'] = today.strftime("%Y-%m-%d")

        # Ensure directory exists
        os.makedirs("automated", exist_ok=True)

        # Save the transformed file
        generic_type_df.to_csv("automated/transformed.csv", index=False)

    extract_and_transform_task = PythonOperator(
        task_id='extract_and_transform',
        python_callable=transform_data
    )


In [5]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

import sqlite3
import csv
import os

def import_csv_to_sqlite():
    db_path = os.path.expanduser('~/airflow/my_database.db')  # or any path you prefer
    csv_path = os.path.expanduser('~/airflow/dags/automated/transformed.csv')

    conn = sqlite3.connect(db_path)
    cur = conn.cursor()

    # Create table
    cur.execute('''
        CREATE TABLE IF NOT EXISTS domains (
            domain TEXT,
            type TEXT,
            org TEXT,
            date DATE
        )
    ''')

    # Read CSV and insert rows
    with open(csv_path, 'r') as f:
        reader = csv.reader(f)
        headers = next(reader)  # Skip header row
        for row in reader:
            cur.execute('INSERT INTO domains (domain, type, org, date) VALUES (?, ?, ?, ?)', row)

    conn.commit()
    conn.close()


with DAG(
    dag_id='import_tld_csv_to_sqlite',
    start_date=datetime(2024, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:

    import_task = PythonOperator(
        task_id='import_csv_to_sqlite',
        python_callable=import_csv_to_sqlite,
    )


In [None]:
with DAG('extract_dag',
         default_args=default_args,
         schedule_interval=None) as dag:

    def transform_data():
        today = date.today()
        url = "https://raw.githubusercontent.com/datasets/top-level-domain-names/master/data/top-level-domain-names.csv"
        df = pd.read_csv(url)

        # Safely filter and modify DataFrame
        generic_type_df = df[df['Type'] == 'generic'].copy()
        generic_type_df['Date'] = today.strftime("%Y-%m-%d")

        # Ensure directory exists
        os.makedirs("automated", exist_ok=True)

        # Save the transformed file
        generic_type_df.to_csv("automated/transformed.csv", index=False)

    extract_and_transform_task = PythonOperator(
        task_id='extract_and_transform',
        python_callable=transform_data
    )


In [8]:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

import sqlite3
import csv
import os

def transform_data():
        today = date.today()
        url = "https://raw.githubusercontent.com/datasets/top-level-domain-names/master/data/top-level-domain-names.csv"
        df = pd.read_csv(url)

        # Safely filter and modify DataFrame
        generic_type_df = df[df['Type'] == 'generic'].copy()
        generic_type_df['Date'] = today.strftime("%Y-%m-%d")

        # Ensure directory exists
        os.makedirs("automated", exist_ok=True)

        # Save the transformed file
        generic_type_df.to_csv("automated/transformed.csv", index=False)

def import_csv_to_sqlite():
    db_path = os.path.expanduser('~/airflow//dags/automated/my_database.db')  # or any path you prefer
    csv_path = os.path.expanduser('~/airflow/dags/automated/transformed.csv')

    conn = sqlite3.connect(db_path)
    cur = conn.cursor()

    # Create table
    cur.execute('''
        CREATE TABLE IF NOT EXISTS domains (
            domain TEXT,
            type TEXT,
            org TEXT,
            date DATE
        )
    ''')

    # Read CSV and insert rows
    with open(csv_path, 'r') as f:
        reader = csv.reader(f)
        headers = next(reader)  # Skip header row
        for row in reader:
            cur.execute('INSERT INTO domains (domain, type, org, date) VALUES (?, ?, ?, ?)', row)

    conn.commit()
    conn.close()


with DAG(
    dag_id='import_tld_csv_to_sqlite',
    start_date=datetime(2024, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    
    ett = PythonOperator(
        task_id='ett',
        python_callable=transform_data
    )

    load_to_sqlite = PythonOperator(
        task_id='import_csv_to_sqlite',
        python_callable=import_csv_to_sqlite,
    )

    ett >> load_to_sqlite
