In [2]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum,lit,when,format_number,row_number,col,filter
from pyspark.sql.types import StringType
from pyspark.sql.window import Window

from openpyxl import load_workbook
from openpyxl.styles import PatternFill, Font, Alignment
from openpyxl.worksheet.table import Table, TableStyleInfo

spark = SparkSession.builder \
    .appName("CSV Loader") \
    .config("spark.executor.memory", "32g") \
    .config("spark.driver.memory", "32g") \
    .config("spark.executor.memoryOverhead", "8192") \
    .config("spark.driver.memoryOverhead", "8192") \
    .config("spark.executor.cores", "7") \
    .config("spark.driver.cores", "7") \
    .config("spark.executor.instances", "2") \
    .getOrCreate()


In [4]:
morefire_file ='/Users/maralsheikhzadeh/documents/codes/Repeating-Analytics/morefire/HG_Salesvalidierung_01-31Mai25.xlsx'
data = []
for sheet_name in ['DE','AT','FR','CH']:
    df = pd.read_excel(morefire_file,header=0,sheet_name=sheet_name)
    df['ID'] = df['ID'].fillna(0).astype(int).astype(str)
    df['LAND'] = sheet_name
    data.append(df)

df_first = pd.concat(data)

  df_first = pd.concat(data)


## Update Originals first
With Server-connect folder **dbf_to_spark.ipynb**, first update all the csv files below so that we can find the EXT_Verweis in them. The following fields connect the data from all 4 countries into 3 unified tables to further connect them with the file from Morefire.

In [5]:
V2AD1096 = spark.read.csv('/Volumes/MARAL/CSV/F01/V2AD1096.csv',sep=';',header=True)
V2AD1056 = spark.read.csv('/Volumes/MARAL/CSV/F01/V2AD1056.csv',sep=';',header=True)
V2AD1156 = spark.read.csv('/Volumes/MARAL/CSV/F01/V2AD1156.csv',sep=';',header=True)

In [6]:
V2AD1096_FR = spark.read.csv('/Volumes/MARAL/CSV/f02/V2AD1096.csv',sep=';',header=True)
V2AD1056_FR = spark.read.csv('/Volumes/MARAL/CSV/f02/V2AD1056.csv',sep=';',header=True)
V2AD1156_FR = spark.read.csv('/Volumes/MARAL/CSV/f02/V2AD1156.csv',sep=';',header=True)

In [7]:
V2AD1096_AU = spark.read.csv('/Volumes/MARAL/CSV/F03/V2AD1096.csv',sep=';',header=True)
V2AD1056_AU = spark.read.csv('/Volumes/MARAL/CSV/F03/V2AD1056.csv',sep=';',header=True)
V2AD1156_AU = spark.read.csv('/Volumes/MARAL/CSV/F03/V2AD1156.csv',sep=';',header=True)

In [8]:
V2AD1096_CH = spark.read.csv('/Volumes/MARAL/CSV/F04/V2AD1096.csv',sep=';',header=True)
V2AD1056_CH = spark.read.csv('/Volumes/MARAL/CSV/F04/V2AD1056.csv',sep=';',header=True)
V2AD1156_CH = spark.read.csv('/Volumes/MARAL/CSV/F04/V2AD1156.csv',sep=';',header=True)

In [9]:
V2AD1096 = V2AD1096_FR.union(V2AD1096_AU).union(V2AD1096_CH).union(V2AD1096)
V2AD1056 = V2AD1056_FR.union(V2AD1056_AU).union(V2AD1056_CH).union(V2AD1056)
V2AD1156 = V2AD1156_FR.union(V2AD1156_AU).union(V2AD1156_CH).union(V2AD1156)


In [10]:
V2AD1096 = V2AD1096.withColumn('SHOPNUMMER', col('SHOPNUMMER').cast(StringType()))


In [11]:
df = spark.createDataFrame(df_first)
df = df.join(V2AD1096[['SHOPNUMMER','RECH_NR']],df['ID'] == V2AD1096['SHOPNUMMER'],how='left')

In [12]:
# merged_df['RECH_NR'] = merged_df['RECH_NR'].astype(str)
df = df.withColumn('RECH_NR', col('RECH_NR').cast(StringType()))
V2AD1056 = V2AD1056.withColumn('RECH_NR', col('RECH_NR').cast(StringType()))

In [13]:
df = df.join(V2AD1056[['VERWEIS','RECH_NR','BEST_WERT','MWST1','AUF_ANLAGE']],on='RECH_NR',how='left')
df = df.join(V2AD1156[['ART_NR','GROESSE','FARBE','PREIS','MWST','RECHNUNG']],df['RECH_NR']==V2AD1156['RECHNUNG'],how='left')

## Changing from Pyspark to Pandas
I used Pyspark to load and connect the data from our databases because the data is so large and Pandas is so slow. Now that everything is joined and the data is filtered to the original lines we had, it's easier to transform it back to Pandas to do the final analysis on it.

In [14]:
df = df.toPandas()


[Stage 12:>                                                       (0 + 16) / 16]

CodeCache: size=131072Kb used=37849Kb max_used=37874Kb free=93222Kb
 bounds [0x0000000104c18000, 0x0000000107148000, 0x000000010cc18000]
 total_blobs=12910 nmethods=11983 adapters=835
 compilation: disabled (not enough contiguous free space left)


                                                                                

In [16]:
df['GROESSE'] = df['GROESSE'].astype(str).fillna("")
df['FARBE'] = df['FARBE'].astype(str).fillna("")
df['ANR'] = df['ART_NR'].str.ljust(8) + df['GROESSE'].str.ljust(4) + df['FARBE'].str.ljust(2)
df['ANR'] = df['ANR'].str.strip()

In [17]:
df_new = df.copy()


### Transforming data

- We create a columnn for items that were **Not Found** in the VS4 Data. We make a * sign for them to know what is missing from data.
- The third line removes items that were not computed in Morefire's Sales_Betrag, namely: Versandkosten (VK) and Geschenkverpackung (011P00). Items may need to be added to this list in the future depending on the items that we see further that are not computed here.
- Then we filter only the columns we need. and change data types for our requirements.
- Finally, we aggregate all the lines from the same Rechnung and Extra so that we compute the Fakturierte Nettoumsatz based on the Nettoumsatz value for each product in the list.

In [38]:
df_new['Publisher_ID'] = df_new['Publisher_ID'].astype(str)
df_new['Not Found'] = ""
df_new.loc[df_new['SHOPNUMMER'].isna(),'Not Found'] = '*'

df_new = df_new[df_new['ART_NR'].isin(['VK','011P00'])==False]
# df_new = df_new[['ID', 'AWIN ID', 'Publisher_ID', 'Sale_Betrag', 'Provision', 'Datum',
#        'Site_Name', 'URL', 'Ablehnungs_Grund', 'Referrer', 'Publisher_URL',
#        'Transaktions_Teile', 'Land_nach_IP', 'Benutzerdef_Parameter',
#        'Produkte', 'getrackte_Teile', 'Netzwerkgebühr','Not Found']]
df_new['PREIS'] = df_new['PREIS'].astype(float).fillna(0)
df_new = df_new.drop_duplicates(subset=['ID','Datum','PREIS','ANR','RECHNUNG'])
df_grouped = df_new.groupby(
    ['ID']
).agg({'PREIS': 'sum','Not Found':'first'}).reset_index().rename(columns={'PREIS': 'Fakturierte Nettoumsatz'})
df_grouped = df_grouped.merge(df_first,on='ID',how='left')
df_grouped = df_grouped[['ID', 'AWIN ID', 'Publisher_ID', 'Sale_Betrag', 'Provision', 'Datum',
       'Site_Name', 'URL', 'Ablehnungs_Grund', 'Referrer', 'Publisher_URL',
       'Transaktions_Teile', 'Land_nach_IP', 'Benutzerdef_Parameter',
       'Produkte', 'getrackte_Teile', 'Netzwerkgebühr','Fakturierte Nettoumsatz','Not Found',"LAND"]]



In [39]:
df_grouped

Unnamed: 0,ID,AWIN ID,Publisher_ID,Sale_Betrag,Provision,Datum,Site_Name,URL,Ablehnungs_Grund,Referrer,Publisher_URL,Transaktions_Teile,Land_nach_IP,Benutzerdef_Parameter,Produkte,getrackte_Teile,Netzwerkgebühr,Fakturierte Nettoumsatz,Not Found,LAND
0,174609701033739,1844082293,626889,16.76,2.01,2025-05-01 12:58:00,adstrong CSS,https://preisvergleich.adstrong.de/,,https://www.hagengrote.de/,https://www.google.com/,"NEW:16,76",Germany,1: Hagen_Grote,"[{""product_id"":""027N18"",""product_name"":""Nachfü...","NEW:16,76",0.08,0.000000e+00,,DE
1,174610532269239,1844160552,176013,30.17,2.41,2025-05-01 15:16:00,Redbrain Ltd,https://www.redbrain.shop/,,https://www.hagengrote.de/,https://de.redbrain.shop/,"NEW:30,17",Germany,1: Hagen_Grote,"[{""product_id"":""222Y31R"",""product_name"":""Gussg...","NEW:30,17",0.15,1.776357e-15,,DE
2,174610839077439,1844189385,789945,12.56,0.75,2025-05-01 16:07:00,FatCoupon Technology Ltd,https://www.fatcoupon.com,,https://www.hagengrote.de/,https://fatcoupon.com/,"NEW:12,56",Germany,1: Hagen_Grote,"[{""product_id"":""649Z03"",""product_name"":""Hagert...","NEW:12,56",0.06,1.256000e+01,,DE
3,174611008325439,1844205020,397635,25.17,2.01,2025-05-01 16:35:00,Bauer Xcel Media Deutschland KG,http://wunderweib.de,,https://www.hagengrote.de/,https://www.lecker.de/,"NEW:25,17",Germany,1: Hagen_Grote,"[{""product_id"":""364M01"",""product_name"":""Edelst...","NEW:25,17",0.13,2.517000e+01,,DE
4,174618534471239,1844788085,626889,16.76,2.01,2025-05-02 13:29:00,adstrong CSS,https://preisvergleich.adstrong.de/,,https://www.hagengrote.de/,https://www.google.com/,"NEW:16,76",Germany,1: Hagen_Grote,"[{""product_id"":""027N18"",""product_name"":""Nachfü...","NEW:16,76",0.08,1.676000e+01,,DE
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
239,174869054645839,1863984047,176013,225.84,18.07,2025-05-31 13:24:00,Redbrain Ltd,https://www.redbrain.shop/,,https://www.hagengrote.de/,https://de.redbrain.shop/,"NEW:225,84",Germany,,"[{""product_id"":""266F02CR"",""product_name"":""Tisc...","NEW:225,84",1.13,2.258400e+02,,DE
240,174869289350139,1864001999,333895,109.75,6.59,2025-05-31 14:01:00,Atolls DE,https://www.cuponation.de,,https://www.hagengrote.de/,https://www.mydealz.de/,"EXISTING:109,75",Germany,,"[{""product_id"":""213HX08L"",""product_name"":""Ganz...","EXISTING:109,75",0.55,1.097400e+02,,DE
241,174870248491239,1864083952,176013,197.44,15.80,2025-05-31 16:42:00,Redbrain Ltd,https://www.redbrain.shop/,,,https://de.redbrain.shop/,"EXISTING:197,44",United States of America,1: gtm_s2s_stape_GTM-PXRSDGNR,,"EXISTING:197,44",0.99,0.000000e+00,,DE
242,174870810726639,1864142591,176013,83.99,6.72,2025-05-31 18:15:00,Redbrain Ltd,https://www.redbrain.shop/,,https://www.hagengrote.de/,https://de.redbrain.shop/,"NEW:83,99",Germany,,"[{""product_id"":""212HU12"",""product_name"":""Puris...","NEW:83,99",0.42,8.399000e+01,,DE


Finally, we compute the difference between our nettoumsatz and what is given by Morefire as Sales_Betrag and leave it in the Difference column.
When everything is computed. we save the data in an excel sheet and there do some minor formatting like formatting the data as a table and then using formatting cell to add Euro sign to columns with Euro values and so on.

In [45]:

df_grouped['Fakturierte Nettoumsatz'] = df_grouped['Fakturierte Nettoumsatz'].fillna(0)
# df_grouped.loc[df_grouped['Fakturierte Nettoumsatz']<0, 'Fakturierte Nettoumsatz'] = 0 
    
# 3. Create 'Difference' column
def calculate_difference(row):
    sale_betrag = row['Sale_Betrag']
    fakturierte_nettoumsatz = row['Fakturierte Nettoumsatz']
    difference = sale_betrag - fakturierte_nettoumsatz
    return '' if round(difference, 0) == 0 else f"{round(difference, 2):.2f}"

df_grouped['Difference'] = df_grouped.apply(calculate_difference, axis=1)


In [47]:
df_grouped

Unnamed: 0,ID,AWIN ID,Publisher_ID,Sale_Betrag,Provision,Datum,Site_Name,URL,Ablehnungs_Grund,Referrer,...,Transaktions_Teile,Land_nach_IP,Benutzerdef_Parameter,Produkte,getrackte_Teile,Netzwerkgebühr,Fakturierte Nettoumsatz,Not Found,LAND,Difference
0,174609701033739,1844082293,626889,16.76,2.01,2025-05-01 12:58:00,adstrong CSS,https://preisvergleich.adstrong.de/,,https://www.hagengrote.de/,...,"NEW:16,76",Germany,1: Hagen_Grote,"[{""product_id"":""027N18"",""product_name"":""Nachfü...","NEW:16,76",0.08,0.000000e+00,,DE,16.76
1,174610532269239,1844160552,176013,30.17,2.41,2025-05-01 15:16:00,Redbrain Ltd,https://www.redbrain.shop/,,https://www.hagengrote.de/,...,"NEW:30,17",Germany,1: Hagen_Grote,"[{""product_id"":""222Y31R"",""product_name"":""Gussg...","NEW:30,17",0.15,1.776357e-15,,DE,30.17
2,174610839077439,1844189385,789945,12.56,0.75,2025-05-01 16:07:00,FatCoupon Technology Ltd,https://www.fatcoupon.com,,https://www.hagengrote.de/,...,"NEW:12,56",Germany,1: Hagen_Grote,"[{""product_id"":""649Z03"",""product_name"":""Hagert...","NEW:12,56",0.06,1.256000e+01,,DE,
3,174611008325439,1844205020,397635,25.17,2.01,2025-05-01 16:35:00,Bauer Xcel Media Deutschland KG,http://wunderweib.de,,https://www.hagengrote.de/,...,"NEW:25,17",Germany,1: Hagen_Grote,"[{""product_id"":""364M01"",""product_name"":""Edelst...","NEW:25,17",0.13,2.517000e+01,,DE,
4,174618534471239,1844788085,626889,16.76,2.01,2025-05-02 13:29:00,adstrong CSS,https://preisvergleich.adstrong.de/,,https://www.hagengrote.de/,...,"NEW:16,76",Germany,1: Hagen_Grote,"[{""product_id"":""027N18"",""product_name"":""Nachfü...","NEW:16,76",0.08,1.676000e+01,,DE,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
239,174869054645839,1863984047,176013,225.84,18.07,2025-05-31 13:24:00,Redbrain Ltd,https://www.redbrain.shop/,,https://www.hagengrote.de/,...,"NEW:225,84",Germany,,"[{""product_id"":""266F02CR"",""product_name"":""Tisc...","NEW:225,84",1.13,2.258400e+02,,DE,
240,174869289350139,1864001999,333895,109.75,6.59,2025-05-31 14:01:00,Atolls DE,https://www.cuponation.de,,https://www.hagengrote.de/,...,"EXISTING:109,75",Germany,,"[{""product_id"":""213HX08L"",""product_name"":""Ganz...","EXISTING:109,75",0.55,1.097400e+02,,DE,
241,174870248491239,1864083952,176013,197.44,15.80,2025-05-31 16:42:00,Redbrain Ltd,https://www.redbrain.shop/,,,...,"EXISTING:197,44",United States of America,1: gtm_s2s_stape_GTM-PXRSDGNR,,"EXISTING:197,44",0.99,0.000000e+00,,DE,197.44
242,174870810726639,1864142591,176013,83.99,6.72,2025-05-31 18:15:00,Redbrain Ltd,https://www.redbrain.shop/,,https://www.hagengrote.de/,...,"NEW:83,99",Germany,,"[{""product_id"":""212HU12"",""product_name"":""Puris...","NEW:83,99",0.42,8.399000e+01,,DE,


In [50]:
# df_grouped.to_excel('morefire_list.xlsx', index=False)

with pd.ExcelWriter('MoreFire_Export.xlsx', engine='openpyxl') as writer:
    for land in ['DE','AT','FR','CH']:
        df_final = df_grouped[df_grouped['LAND'] == land].copy()
        df_final.drop(columns=['LAND'],inplace=True)
        df_final.to_excel(writer,sheet_name=land,index=False)

In [18]:
len(df)

2758