In [1]:
import numpy as np
import pandas as pd
from confluent_kafka import Producer
from connect import get_connection
from random import randint
from simulation import transaktion_factory
from time import sleep
import socket


from confluent_kafka import Consumer

# Einlesen des Buchbestands

In [2]:
df_bestand = pd.read_csv('data/Bestand.csv', sep=';', encoding='unicode_escape')

In [3]:
bestand_cols=df_bestand.columns

# Einlesen der Tabellen des Producers

In [5]:
c = Consumer({
    'bootstrap.servers': '0.0.0.0:9092',
    'group.id': socket.gethostname(),
    'auto.offset.reset': 'latest'
})


In [6]:
prod_Transaktion_cols={
    0:'Aktion',
    1:'Datum',
    2:'Fernleihe',
    3:'ID_Exemplar',
    4:'ID_Kunde',
    5:'Titel',
    6:'Autor',
    7:'Jahr',
    8:'Art',
    9:'Kennung',
   10:'Zugriffsort'
}

prod_Bewertung_cols={
    0:'Aktion',
    1:'ID_Kunde',
    2:'ID_Buch',
    3:'Wertung',
    4:'Rezension'
}

prod_Neukunden_cols={
    0:'Aktion',
    1:'ID_Kunde',
    2:'c_count',
    3:'Kundennr',
    4:'Vorname_1',
    5:'Vorname_2',
    6:'Nachname',
    7:'Anrede',
    8:'PLZ',
    9:'Strasse',
   10:'Hausnr',
   11:'Mail',
   12:'Tel',
   13:'Geschlecht',
   14:'Geburtsdatum',
   15:'Beruf',
   16:'Titel',
   17:'PersoNr',
   18:'PersoValidTo' 
}

cols=[prod_Transaktion_cols,prod_Bewertung_cols,prod_Neukunden_cols]
topics=["Transaktion", "Bewertung", "Neukunden"]

dfd={}

In [7]:
for col, topic in zip(cols, topics):
    print(col, topic)

    dfd[topic] = pd.DataFrame(columns=list(col.values()))
    
    
    c.subscribe([topic])
    
    for i in range(60):
        msg = c.poll(1)
        if msg is None:
            continue
    
        mrow = pd.DataFrame([msg.value().decode('utf-8').split(";")]).rename(columns=col)

        dfd[topic]=pd.concat([dfd[topic], mrow])

    dfd[topic] = dfd[topic].reset_index(drop=True)
    #c.close()

{0: 'Aktion', 1: 'Datum', 2: 'Fernleihe', 3: 'ID_Exemplar', 4: 'ID_Kunde', 5: 'Titel', 6: 'Autor', 7: 'Jahr', 8: 'Art', 9: 'Kennung', 10: 'Zugriffsort'} Transaktion
{0: 'Aktion', 1: 'ID_Kunde', 2: 'ID_Buch', 3: 'Wertung', 4: 'Rezension'} Bewertung
{0: 'Aktion', 1: 'ID_Kunde', 2: 'c_count', 3: 'Kundennr', 4: 'Vorname_1', 5: 'Vorname_2', 6: 'Nachname', 7: 'Anrede', 8: 'PLZ', 9: 'Strasse', 10: 'Hausnr', 11: 'Mail', 12: 'Tel', 13: 'Geschlecht', 14: 'Geburtsdatum', 15: 'Beruf', 16: 'Titel', 17: 'PersoNr', 18: 'PersoValidTo'} Neukunden


In [None]:
dfd['Transaktion']

In [27]:
dfd['Bewertung']

Unnamed: 0,Aktion,ID_Kunde,ID_Buch,Wertung,Rezension
0,Bewertung,45,248,5,"Ein Buch, das weder besonders gut noch besond..."
1,Bewertung,227,747,9,"Ein nahezu makelloses Werk, das durch Tiefe u..."
2,Bewertung,237,544,9,"Ein beeindruckendes Buch, das den Leser mit j..."
3,Bewertung,204,594,4,"Es schien, als ob das Buch viel Potential hät..."


In [28]:
dfd['Neukunden']

Unnamed: 0,Aktion,ID_Kunde,c_count,Kundennr,Vorname_1,Vorname_2,Nachname,Anrede,PLZ,Strasse,Hausnr,Mail,Tel,Geschlecht,Geburtsdatum,Beruf,Titel,PersoNr,PersoValidTo
0,Neukunde,126.0,258,7296,Henry,,Schwarz,Hr.,53520,Schloßstraße,2.0,henry.schwarz@gmx.net,02694/835429,m,2005-10-14,Software Entwickler/in,,JTR20689,2025-08-05
1,Neukunde,126.0,259,8452,Marie,,Lange,Frau,99628,Eichenstraße 10,,marie.lange@gmx.net,036373/258767,W,12.09.1972,Data Scientist,,PMQ12380,2022-01-05
2,Neukunde,141.0,260,8232,Lena,,Neumann,Fr.,24619,Bergstraße,2.0,lena.neumann@web.de,04323/893429,W,1959-07-16,Immobilienkaufmann/frau,,IPS68537,24.04.2024
3,Neukunde,152.0,261,7338,Sofia,,Schulze,Frau,19339,Gartenstraße,5.0,sofia.schulze@t-online.de,038787/293582,w,2003-11-07,UX Designer/in,,RGX29678,28.09.2024
4,Neukunde,152.0,262,6025,Emma,,Kaiser,Frau,67117,Drosselweg,3.0,emma.kaiser@t-online.de,06236/563646,w,1943-12-14,Senior/in,,PMP96275,2025-01-21


%4|1727341870.513|MAXPOLL|rdkafka#consumer-3| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 494ms (adjust max.poll.interval.ms for long-running message processing): leaving group


# Cleaning der Tabellen

### Transaktion

In [35]:
# IDs zu int konvertieren
dfd['Transaktion']['ID_Exemplar'] = pd.to_numeric(dfd['Transaktion']['ID_Exemplar']).astype(int)
dfd['Transaktion']['ID_Kunde'] = pd.to_numeric(dfd['Transaktion']['ID_Kunde']).astype(int)
# Jahre zu int konvertieren
dfd['Transaktion']['Jahr'] = pd.to_numeric(dfd['Transaktion']['Jahr']).astype(int)

# Fernleihe zu true oder false konvertieren
dfd['Transaktion']['Fernleihe'] = np.where(dfd['Transaktion']['Fernleihe'].astype(str).str.contains("None"), False, dfd['Transaktion']['Fernleihe'] )
dfd['Transaktion']['Fernleihe'] = np.where(dfd['Transaktion']['Fernleihe'].astype(str).str.contains("False"), False, dfd['Transaktion']['Fernleihe'] )
dfd['Transaktion']['Fernleihe'] = np.where(dfd['Transaktion']['Fernleihe'].astype(str).str.contains("True"), True, dfd['Transaktion']['Fernleihe'] )


### Bewertung

In [37]:
# IDs zu int konvertieren
dfd['Bewertung']['ID_Kunde'] = pd.to_numeric(dfd['Bewertung']['ID_Kunde']).astype(int)
dfd['Bewertung']['ID_Buch'] = pd.to_numeric(dfd['Bewertung']['ID_Buch']).astype(int)
dfd['Bewertung']['Wertung'] = pd.to_numeric(dfd['Bewertung']['Wertung']).astype(int)

### Neukunden

In [38]:
# IDs zu int konvertieren
dfd['Neukunden']['ID_Kunde'] = pd.to_numeric(dfd['Neukunden']['ID_Kunde']).astype(int)
dfd['Neukunden']['Kundennr'] = pd.to_numeric(dfd['Neukunden']['Kundennr']).astype(int)

# Anreden konsistent machen 
dfd['Neukunden']['Anrede'] = dfd['Neukunden']['Anrede'].replace(['Fr\.'],'Frau', regex=True)
dfd['Neukunden']['Anrede'] = dfd['Neukunden']['Anrede'].replace(['Hr\.'],'Herr', regex=True)
# Titel: "None" to NaN
dfd['Neukunden']['Titel']  = np.where(dfd['Neukunden']['Titel'].str.contains("None"), np.nan, dfd['Neukunden']['Titel'] )
# Vorname_2: "nan" to NaN
dfd['Neukunden']['Vorname_2']  = np.where(dfd['Neukunden']['Vorname_2'].str.contains("nan"), np.nan, dfd['Neukunden']['Vorname_2'] )
# Hausnr: "nan" to NaN
dfd['Neukunden']['Hausnr']  = np.where(dfd['Neukunden']['Hausnr'].str.contains("nan"), np.nan, dfd['Neukunden']['Hausnr'] )
# 4-stellige PLZ mit "0" präfixen 
dfd['Neukunden']['PLZ']    = dfd['Neukunden']['PLZ'].str.strip().str.rjust(5, '0')

# Hausnummern aus Strasse-Spalte extrahieren
def extract_hausnr(strasse):
  if pd.isna(strasse):
    return np.nan
  parts = strasse.split()
  if len(parts) > 1 and parts[-1].isdigit():
    return parts[-1]
  else:
    return np.nan
#Apply extract_hausnr, wenn 'Hausnr' NaN und entferne die Hausnummern aus der Strasse Spalte
dfd['Neukunden'].loc[dfd['Neukunden']['Hausnr'].isnull(), 'Hausnr'] = dfd['Neukunden'].loc[dfd['Neukunden']['Hausnr'].isnull(), 'Strasse'].apply(extract_hausnr)
dfd['Neukunden']['Strasse'] = dfd['Neukunden']['Strasse'].str.rsplit(expand=True)[0]
# Convert Hausnr to int
dfd['Neukunden']['Hausnr'] = pd.to_numeric(dfd['Neukunden']['Hausnr']).astype(int)

# Geschlecht konsistent machen 
dfd['Neukunden']['Geschlecht'] = dfd['Neukunden']['Geschlecht'].str.strip().replace(["^w$", "^W$"],'Weiblich', regex=True)
dfd['Neukunden']['Geschlecht'] = dfd['Neukunden']['Geschlecht'].str.strip().replace(["^m$", "^M$"],'Männlich', regex=True)

# Datumsspalten konsistent machen
dfd['Neukunden']['Geburtsdatum'] = pd.to_datetime(dfd['Neukunden']['Geburtsdatum'],format='mixed', dayfirst=True)
dfd['Neukunden']['PersoValidTo'] = pd.to_datetime(dfd['Neukunden']['PersoValidTo'],format='mixed', dayfirst=True)

### Bestand

In [39]:
# Jahre zu int konvertieren
df_bestand['Jahr'] = pd.to_numeric(df_bestand['Jahr']).astype('Int64', errors='ignore')

In [8]:
dfd['Transaktion']

Unnamed: 0,Aktion,Datum,Fernleihe,ID_Exemplar,ID_Kunde,Titel,Autor,Jahr,Art,Kennung,Zugriffsort
0,Leihe,2024-10-01 11:00:00,False,184,3185,Wilde Schwäne,Jung Chang,1991.0,Roman,zRy45129,qRv65907
1,Leihe,2024-10-01 12:00:00,False,225,3236,Kritik der reinen Vernunft,Immanuel Kant,1781.0,Philosophie,lPh90743,RBM07364
2,Leihe,2024-10-01 13:00:00,False,161,948,Der Tod des Vergil,Hermann Broch,1945.0,Roman,FCJ75034,Qwg21308
3,Leihe,2024-10-01 13:00:00,False,184,5108,Grande Sertão,João Guimarães Rosa,1956.0,Roman,qRN31487,YhA17243
4,Rückgabe,2024-10-01 18:00:00,,13,5536,Die große Reise,Jorge Semprún,1963.0,Roman,Ocm29314,iQX07436
5,Leihe,2024-10-01 19:00:00,False,189,1687,Die Dämonen,Fjodor Dostojewski,1871.0,Roman,Foj32158,eXx65890
6,Leihe,2024-10-01 22:00:00,False,184,3931,Von der Freiheit eines Christenmenschen,Martin Luther,1520.0,Religion,Myf70528,KtQ58064
7,Leihe,2024-10-02 01:00:00,False,231,4665,Farm der Tiere,George Orwell,1946.0,Roman,LTd96150,eaF35478
8,Leihe,2024-10-02 04:00:00,False,212,6261,Caspar Hauser oder Die Trägheit des Herzens,Jakob Wassermann,1908.0,Roman,BQd82937,Fkf27830
9,Leihe,2024-10-02 04:00:00,False,224,3923,Hetärengespräche,Lukian von Samosata,160.0,Sachbuch,IhD93785,fDO58463


In [54]:
dfd['Transaktion'].to_csv("transaktionen_test.csv")

In [2]:
transaktionen = pd.read_csv("transaktionen_test.csv")
transaktionen

Unnamed: 0.1,Unnamed: 0,Aktion,Datum,Fernleihe,ID_Exemplar,ID_Kunde,Titel,Autor,Jahr,Art,Kennung,Zugriffsort
0,0,Leihe,2024-10-01 11:00:00,False,184,3185,Wilde Schwäne,Jung Chang,1991.0,Roman,zRy45129,qRv65907
1,1,Leihe,2024-10-01 12:00:00,False,225,3236,Kritik der reinen Vernunft,Immanuel Kant,1781.0,Philosophie,lPh90743,RBM07364
2,2,Leihe,2024-10-01 13:00:00,False,161,948,Der Tod des Vergil,Hermann Broch,1945.0,Roman,FCJ75034,Qwg21308
3,3,Leihe,2024-10-01 13:00:00,False,184,5108,Grande Sertão,João Guimarães Rosa,1956.0,Roman,qRN31487,YhA17243
4,4,Rückgabe,2024-10-01 18:00:00,,13,5536,Die große Reise,Jorge Semprún,1963.0,Roman,Ocm29314,iQX07436
5,5,Leihe,2024-10-01 19:00:00,False,189,1687,Die Dämonen,Fjodor Dostojewski,1871.0,Roman,Foj32158,eXx65890
6,6,Leihe,2024-10-01 22:00:00,False,184,3931,Von der Freiheit eines Christenmenschen,Martin Luther,1520.0,Religion,Myf70528,KtQ58064
7,7,Leihe,2024-10-02 01:00:00,False,231,4665,Farm der Tiere,George Orwell,1946.0,Roman,LTd96150,eaF35478
8,8,Leihe,2024-10-02 04:00:00,False,212,6261,Caspar Hauser oder Die Trägheit des Herzens,Jakob Wassermann,1908.0,Roman,BQd82937,Fkf27830
9,9,Leihe,2024-10-02 04:00:00,False,224,3923,Hetärengespräche,Lukian von Samosata,160.0,Sachbuch,IhD93785,fDO58463


    '''Tdnmas, lets change it. Heres the available columns
    prod_Transaktion_cols={
    0:'Aktion',
    1:'Datum',
    2:'Fernleihe',
    3:'ID_Exemplar',
    4:'ID_Kunde',
    5:'Titel',
    6:'Autor',
    7:'Jahr',
    8:'Art',
    9:'Kennung',
   10:'Zugriffsort'}
   
    Leihe_cols={
        0:'ID_Kunde',
        1:'ID_Exemplar',
        2:'Ausleihdatum',
        3:'Rueckgabedatum',
        4:'Verlaengerungsstatus',
        5:'Mahnstatus',
        6:'Fernleihe'
    }
   '''





    

In [66]:
    Leihe_cols={
        0:'ID_Kunde',
        1:'ID_Exemplar',
        2:'Ausleihdatum',
        3:'Rueckgabedatum',
        4:'Verlaengerungsstatus',
        5:'Mahnstatus',
        6:'Fernleihe'
    }

    loan_cols = { 
        'ID_Leihe' : pd.Series(dtype='int'),    
        'ID_Kunde' : pd.Series(dtype='int'),    
        'ID_Exemplar' : pd.Series(dtype='int'),                  
        'Ausleihdatum' : pd.Series(dtype='datetime64[ns]'),
        'Rueckgabedatum' : pd.Series(dtype='datetime64[ns]'),
        'Verlaengerungsstatus' : pd.Series(dtype='int'),    
        'Mahnstatus' : pd.Series(dtype='int'),    
        'Fernleihe' : pd.Series(dtype='bool')
}

In [67]:
leihen = pd.DataFrame(columns=loan_cols)
transaktionen = dfd['Transaktion']

In [69]:
leihen.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 0 entries
Data columns (total 8 columns):
 #   Column                Non-Null Count  Dtype 
---  ------                --------------  ----- 
 0   ID_Leihe              0 non-null      object
 1   ID_Kunde              0 non-null      object
 2   ID_Exemplar           0 non-null      object
 3   Ausleihdatum          0 non-null      object
 4   Rueckgabedatum        0 non-null      object
 5   Verlaengerungsstatus  0 non-null      object
 6   Mahnstatus            0 non-null      object
 7   Fernleihe             0 non-null      object
dtypes: object(8)
memory usage: 124.0+ bytes


In [11]:
USER = 'root'
PASSWORD = 'root'
HOST = 'localhost'          # see yml file
PORT = 3307
DATABASE = 'julian_leihe_test'

from mysql.connector import connect
import os
connection = connect(user=USER, password=PASSWORD, host=HOST, port=PORT)

cursor = connection.cursor()
query = ("SHOW DATABASES")
cursor.execute(query)
cursor.fetchall()
select_db_query = f"USE {DATABASE}"
cursor.execute(select_db_query)
create_leihe_table = '''CREATE TABLE IF NOT EXISTS leihe (
    LeihID INT AUTO_INCREMENT PRIMARY KEY ,
    KundenID INT NOT NULL,
    ExemplarID INT NOT NULL,
    Ausleihdatum DATETIME NOT NULL,
    Rueckgabedatum DATETIME,
    Verlaengerungsstatus INT,
    Mahnstatus INT,
    Fernleihe BOOLEAN
)'''
cursor.execute(create_leihe_table)


In [13]:
cursor = connection.cursor()
query = ("SHOW DATABASES")
cursor.execute(query)
result = cursor.fetchall()
result


[('information_schema',),
 ('bibdb',),
 ('julian_leihe_test',),
 ('mysql',),
 ('performance_schema',),
 ('sys',),
 ('test_db_1',)]

In [16]:
('julian_leihe_test',) in result

True

In [21]:
DATABASE = "zebrix"
cursor = connection.cursor()
query = ("SHOW DATABASES")
cursor.execute(query)
result = cursor.fetchall()
if (DATABASE,) not in result:
    create_query = f"CREATE DATABASE {DATABASE}" 
    cursor.execute(create_query)
    print(create_query)
cursor.execute(query)
cursor.fetchall()

[('information_schema',),
 ('bibdb',),
 ('julian_leihe_test',),
 ('mysql',),
 ('performance_schema',),
 ('sys',),
 ('test_db_1',),
 ('zebrix',)]

In [9]:
# gehe transaktionen durch

for index, row in transaktionen.iterrows():
    if row['Aktion'] == 'Leihe':
        
        print("Neue Leihe eintragen")
        #print(row)

        loan_query = f'''INSERT INTO leihe(KundenID, ExemplarID, Ausleihdatum, Fernleihe) VALUES ({row['ID_Kunde']}, {row['ID_Exemplar']}, "{row['Datum']}", {row['Fernleihe']})'''
        cursor.execute(loan_query)


    if row['Aktion'] == 'Rückgabe':
        print("Rückgabe")
        # suche nach Zeile mit ID_Kunde, ID_Exemplar, Rückgabedatum leer    
        find_loan = f"SELECT LeihID FROM leihe WHERE KundenID = {row['ID_Kunde']} AND ExemplarID = {row['ID_Exemplar']} AND Rueckgabedatum IS NULL"
        cursor.execute(find_loan)
        results = cursor.fetchall()
        # we get a list of tuple. every record is a list entry. 
        # pruefe ob zeile eindeutig
        if len(results) == 0:
            print(f"Warning! Could not find open loan! {find_loan}")
        if len(results) > 1:
            print(f"Warning! Found {len(results)} open loans! {find_loan}")
        if len(results) == 1:
            leih_id = results[0][0]
            # setze Rückgabedatum auf Datum
            enter_rueckgabedatum = f'''UPDATE leihe SET Rueckgabedatum = "{row['Datum']}" WHERE LeihID = {leih_id}'''
            cursor.execute(enter_rueckgabedatum)
        
        

        
        

     

Neue Leihe eintragen
Neue Leihe eintragen
Neue Leihe eintragen
Neue Leihe eintragen
Rückgabe
Neue Leihe eintragen
Neue Leihe eintragen
Neue Leihe eintragen
Neue Leihe eintragen
Neue Leihe eintragen
Neue Leihe eintragen
Rückgabe
Rückgabe
Neue Leihe eintragen
Neue Leihe eintragen
Neue Leihe eintragen
Neue Leihe eintragen
Neue Leihe eintragen
Neue Leihe eintragen
Rückgabe
Neue Leihe eintragen
Neue Leihe eintragen
Neue Leihe eintragen
Neue Leihe eintragen
Neue Leihe eintragen
Neue Leihe eintragen
Neue Leihe eintragen
Neue Leihe eintragen
Neue Leihe eintragen
Neue Leihe eintragen
Rückgabe


In [10]:
show_table = "SELECT * FROM leihe"
cursor.execute(show_table)
cursor.fetchall()

[(27, 3185, 184, datetime.datetime(2024, 10, 1, 11, 0), None, None, None, 0),
 (28, 3236, 225, datetime.datetime(2024, 10, 1, 12, 0), None, None, None, 0),
 (29, 948, 161, datetime.datetime(2024, 10, 1, 13, 0), None, None, None, 0),
 (30, 5108, 184, datetime.datetime(2024, 10, 1, 13, 0), None, None, None, 0),
 (31, 1687, 189, datetime.datetime(2024, 10, 1, 19, 0), None, None, None, 0),
 (32, 3931, 184, datetime.datetime(2024, 10, 1, 22, 0), None, None, None, 0),
 (33,
  4665,
  231,
  datetime.datetime(2024, 10, 2, 1, 0),
  datetime.datetime(2024, 10, 2, 13, 0),
  None,
  None,
  0),
 (34, 6261, 212, datetime.datetime(2024, 10, 2, 4, 0), None, None, None, 0),
 (35, 3923, 224, datetime.datetime(2024, 10, 2, 4, 0), None, None, None, 0),
 (36, 3947, 148, datetime.datetime(2024, 10, 2, 4, 0), None, None, None, 0),
 (37, 6691, 117, datetime.datetime(2024, 9, 28, 1, 0), None, None, None, 0),
 (38, 5927, 219, datetime.datetime(2024, 9, 28, 6, 0), None, None, None, 1),
 (39,
  2130,
  247,
  d

In [119]:
connection.close()

In [112]:
find_loan = "SELECT LeihID FROM leihe WHERE KundenID = 4665 AND ExemplarID = 231 AND Rueckgabedatum IS NULL"
cursor.execute(find_loan)
results = cursor.fetchall()
# we get a list of tuple. every record is a list entry. 
if len(results) == 0:
    print(f"Could not find open loan! {find_loan}")
if len(results) > 1:
    print(f"Found {len(results)} open loans! {find_loan}")
if len(results) == 1:
    leih_id = results[0][0]
    print(leih_id)

7


# Aufbau der Tabellen für das Core DWH

In [33]:
Kunde_cols={
    0:'ID_Kunde',
    1:'ID_Mitgliedsstatus',
    2:'Kundennr',
    3:'Vorname_1',
    4:'Vorname_2',
    5:'Nachname',
    6:'Anrede',
    7:'Titel',
    8:'PLZ',
    9:'Strasse',
   10:'Hausnr',
   11:'Mail',
   12:'Tel',
   13:'Geschlecht',
   14:'Geburtsdatum',
   15:'Beruf',
   16:'PersoNr',
   17:'PersoValidTo', 
   18:'Mitglied seit',
   19:'Mitglied bis' 
}

Bewertung_cols={
    1:'ID_Kunde',
    2:'ID_Buch',
    3:'Wertung',
    4:'Rezension'
}

Buch_cols={
    1:'ID_Kunde',
    2:'ID_Buch',
    3:'Wertung',
    4:'Rezension'
}

Exemplare_cols={
    1:'ID_Bestand',
    2:'Nr',
    3:'Kennung',
    4:'Zugriffsort',
    5:'Zustand'
}

Buch_cols={
    1:'ID_Autor',
    2:'Nr',
    3:'Titel',
    4:'Jahr',
    5:'Art',
}

Leihe_cols={
    0:'ID_Kunde',
    1:'ID_Exemplar',
    2:'Ausleihdatum',
    3:'Rueckgabedatum',
    4:'Verlaengerungsstatus',
    5:'Mahnstatus',
    6:'Fernleihe'
}

Mitgliedsstatus_cols={
    0:'ID_Mitgliedsstatus',
    2:'Bezeichnung',
    3:'Jahresbeitrag',
    4:'Mahnbetrag'
}

Beitragszahlung_cols={
    0:'ID_Zahlung',
    2:'ID_Kunde',
    3:'Datum',
    4:'Betrag',
}

In [34]:
dfs['Kunde']     = dfd['Neukunden'].reindex(columns=list(Kunde_cols.values()))
dfs['Bewertung'] = dfd['Bewertung'].reindex(columns=list(Bewertung_cols.values()))
dfs['Exemplare'] = df_bestand.reindex(columns=list(Exemplare_cols.values())).rename(columns={
                                                                        'ID_Bestand':'ID_Exemplar',
                                                                        'Nr':'ID_Buch'    
                                                                        })

dfs['Autor']   = df_bestand[['Autor', 'Herkunft']].drop_duplicates().reset_index(drop=True).reset_index().rename(columns={'index':'ID_Autor'})

dfs['Buch']    = pd.merge(df_bestand, 
                          dfs['Autor'], 
                          on=['Autor']
                         )[list(Buch_cols.values())]                                  \
                                                    .drop_duplicates()                \
                                                    .reset_index(drop=True)           \
                                                    .rename(columns={'Nr':'ID_Buch'})

dfs['Leihe'] = dfd['Transaktion'].reindex(columns=list(Leihe_cols.values())).reset_index().rename(columns={'index':'ID_Leihe'})


### more ###
dfs['Mitgliedsstatus'] = pd.DataFrame(columns=list(Mitgliedsstatus_cols.values()))
dfs['Beitragszahlung'] = dfd['Neukunden'].reindex(columns=list(Beitragszahlung_cols.values()))


%4|1727339292.849|MAXPOLL|rdkafka#consumer-2| [thrd:main]: Application maximum poll interval (300000ms) exceeded by 453ms (adjust max.poll.interval.ms for long-running message processing): leaving group
%6|1727339433.127|FAIL|rdkafka#consumer-2| [thrd:0.0.0.0:9092/bootstrap]: 0.0.0.0:9092/bootstrap: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY)
%6|1727339433.128|FAIL|rdkafka#consumer-2| [thrd:127.0.0.1:9092/1]: 127.0.0.1:9092/1: Disconnected while requesting ApiVersion: might be caused by incorrect security.protocol configuration (connecting to a SSL listener?) or broker version is < 0.10 (see api.version.request) (after 0ms in state APIVERSION_QUERY)
%6|1727339433.128|FAIL|rdkafka#consumer-2| [thrd:GroupCoordinator]: GroupCoordinator: 127.0.0.1:9092: Disconnected (after 0ms in state APIVERSION_QUERY, 1 iden

# Schicke die Tabellen an den SQL Server


In [37]:
from sqlalchemy import create_engine, inspect, text

USER = 'root'
PASSWORD = 'root'
HOST = 'localhost'          # see yml file
PORT = 3307
DATABASE = 'test_db_1'
 

def get_connection(database=DATABASE):
    return create_engine(
        url="mysql+pymysql://{0}:{1}@{2}:{3}/{4}"\
            .format(USER, PASSWORD, HOST, PORT, database)
    )

engine = get_connection('mysql')
with engine.connect() as connection:
    insp = inspect(engine)
    db_list = insp.get_schema_names()
    print(f"Connection to the {HOST} for user {USER} created successfully.")
    if DATABASE not in db_list:
        sql = text(f"CREATE DATABASE {DATABASE} CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;")
        result = connection.execute(sql)
        if result:
            print(f"Database {DATABASE} created!")

Connection to the localhost for user root created successfully.


In [38]:
for Tabelle in dfs.keys():
    dfs[Tabelle].to_sql(Tabelle, con=engine,schema=DATABASE, index=False, if_exists='append') 