# Setup

In [14]:
pip install requests pandas "pyiceberg[duckdb]"

Defaulting to user installation because normal site-packages is not writeable
Collecting pyiceberg[duckdb]
  Using cached pyiceberg-0.6.1-cp312-cp312-win_amd64.whl
Collecting click<9.0.0,>=7.1.1 (from pyiceberg[duckdb])
  Using cached click-8.1.7-py3-none-any.whl.metadata (3.0 kB)
Collecting fsspec<2024.1.0,>=2023.1.0 (from pyiceberg[duckdb])
  Using cached fsspec-2023.12.2-py3-none-any.whl.metadata (6.8 kB)
Collecting mmhash3<4.0.0,>=3.0.0 (from pyiceberg[duckdb])
  Using cached mmhash3-3.0.1.tar.gz (11 kB)
  Installing build dependencies: started
  Installing build dependencies: finished with status 'done'
  Getting requirements to build wheel: started
  Getting requirements to build wheel: finished with status 'done'
  Installing backend dependencies: started
  Installing backend dependencies: finished with status 'done'
  Preparing metadata (pyproject.toml): started
  Preparing metadata (pyproject.toml): finished with status 'done'
Collecting pydantic!=2.4.0,!=2.4.1,<3.0,>=2.0 (fro

  error: subprocess-exited-with-error
  
  × Building wheel for mmhash3 (pyproject.toml) did not run successfully.
  │ exit code: 1
  ╰─> [5 lines of output]
      running bdist_wheel
      running build
      running build_ext
      building 'mmh3' extension
      error: Microsoft Visual C++ 14.0 or greater is required. Get it with "Microsoft C++ Build Tools": https://visualstudio.microsoft.com/visual-cpp-build-tools/
      [end of output]
  
  note: This error originates from a subprocess, and is likely not a problem with pip.
  ERROR: Failed building wheel for mmhash3
ERROR: Could not build wheels for mmhash3, which is required to install pyproject.toml-based projects


In [15]:
pip install wheel --upgrade

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [16]:
pip install "pyiceberg[duckdb]"

In [None]:
import pandas as pd
from datetime import datetime
import unittest
import duckdb
import requests

from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema
from pyiceberg.types import StringType, DoubleType, TimestampType, BooleanType
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table import Table
from pyiceberg.io import FileIO

ModuleNotFoundError: No module named 'pyiceberg'

# Functions

In [None]:
def fetch_covid_data():
    url = "https://api.corona-zahlen.org/states/history/incidence"
    response = requests.get(url)
    data = response.json()
    
    # Extract the relevant data
    records = []
    for state in data['data']:
        for record in data['data'][state]['history']:
            records.append({
                'state': state,
                'date': record['date'],
                'incidence': record['weekIncidence']
            })
    
    df = pd.DataFrame(records)
    df['date'] = pd.to_datetime(df['date'])
    return df

def create_duckdb_connection(db_path='covid_data.duckdb'):
    conn = duckdb.connect(database=db_path, read_only=False)
    return conn

def create_iceberg_table():
    catalog = load_catalog("my_catalog")  # Configure your catalog
    schema = Schema(
        StringType().field("state", 1),
        TimestampType().field("date", 2),
        DoubleType().field("incidence", 3),
        TimestampType().field("start_date", 4),
        TimestampType().field("end_date", 5),
        BooleanType().field("is_current", 6)
    )

    metadata = TableMetadata(schema=schema)
    table = Table.create("covid_data.covid_incidence", metadata=metadata, catalog=catalog)
    return table

def insert_data_to_iceberg(table: Table, df: pd.DataFrame):
    for index, row in df.iterrows():
        existing_data = list(table.scan().filter(f"state == '{row['state']}' and date == '{row['date']}' and is_current == True"))

        if existing_data:
            existing_record = existing_data[0]
            if existing_record['incidence'] != row['incidence']:
                # Update existing record: Set end_date and is_current to False
                table.update(existing_record['id'], {'end_date': datetime.utcnow(), 'is_current': False})
                
                # Insert the new record
                table.append({'state': row['state'], 'date': row['date'], 'incidence': row['incidence'], 'start_date': datetime.utcnow(), 'is_current': True})
        else:
            # Insert the new record if it doesn't exist
            table.append({'state': row['state'], 'date': row['date'], 'incidence': row['incidence'], 'start_date': datetime.utcnow(), 'is_current': True})



def main():
    conn = create_duckdb_connection()
    create_iceberg_table(conn)
    df = fetch_covid_data()
    insert_data_to_iceberg(conn, df)
    conn.close()
    

NameError: name 'Table' is not defined

# Main

In [None]:
if __name__ == "__main__":
    main()


ParserException: Parser Error: syntax error at or near "USING"

# Tests

In [None]:
import unittest
from unittest.mock import patch
import pandas as pd

class TestCovidDataFunctions(unittest.TestCase):

    @patch('requests.get')
    def test_fetch_covid_data(self, mock_get):
        mock_response = {
            'data': {
                'DE-BW': {
                    'history': [
                        {'date': '2023-05-01T00:00:00Z', 'weekIncidence': 50.0},
                        {'date': '2023-05-02T00:00:00Z', 'weekIncidence': 55.0}
                    ]
                }
            }
        }
        mock_get.return_value.json.return_value = mock_response

        expected_df = pd.DataFrame({
            'state': ['DE-BW', 'DE-BW'],
            'date': [pd.Timestamp('2023-05-01T00:00:00Z'), pd.Timestamp('2023-05-02T00:00:00Z')],
            'incidence': [50.0, 55.0]
        })

        result_df = fetch_covid_data()
        pd.testing.assert_frame_equal(result_df, expected_df)

if __name__ == "__main__":
    unittest.main(argv=['first-arg-is-ignored'], exit=False)


.
----------------------------------------------------------------------
Ran 1 test in 0.068s

OK


# Backup

In [None]:
# crontab -e
# 0 0 * * * /usr/bin/python3 ./showcase2.ipynb

1