# Pipeline

In [None]:
#!pip install mysql-connector-python

In [15]:
import pandas as pd
import numpy as np
import re    
from time import time
from datetime import datetime 
import datavaultlib as dvl
#dvl.clean_all() #uncomment to delete project-database and start from scratch

# Create DB Connection  

In [60]:
#Loading Database MariaDB
connection, cursor =dvl.db_con()
#Setting global LOAD_DTS value
LOAD_DTS = str(datetime.now())

# Creating Staging Schema and Tables

## Creating Tables

In [17]:
#Creating Tables
stagingschema="bi_staging"
tableAuftraege="Staging_Auftraege"
cursor.execute(f'''CREATE SCHEMA IF NOT EXISTS {stagingschema}''')
columns=["LOAD_DTS TIMESTAMP", 
        "Auftragsnummer INT PRIMARY KEY",
        "Datum varchar(64)","Kundennummer INT",
        "Kundenname varchar(64)"," Strasse varchar(64)",
        "PLZ varchar(10)",
        "Ort varchar(64)",
        "BundeslandKurz varchar(64)",
        "Bundesland varchar(64)",
        "LandKurz varchar(64)",
        "Land varchar(64)",
        "Laengengrad varchar(64)",
        "Breitengrad varchar(64)",
        "Produktnummer INT",
        "Produktbezeichnung varchar(64)",
        "Verkaufspreis REAL",
        "Einkaufspreis REAL",
        "Jahrgang INT",
        "Herkunftsland varchar(64)",
        "Sortenkennzeichen varchar(64)",
        "Rebsorte varchar(64)",
        "Verkaeufernummer INT",
        "Verkaeufername varchar(64)",
        "Bereichsnummer INT",
        "Bereichsbezeichnung varchar(64)",
        "Provisionsfaktor REAL",
        "Stueckzahl INT",
        "Umsatz REAL",
        "Kosten REAL",
        "SRC varchar(64)"]
dvl.createtbl(schema=stagingschema,table=tableAuftraege, columns=columns, droptbl=False)

In [18]:
tablePlanzahlen="Staging_Planzahlen"
columns=[
    "LOAD_DTS TIMESTAMP",
    "Jahr INT",
    "Monat INT",
    "Verkaeufernummer INT",
    "Rebsorte varchar(20)",
    "Stueckzahl INT",
    "SRC varchar(64)"
]
dvl.createtbl(schema=stagingschema,table=tablePlanzahlen, columns=columns,droptbl=False)

## Loading Data

In [19]:
#Loading and pre-processing Auftraege Data
srcAuf="AuftraegeNew.csv"
df_Auf = pd.read_csv(f"../Data/{srcAuf}",sep="','", engine="python",encoding='latin1')
#Removing quotationmarks
df_Auf["'Auftragsnummer"].replace("(')","",regex=True,inplace=True)
df_Auf["Kosten'"].replace("(')","",regex=True,inplace=True)
columns=list(df_Auf.columns)
columns[0]="Auftragsnummer"
columns[-1]="Kosten"
df_Auf.columns=columns
#replacing decimal comma by point
for col in ['Kosten', 'Umsatz', 'Provisionsfaktor','Einkaufspreis','Verkaufspreis','Breitengrad','Laengengrad']:
    df_Auf[col] = pd.to_numeric(df_Auf[col].apply(lambda x: re.sub(',', '.', str(x))))
#Filling NaN Values in BundelandKurz Column
df_Auf.BundeslandKurz =df_Auf[["BundeslandKurz"]].fillna(0)


#Defining Column Order for DB Insert
columns_Auftraege=['LOAD_DTS', 'Auftragsnummer', 'Datum', 'Kundennummer', 'Kundenname', 'Strasse',
       'PLZ', 'Ort', 'BundeslandKurz', 'Bundesland', 'LandKurz', 'Land',
       'Laengengrad', 'Breitengrad', 'Produktnummer', 'Produktbezeichnung',
       'Verkaufspreis', 'Einkaufspreis', 'Jahrgang', 'Herkunftsland',
       'Sortenkennzeichen', 'Rebsorte', 'Verkaeufernummer', 'Verkaeufername',
       'Bereichsnummer', 'Bereichsbezeichnung', 'Provisionsfaktor',
       'Stueckzahl', 'Umsatz', 'Kosten','SRC']
df_Auf["LOAD_DTS"] = LOAD_DTS
df_Auf["SRC"]=srcAuf
df_Auf=df_Auf[columns_Auftraege]
tableAuftraege="Staging_Auftraege"
#Prepare Data for DB Insert
columns_Auftraege=list(df_Auf.columns)
AuftraegeLst= [tuple(row.values) for idx, row in df_Auf.iterrows()]

In [20]:
%%time
#Loading Data into DB
dvl.add2db(schema=stagingschema,tblname=tableAuftraege, columns=columns_Auftraege, records=AuftraegeLst)

#Freeing up memory
del(AuftraegeLst)
del(df_Auf)

1062 (23000): Duplicate entry '1' for key 'PRIMARY' 0 ('2021-12-13 09:48:54.847976', '1', '01.01.2012', 200040, 'abacus-wein.de', 'Am Binsengrund 33', '70839', 'Gerlingen', 'BW', 'Baden-Württemberg', 'D', 'Deutschland', 9.05, 48.79, 100340, 'Côtes du Rhone E. Guigal', 10.69, 7.87, 2011, 'Frankreich', 'R01', 'Shiraz', 4, 'Stefan Busch', 5, 'Champagner/Sekt', 0.03, 6.0, 64.15, 47.2, 'AuftraegeNew.csv')
1062 (23000): Duplicate entry '2' for key 'PRIMARY' 1 ('2021-12-13 09:48:54.847976', '2', '01.01.2012', 200050, 'Vinovero GmbH & Co.KG', 'Arnsberger Straße 47', '70372', 'Stuttgart', 'BW', 'Baden-Württemberg', 'D', 'Deutschland', 9.22, 48.81, 100380, 'Marie Vallé Spätburgunder', 57.83, 32.55, 2004, 'Deutschland', 'E02', 'Spätburgunder', 5, 'Harry Haller', 4, 'Exklusiv', 0.01, 6.0, 346.97, 195.31, 'AuftraegeNew.csv')
1062 (23000): Duplicate entry '3' for key 'PRIMARY' 2 ('2021-12-13 09:48:54.847976', '3', '01.01.2012', 200100, 'Aix Vinum Niklas P. Kudlek', 'Finkenwalder Weg 3', '52062', 'Aa

In [21]:
#Loading Planzahlen Data
srcPLA="Planzahlen.csv"
df_Pla=pd.read_csv(f"../Data/{srcPLA}",quotechar="'",encoding='latin1')
df_Pla.drop_duplicates(inplace=True)
#Inserting data into bi_staging.Staging_Planzahlen
df_Pla["LOAD_DTS"] = LOAD_DTS
df_Pla["SRC"]=srcPLA
columns_Planzahlen=['EXP Jahr', 'EXP Monat', 'EXP Verkaeufernummer', 'EXP Rebsorte', 'EXP Planstueckzahl', 'Jahr']
df_Pla.rename(columns=dict(list(zip(columns_Planzahlen,["Jahr","Monat","Verkaeufernummer","Rebsorte","Stueckzahl"]))),inplace=True)
#except: print("Column Names already updated, or unexpected inputs")
columns_Planzahlen=['LOAD_DTS',"Jahr","Monat","Verkaeufernummer","Rebsorte","Stueckzahl","SRC"]
df_Pla=df_Pla[columns_Planzahlen].copy()
PlanzahlLst= [tuple(row.values) for idx, row in df_Pla.iterrows()]

In [22]:
%%time
dvl.add2db(schema=stagingschema,tblname=tablePlanzahlen, columns=columns_Planzahlen, records=PlanzahlLst)

#Freeing up memory
del(df_Pla)
del(PlanzahlLst)

1050 records were added to Staging_Planzahlen
CPU times: user 365 ms, sys: 110 ms, total: 476 ms
Wall time: 19.1 s


## Initialization of CDWH Schema and Tables

### Table Initialization:

In [24]:
try: connection.close()
except: pass
finally: connection, cursor= dvl.db_con()
cdwhschema = "bi_cdwh"
cursor.execute(f'''CREATE SCHEMA IF NOT EXISTS {cdwhschema};''')

#HUB's
hublst=[("HUB_Kunde","Kundenname","VARCHAR(64)"),
        ("HUB_Verkaeufer","Verkaeufername","VARCHAR(64)"),
        ("HUB_Produkt","Produktbezeichnung","VARCHAR(64)"),
        ("HUB_Rebsorte","Rebsorte","VARCHAR(32)"),
        ("HUB_Periode","Periode","VARCHAR(32)"),#Periode is not a perfect business key, but it's structure is recognizable as YearMonth
        ("HUB_Auftrag","Auftragsnummer","INT"),#Auftragsnummer is not a valid business key considering the DV 2.0 specs, since it is an abstract number which could mean anything
       ]
dvl.hub_gen(hublst=hublst)


#LINK's
linklst=[("LINK_Planzahl",("HUB_Rebsorte","HUB_Verkaeufer","HUB_Periode")),
         ("LINK_Auftrag",("HUB_Kunde","HUB_Produkt","HUB_Verkaeufer","HUB_Auftrag")),
         ("LINK_Produkt",("HUB_Produkt","HUB_Verkaeufer","HUB_Rebsorte"))
        ]
dvl.link_gen(linklst=linklst)

#SAT's
dic={
0:"INT",
1:"VARCHAR(64)",
2:"CHAR(32)",
3:"REAL" 
}
satlst=[("SAT_Adresse","HUB_Kunde",[("Kundennummer",dic[1]),("Strasse",dic[1]),("PLZ",dic[1]),("Ort", dic[1]),("BundeslandKurz", dic[1]),("Bundesland",dic[1]),("LandKurz",dic[1]),("Land",dic[1]),("Laengengrad",dic[1]),("Breitengrad",dic[1])]),
        ("SAT_Auftrag","HUB_Auftrag",[("Datum",dic[1]),("Stueckzahl",dic[0]),("Provisionsfaktor",dic[3]),("Umsatz",dic[3]),("Kosten",dic[3])]),
        ("SAT_Verkaeufer","HUB_Verkaeufer",[("Verkaeufernummer",dic[1])]),
        ("SAT_ProduktBereich","HUB_Produkt",[("Bereichsnummer",dic[0]),("Bereichsbezeichnung",dic[1])]),
        ("SAT_ProduktAllgemein","HUB_Produkt",[("Produktnummer",dic[1]),("Verkaufspreis",dic[3]),("Einkaufspreis",dic[3]),("Jahrgang",dic[0]),("Herkunftsland",dic[1])]),
        ("SAT_Rebsorte","HUB_Rebsorte",[("Sortenkennzeichen",dic[1])]),
        ("SAT_Planzahl","LINK_Planzahl",[("Stueckzahl",dic[0])])
        
]
dvl.sat_gen(satlst=satlst)

HUB was created: HUB_Kunde
HUB was created: HUB_Verkaeufer
HUB was created: HUB_Produkt
HUB was created: HUB_Rebsorte
HUB was created: HUB_Periode
HUB was created: HUB_Auftrag
LINK was created: LINK_Planzahl
LINK was created: LINK_Auftrag
LINK was created: LINK_Produkt
SAT was created: SAT_Adresse
SAT was created: SAT_Auftrag
SAT was created: SAT_Verkaeufer
SAT was created: SAT_ProduktBereich
SAT was created: SAT_ProduktAllgemein
SAT was created: SAT_Rebsorte
SAT was created: SAT_Planzahl


### Importing data from CSV to Staging

#### Loading Process:

In [25]:
%%time
#Loading Data into bi_cdwh.HUB_Kunde
cdwhtablename="HUB_Kunde"
stagingtablename="Staging_Auftraege"
columns=["Kundenname","LOAD_DTS","SRC"]
bk="Kundenname"
dvl.hub_insert(bk=bk, cdwhtbl=cdwhtablename,stagingtbl=stagingtablename,columns=columns)

#Loading Data into bi_cdwh.HUB_Verkaeufer
cdwhtablename="HUB_Verkaeufer"
stagingtablename="Staging_Auftraege"
columns=["Verkaeufername","LOAD_DTS","SRC"]
bk="Verkaeufername"
dvl.hub_insert(bk=bk, cdwhtbl=cdwhtablename,stagingtbl=stagingtablename,columns=columns)

#Loading Data into bi_cdwh.HUB_Produkt
cdwhtablename="HUB_Produkt"
stagingtablename="Staging_Auftraege"
columns=["Produktbezeichnung","LOAD_DTS","SRC"]
bk="Produktbezeichnung"
dvl.hub_insert(bk=bk, cdwhtbl=cdwhtablename,stagingtbl=stagingtablename,columns=columns)

#Loading Data into bi_cdwh.HUB_Rebsorte
cdwhtablename="HUB_Rebsorte"
stagingtablename="Staging_Auftraege"
columns=["Rebsorte","LOAD_DTS","SRC"]
bk="Rebsorte"
dvl.hub_insert(bk=bk, cdwhtbl=cdwhtablename,stagingtbl=stagingtablename,columns=columns)

#Loading Data into bi_cdwh.HUB_Periode
cdwhtablename="HUB_Periode"
stagingtablename="Staging_Planzahlen"
columns=["CONCAT(Jahr,Monat)","LOAD_DTS","SRC"]
bk="CONCAT(Jahr,Monat)"
dvl.hub_insert(bk=bk, cdwhtbl=cdwhtablename,stagingtbl=stagingtablename,columns=columns)

#Loading Data into bi_cdwh.HUB_Auftrag
cdwhtablename="HUB_Auftrag"
stagingtablename="Staging_Auftraege"
columns=["Auftragsnummer","LOAD_DTS","SRC"]
bk="Auftragsnummer"
dvl.hub_insert(bk=bk, cdwhtbl=cdwhtablename,stagingtbl=stagingtablename,columns=columns)

Insert into HUB_Kundenname failed:
 1062 (23000): Duplicate entry 'e99e65de8d79898560c6061460dbfc28' for key 'PRIMARY'
Insert into HUB_Verkaeufername failed:
 1062 (23000): Duplicate entry '3084be171bb0fd9eb55dfb1ab1ca5099' for key 'PRIMARY'
Insert into HUB_Produktbezeichnung failed:
 1062 (23000): Duplicate entry '38fa69832e3ac283e2fb7ba7012af415' for key 'PRIMARY'
Insert into HUB_Rebsorte failed:
 1062 (23000): Duplicate entry '9a759551291fa9eb0c479ba0089f09ea' for key 'PRIMARY'
Insert into HUB_CONCAT(Jahr,Monat) failed:
 1062 (23000): Duplicate entry 'e0613000bcd426e19c29ad052aa7ea49' for key 'PRIMARY'
Insert into HUB_Auftragsnummer failed:
 1062 (23000): Duplicate entry 'c4ca4238a0b923820dcc509a6f75849b' for key 'PRIMARY'
CPU times: user 28 ms, sys: 10.7 ms, total: 38.8 ms
Wall time: 1.43 s


In [26]:
%%time
#Loading Data into bi_cdwh.LINK_Planzahl
try: connection.close()
except: pass
finally: connection, cursor= dvl.db_con()
fklst=["Rebsorte","Verkaeufername", "CONCAT(Jahr,Monat)"]
cdwhtbl="LINK_Planzahl"
stagingtbl="Staging_Planzahlen"
query='''
INSERT INTO bi_cdwh.LINK_Planzahl 
SELECT
    MD5(
        CONCAT(
            MD5(SP.Rebsorte),
            MD5(Verkaeufername),
            MD5(CONCAT(Jahr, Monat))
        )
    ),
    MD5(SP.Rebsorte),
    MD5(Verkaeufername),
    MD5(CONCAT(Jahr, Monat)),
    SP.LOAD_DTS,
    SP.SRC
FROM
    bi_staging.Staging_Planzahlen SP
LEFT JOIN bi_staging.Staging_Auftraege SA ON
    (SP.Verkaeufernummer = SA.Verkaeufernummer)
GROUP BY
    SP.Rebsorte,
    Verkaeufername,
    CONCAT(Jahr, Monat) 
    '''
try: 
    cursor.execute(query)
    print(f"Insert into {cdwhtbl} was successfull" )
except Exception as e: print(e)

1062 (23000): Duplicate entry '278f4769a87878c778d0a89da0713c41' for key 'PRIMARY'
CPU times: user 9.95 ms, sys: 9.85 ms, total: 19.8 ms
Wall time: 1min 19s


In [27]:
#Loading Data into bi_cdwh.LINK_Auftrag
fklst=["Kundenname","Produktbezeichnung", "Verkaeufername","Auftragsnummer"]
cdwhtbl="LINK_Auftrag"
stagingtbl="Staging_Auftraege"
dvl.link_insert(cdwhtbl=cdwhtbl,stagingtbl=stagingtbl,fklst=fklst)

#Loading Data into bi_cdwh.LINK_Produkt
fklst=["Produktbezeichnung","Verkaeufername","Rebsorte"]
cdwhtbl="LINK_Produkt"
stagingtbl="Staging_Auftraege"
dvl.link_insert(cdwhtbl=cdwhtbl,stagingtbl=stagingtbl,fklst=fklst)

Something went wrong 1062 (23000): Duplicate entry '827274b13381ec54a303befeb3f38034' for key 'PRIMARY'
Something went wrong 1062 (23000): Duplicate entry '7a92c89dbd63846f3621e9f46793ce9d' for key 'PRIMARY'


In [28]:
%%time
#Loading Data into SAT_Adresse
columns=["Kundennummer","Strasse","PLZ","Ort","BundeslandKurz","Bundesland","LandKurz","Land","Laengengrad","Breitengrad"]
bk="Kundenname"
cdwhtbl="SAT_Adresse"
stagingtbl="Staging_Auftraege"
dvl.sat_insert(bk=bk,columns=columns,cdwhtbl=cdwhtbl,stagingtbl=stagingtbl,hub="HUB_Kunde")

#Loading Data into SAT_Auftrag
columns=["Datum","Stueckzahl","Provisionsfaktor","Umsatz","Kosten"]
bk="Auftragsnummer"
cdwhtbl="SAT_Auftrag"
stagingtbl="Staging_Auftraege"
dvl.sat_insert(bk=bk,columns=columns,cdwhtbl=cdwhtbl,stagingtbl=stagingtbl,hub="HUB_Auftrag")

#Loading Data into SAT_ProduktAllgemein
columns=["Produktnummer","Verkaufspreis","Einkaufspreis","Jahrgang","Herkunftsland"]
bk="Produktbezeichnung"
cdwhtbl="SAT_ProduktAllgemein"
stagingtbl="Staging_Auftraege"
dvl.sat_insert(bk=bk,columns=columns,cdwhtbl=cdwhtbl,stagingtbl=stagingtbl,hub="HUB_Produkt")

#Loading Data into SAT_ProduktBereich
columns=["Bereichsnummer","Bereichsbezeichnung"]
bk="Produktbezeichnung"
cdwhtbl="SAT_ProduktBereich"
stagingtbl="Staging_Auftraege"
dvl.sat_insert(bk=bk,columns=columns,cdwhtbl=cdwhtbl,stagingtbl=stagingtbl,hub="HUB_Produkt")

#Loading Data into SAT_Rebsorte
columns=["Sortenkennzeichen"]
bk="Rebsorte"
cdwhtbl="SAT_Rebsorte"
stagingtbl="Staging_Auftraege"
dvl.sat_insert(bk=bk,columns=columns,cdwhtbl=cdwhtbl,stagingtbl=stagingtbl,hub="HUB_Rebsorte")

#Loading Data into SAT_Verkaeufer
columns=["Verkaeufernummer"]
bk="Verkaeufername"
cdwhtbl="SAT_Verkaeufer"
stagingtbl="Staging_Auftraege"
dvl.sat_insert(bk=bk,columns=columns,cdwhtbl=cdwhtbl,stagingtbl=stagingtbl,hub="HUB_Verkaeufer")

#Loading Data into SAT_Planzahl (Custome Query needed since sat_insert is only made for HUB_SAT's, not for LINK_SAT's)
columns=["Stueckzahl"]
bk=["Rebsorte","Verkaeufername","Jahr","Monat"]
cdwhtbl="SAT_Planzahl"
stagingtbl="Staging_Planzahlen"
stagingschema="Staging_Planzahlen"
query=f'''
INSERT INTO bi_cdwh.{cdwhtbl} (LOAD_DTS, LINK_Planzahl_HSH,Stueckzahl,SRC)
SELECT
    SP.LOAD_DTS,
    MD5(
        CONCAT(
            MD5(SP.Rebsorte),
            MD5(Verkaeufername),
            MD5(CONCAT(Jahr, Monat))
        )
    ),
    SP.Stueckzahl,
    SP.SRC
FROM
    bi_staging.Staging_Planzahlen SP LEFT JOIN bi_staging.Staging_Auftraege SA ON
    SP.Verkaeufernummer = SA.Verkaeufernummer
GROUP BY
    SP.Rebsorte,
    Verkaeufername,
    Jahr,
    Monat;'''
try: 
    cursor.execute(query)
    print(f"Insert into {cdwhtbl} was successfull")
except Exception as e: print(e,query)
connection.commit()

Record might already be existing, otherwise follow error message:
 1062 (23000): Duplicate entry '2021-12-02 14:51:26-e99e65de8d79898560c6061460dbfc28' for key 'PRIMARY' INSERT INTO bi_cdwh.SAT_Adresse (LOAD_DTS,HUB_Kunde_HSH,Kundennummer,Strasse,PLZ,Ort,BundeslandKurz,Bundesland,LandKurz,Land,Laengengrad,Breitengrad,SRC)
            SELECT LOAD_DTS,MD5(Kundenname),Kundennummer,Strasse,PLZ,Ort,BundeslandKurz,Bundesland,LandKurz,Land,Laengengrad,Breitengrad,SRC
            FROM bi_staging.Staging_Auftraege GROUP BY Kundenname;
Record might already be existing, otherwise follow error message:
 1062 (23000): Duplicate entry '2021-12-02 14:51:26-c4ca4238a0b923820dcc509a6f75849b' for key 'PRIMARY' INSERT INTO bi_cdwh.SAT_Auftrag (LOAD_DTS,HUB_Auftrag_HSH,Datum,Stueckzahl,Provisionsfaktor,Umsatz,Kosten,SRC)
            SELECT LOAD_DTS,MD5(Auftragsnummer),Datum,Stueckzahl,Provisionsfaktor,Umsatz,Kosten,SRC
            FROM bi_staging.Staging_Auftraege GROUP BY Auftragsnummer;
Record might alr

In [61]:
def execQuery(query):
    try: 
        cursor.execute(query)
        print(f"Query successfull")
    except Exception as e: print(e,query)
    connection.commit()



# Reporting Datamart


In [62]:
class Reporting_Datamart:
    query_create = """
    CREATE Table IF NOT EXISTS bi_datamart.Reporting_datamart( 
    Produktbezeichnung varchar(64),
    Verkaeufername varchar(64),
    Kundenname varchar(64),
    Datum timestamp,
    Jahr int,
    Monat int,
    Umsatz double,
    Land varchar(64),
    Laengengrad double, 
    Breitengrad double, 
    Stueckzahl int,
    Rebsorte varchar(64));
    """

    truncat_query = "TRUNCATE Table bi_datamart.Reporting_datamart;"

    insert_query="""
    
    INSERT INTO bi_datamart.Reporting_datamart

    SELECT Distinct Produktbezeichnung, Verkaeufername, Kundenname, STR_TO_DATE(Datum, '%d.%m.%Y') AS Auftragsdatum, 
    YEAR(STR_TO_DATE(Datum, '%d.%m.%Y')) As Jahr,
    MONTH(STR_TO_DATE(Datum, '%d.%m.%Y')) As Monat, Umsatz, LAND, Laengengrad, Breitengrad, Stueckzahl, Rebsorte
    FROM bi_cdwh.LINK_Auftrag 

    # Join Hubs 
    JOIN(bi_cdwh.HUB_Produkt,bi_cdwh.HUB_Auftrag, bi_cdwh.HUB_Kunde, bi_cdwh.HUB_Verkaeufer)
    ON 
    (LINK_Auftrag.HUB_Produkt_HSH = HUB_Produkt.HUB_Produkt_HSH 
    AND LINK_Auftrag.HUB_Auftrag_HSH = HUB_Auftrag.HUB_Auftrag_HSH AND
    LINK_Auftrag.HUB_Kunde_HSH = HUB_Kunde.HUB_Kunde_HSH AND 
    LINK_Auftrag.HUB_Verkaeufer_HSH = HUB_Verkaeufer.HUB_Verkaeufer_HSH) 

    # Join Sat
    JOIN(bi_cdwh.SAT_Auftrag) ON (HUB_Auftrag.HUB_Auftrag_HSH = SAT_Auftrag.HUB_Auftrag_HSH)
    JOIN(bi_cdwh.SAT_Adresse) ON(HUB_Kunde.HUB_Kunde_HSH = SAT_Adresse.HUB_Kunde_HSH)

    Join bi_cdwh.LINK_Produkt ON(LINK_Produkt.HUB_Produkt_HSH = HUB_Produkt.HUB_Produkt_HSH )
    Join bi_cdwh.HUB_Rebsorte ON(LINK_Produkt.HUB_Rebsorte_HSH = HUB_Rebsorte.HUB_Rebsorte_HSH);
    """



In [63]:
query_schema = f'''
CREATE SCHEMA IF NOT EXISTS bi_datamart;
'''

execQuery(query_schema)
execQuery(Reporting_Datamart.query_create)
execQuery(Reporting_Datamart.truncat_query)
execQuery(Reporting_Datamart.insert_query)


Insert into was successfull
Insert into was successfull
Insert into was successfull


# Pre Planning Datamart

In [64]:
class PrePlanning_Datamart:
    query_create = f"""
    CREATE TABLE IF NOT EXISTS bi_datamart.PrePlanning_datamart (
    Rebsorte varchar(64), 
    EXP_Stueck int, 
    EXP_Jahr int,
    EXP_Monat int, 
    Verkaeufername varchar(64));
    """
    truncat_query = "TRUNCATE Table bi_datamart.PrePlanning_datamart;"
    
    insert_query = f"""
    INSERT Into bi_datamart.PrePlanning_datamart
    Select Distinct  Rebsorte, SAT_Planzahl.Stueckzahl AS EXP_Stueck, CONVERT(SUBSTRING(Periode,1, 4), INTEGER) AS EXP_Jahr,    
    CONVERT(SUBSTRING(Periode,5,2),INTEGER) AS EXP_Monat, Verkaeufername 

    From bi_cdwh.LINK_Planzahl

    JOIN bi_cdwh.SAT_Planzahl On(LINK_Planzahl.LINK_Planzahl_HSH = SAT_Planzahl.LINK_Planzahl_HSH)
    JOIN bi_cdwh.HUB_Periode ON (LINK_Planzahl.HUB_Periode_HSH = HUB_Periode.HUB_Periode_HSH)
    JOIN bi_cdwh.HUB_Rebsorte ON (LINK_Planzahl.HUB_Rebsorte_HSH = HUB_Rebsorte.HUB_Rebsorte_HSH)
    JOIN bi_cdwh.HUB_Verkaeufer ON (LINK_Planzahl.HUB_Verkaeufer_HSH = HUB_Verkaeufer.HUB_Verkaeufer_HSH)

    JOIN bi_cdwh.LINK_Produkt ON(LINK_Produkt.HUB_Rebsorte_HSH = HUB_Rebsorte.HUB_Rebsorte_HSH);
    """

In [65]:
execQuery(PrePlanning_Datamart.query_create)
execQuery(PrePlanning_Datamart.truncat_query)
execQuery(PrePlanning_Datamart.insert_query)


Insert into was successfull
Insert into was successfull
Insert into was successfull


# Planning Datamart

In [66]:
class Planning_Datamart:
    query_create = f"""
    CREATE TABLE IF NOT EXISTS bi_datamart.Planning_datamart (
    Verkaeufername varchar(64),
    Rebsorte varchar(64),
    EXP_Stueck int, 
    Absatz int,
    EXP_Monat int, 
    EXP_Jahr int);"""

    truncat_query = "TRUNCATE Table bi_datamart.Planning_datamart;"
    
    insert_query = """

    INSERT Into bi_datamart.Planning_datamart
    SELECT PrePlanning_datamart.Verkaeufername, PrePlanning_datamart.Rebsorte, EXP_Stueck, sum(Stueckzahl) AS Absatz , 
    EXP_Monat, EXP_Jahr 

    FROM bi_datamart.PrePlanning_datamart 

    Left JOIN bi_datamart.Reporting_datamart 
    ON (Reporting_datamart.Verkaeufername = PrePlanning_datamart.Verkaeufername 
    AND Reporting_datamart.Rebsorte = PrePlanning_datamart.Rebsorte 
    AND Reporting_datamart.Monat = PrePlanning_datamart.EXP_Monat 
    AND  Reporting_datamart.Jahr = PrePlanning_datamart.EXP_Jahr)

    Group by PrePlanning_datamart.Rebsorte, PrePlanning_datamart.Verkaeufername, EXP_Monat, EXP_Jahr;
    """  

In [68]:

execQuery(Planning_Datamart.query_create)
execQuery(Planning_Datamart.truncat_query)
execQuery(Planning_Datamart.insert_query)


Insert into was successfull
Insert into was successfull
Insert into was successfull
