In [2]:
import psycopg2
import pandas as pd

In [7]:
def get_db_connection():
    return psycopg2.connect(
    host='localhost',
    database='sdulogs',
    user='karina',
    password='12345',
    port = '5432'
)

In [8]:
 conn = get_db_connection()
cur = conn.cursor()

In [9]:
cur.execute('ROLLBACK;')

In [10]:
cur.execute("""CREATE TABLE IF not exists LOGIN_LOGS (
    user_id VARCHAR(64) NOT NULL, 
    user_ip VARCHAR(45),          
    device_info TEXT,             
    log_date TIMESTAMP,           
    login_status INTEGER          
);
""")

In [11]:
cur.execute("""insert into LOGIN_LOGS (user_id, user_ip, device_info, log_date, login_status) values ('4de7a7b4681ae9f4f25e0660d75c73be72e60537a48601acdef672f966221f3f', '176.69.24.126', 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_0_2 like Mac OS X) AppleWebKit/600.1.4 (KHTML, like Gecko) GSA/21.1.139288856 Mobile/14A456 Safari/600.1.4', to_date('20-12-2016 16:45:22', 'dd-mm-yyyy hh24:mi:ss'), 1);""")

In [30]:
cur.execute('select * from LOGIN_LOGS')
print(cur.fetchall())

[('4de7a7b4681ae9f4f25e0660d75c73be72e60537a48601acdef672f966221f3f', '176.69.24.126', 'Mozilla/5.0 (iPhone; CPU iPhone OS 10_0_2 like Mac OS X) AppleWebKit/600.1.4 (KHTML, like Gecko) GSA/21.1.139288856 Mobile/14A456 Safari/600.1.4', datetime.datetime(2016, 12, 20, 0, 0), 1)]


### Extract

In [48]:
with open('loginLogs-3-64.sql', 'r') as file:
    sql_queries = file.read()

### Transform & load

In [33]:
len(sql_queries)

64627585

In [49]:
from airflow.models.dag import DAG
import pendulum
from airflow.operators.python import PythonOperator
import psycopg2 

with DAG(
    "first_dag",
    default_args={"retries": 2},
    description="DAG tutorial",
    schedule=None,
    start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:
    
    def extract(**kwargs):
        ti = kwargs['ti']
        conn = get_db_connection()
        cur = conn.cursor()
        
        cur.execute("""CREATE TABLE IF NOT EXISTS LOGIN_LOGS (
            user_id VARCHAR(64) NOT NULL, 
            user_ip VARCHAR(45),          
            device_info TEXT,             
            log_date TIMESTAMP,           
            login_status INTEGER
        );""")
        
        conn.commit()


    def transform(**kwargs):         
        ti = kwargs['ti']
        
        with open('loginLogs-3-64.sql', 'r') as file:
            sql_queries = file.read()
        
        ti.xcom_push(key='sql_queries', value=sql_queries)
    
    def load(**kwargs):
        ti = kwargs['ti']
        
        sql_queries = ti.xcom_pull(task_ids="transform", key="sql_queries")
        
        if sql_queries:
            conn = get_db_connection()
            cur = conn.cursor()
            
            for query in sql_queries.split(';'):
                query = query.strip()
                if query:  
                    cur.execute(query)
            
            conn.commit()
            cur.close()
            conn.close()
    
  

In [50]:
extract_task = PythonOperator(
        task_id="extract",
        python_callable=extract,
        provide_context=True,
        dag=dag,
    )




In [51]:
transform_task = PythonOperator(
        task_id="transform",
        python_callable=transform,
        provide_context=True,
        dag=dag,
    )



In [52]:
load_task = PythonOperator(
        task_id="load",
        python_callable=load,
        provide_context=True,
        dag=dag,
    )


In [53]:
extract_task >> transform_task >> load_task


<Task(PythonOperator): load>