In [None]:
import json
import requests
import pandas as pd
import psycopg2
import os

# Import environmental variables

ENDPOINT = os.environ["ENDPOINT"]
END_DB_NAME = os.environ["END_DB_NAME"]
END_USERNAME = os.environ["END_USERNAME"]
END_PASSWORD = os.environ["END_PASSWORD"]
SOURCE_DB_NAME	= os.environ["SOURCE_DB_NAME"]
SOURCE_PASSWORD	= os.environ["SOURCE_PASSWORD"]
SOURCE_USERNAME	= os.environ["SOURCE_USERNAME"]
SOURCE_POINT = os.environ["SOURCE_POINT"]

# Connect to source database

def lambda_handler(event, context):
    try: 
        source_conn = psycopg2.connect("host={} dbname={} user={} password={}".format(SOURCE_POINT, SOURCE_DB_NAME, SOURCE_USERNAME, SOURCE_PASSWORD))
    except psycopg2.Error as e: 
        print("Error: Could not make connection to database")
        print(e)
        return
    
    
    try: 
        source_cur = source_conn.cursor()
    except psycopg2.Error as e: 
        print("Error: Could not get curser to the Database")
        print(e)
        return
    
    source_conn.set_session(autocommit=True)

# Select max datetimefrom to query the API    
    try:
        source_cur.execute("SELECT MAX(datetimefrom) FROM traffic_table;")
        latest_datetimefrom = source_cur.fetchone()[0]
    except psycopg2.Error as e:
        print("Error: Couldn't get max datetime")
        print(e)
        
# Close source connection as it is not needed        
    source_cur.close()
    source_conn.close()

# Connect to target database (datalake)    
    try: 
        target_conn = psycopg2.connect("host={} dbname={} user={} password={}".format(ENDPOINT, END_DB_NAME, END_USERNAME, END_PASSWORD))
    except psycopg2.Error as e: 
        print("Error: Could not make connection to database (datalake)")
        print(e)
        return
    
    try: 
        target_cur = target_conn.cursor()
    except psycopg2.Error as e: 
        print("Error: Could not get cursor to the Database (datalake)")
        print(e)
        return
    
    target_conn.set_session(autocommit=True)
    
    
    
# API connection details
    url = "https://data.bs.ch/api/explore/v2.1/catalog/datasets/100013/exports/json"
    params = {
    "where": f"datetimefrom > '{latest_datetimefrom}'",
    }

#Request data from API    
    try:
        response = requests.get(url, params=params)
        data = response.json()
    except requests.RequestException as e:
        print(f"An error occurred while making the request: {e}")
        return
    except ValueError as e:
        print(f"An error occurred while parsing the JSON response: {e}")
        return

    try:    
#Create table
        target_cur.execute("""
            CREATE TABLE IF NOT EXISTS traffic_staging (
                zst_nr INT,
                sitecode VARCHAR(255),
                sitename VARCHAR(255),
                datetimefrom TIMESTAMP, 
                datetimeto TIMESTAMP, 
                directionname VARCHAR(255), 
                lanecode INT, 
                lanename VARCHAR(255), 
                valuesapproved INT, 
                valuesedited INT, 
                traffictype VARCHAR(50), 
                total INT,
                year INT,
                month INT,
                day INT,
                weekday INT, 
                hourfrom INT,
                date VARCHAR(255),
                timefrom VARCHAR(255),
                timeto VARCHAR(255),
                dayofyear INT,
                zst_id INT,
                latitude FLOAT,
                longitude FLOAT
            )
            """)
    except psycopg2.Error as e:
        print("Error: Creating table")
        print(e)
        return

# Insert JSON records
    try:
        for record in data:
            try:
                target_cur.execute("""
                    INSERT INTO traffic_staging (
                        zst_nr,
                        sitecode,
                        sitename,
                        datetimefrom, 
                        datetimeto, 
                        directionname, 
                        lanecode, 
                        lanename, 
                        valuesapproved, 
                        valuesedited, 
                        traffictype, 
                        total,
                        year,
                        month,
                        day,
                        weekday, 
                        hourfrom,
                        date,
                        timefrom,
                        timeto,
                        dayofyear,
                        zst_id,
                        latitude,
                        longitude
                    )
                    VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                """, (
                    record['zst_nr'],
                    record['sitecode'],
                    record['sitename'],
                    record['datetimefrom'],
                    record['datetimeto'],
                    record['directionname'],
                    record['lanecode'],
                    record['lanename'],
                    record['valuesapproved'],
                    record['valuesedited'],
                    record['traffictype'],
                    record['total'],
                    record['year'],
                    record['month'],
                    record['day'],
                    record['weekday'],
                    record['hourfrom'],
                    record['date'],
                    record['timefrom'],
                    record['timeto'],
                    record['dayofyear'],
                    record['zst_id'],
                    record['geo_point_2d']['lat'],
                    record['geo_point_2d']['lon']
                ))
            except KeyError as e:
                print(f"Error: Missing field in record {record}: {e}")
    except psycopg2.Error as e:
        print("Error: Inserting Rows")
        print(e)
        return
    
## DATA TRANSFORMATIONS

# Aggregate and split traffic data
    try:
        target_cur.execute("""
            INSERT INTO traffic_agg (
            zst_id,
            datetimefrom,
            datetimeto,
            valuesapproved,
            valuesedited,
            traffictype,
            total_sum
            )
            SELECT
            zst_id,
            datetimefrom,
            datetimeto,
            valuesapproved,
            valuesedited,
            traffictype,
            SUM(total) AS total_sum
            FROM 
            traffic_staging
            GROUP BY
            zst_id, datetimefrom, datetimeto, valuesapproved, valuesedited, traffictype;
            """)
    except psycopg2.Error as e:
        print("Error: inserting traff_agg")
        print(e)
        return
    
# Split datetime data        
    try:
        target_cur.execute("""
            INSERT INTO datetime_info (
            datetimefrom,
            datetimeto,
            year,
            month,
            day,
            weekday,
            hourfrom,
            date,
            timefrom,
            timeto,
            dayofyear
            )
            SELECT DISTINCT
            datetimefrom,
            datetimeto,
            year,
            month,
            day,
            weekday,
            hourfrom,
            TO_DATE(date, 'DD.MM.YYYY'),
            timefrom,
            timeto,
            dayofyear
            FROM
            traffic_staging;
            """)
    except psycopg2.Error as e:
        print("Error: inserting datetime_info")
        print(e)
        return
    
# Split location data        
    try:
        target_cur.execute("""
            INSERT INTO location_info (
            zst_id,
            sitename,
            latitude,
            longitude
            )
            SELECT DISTINCT
            zst_id,
            sitename,
            latitude,
            longitude
            FROM
            traffic_staging;
        """)
    except psycopg2.Error as e:
        print("Error: inserting location_info")
        print(e)
        return

# Delete duplicate locations    
    try:
        target_cur.execute("""
            DELETE FROM location_info t1
            WHERE EXISTS (
                SELECT 1
                FROM location_info t2
                WHERE t1.zst_id = t2.zst_id
                    AND t1.sitename > t2.sitename
            );
        """)
    except psycopg2.Error as e:
        print("Error: inserting location_info")
        print(e)
        return
    
    target_cur.close()
    target_conn.close()